Skip to content
Permalink
Browse files
[CARBONDATA-4271] Support DPP for carbon
Why is this PR needed?
This PR enables Dynamic Partition Pruning for carbon.

What changes were proposed in this PR?
CarbonDatasourceHadoopRelation has to extend HadoopFsRelation,
because spark has added a check to use DPP only for relation matching HadoopFsRelation
Apply Dynamic filter and get runtimePartitions and set this to CarbonScanRDD for pruning

This closes #4199
  • Loading branch information
Indhumathi27 authored and kunal642 committed Sep 1, 2021
1 parent ca659b5 commit bdc9484ac8455e8f53e86367c0e5104364799068
Show file tree
Hide file tree
Showing 17 changed files with 426 additions and 75 deletions.
@@ -20,6 +20,7 @@ package org.apache.spark.sql
import org.apache.spark.CarbonInputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression => SparkExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.{ColumnarBatchScan, DataSourceScanExec}
@@ -40,6 +41,8 @@ abstract class CarbonDataSourceScanHelper(relation: CarbonDatasourceHadoopRelat
pushedDownProjection: CarbonProjection,
directScanSupport: Boolean,
extraRDD: Option[(RDD[InternalRow], Boolean)],
selectedCatalogPartitions: Seq[CatalogTablePartition],
partitionFilterWithDpp: Seq[SparkExpression],
segmentIds: Option[String])
extends DataSourceScanExec with ColumnarBatchScan {

@@ -44,7 +44,7 @@ class CarbonDeltaRowScanRDD[T: ClassTag](
@transient private val spark: SparkSession,
@transient private val serializedTableInfo: Array[Byte],
@transient private val tableInfo: TableInfo,
@transient override val partitionNames: Seq[PartitionSpec],
@transient private val newPartitionNames: Seq[PartitionSpec],
override val columnProjection: CarbonProjection,
var filter: IndexFilter,
identifier: AbsoluteTableIdentifier,
@@ -62,7 +62,7 @@ class CarbonDeltaRowScanRDD[T: ClassTag](
serializedTableInfo,
tableInfo,
inputMetricsStats,
partitionNames,
newPartitionNames,
dataTypeConverterClz,
readSupportClz) {
override def internalGetPartitions: Array[Partition] = {
@@ -83,7 +83,7 @@ class CarbonScanRDD[T: ClassTag](
@transient private val serializedTableInfo: Array[Byte],
@transient private val tableInfo: TableInfo,
inputMetricsStats: InitInputMetrics,
@transient val partitionNames: Seq[PartitionSpec],
@transient var partitionNames: Seq[PartitionSpec],
val dataTypeConverterClz: Class[_ <: DataTypeConverter] = classOf[SparkDataTypeConverterImpl],
val readSupportClz: Class[_ <: CarbonReadSupport[_]] = SparkReadSupport.readSupportClass,
@transient var splits: java.util.List[InputSplit] = null,
@@ -17,21 +17,28 @@

package org.apache.spark.sql

import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, SparkCarbonTableFormat}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType

import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable

case class CarbonDatasourceHadoopRelation(
sparkSession: SparkSession,
paths: Array[String],
parameters: Map[String, String],
tableSchema: Option[StructType],
limit: Int = -1)
extends BaseRelation {
class CarbonDatasourceHadoopRelation(
override val sparkSession: SparkSession,
val paths: Array[String],
val parameters: Map[String, String],
val tableSchema: Option[StructType],
partitionSchema: StructType = new StructType())
extends HadoopFsRelation(null,
partitionSchema,
new StructType(),
None,
new SparkCarbonTableFormat,
Map.empty)(
sparkSession) {

val caseInsensitiveMap: Map[String, String] = parameters.map(f => (f._1.toLowerCase, f._2))
lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
@@ -47,15 +54,39 @@ case class CarbonDatasourceHadoopRelation(

@transient lazy val carbonTable: CarbonTable = carbonRelation.carbonTable

var limit: Int = -1

override def sqlContext: SQLContext = sparkSession.sqlContext

override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)
override val schema: StructType = tableSchema.getOrElse(carbonRelation.schema)

override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)

override def toString: String = {
"CarbonDatasourceHadoopRelation"
}

override def equals(other: Any): Boolean = {
other match {
case relation: CarbonDatasourceHadoopRelation =>
relation.carbonRelation == carbonRelation && (relation.paths sameElements this.paths) &&
relation.tableSchema == tableSchema && relation.parameters == this.parameters &&
relation.partitionSchema == this.partitionSchema
case _ => false
}
}

override def sizeInBytes: Long = carbonRelation.sizeInBytes

def getTableSchema: Option[StructType] = {
tableSchema
}

def getLimit: Int = {
limit
}

def setLimit(newLimit: Int): Unit = {
this.limit = newLimit
}
}
@@ -39,7 +39,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.spark.CarbonOption
@@ -63,15 +63,15 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
// Otherwise create datasource relation
val newParameters = CarbonScalaUtil.getDeserializedParameters(parameters)
newParameters.get("tablePath") match {
case Some(path) => CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
case Some(path) => new CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
Array(path),
newParameters,
None)
case _ =>
val options = new CarbonOption(newParameters)
val tablePath =
CarbonEnv.getTablePath(options.dbName, options.tableName)(sqlContext.sparkSession)
CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
new CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
Array(tablePath),
newParameters,
None)
@@ -136,9 +136,21 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
"Table creation failed. Table name cannot contain blank space")
}
val path = getPathForTable(sqlContext.sparkSession, dbName, tableName, newParameters)

CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), newParameters,
Option(dataSchema))
var carbonTable: CarbonTable = null
try {
carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sqlContext.sparkSession)
} catch {
case _: Exception =>
}
val partitionSchema: StructType =
if (null != carbonTable && carbonTable.isHivePartitionTable) {
StructType(carbonTable.getPartitionInfo.getColumnSchemaList.asScala.map(field =>
dataSchema.fields.find(_.name.equalsIgnoreCase(field.getColumnName))).map(_.get))
} else {
new StructType()
}
new CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), newParameters,
Option(dataSchema), partitionSchema)
}

