Skip to content

Commit

Permalink
partition query after alter add partition
Browse files Browse the repository at this point in the history
update partitionspec path after compaction

load data to mv after add partition

rework
  • Loading branch information
ShreelekhyaG committed Mar 18, 2021
1 parent d9f69ae commit 248c84d
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public Path getLocation() {
return locationPath;
}

public void setLocation(String location) {
locationPath = new Path(location);
this.location = location;
}

public String getUuid() {
return uuid;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
*/
package org.apache.carbondata.spark.testsuite.secondaryindex

import org.apache.spark.sql.Row
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.sdk.file.CarbonWriter
import org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI

class TestSIWithPartition extends QueryTest with BeforeAndAfterAll {
Expand Down Expand Up @@ -380,6 +384,37 @@ class TestSIWithPartition extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists partition_table")
}

test("test si with add partition based on location on partition table") {
sql("drop table if exists partition_table")
sql("create table partition_table (id int,name String) " +
"partitioned by(email string) stored as carbondata")
sql("insert into partition_table select 1,'blue','abc'")
sql("CREATE INDEX partitionTable_si on table partition_table (name) as 'carbondata'")
val schemaFile =
CarbonTablePath.getSchemaFilePath(
CarbonEnv.getCarbonTable(None, "partition_table")(sqlContext.sparkSession).getTablePath)
val sdkWritePath = target + "/" + "def"
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath))
val writer = CarbonWriter.builder()
.outputPath(sdkWritePath)
.writtenBy("test")
.withSchemaFile(schemaFile)
.withCsvInput()
.build()
writer.write(Seq("2", "red", "def").toArray)
writer.write(Seq("3", "black", "def").toArray)
writer.close()
sql(s"alter table partition_table add partition (email='def') location '$sdkWritePath'")
var extSegmentQuery = sql("select * from partition_table where name = 'red'")
checkAnswer(extSegmentQuery, Row(2, "red", "def"))
sql("insert into partition_table select 4,'grey','bcd'")
sql("insert into partition_table select 5,'red','abc'")
sql("alter table partition_table compact 'minor'")
extSegmentQuery = sql("select * from partition_table where name = 'red'")
checkAnswer(extSegmentQuery, Seq(Row(2, "red", "def"), Row(5, "red", "abc")))
assert(extSegmentQuery.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin])
}

