Skip to content
Permalink
Browse files
[CARBONDATA-4322] Apply local sort task level property for insert
Why is this PR needed?
Currently, When carbon.partition.data.on.tasklevel is enabled with
local sort, the number of tasks launched for load will be based on
node locality. But for insert command, the local sort task level
property is not applied which is causing the number of tasks
launched based on the input files.

What changes were proposed in this PR?
Included changes to apply carbon.partition.data.on.tasklevel property
for insert command as well. Used DataLoadCoalescedRDD to coalesce
the partitions and a DataLoadCoalescedUnwrapRDDto unwrap partitions
from DataLoadPartitionWrap and iterate.

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes

This closes #4248
  • Loading branch information
ShreelekhyaG authored and Indhumathi27 committed Feb 14, 2022
1 parent 05aff87 commit 59f23c0dfea74199f97b6d3626c31d8fba4a2e1f
Showing 2 changed files with 124 additions and 5 deletions.
@@ -23,10 +23,12 @@ import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, DataLoadPartitionWrap, RDD}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
@@ -36,6 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.command.UpdateTableModel
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SparkSQLUtil
@@ -896,6 +899,9 @@ object CommonLoadUtils {
Array[String]()
}
var persistedRDD: Option[RDD[InternalRow]] = None
val partitionBasedOnLocality = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL,
CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL_DEFAULT).toBoolean
try {
val query: LogicalPlan = if ((loadParams.dataFrame.isDefined) ||
loadParams.scanResultRDD.isDefined) {
@@ -982,9 +988,26 @@ object CommonLoadUtils {
persistedRDD = persistedRDDLocal
transformedPlan
} else {
val rdd = loadParams.scanResultRDD.get
val newRdd =
if (sortScope == SortScopeOptions.SortScope.LOCAL_SORT && partitionBasedOnLocality) {
val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
}.distinct.length
val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
nodeNumOfData,
loadParams.sparkSession.sqlContext.sparkContext)
val coalescedRdd = new DataLoadCoalescedRDD[InternalRow](
loadParams.sparkSession,
rdd,
nodes.toArray.distinct)
new DataLoadCoalescedUnwrapRDD(coalescedRdd)
} else {
rdd
}
val (transformedPlan, partitions, persistedRDDLocal) =
CommonLoadUtils.transformQueryWithInternalRow(
loadParams.scanResultRDD.get,
newRdd,
loadParams.sparkSession,
loadParams.carbonLoadModel,
partitionValues,
@@ -999,9 +1022,6 @@ object CommonLoadUtils {
}
} else {
val columnCount = loadParams.carbonLoadModel.getCsvHeaderColumns.length
val partitionBasedOnLocality = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL,
CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL_DEFAULT).toBoolean
val rdd =
if (sortScope == SortScopeOptions.SortScope.LOCAL_SORT && partitionBasedOnLocality) {
CsvRDDHelper.csvFileScanRDDForLocalSort(
@@ -1208,3 +1228,48 @@ object CommonLoadUtils {
segmentMetaDataInfo
}
}

/**
* It unwraps partitions from DataLoadPartitionWrap
*/
class DataLoadCoalescedUnwrapRDD[T: ClassTag](@transient var prev: RDD[T])
extends RDD[InternalRow](prev) {
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
new Iterator[InternalRow] {

final val partitionIterator = firstParent[DataLoadPartitionWrap[InternalRow]].iterator(split,
context)
private var rddIter: Iterator[InternalRow] = null

override def hasNext: Boolean = {
// If rddIter is already initialised, check and return
if (rddIter != null && rddIter.hasNext) {
true
} else {
internalHasNext()
}
}

def internalHasNext(): Boolean = {
if (partitionIterator.hasNext) {
val value = partitionIterator.next()
rddIter = value.rdd.iterator(value.partition, context)
var hasNext = rddIter.hasNext
// If iterator is finished then check for next iterator.
if (!hasNext) {
hasNext = internalHasNext()
}
hasNext
} else {
false
}
}

override def next(): InternalRow = {
rddIter.next()
}
}
}

override protected def getPartitions: Array[Partition] = prev.partitions
}
@@ -140,6 +140,59 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
}
}

test("data insert and compaction for local sort partition table based on task id ") {
sql("drop table if exists partitionthree")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL, "TRUE")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,3")
try {
sql("""
| CREATE TABLE partitionthree (empno int, doj Timestamp,
| workgroupcategoryname String, deptno int, deptname String,
| projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
| utilization int,salary int)
| PARTITIONED BY (workgroupcategory int, empname String, designation String)
| STORED AS carbondata
| tblproperties('sort_scope'='local_sort', 'sort_columns'='deptname,empname')
""".stripMargin)
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
assert(getPartitionDataFilesCount("partitionthree", "/workgroupcategory=1/empname=arvind/designation=SE/") == 1)
sql("drop table if exists partitionthree_hive")
sql("CREATE TABLE partitionthree_hive (empno int, doj Timestamp, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) PARTITIONED BY (workgroupcategory int, empname String, designation String)")
sql("set hive.exec.dynamic.partition.mode=nonstrict")
sql("insert into partitionthree_hive select * from partitionthree")
sql("insert into partitionthree_hive select * from partitionthree")
sql("insert into partitionthree_hive select * from partitionthree")

sql("insert into partitionthree select * from partitionthree_hive")
assert(getPartitionDataFilesCount("partitionthree", "/workgroupcategory=1/empname=arvind/designation=SE/") == 2)
sql("ALTER TABLE partitionthree COMPACT 'MINOR'").collect()
sql("clean files for table partitionthree options('force'='true')")
assert(getPartitionDataFilesCount("partitionthree", "/workgroupcategory=1/empname=arvind/designation=SE/") == 1)
checkExistence(sql("show segments for table partitionthree"), true, "0.1")
checkAnswer(sql(
"select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, " +
"deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization," +
" salary from partitionthree where workgroupcategory=1 and empname='arvind' and " +
"designation='SE' order by empno"),
sql(
"select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, " +
"deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, " +
"utilization, salary from originTable where workgroupcategory=1 and empname='arvind' " +
"and designation='SE' order by empno"))
} finally {
sql("drop table if exists partitionthree")
sql("drop table if exists partitionthree_hive")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL,
CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL_DEFAULT)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
}
}

private def getPartitionDataFilesCount(tableName: String, partition: String) = {
val table: CarbonTable = CarbonEnv.getCarbonTable(None, tableName)(sqlContext.sparkSession)
FileFactory.getCarbonFile(table.getTablePath + partition)
@@ -280,6 +333,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
sql("drop table if exists partitionone")
sql("drop table if exists partitiontwo")
sql("drop table if exists partitionthree")
sql("drop table if exists partitionthree_hive")
sql("drop table if exists partitionmajor")
sql("drop table if exists staticpartition")
sql("drop table if exists staticpartitioncompaction")

0 comments on commit 59f23c0

Please sign in to comment.