/**
@@ -198,8 +198,6 @@ case class CarbonInsertIntoHadoopFsRelationCommand(

// refresh cached files in FileIndex
fileIndex.foreach(_.refresh())
// refresh data cache if table is cached
sparkSession.catalog.refreshByPath(outputPath.toString)

if (catalogTable.nonEmpty) {
CommandUtils.updateTableStats(sparkSession, catalogTable.get)
@@ -81,7 +81,7 @@ case class CarbonCreateTableAsSelectCommand(
tableName = carbonDataSourceHadoopRelation.carbonRelation.tableName,
options = scala.collection.immutable
.Map("fileheader" ->
carbonDataSourceHadoopRelation.tableSchema.get.fields.map(_.name).mkString(",")),
carbonDataSourceHadoopRelation.getTableSchema.get.fields.map(_.name).mkString(",")),
isOverwriteTable = false,
logicalPlan = query,
tableInfo = tableInfo)
@@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonDataSourceScanHelper}
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, SortOrder, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
@@ -41,17 +42,27 @@ import org.apache.carbondata.hadoop.CarbonProjection
case class CarbonDataSourceScan(
@transient relation: CarbonDatasourceHadoopRelation,
output: Seq[Attribute],
partitionFilters: Seq[SparkExpression],
partitionFiltersWithoutDpp: Seq[SparkExpression],
dataFilters: Seq[SparkExpression],
@transient readCommittedScope: ReadCommittedScope,
@transient pushedDownProjection: CarbonProjection,
@transient pushedDownFilters: Seq[Expression],
directScanSupport: Boolean,
@transient extraRDD: Option[(RDD[InternalRow], Boolean)] = None,
tableIdentifier: Option[TableIdentifier] = None,
@transient selectedCatalogPartitions : Seq[CatalogTablePartition] = Seq.empty,
@transient partitionFiltersWithDpp: Seq[SparkExpression],
segmentIds: Option[String] = None)
extends CarbonDataSourceScanHelper(relation, output, partitionFilters, pushedDownFilters,
pushedDownProjection, directScanSupport, extraRDD, segmentIds) {
extends CarbonDataSourceScanHelper(relation,
output,
partitionFiltersWithoutDpp,
pushedDownFilters,
pushedDownProjection,
directScanSupport,
extraRDD,
selectedCatalogPartitions,
partitionFiltersWithDpp,
segmentIds) {

override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
val info: BucketingInfo = relation.carbonTable.getBucketingInfo
@@ -89,7 +100,7 @@ case class CarbonDataSourceScan(
"DirectScan" -> (supportsBatchOrColumnar && directScanSupport).toString,
"PushedFilters" -> seqToString(pushedDownFilters.map(_.getStatement)))
if (relation.carbonTable.isHivePartitionTable) {
metadata + ("PartitionFilters" -> seqToString(partitionFilters)) +
metadata + ("PartitionFilters" -> seqToString(partitionFiltersWithDpp)) +
("PartitionCount" -> selectedPartitions.size.toString)
} else {
metadata
@@ -129,13 +140,16 @@ case class CarbonDataSourceScan(
CarbonDataSourceScan(
relation,
outputAttibutesAfterNormalizingExpressionIds,
QueryPlan.normalizePredicates(partitionFilters, output),
QueryPlan.normalizePredicates(partitionFiltersWithoutDpp, output),
QueryPlan.normalizePredicates(dataFilters, output),
null,
null,
null,
directScanSupport,
extraRDD,
tableIdentifier)
tableIdentifier,
selectedCatalogPartitions,
QueryPlan.normalizePredicates(partitionFiltersWithDpp, output)
)
}
}
@@ -23,7 +23,7 @@ import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, CustomDeterministi
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, NamedExpression, Rand}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.{ExecutedCommandExec, RunnableCommand}
@@ -45,7 +45,7 @@ object CarbonPlanHelper {
databaseNameOp = Some(insertInto.table.carbonRelation.databaseName),
tableName = insertInto.table.carbonRelation.tableName,
options = scala.collection.immutable
.Map("fileheader" -> insertInto.table.tableSchema.get.fields.map(_.name).mkString(",")),
.Map("fileheader" -> insertInto.table.getTableSchema.get.fields.map(_.name).mkString(",")),
isOverwriteTable = insertInto.overwrite,
logicalPlan = insertInto.child,
tableInfo = insertInto.table.carbonRelation.carbonTable.getTableInfo,
@@ -202,12 +202,20 @@ object CarbonPlanHelper {
p.transformAllExpressions {
case a@Alias(exp, _)
if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
CarbonToSparkAdapter.createAliasRef(
CustomDeterministicExpression(exp),
a.name,
a.exprId,
a.qualifier,
a.explicitMetadata)
if (SparkUtil.isSparkVersionXAndAbove("3")) {
// create custom deterministic expression for Rand function
a.transform {
case rand: Rand =>
CustomDeterministicExpression(rand)
}
} else {
CarbonToSparkAdapter.createAliasRef(
CustomDeterministicExpression(exp),
a.name,
a.exprId,
a.qualifier,
a.explicitMetadata)
}
case exp: NamedExpression
if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
makeDeterministicExp(exp)
@@ -220,12 +228,20 @@ object CarbonPlanHelper {
f.transformAllExpressions {
case a@Alias(exp, _)
if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
CarbonToSparkAdapter.createAliasRef(
CustomDeterministicExpression(exp),
a.name,
a.exprId,
a.qualifier,
a.explicitMetadata)
if (SparkUtil.isSparkVersionXAndAbove("3")) {
// create custom deterministic expression for Rand function
a.transform {
case rand: Rand =>
CustomDeterministicExpression(rand)
}
} else {
CarbonToSparkAdapter.createAliasRef(
CustomDeterministicExpression(exp),
a.name,
a.exprId,
a.qualifier,
a.explicitMetadata)
}
case exp: NamedExpression
if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
makeDeterministicExp(exp)

0 comments on commit bdc9484

Please sign in to comment.