override protected def afterAll(): Unit = {
sql("drop index if exists indextable1 on uniqdata1")
sql("drop table if exists uniqdata1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.carbondata.spark.rdd

import java.io.IOException
import java.io.{File, IOException}
import java.util
import java.util.{Collections, List}
import java.util.concurrent.atomic.AtomicInteger
Expand Down Expand Up @@ -98,6 +98,24 @@ class CarbonMergerRDD[K, V](
broadCastSplits = sparkContext.broadcast(new CarbonInputSplitWrapper(splits))
}

// checks for added partition specs with external path.
// after compaction, location path to be updated with table path.
def checkAndUpdatePartitionLocation(partitionSpec: PartitionSpec) : PartitionSpec = {
if (partitionSpec != null) {
carbonLoadModel.getLoadMetadataDetails.asScala.foreach(loadMetaDetail => {
if (loadMetaDetail.getPath != null &&
loadMetaDetail.getPath.split(",").contains(partitionSpec.getLocation.toString)) {
val updatedPartitionLocation = CarbonDataProcessorUtil
.createCarbonStoreLocationForPartition(
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
partitionSpec.getPartitions.toArray.mkString(File.separator))
partitionSpec.setLocation(updatedPartitionLocation)
}
})
}
partitionSpec
}

override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
val queryStartTime = System.currentTimeMillis()
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
Expand Down Expand Up @@ -210,6 +228,7 @@ class CarbonMergerRDD[K, V](
val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
carbonTable, carbonLoadModel.getTaskNo, mergeNumber, true, false)

checkAndUpdatePartitionLocation(partitionSpec)
if (carbonTable.getSortScope == SortScopeOptions.SortScope.NO_SORT ||
rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX).size() == 0) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,28 @@

package org.apache.carbondata.spark.rdd

import java.io.File
import java.util
import java.util.{Collections, List}
import java.util.concurrent.ExecutorService

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer

import org.apache.hadoop.mapreduce.InputSplit
import org.apache.spark.sql.{CarbonThreadUtil, SparkSession, SQLContext}
import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, CarbonMergerMapping, CompactionCallableModel, CompactionModel}
import org.apache.spark.sql.execution.command.management.CommonLoadUtils
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.util.{CollectionAccumulator, MergeIndexUtil}

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
Expand Down Expand Up @@ -247,6 +250,7 @@ class CarbonTableCompactor(
.sparkContext
.collectionAccumulator[Map[String, SegmentMetaDataInfo]]

val updatePartitionSpecs : List[PartitionSpec] = new util.ArrayList[PartitionSpec]
var mergeRDD: CarbonMergerRDD[String, Boolean] = null
if (carbonTable.isHivePartitionTable) {
// collect related partitions
Expand All @@ -263,6 +267,16 @@ class CarbonTableCompactor(
if (partitionSpecs != null && partitionSpecs.nonEmpty) {
compactionCallableModel.compactedPartitions = Some(partitionSpecs)
}
partitionSpecs.foreach(partitionSpec => {
carbonLoadModel.getLoadMetadataDetails.asScala.foreach(loadMetaDetail => {
if (loadMetaDetail.getPath != null &&
loadMetaDetail.getPath.split(",").contains(partitionSpec.getLocation.toString)) {
// if partition spec added is external path,
// after compaction location path to be updated with table path.
updatePartitionSpecs.add(partitionSpec)
}
})
})
}

val mergeStatus =
Expand All @@ -276,7 +290,25 @@ class CarbonTableCompactor(
segmentMetaDataAccumulator)
} else {
if (mergeRDD != null) {
mergeRDD.collect
val result = mergeRDD.collect
val tableIdentifier = new TableIdentifier(carbonTable.getTableName,
Some(carbonTable.getDatabaseName))
// To update partitionSpec in hive metastore, drop and add with latest path.
updatePartitionSpecs.asScala.foreach(deletePartitionSpec => {
AlterTableDropPartitionCommand(
tableIdentifier,
deletePartitionSpec.getPartitions
.asScala.map(PartitioningUtils.parsePathFragment),
true, false, true).run(sqlContext.sparkSession)
})
updatePartitionSpecs.asScala.foreach(partitionSpec => {
val addPartition = mergeRDD.checkAndUpdatePartitionLocation(partitionSpec)
AlterTableAddPartitionCommand(tableIdentifier,
Seq(PartitioningUtils.parsePathFragment(addPartition.getPartitions
.toArray.mkString(File.separator)))
.map(p => (p, None)), false).run(sqlContext.sparkSession)
})
result
} else {
new CarbonMergerRDD(
sc.sparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AlterTableModel, AtomicRunnableCommand}
import org.apache.spark.sql.execution.command.management.CommonLoadUtils
import org.apache.spark.sql.optimizer.CarbonFilters

import org.apache.carbondata.common.logging.LogServiceFactory
Expand Down Expand Up @@ -125,6 +126,19 @@ case class CarbonAlterTableAddHivePartitionCommand(
loadModel.setColumnCompressor(columnCompressor)
loadModel.setCarbonTransactionalTable(true)
loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table))
// create operationContext to fire load events
val operationContext: OperationContext = new OperationContext
val (tableIndexes, indexOperationContext) = CommonLoadUtils.firePreLoadEvents(
sparkSession = sparkSession,
carbonLoadModel = loadModel,
uuid = "",
factPath = "",
null,
null,
isOverwriteTable = false,
isDataFrame = false,
updateModel = None,
operationContext = operationContext)
// Create new entry in tablestatus file
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, false)
val newMetaEntry = loadModel.getCurrentLoadMetadataDetail
Expand All @@ -133,6 +147,9 @@ case class CarbonAlterTableAddHivePartitionCommand(
loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp)) +
CarbonTablePath.SEGMENT_EXT
newMetaEntry.setSegmentFile(segmentFileName)
// set path to identify it as external added partition
newMetaEntry.setPath(partitionSpecsAndLocsTobeAdded.asScala
.map(_.getLocation.toString).mkString(","))
val segmentsLoc = CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
CarbonUtil.checkAndCreateFolderWithPermission(segmentsLoc)
val segmentPath = segmentsLoc + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName
Expand Down Expand Up @@ -169,6 +186,13 @@ case class CarbonAlterTableAddHivePartitionCommand(
customSegmentIds = customSegmentIds)
val mergeIndexEvent = AlterTableMergeIndexEvent(sparkSession, table, alterTableModel)
OperationListenerBus.getInstance.fireEvent(mergeIndexEvent, new OperationContext)
// fire event to load data to materialized views
CommonLoadUtils.firePostLoadEvents(sparkSession,
loadModel,
tableIndexes,
indexOperationContext,
table,
operationContext)
}
}
Seq.empty[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.sdk.file.CarbonWriter

/**
* Test class for MV to verify partition scenarios
Expand Down Expand Up @@ -770,5 +772,33 @@ class TestPartitionWithMV extends QueryTest with BeforeAndAfterAll with BeforeAn
assert(sql("select timeseries(b,'day'),c from partitionone group by timeseries(b,'day'),c").collect().length == 1)
sql("drop table if exists partitionone")
}

test("test mv with add partition based on location on partition table") {
sql("drop table if exists partition_table")
sql("create table partition_table (id int,name String) " +
"partitioned by(email string) stored as carbondata")
sql("drop materialized view if exists partitiontable_mv")
sql("CREATE materialized view partitiontable_mv as select name from partition_table")
sql("insert into partition_table select 1,'blue','abc'")
val schemaFile =
CarbonTablePath.getSchemaFilePath(
CarbonEnv.getCarbonTable(None, "partition_table")(sqlContext.sparkSession).getTablePath)
val sdkWritePath = target + "/" + "def"
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath))
val writer = CarbonWriter.builder()
.outputPath(sdkWritePath)
.writtenBy("test")
.withSchemaFile(schemaFile)
.withCsvInput()
.build()
writer.write(Seq("2", "red", "def").toArray)
writer.write(Seq("3", "black", "def").toArray)
writer.close()
sql(s"alter table partition_table add partition (email='def') location '$sdkWritePath'")
val extSegmentQuery = sql("select name from partition_table")
assert(TestUtil.verifyMVHit(extSegmentQuery.queryExecution.optimizedPlan, "partitiontable_mv"))
checkAnswer(extSegmentQuery, Seq(Row("blue"), Row("red"), Row("black")))
sql("drop table if exists partition_table")
}
// scalastyle:on lineLength
}
Original file line number Diff line number Diff line change
Expand Up @@ -824,9 +824,9 @@ public static List<Segment> getValidSegments(List<LoadMetadataDetails> loadMetad
//check if this load is an already merged load.
if (null != segment.getMergedLoadName()) {
segments
.add(Segment.getSegment(segment.getMergedLoadName(), segment.getSegmentFile(), null));
.add(new Segment(segment.getMergedLoadName(), segment.getSegmentFile(), null, segment));
} else {
segments.add(Segment.getSegment(segment.getLoadName(), segment.getSegmentFile(), null));
segments.add(new Segment(segment.getLoadName(), segment.getSegmentFile(), null, segment));
}
}
return segments;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,16 @@ public static String createCarbonStoreLocation(CarbonTable carbonTable, String s
return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId);
}

/**
* This method will get the store location for the given path, partition spec
*
* @return data directory path
*/
public static String createCarbonStoreLocationForPartition(CarbonTable carbonTable,
String partition) {
return carbonTable.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + partition;
}

/**
* initialise data type for measures for their storage format
*/
Expand Down

0 comments on commit 248c84d

Please sign in to comment.