Skip to content
Permalink
Browse files
[CARBONDATA-4317] Fix TPCDS performance issues
Why is this PR needed?
The following issues has degraded the TPCDS query performance
1. If dynamic filters is not present in partitionFilters Set, then that filter is skipped, to pushdown to spark.
2. In some cases, some nodes like Exchange / Shuffle is not reused, because the CarbonDataSourceSCan plan is not mached
3. While accessing the metadata on the canonicalized plan throws NPE

What changes were proposed in this PR?
1. Check if dynamic filters is present in PartitionFilters set. If not, pushdown the filter
2. Match the plans, by converting them to canonicalized and by normalising the expressions
3. Move variables used in metadata(), to avoid NPE while comparing plans

This closes #4241
  • Loading branch information
Indhumathi27 authored and kunal642 committed Dec 22, 2021
1 parent d629dc0 commit 0f1d2a45e5f614fd123bd734ab37d7e453c21344
Showing 5 changed files with 54 additions and 16 deletions.
@@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
import org.apache.spark.sql.execution.WholeStageCodegenExec

import org.apache.carbondata.core.metadata.schema.BucketingInfo
import org.apache.carbondata.core.readcommitter.ReadCommittedScope
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.hadoop.CarbonProjection

@@ -44,7 +43,6 @@ case class CarbonDataSourceScan(
output: Seq[Attribute],
partitionFiltersWithoutDpp: Seq[SparkExpression],
dataFilters: Seq[SparkExpression],
@transient readCommittedScope: ReadCommittedScope,
@transient pushedDownProjection: CarbonProjection,
@transient pushedDownFilters: Seq[Expression],
directScanSupport: Boolean,
@@ -64,6 +62,10 @@ case class CarbonDataSourceScan(
partitionFiltersWithDpp,
segmentIds) {

val pushDownFiltersStr: String = seqToString(pushedDownFilters.map(_.getStatement))

val projectionColStr: String = seqToString(pushedDownProjection.getAllColumns)

override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
val info: BucketingInfo = relation.carbonTable.getBucketingInfo
if (info != null) {
@@ -91,15 +93,18 @@ case class CarbonDataSourceScan(
}
}

def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]")

override lazy val metadata: Map[String, String] = {
def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
val metadata =
Map(
"ReadSchema" -> seqToString(pushedDownProjection.getAllColumns),
"ReadSchema" -> projectionColStr,
"Batched" -> supportsBatchOrColumnar.toString,
"DirectScan" -> (supportsBatchOrColumnar && directScanSupport).toString,
"PushedFilters" -> seqToString(pushedDownFilters.map(_.getStatement)))
if (relation.carbonTable.isHivePartitionTable) {
"PushedFilters" -> pushDownFiltersStr)
// if plan is canonicalized, then filter expressions will be normalized. In that case,
// skip adding selected partitions to metadata
if (!this.isCanonicalizedPlan && relation.carbonTable.isHivePartitionTable) {
metadata + ("PartitionFilters" -> seqToString(partitionFiltersWithDpp)) +
("PartitionCount" -> selectedPartitions.size.toString)
} else {
@@ -142,14 +147,40 @@ case class CarbonDataSourceScan(
outputAttibutesAfterNormalizingExpressionIds,
QueryPlan.normalizePredicates(partitionFiltersWithoutDpp, output),
QueryPlan.normalizePredicates(dataFilters, output),
null,
null,
pushedDownProjection,
Seq.empty,
directScanSupport,
extraRDD,
tableIdentifier,
selectedCatalogPartitions,
Seq.empty,
QueryPlan.normalizePredicates(partitionFiltersWithDpp, output)
)
}

override def equals(other: Any): Boolean = {
other match {
case scan: CarbonDataSourceScan =>
if (scan.relation == relation) {
var currentPlan = this
var otherPlan = scan
// In some cases, the plans for comparison is not canonicalized. In that case, comparing
// pushedDownFilters will not match, since objects are different. Do canonicalize
// the plans before comparison, which can reuse exchange for better performance
if (pushedDownFilters.nonEmpty && scan.pushedDownFilters.nonEmpty) {
otherPlan = scan.canonicalized.asInstanceOf[CarbonDataSourceScan]
currentPlan = this.canonicalized.asInstanceOf[CarbonDataSourceScan]
}
// compare metadata, partition filter and data filter expressions
currentPlan.metadata == otherPlan.metadata &&
currentPlan.partitionFiltersWithDpp.toList.asJava
.containsAll(otherPlan.partitionFiltersWithDpp.toList.asJava) &&
(currentPlan.dataFilters == otherPlan.dataFilters ||
QueryPlan.normalizePredicates(currentPlan.dataFilters, currentPlan.output)
== QueryPlan.normalizePredicates(otherPlan.dataFilters, otherPlan.output))
} else {
false
}
case _ => false
}
}
}
@@ -158,9 +158,10 @@ private[sql] object CarbonSourceStrategy extends SparkStrategy {
SparkSession.getActiveSession.get,
relation.catalogTable.get.identifier
)
// remove dynamic partition filter from predicates
filterPredicates = CarbonToSparkAdapter.getDataFilter(partitionSet, allPredicates)
}
// remove dynamic partition filter from predicates
filterPredicates = CarbonToSparkAdapter.getDataFilter(partitionSet,
allPredicates, partitionsFilter)
val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
val projects = rawProjects.map {p =>
p.transform {
@@ -232,7 +233,6 @@ private[sql] object CarbonSourceStrategy extends SparkStrategy {
output,
partitionsFilter.filterNot(SubqueryExpression.hasSubquery),
handledPredicates,
readCommittedScope,
getCarbonProjection(relationPredicates, requiredColumns, projects),
pushedFilters,
directScanSupport,
@@ -173,7 +173,9 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {
}
}

def getDataFilter(partitionSet: AttributeSet, filter: Seq[Expression]): Seq[Expression] = {
def getDataFilter(partitionSet: AttributeSet,
filter: Seq[Expression],
partitionFilter: Seq[Expression]): Seq[Expression] = {
filter
}

@@ -207,7 +207,9 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {
}
}

def getDataFilter(partitionSet: AttributeSet, filter: Seq[Expression]): Seq[Expression] = {
def getDataFilter(partitionSet: AttributeSet,
filter: Seq[Expression],
partitionFilter: Seq[Expression]): Seq[Expression] = {
filter
}

@@ -180,9 +180,12 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {
}
}

def getDataFilter(partitionSet: AttributeSet, filter: Seq[Expression]): Seq[Expression] = {
def getDataFilter(partitionSet: AttributeSet, filter: Seq[Expression],
partitionFilter: Seq[Expression]): Seq[Expression] = {
filter.filter {
case _: DynamicPruningSubquery => false
case dp: DynamicPruningSubquery =>
// if filter does not exists in partition filter, then push down the filter to spark
!partitionFilter.exists(_.semanticEquals(dp))
case _ => true
}
}

0 comments on commit 0f1d2a4

Please sign in to comment.