From 826e229f98de4f45bfb603c0451aa47105f69d01 Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Fri, 24 Nov 2023 23:14:32 +0800 Subject: [PATCH 1/5] Datasource V2 data lake read support --- .github/workflows/velox_be.yml | 22 +-- .../backendsapi/clickhouse/CHBackend.scala | 2 + .../execution/FilterExecTransformer.scala | 6 +- docs/get-started/Velox.md | 17 ++ .../backendsapi/BackendSettingsApi.scala | 2 + .../execution/BaseDataSource.scala | 37 ++++ .../BasicPhysicalOperatorTransformer.scala | 66 +++---- .../execution/BasicScanExecTransformer.scala | 46 ++--- .../execution/BatchScanExecTransformer.scala | 6 +- .../execution/DatasourceScanTransformer.scala | 29 +++ .../FileSourceScanExecTransformer.scala | 14 +- .../execution/ScanTransformerFactory.scala | 117 ++++++++++++ .../execution/WholeStageTransformer.scala | 2 +- .../expression/ExpressionConverter.scala | 54 ++++-- .../extension/ColumnarOverrides.scala | 31 +--- .../columnar/TransformHintRule.scala | 27 ++- .../ColumnarCollapseTransformStages.scala | 4 +- .../hive/HiveTableScanExecTransformer.scala | 8 +- gluten-delta/pom.xml | 6 + gluten-iceberg/pom.xml | 168 ++++++++++++++++++ .../rel/IcebergLocalFilesBuilder.java | 37 ++++ .../substrait/rel/IcebergLocalFilesNode.java | 63 +++++++ .../execution/IcebergScanTransformer.scala | 71 ++++++++ .../source/GlutenIcebergSourceUtil.scala | 100 +++++++++++ .../execution/VeloxTPCHIcebergSuite.scala | 56 ++++++ pom.xml | 10 ++ .../execution/FileSourceScanExecShim.scala | 14 -- .../execution/FileSourceScanExecShim.scala | 14 -- .../execution/FileSourceScanExecShim.scala | 14 -- 29 files changed, 835 insertions(+), 208 deletions(-) create mode 100644 gluten-core/src/main/scala/io/glutenproject/execution/BaseDataSource.scala create mode 100644 gluten-core/src/main/scala/io/glutenproject/execution/DatasourceScanTransformer.scala create mode 100644 gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala create mode 100644 gluten-iceberg/pom.xml create mode 100644 gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java create mode 100644 gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java create mode 100644 gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala create mode 100644 gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala create mode 100644 gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala diff --git a/.github/workflows/velox_be.yml b/.github/workflows/velox_be.yml index 959b79d900b0..29377aa5ff2f 100644 --- a/.github/workflows/velox_be.yml +++ b/.github/workflows/velox_be.yml @@ -112,7 +112,7 @@ jobs: run: | docker exec ubuntu2004-test-slow-$GITHUB_RUN_ID bash -c ' cd /opt/gluten && \ - mvn clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss -DargLine="-Dspark.test.home=/opt/spark322" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest' + mvn clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss -Piceberg -DargLine="-Dspark.test.home=/opt/spark322" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest' - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 run: | docker exec ubuntu2004-test-slow-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \ @@ -148,7 +148,7 @@ jobs: - name: Build and Run unit test for Spark 3.3.1(slow tests) run: | docker exec ubuntu2004-test-spark33-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten && \ - mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest' + mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest' - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.3 run: | docker exec ubuntu2004-test-spark33-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten/tools/gluten-it && \ @@ -184,7 +184,7 @@ jobs: - name: Build and Run unit test for Spark 3.3.1(other tests) run: | docker exec ubuntu2004-test-spark33-$GITHUB_RUN_ID bash -c 'cd /opt/gluten && \ - mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \ + mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \ mvn test -Pspark-3.3 -Pbackends-velox -DtagsToExclude=None -DtagsToInclude=io.glutenproject.tags.UDFTest' - name: Exit docker container if: ${{ always() }} @@ -214,7 +214,7 @@ jobs: - name: Build and Run unit test for Spark 3.4.1(slow tests) run: | docker exec ubuntu2004-test-spark34-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten && \ - mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest' + mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest' - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.4 run: | docker exec ubuntu2004-test-spark34-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten/tools/gluten-it && \ @@ -250,7 +250,7 @@ jobs: - name: Build and Run unit test for Spark 3.4.1(other tests) run: | docker exec ubuntu2004-test-spark34-$GITHUB_RUN_ID bash -c 'cd /opt/gluten && \ - mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \ + mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \ mvn test -Pspark-3.4 -Pbackends-velox -DtagsToExclude=None -DtagsToInclude=io.glutenproject.tags.UDFTest' - name: Exit docker container if: ${{ always() }} @@ -280,7 +280,7 @@ jobs: run: | docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c ' cd /opt/gluten && \ - mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests' + mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests' - name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.2 run: | docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \ @@ -307,7 +307,7 @@ jobs: run: | docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c ' cd /opt/gluten && \ - mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -DskipTests' + mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -DskipTests' - name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.3 run: | docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \ @@ -320,7 +320,7 @@ jobs: run: | docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c ' cd /opt/gluten && \ - mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -DskipTests' + mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -DskipTests' - name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.4 run: | docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \ @@ -360,7 +360,7 @@ jobs: run: | docker exec centos8-test-$GITHUB_RUN_ID bash -c ' cd /opt/gluten && \ - mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests' + mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests' - name: TPC-H SF1.0 && TPC-DS SF30.0 Parquet local spark3.2 run: | docker exec centos8-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \ @@ -411,7 +411,7 @@ jobs: run: | docker exec centos7-test-$GITHUB_RUN_ID bash -c ' cd /opt/gluten && \ - mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests' + mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests' - name: TPC-H SF1.0 && TPC-DS SF30.0 Parquet local spark3.2 run: | docker exec centos7-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \ @@ -493,7 +493,7 @@ jobs: run: | docker exec static-build-test-$GITHUB_RUN_ID bash -c ' cd /opt/gluten && \ - mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests && \ + mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests && \ cd /opt/gluten/tools/gluten-it && \ mvn clean install -Pspark-3.2' - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 (centos 8) diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala index ad65c65e64e0..1f7dbb9cd905 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala @@ -217,4 +217,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging { override def needOutputSchemaForPlan(): Boolean = true override def allowDecimalArithmetic: Boolean = !SQLConf.get.decimalOperationsAllowPrecisionLoss + + override def requiredInputFilePaths(): Boolean = true } diff --git a/backends-velox/src/main/scala/io/glutenproject/execution/FilterExecTransformer.scala b/backends-velox/src/main/scala/io/glutenproject/execution/FilterExecTransformer.scala index 1c9dc1f9041f..2d23943fd55d 100644 --- a/backends-velox/src/main/scala/io/glutenproject/execution/FilterExecTransformer.scala +++ b/backends-velox/src/main/scala/io/glutenproject/execution/FilterExecTransformer.scala @@ -66,10 +66,8 @@ case class FilterExecTransformer(condition: Expression, child: SparkPlan) private def getLeftCondition: Expression = { val scanFilters = child match { // Get the filters including the manually pushed down ones. - case batchScanTransformer: BatchScanExecTransformer => - batchScanTransformer.filterExprs() - case fileScanTransformer: FileSourceScanExecTransformer => - fileScanTransformer.filterExprs() + case basicScanExecTransformer: BasicScanExecTransformer => + basicScanExecTransformer.filterExprs() // For fallback scan, we need to keep original filter. case _ => Seq.empty[Expression] diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md index acfec93fad37..16e8132c2643 100644 --- a/docs/get-started/Velox.md +++ b/docs/get-started/Velox.md @@ -260,6 +260,23 @@ After the two steps, you can query delta table by gluten/velox without scan's fa Gluten with velox backends also support the column mapping of delta tables. About column mapping, see more [here](https://docs.delta.io/latest/delta-column-mapping.html). +## Iceberg Support + +Gluten with velox backend supports [Iceberg](https://iceberg.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported. + +### How to use + +First of all, compile gluten-iceberg module by a `iceberg` profile, as follows: + +``` +mvn clean package -Pbackends-velox -Pspark-3.3 -Piceberg -DskipTests +``` + +Then, put the additional gluten-iceberg jar to the class path (usually it's `$SPARK_HOME/jars`). +The gluten-iceberg jar is in `gluten-iceberg/target` directory. + +After the two steps, you can query iceberg table by gluten/velox without scan's fallback. + # Coverage Spark3.3 has 387 functions in total. ~240 are commonly used. Velox's functions have two category, Presto and Spark. Presto has 124 functions implemented. Spark has 62 functions. Spark functions are verified to have the same result as Vanilla Spark. Some Presto functions have the same result as Vanilla Spark but some others have different. Gluten prefer to use Spark functions firstly. If it's not in Spark's list but implemented in Presto, we currently offload to Presto one until we noted some result mismatch, then we need to reimplement the function in Spark category. Gluten currently offloads 94 functions and 14 operators, more details refer to [Velox Backend's Supported Operators & Functions](../velox-backend-support-progress.md). diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala index fcd1bbfe8453..a6443060a4ed 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala @@ -109,4 +109,6 @@ trait BackendSettingsApi { def requiredChildOrderingForWindow(): Boolean = false def staticPartitionWriteOnly(): Boolean = false + + def requiredInputFilePaths(): Boolean = false } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BaseDataSource.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BaseDataSource.scala new file mode 100644 index 000000000000..10dc23dd0ff1 --- /dev/null +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BaseDataSource.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import io.glutenproject.substrait.SupportFormat + +import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.types.StructType + +trait BaseDataSource extends SupportFormat { + + /** Returns the actual schema of this data source scan. */ + def getDataSchema: StructType + + /** Returns the required partition schema, used to generate partition column. */ + def getPartitionSchema: StructType + + /** Returns the partitions generated by this data source scan. */ + def getPartitions: Seq[InputPartition] + + /** Returns the input file paths, used to validate the partition column path */ + def getInputFilePathsInternal: Seq[String] +} diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala index a5cef1a21921..3524c020fed3 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala @@ -21,7 +21,6 @@ import io.glutenproject.expression.{ConverterUtils, ExpressionConverter, Express import io.glutenproject.extension.{GlutenPlan, ValidationResult} import io.glutenproject.extension.columnar.TransformHints import io.glutenproject.metrics.MetricsUpdater -import io.glutenproject.sql.shims.SparkShimLoader import io.glutenproject.substrait.`type`.TypeBuilder import io.glutenproject.substrait.SubstraitContext import io.glutenproject.substrait.extensions.ExtensionBuilder @@ -416,52 +415,29 @@ object FilterHandler { // Separate and compare the filter conditions in Scan and Filter. // Push down the left conditions in Filter into Scan. - def applyFilterPushdownToScan(plan: FilterExec, reuseSubquery: Boolean): SparkPlan = - plan.child match { + def applyFilterPushdownToScan(filter: FilterExec, reuseSubquery: Boolean): SparkPlan = + filter.child match { case fileSourceScan: FileSourceScanExec => - val leftFilters = - getLeftFilters(fileSourceScan.dataFilters, flattenCondition(plan.condition)) - // transform BroadcastExchangeExec to ColumnarBroadcastExchangeExec in partitionFilters - val newPartitionFilters = - ExpressionConverter.transformDynamicPruningExpr( - fileSourceScan.partitionFilters, - reuseSubquery) - new FileSourceScanExecTransformer( - fileSourceScan.relation, - fileSourceScan.output, - fileSourceScan.requiredSchema, - newPartitionFilters, - fileSourceScan.optionalBucketSet, - fileSourceScan.optionalNumCoalescedBuckets, - fileSourceScan.dataFilters ++ leftFilters, - fileSourceScan.tableIdentifier, - fileSourceScan.disableBucketedScan - ) + ScanTransformerFactory.createFileSourceScanTransformer( + fileSourceScan, + reuseSubquery, + filter) case batchScan: BatchScanExec => - batchScan.scan match { - case scan: FileScan => - val leftFilters = - getLeftFilters(scan.dataFilters, flattenCondition(plan.condition)) - val newPartitionFilters = - ExpressionConverter.transformDynamicPruningExpr(scan.partitionFilters, reuseSubquery) - new BatchScanExecTransformer( - batchScan.output, - scan, - leftFilters ++ newPartitionFilters, - table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScan)) - case _ => - if (batchScan.runtimeFilters.isEmpty) { - throw new UnsupportedOperationException( - s"${batchScan.scan.getClass.toString} is not supported.") - } else { - // IF filter expressions aren't empty, we need to transform the inner operators. - val newSource = batchScan.copy(runtimeFilters = ExpressionConverter - .transformDynamicPruningExpr(batchScan.runtimeFilters, reuseSubquery)) - TransformHints.tagNotTransformable( - newSource, - "The scan in BatchScanExec is not a FileScan") - newSource - } + if (ScanTransformerFactory.supportedBatchScan(batchScan.scan)) { + ScanTransformerFactory.createBatchScanTransformer(batchScan, reuseSubquery) + } else { + if (batchScan.runtimeFilters.isEmpty) { + throw new UnsupportedOperationException( + s"${batchScan.scan.getClass.toString} is not supported.") + } else { + // IF filter expressions aren't empty, we need to transform the inner operators. + val newSource = batchScan.copy(runtimeFilters = ExpressionConverter + .transformDynamicPruningExpr(batchScan.runtimeFilters, reuseSubquery)) + TransformHints.tagNotTransformable( + newSource, + "The scan in BatchScanExec is not a FileScan") + newSource + } } case other => throw new UnsupportedOperationException(s"${other.getClass.toString} is not supported.") diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala index 7bb32df6f7cf..7e2d23d09506 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala @@ -20,43 +20,41 @@ import io.glutenproject.backendsapi.BackendsApiManager import io.glutenproject.expression.{ConverterUtils, ExpressionConverter} import io.glutenproject.extension.ValidationResult import io.glutenproject.substrait.`type`.ColumnTypeNode -import io.glutenproject.substrait.{SubstraitContext, SupportFormat} +import io.glutenproject.substrait.SubstraitContext import io.glutenproject.substrait.plan.PlanBuilder import io.glutenproject.substrait.rel.{ReadRelNode, RelBuilder, SplitInfo} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.connector.read.InputPartition -import org.apache.spark.sql.execution.InSubqueryExec -import org.apache.spark.sql.types._ +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression} import org.apache.spark.sql.vectorized.ColumnarBatch import com.google.common.collect.Lists import scala.collection.JavaConverters._ -trait BasicScanExecTransformer extends LeafTransformSupport with SupportFormat { - - // The key of merge schema option in Parquet reader. - protected val mergeSchemaOptionKey = "mergeschema" +trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource { + /** Returns the filters that can be pushed down to native file scan */ def filterExprs(): Seq[Expression] def outputAttributes(): Seq[Attribute] - def getPartitions: Seq[InputPartition] - - def getPartitionSchemas: StructType - - def getDataSchemas: StructType - // TODO: Remove this expensive call when CH support scan custom partition location. - def getInputFilePaths: Seq[String] + def getInputFilePaths: Seq[String] = { + // This is a heavy operation, and only the required backend executes the corresponding logic. + if (BackendsApiManager.getSettings.requiredInputFilePaths()) { + getInputFilePathsInternal + } else { + Seq.empty + } + } - def getSplitInfos: Seq[SplitInfo] = + /** Returns the split infos that will be processed by the underlying native engine. */ + def getSplitInfos: Seq[SplitInfo] = { getPartitions.map( BackendsApiManager.getIteratorApiInstance - .genSplitInfo(_, getPartitionSchemas, fileFormat)) + .genSplitInfo(_, getPartitionSchema, fileFormat)) + } def doExecuteColumnarInternal(): RDD[ColumnarBatch] = { val numOutputRows = longMetric("outputRows") @@ -85,13 +83,12 @@ trait BasicScanExecTransformer extends LeafTransformSupport with SupportFormat { .supportFileFormatRead( fileFormat, schema.fields, - getPartitionSchemas.nonEmpty, + getPartitionSchema.nonEmpty, getInputFilePaths) ) { return ValidationResult.notOk( s"Not supported file format or complex type for scan: $fileFormat") } - val substraitContext = new SubstraitContext val relNode = doTransform(substraitContext).root @@ -102,10 +99,9 @@ trait BasicScanExecTransformer extends LeafTransformSupport with SupportFormat { val output = outputAttributes() val typeNodes = ConverterUtils.collectAttributeTypeNodes(output) val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output) - val partitionSchemas = getPartitionSchemas val columnTypeNodes = output.map { attr => - if (partitionSchemas.exists(_.name.equals(attr.name))) { + if (getPartitionSchema.exists(_.name.equals(attr.name))) { new ColumnTypeNode(1) } else { new ColumnTypeNode(0) @@ -125,11 +121,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with SupportFormat { exprNode, context, context.nextOperatorId(this.nodeName)) - relNode.asInstanceOf[ReadRelNode].setDataSchema(getDataSchemas) + relNode.asInstanceOf[ReadRelNode].setDataSchema(getDataSchema) TransformContext(output, output, relNode) } - - def executeInSubqueryForDynamicPruningExpression(inSubquery: InSubqueryExec): Unit = { - if (inSubquery.values().isEmpty) inSubquery.updateResult() - } } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala index 1d4551afa4b2..051ba9779084 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala @@ -68,17 +68,17 @@ class BatchScanExecTransformer( override def getPartitions: Seq[InputPartition] = filteredFlattenPartitions - override def getPartitionSchemas: StructType = scan match { + override def getPartitionSchema: StructType = scan match { case fileScan: FileScan => fileScan.readPartitionSchema case _ => new StructType() } - override def getDataSchemas: StructType = scan match { + override def getDataSchema: StructType = scan match { case fileScan: FileScan => fileScan.readDataSchema case _ => new StructType() } - override def getInputFilePaths: Seq[String] = { + override def getInputFilePathsInternal: Seq[String] = { scan match { case fileScan: FileScan => fileScan.fileIndex.inputFiles.toSeq case _ => Seq.empty diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/DatasourceScanTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/DatasourceScanTransformer.scala new file mode 100644 index 000000000000..e4c73436afb7 --- /dev/null +++ b/gluten-core/src/main/scala/io/glutenproject/execution/DatasourceScanTransformer.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.sources.BaseRelation + +trait DatasourceScanTransformer extends BasicScanExecTransformer { + + /** The file-based relation to scan. */ + val relation: BaseRelation + + /** Identifier for the table in the metastore. */ + val tableIdentifier: Option[TableIdentifier] +} diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala index 822f98c47986..80e5e7e6478b 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala @@ -38,14 +38,14 @@ import org.apache.spark.util.collection.BitSet import scala.collection.JavaConverters class FileSourceScanExecTransformer( - @transient relation: HadoopFsRelation, + @transient override val relation: HadoopFsRelation, output: Seq[Attribute], requiredSchema: StructType, partitionFilters: Seq[Expression], optionalBucketSet: Option[BitSet], optionalNumCoalescedBuckets: Option[Int], dataFilters: Seq[Expression], - tableIdentifier: Option[TableIdentifier], + override val tableIdentifier: Option[TableIdentifier], disableBucketedScan: Boolean = false) extends FileSourceScanExecShim( relation, @@ -57,7 +57,7 @@ class FileSourceScanExecTransformer( dataFilters, tableIdentifier, disableBucketedScan) - with BasicScanExecTransformer { + with DatasourceScanTransformer { // Note: "metrics" is made transient to avoid sending driver-side metrics to tasks. @transient override lazy val metrics: Map[String, SQLMetric] = @@ -89,11 +89,11 @@ class FileSourceScanExecTransformer( optionalNumCoalescedBuckets, disableBucketedScan) - override def getPartitionSchemas: StructType = relation.partitionSchema + override def getPartitionSchema: StructType = relation.partitionSchema - override def getDataSchemas: StructType = relation.dataSchema + override def getDataSchema: StructType = relation.dataSchema - override def getInputFilePaths: Seq[String] = { + override def getInputFilePathsInternal: Seq[String] = { relation.location.inputFiles.toSeq } @@ -150,7 +150,7 @@ class FileSourceScanExecTransformer( } val readRelNode = transformCtx.root.asInstanceOf[ReadRelNode] - readRelNode.setDataSchema(getDataSchemas) + readRelNode.setDataSchema(getDataSchema) readRelNode.setProperties(JavaConverters.mapAsJavaMap(options)) } transformCtx diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala b/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala new file mode 100644 index 000000000000..9b21c8e7e096 --- /dev/null +++ b/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import io.glutenproject.execution.FilterHandler.{flattenCondition, getLeftFilters} +import io.glutenproject.expression.ExpressionConverter +import io.glutenproject.sql.shims.SparkShimLoader + +import org.apache.spark.sql.connector.read.Scan +import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec} +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} + +import scala.reflect.runtime.{universe => ru} + +object ScanTransformerFactory { + + private val IcebergScanClassName = "org.apache.iceberg.spark.source.SparkBatchQueryScan" + private val IcebergTransformerClassName = "io.glutenproject.execution.IcebergScanTransformer" + + def createFileSourceScanTransformer( + scanExec: FileSourceScanExec, + reuseSubquery: Boolean, + validation: Boolean = false): FileSourceScanExecTransformer = { + // TODO: Add delta match here + val newPartitionFilters = if (validation) { + scanExec.partitionFilters + } else { + ExpressionConverter.transformDynamicPruningExpr(scanExec.partitionFilters, reuseSubquery) + } + new FileSourceScanExecTransformer( + scanExec.relation, + scanExec.output, + scanExec.requiredSchema, + newPartitionFilters, + scanExec.optionalBucketSet, + scanExec.optionalNumCoalescedBuckets, + scanExec.dataFilters, + scanExec.tableIdentifier, + scanExec.disableBucketedScan + ) + } + + def createFileSourceScanTransformer( + scanExec: FileSourceScanExec, + reuseSubquery: Boolean, + filter: FilterExec): FileSourceScanExecTransformer = { + val leftFilters = + getLeftFilters(scanExec.dataFilters, flattenCondition(filter.condition)) + // transform BroadcastExchangeExec to ColumnarBroadcastExchangeExec in partitionFilters + val newPartitionFilters = + ExpressionConverter.transformDynamicPruningExpr(scanExec.partitionFilters, reuseSubquery) + new FileSourceScanExecTransformer( + scanExec.relation, + scanExec.output, + scanExec.requiredSchema, + newPartitionFilters, + scanExec.optionalBucketSet, + scanExec.optionalNumCoalescedBuckets, + scanExec.dataFilters ++ leftFilters, + scanExec.tableIdentifier, + scanExec.disableBucketedScan + ) + } + + def createBatchScanTransformer( + batchScanExec: BatchScanExec, + reuseSubquery: Boolean, + validation: Boolean = false): BatchScanExecTransformer = { + val newPartitionFilters = if (validation) { + batchScanExec.runtimeFilters + } else { + ExpressionConverter.transformDynamicPruningExpr(batchScanExec.runtimeFilters, reuseSubquery) + } + val scan = batchScanExec.scan + scan match { + case _ if scan.getClass.getName == IcebergScanClassName => + createBatchScanTransformer(IcebergTransformerClassName, batchScanExec, newPartitionFilters) + case _ => + new BatchScanExecTransformer( + batchScanExec.output, + batchScanExec.scan, + newPartitionFilters, + table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec)) + } + } + + def supportedBatchScan(scan: Scan): Boolean = scan match { + case _: FileScan => true + case _ if scan.getClass.getName == IcebergScanClassName => true + case _ => false + } + + private def createBatchScanTransformer( + className: String, + params: Any*): BatchScanExecTransformer = { + val classMirror = ru.runtimeMirror(getClass.getClassLoader) + val classModule = classMirror.staticModule(className) + val mirror = classMirror.reflectModule(classModule) + val apply = mirror.symbol.typeSignature.member(ru.TermName("apply")).asMethod + val objMirror = classMirror.reflect(mirror.instance) + objMirror.reflectMethod(apply)(params: _*).asInstanceOf[BatchScanExecTransformer] + } +} diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala index 9dd6b2e0ed1e..a73527d2f313 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala @@ -226,7 +226,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f context } - /** Find all BasicScanExecTransformers in one WholeStageTransformer */ + /** Find all BasicScanExecTransformer in one WholeStageTransformer */ private def findAllScanTransformers(): Seq[BasicScanExecTransformer] = { val basicScanExecTransformers = new mutable.ListBuffer[BasicScanExecTransformer]() diff --git a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala index 7fbc34e973bc..4a9e48c6aea1 100644 --- a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala +++ b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.expressions.{BinaryArithmetic, _} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero -import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.{ScalarSubquery, _} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec import org.apache.spark.sql.hive.HiveSimpleUDFTransformer @@ -455,7 +455,9 @@ object ExpressionConverter extends SQLConfHelper with Logging { * Transform BroadcastExchangeExec to ColumnarBroadcastExchangeExec in DynamicPruningExpression. * * @param partitionFilters + * The partition filter of Scan * @return + * Transformed partition filter */ def transformDynamicPruningExpr( partitionFilters: Seq[Expression], @@ -468,15 +470,13 @@ object ExpressionConverter extends SQLConfHelper with Logging { case c2r: ColumnarToRowExecBase => c2r.child // in fallback case case plan: UnaryExecNode if !plan.isInstanceOf[GlutenPlan] => - if (plan.child.isInstanceOf[ColumnarToRowExec]) { - val wholeStageTransformer = exchange.find(_.isInstanceOf[WholeStageTransformer]) - if (wholeStageTransformer.nonEmpty) { - wholeStageTransformer.get - } else { + plan.child match { + case _: ColumnarToRowExec => + val wholeStageTransformer = exchange.find(_.isInstanceOf[WholeStageTransformer]) + wholeStageTransformer.getOrElse( + BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(plan)) + case _ => BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(plan) - } - } else { - BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(plan) } } ColumnarBroadcastExchangeExec(exchange.mode, newChild) @@ -486,7 +486,7 @@ object ExpressionConverter extends SQLConfHelper with Logging { // Disable ColumnarSubqueryBroadcast for scan-only execution. partitionFilters } else { - partitionFilters.map { + val newPartitionFilters = partitionFilters.map { case dynamicPruning: DynamicPruningExpression => dynamicPruning.transform { // Lookup inside subqueries for duplicate exchanges. @@ -516,13 +516,12 @@ object ExpressionConverter extends SQLConfHelper with Logging { // On the other hand, it needs to use // the AdaptiveSparkPlanExec.AdaptiveExecutionContext to hold the reused map // for each query. - if (newIn.child.isInstanceOf[AdaptiveSparkPlanExec] && reuseSubquery) { - // When AQE is on and reuseSubquery is on. - newIn.child - .asInstanceOf[AdaptiveSparkPlanExec] - .context - .subqueryCache - .update(newIn.canonicalized, transformSubqueryBroadcast) + newIn.child match { + case a: AdaptiveSparkPlanExec if reuseSubquery => + // When AQE is on and reuseSubquery is on. + a.context.subqueryCache + .update(newIn.canonicalized, transformSubqueryBroadcast) + case _ => } in.copy(plan = transformSubqueryBroadcast.asInstanceOf[BaseSubqueryExec]) case r: ReusedSubqueryExec if r.child.isInstanceOf[SubqueryBroadcastExec] => @@ -545,7 +544,7 @@ object ExpressionConverter extends SQLConfHelper with Logging { logWarning(errMsg) throw new UnsupportedOperationException(errMsg) } - case other => + case _ => val errMsg = "Can not get the reused ColumnarSubqueryBroadcastExec" + "by the ${newIn.canonicalized}" logWarning(errMsg) @@ -556,6 +555,25 @@ object ExpressionConverter extends SQLConfHelper with Logging { } case e: Expression => e } + updateSubqueryResult(newPartitionFilters) + newPartitionFilters + } + } + + private def updateSubqueryResult(partitionFilters: Seq[Expression]): Unit = { + // When it includes some DynamicPruningExpression, + // it needs to execute InSubqueryExec first, + // because doTransform path can't execute 'doExecuteColumnar' which will + // execute prepare subquery first. + partitionFilters.foreach { + case DynamicPruningExpression(inSubquery: InSubqueryExec) => + if (inSubquery.values().isEmpty) inSubquery.updateResult() + case e: Expression => + e.foreach { + case s: ScalarSubquery => s.updateResult() + case _ => + } + case _ => } } } diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index f6c2b2ac90a9..8c9248695bb8 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -22,7 +22,6 @@ import io.glutenproject.execution._ import io.glutenproject.expression.ExpressionConverter import io.glutenproject.extension.columnar._ import io.glutenproject.metrics.GlutenTimeMetric -import io.glutenproject.sql.shims.SparkShimLoader import io.glutenproject.utils.{LogLevelUtil, PhysicalPlanSelector} import org.apache.spark.api.python.EvalPythonExecTransformer @@ -533,41 +532,19 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean) */ def applyScanTransformer(plan: SparkPlan): SparkPlan = plan match { case plan: FileSourceScanExec => - val newPartitionFilters = - ExpressionConverter.transformDynamicPruningExpr(plan.partitionFilters, reuseSubquery) - val transformer = new FileSourceScanExecTransformer( - plan.relation, - plan.output, - plan.requiredSchema, - newPartitionFilters, - plan.optionalBucketSet, - plan.optionalNumCoalescedBuckets, - plan.dataFilters, - plan.tableIdentifier, - plan.disableBucketedScan - ) + val transformer = ScanTransformerFactory.createFileSourceScanTransformer(plan, reuseSubquery) val validationResult = transformer.doValidate() if (validationResult.isValid) { logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") transformer } else { logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") - val newSource = plan.copy(partitionFilters = newPartitionFilters) + val newSource = plan.copy(partitionFilters = transformer.partitionFilters) TransformHints.tagNotTransformable(newSource, validationResult.reason.get) newSource } case plan: BatchScanExec => - val newPartitionFilters: Seq[Expression] = plan.scan match { - case scan: FileScan => - ExpressionConverter.transformDynamicPruningExpr(scan.partitionFilters, reuseSubquery) - case _ => - ExpressionConverter.transformDynamicPruningExpr(plan.runtimeFilters, reuseSubquery) - } - val transformer = new BatchScanExecTransformer( - plan.output, - plan.scan, - newPartitionFilters, - table = SparkShimLoader.getSparkShims.getBatchScanExecTable(plan)) + val transformer = ScanTransformerFactory.createBatchScanTransformer(plan, reuseSubquery) val validationResult = transformer.doValidate() if (validationResult.isValid) { @@ -575,7 +552,7 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean) transformer } else { logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") - val newSource = plan.copy(runtimeFilters = newPartitionFilters) + val newSource = plan.copy(runtimeFilters = transformer.runtimeFilters) TransformHints.tagNotTransformable(newSource, validationResult.reason.get) newSource } diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala b/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala index 1431af2dda34..13fa420ef70e 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala @@ -20,7 +20,6 @@ import io.glutenproject.GlutenConfig import io.glutenproject.backendsapi.BackendsApiManager import io.glutenproject.execution._ import io.glutenproject.extension.{GlutenPlan, ValidationResult} -import io.glutenproject.sql.shims.SparkShimLoader import io.glutenproject.utils.PhysicalPlanSelector import org.apache.spark.api.python.EvalPythonExecTransformer @@ -336,11 +335,11 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { if (plan.runtimeFilters.nonEmpty) { TransformHints.tagTransformable(plan) } else { - val transformer = new BatchScanExecTransformer( - plan.output, - plan.scan, - plan.runtimeFilters, - table = SparkShimLoader.getSparkShims.getBatchScanExecTable(plan)) + val transformer = + ScanTransformerFactory.createBatchScanTransformer( + plan, + reuseSubquery = false, + validation = true) TransformHints.tag(plan, transformer.doValidate().toTransformHint) } } @@ -354,17 +353,11 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { if (plan.partitionFilters.nonEmpty) { TransformHints.tagTransformable(plan) } else { - val transformer = new FileSourceScanExecTransformer( - plan.relation, - plan.output, - plan.requiredSchema, - plan.partitionFilters, - plan.optionalBucketSet, - plan.optionalNumCoalescedBuckets, - plan.dataFilters, - plan.tableIdentifier, - plan.disableBucketedScan - ) + val transformer = + ScanTransformerFactory.createFileSourceScanTransformer( + plan, + reuseSubquery = false, + validation = true) TransformHints.tag(plan, transformer.doValidate().toTransformHint) } } diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala index a5be60241fcf..9878043e0a34 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala @@ -118,13 +118,13 @@ case class ColumnarCollapseTransformStages( * When it's the ClickHouse backend, BasicScanExecTransformer will not be included in * WholeStageTransformer. */ - private def isSeparateBasicScanExecTransformer(plan: SparkPlan): Boolean = plan match { + private def isSeparateBaseScanExecTransformer(plan: SparkPlan): Boolean = plan match { case _: BasicScanExecTransformer if separateScanRDD => true case _ => false } private def supportTransform(plan: SparkPlan): Boolean = plan match { - case plan: TransformSupport if !isSeparateBasicScanExecTransformer(plan) => true + case plan: TransformSupport if !isSeparateBaseScanExecTransformer(plan) => true case _ => false } diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala index bf8af495af97..9f5e07e2940d 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala @@ -70,11 +70,11 @@ class HiveTableScanExecTransformer( override def getPartitions: Seq[InputPartition] = partitions - override def getPartitionSchemas: StructType = relation.tableMeta.partitionSchema + override def getPartitionSchema: StructType = relation.tableMeta.partitionSchema - override def getDataSchemas: StructType = relation.tableMeta.dataSchema + override def getDataSchema: StructType = relation.tableMeta.dataSchema - override def getInputFilePaths: Seq[String] = { + override def getInputFilePathsInternal: Seq[String] = { // FIXME how does a hive table expose file paths? Seq.empty } @@ -172,7 +172,7 @@ class HiveTableScanExecTransformer( case (_, _) => } val readRelNode = transformCtx.root.asInstanceOf[ReadRelNode] - readRelNode.setDataSchema(getDataSchemas) + readRelNode.setDataSchema(getDataSchema) readRelNode.setProperties(JavaConverters.mapAsJavaMap(options)) } transformCtx diff --git a/gluten-delta/pom.xml b/gluten-delta/pom.xml index 76e64e9e3b4c..e2b47f8e1792 100755 --- a/gluten-delta/pom.xml +++ b/gluten-delta/pom.xml @@ -93,6 +93,12 @@ ${hadoop.version} test + + com.google.protobuf + protobuf-java + ${protobuf.version} + test + org.scalatest scalatest_${scala.binary.version} diff --git a/gluten-iceberg/pom.xml b/gluten-iceberg/pom.xml new file mode 100644 index 000000000000..8cae66d7ead2 --- /dev/null +++ b/gluten-iceberg/pom.xml @@ -0,0 +1,168 @@ + + + + gluten-parent + io.glutenproject + 1.1.0-SNAPSHOT + ../pom.xml + + 4.0.0 + + gluten-iceberg + jar + Gluten Iceberg + + + ${project.basedir}/src/main/resources + + + + + io.glutenproject + gluten-core + ${project.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + provided + + + org.apache.iceberg + iceberg-spark-${sparkbundle.version}_${scala.binary.version} + ${iceberg.version} + provided + + + + + io.glutenproject + gluten-core + ${project.version} + test-jar + test + + + io.glutenproject + backends-velox + ${project.version} + test + + + io.glutenproject + backends-velox + ${project.version} + test-jar + test + + + org.apache.spark + spark-core_${scala.binary.version} + + + org.apache.spark + spark-core_${scala.binary.version} + test-jar + + + org.apache.spark + spark-sql_${scala.binary.version} + test-jar + + + org.apache.spark + spark-catalyst_${scala.binary.version} + test-jar + + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + test + + + org.apache.iceberg + iceberg-spark-runtime-${sparkbundle.version}_${scala.binary.version} + ${iceberg.version} + test + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + test + + + com.google.protobuf + protobuf-java + ${protobuf.version} + test + + + org.scalatest + scalatest_${scala.binary.version} + test + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + ${resource.dir} + + + + + net.alchim31.maven + scala-maven-plugin + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.scalastyle + scalastyle-maven-plugin + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + com.diffplug.spotless + spotless-maven-plugin + + + org.scalatest + scalatest-maven-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + prepare-test-jar + test-compile + + test-jar + + + + + + org.apache.maven.plugins + maven-resources-plugin + + + + diff --git a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java new file mode 100644 index 000000000000..3452836cfd83 --- /dev/null +++ b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.substrait.rel; + +import java.util.List; +import java.util.Map; + +public class IcebergLocalFilesBuilder { + + // TODO: Add makeIcebergLocalFiles for MOR iceberg table + + public static IcebergLocalFilesNode makeIcebergLocalFiles( + Integer index, + List paths, + List starts, + List lengths, + List> partitionColumns, + LocalFilesNode.ReadFileFormat fileFormat, + List preferredLocations) { + return new IcebergLocalFilesNode( + index, paths, starts, lengths, partitionColumns, fileFormat, preferredLocations); + } +} diff --git a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java new file mode 100644 index 000000000000..c763a46b1915 --- /dev/null +++ b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.substrait.rel; + +import java.util.List; +import java.util.Map; + +public class IcebergLocalFilesNode extends LocalFilesNode { + + class DeleteFile { + private final String path; + private final Integer fileContent; + private final ReadFileFormat fileFormat; + private final Long fileSize; + private final Long recordCount; + private final Map lowerBounds; + private final Map upperBounds; + + DeleteFile( + String path, + Integer fileContent, + ReadFileFormat fileFormat, + Long fileSize, + Long recordCount, + Map lowerBounds, + Map upperBounds) { + this.path = path; + this.fileContent = fileContent; + this.fileFormat = fileFormat; + this.fileSize = fileSize; + this.recordCount = recordCount; + this.lowerBounds = lowerBounds; + this.upperBounds = upperBounds; + } + } + + // TODO: Add delete file support for MOR iceberg table + + IcebergLocalFilesNode( + Integer index, + List paths, + List starts, + List lengths, + List> partitionColumns, + ReadFileFormat fileFormat, + List preferredLocations) { + super(index, paths, starts, lengths, partitionColumns, fileFormat, preferredLocations); + } +} diff --git a/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala b/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala new file mode 100644 index 000000000000..ee4835ddc20e --- /dev/null +++ b/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import io.glutenproject.sql.shims.SparkShimLoader +import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat +import io.glutenproject.substrait.rel.SplitInfo + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.read.Scan +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.ColumnarBatch + +import org.apache.iceberg.spark.source.GlutenIcebergSourceUtil + +class IcebergScanTransformer( + output: Seq[AttributeReference], + @transient scan: Scan, + runtimeFilters: Seq[Expression], + @transient table: Table) + extends BatchScanExecTransformer( + output = output, + scan = scan, + runtimeFilters = runtimeFilters, + table = table) { + + override def filterExprs(): Seq[Expression] = Seq.empty + + override def getPartitionSchema: StructType = new StructType() + + override def getDataSchema: StructType = new StructType() + + override def getInputFilePathsInternal: Seq[String] = Seq.empty + + override lazy val fileFormat: ReadFileFormat = GlutenIcebergSourceUtil.getFileFormat(scan) + + override def doExecuteColumnar(): RDD[ColumnarBatch] = throw new UnsupportedOperationException() + + override def getSplitInfos: Seq[SplitInfo] = { + getPartitions.zipWithIndex.map { + case (p, index) => GlutenIcebergSourceUtil.genSplitInfo(p, index) + } + } +} + +object IcebergScanTransformer { + def apply(batchScan: BatchScanExec, partitionFilters: Seq[Expression]): IcebergScanTransformer = { + new IcebergScanTransformer( + batchScan.output, + batchScan.scan, + partitionFilters, + table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScan)) + } +} diff --git a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala new file mode 100644 index 000000000000..5ce4d837c791 --- /dev/null +++ b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark.source + +import io.glutenproject.substrait.rel.{IcebergLocalFilesBuilder, SplitInfo} +import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat + +import org.apache.spark.softaffinity.SoftAffinityUtil +import org.apache.spark.sql.connector.read.{InputPartition, Scan} + +import org.apache.iceberg.{FileFormat, FileScanTask, ScanTask} + +import java.lang.{Long => JLong} +import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} + +import scala.collection.JavaConverters._ + +object GlutenIcebergSourceUtil { + def genSplitInfo(inputPartition: InputPartition, index: Int): SplitInfo = inputPartition match { + case partition: SparkInputPartition => + val paths = new JArrayList[String]() + val starts = new JArrayList[JLong]() + val lengths = new JArrayList[JLong]() + val partitionColumns = new JArrayList[JMap[String, String]]() + var fileFormat = ReadFileFormat.UnknownFormat + + val tasks = partition.taskGroup[ScanTask]().tasks().asScala + if (tasks.forall(_.isInstanceOf[FileScanTask])) { + tasks.map(_.asInstanceOf[FileScanTask]).foreach { + task => + paths.add(task.file().path().toString) + starts.add(task.start()) + lengths.add(task.length()) + partitionColumns.add(new JHashMap[String, String]()) + val currentFileFormat = task.file().format() match { + case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat + case FileFormat.ORC => ReadFileFormat.OrcReadFormat + case _ => + throw new UnsupportedOperationException( + "Iceberg Only support parquet and orc file format.") + } + if (fileFormat == ReadFileFormat.UnknownFormat) { + fileFormat = currentFileFormat + } else if (fileFormat != currentFileFormat) { + throw new UnsupportedOperationException( + s"Only one file format is supported, " + + s"find different file format $fileFormat and $currentFileFormat") + } + } + val preferredLoc = SoftAffinityUtil.getFilePartitionLocations( + paths.asScala.toArray, + inputPartition.preferredLocations()) + IcebergLocalFilesBuilder.makeIcebergLocalFiles( + index, + paths, + starts, + lengths, + partitionColumns, + fileFormat, + preferredLoc.toList.asJava + ) + } else { + throw new UnsupportedOperationException("Only support iceberg FileScanTask.") + } + case _ => + throw new UnsupportedOperationException("Only support iceberg SparkInputPartition.") + } + + def getFileFormat(sparkScan: Scan): ReadFileFormat = sparkScan match { + case scan: SparkBatchQueryScan => + val tasks = scan.tasks().asScala + tasks.map(_.asCombinedScanTask()).foreach { + task => + val file = task.files().asScala.head.file() + file.format() match { + case FileFormat.PARQUET => return ReadFileFormat.ParquetReadFormat + case FileFormat.ORC => return ReadFileFormat.OrcReadFormat + case _ => + } + } + throw new UnsupportedOperationException("Iceberg Only support parquet and orc file format.") + case _ => + throw new UnsupportedOperationException("Only support iceberg SparkBatchQueryScan.") + } + +} diff --git a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala new file mode 100644 index 000000000000..0a76a30e0dfb --- /dev/null +++ b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import org.apache.spark.SparkConf + +import java.io.File + +class VeloxTPCHIcebergSuite extends VeloxTPCHSuite { + + protected val tpchBasePath: String = new File( + "../backends-velox/src/test/resources").getAbsolutePath + + override protected val resourcePath: String = + new File(tpchBasePath, "tpch-data-parquet-velox").getCanonicalPath + + override protected val veloxTPCHQueries: String = + new File(tpchBasePath, "tpch-queries-velox").getCanonicalPath + + override protected val queriesResults: String = + new File(tpchBasePath, "queries-output").getCanonicalPath + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") + .set("spark.sql.catalog.spark_catalog.type", "hadoop") + .set("spark.sql.catalog.spark_catalog.warehouse", s"file://$rootPath/tpch-data-iceberg-velox") + } + + override protected def createTPCHNotNullTables(): Unit = { + TPCHTables = TPCHTableNames.map { + table => + val tablePath = new File(resourcePath, table).getAbsolutePath + val tableDF = spark.read.format(fileFormat).load(tablePath) + tableDF.write.format("iceberg").mode("append").saveAsTable(table) + (table, tableDF) + }.toMap + } +} diff --git a/pom.xml b/pom.xml index 78c4c2e6f369..9f98a856e34f 100644 --- a/pom.xml +++ b/pom.xml @@ -40,6 +40,7 @@ 2.9.3 2.0.1 + 1.3.1 20 2.12 2.12.15 @@ -207,6 +208,15 @@ gluten-delta + + iceberg + + false + + + gluten-iceberg + + backends-velox diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala index 9e32b35b8f3b..d380b0bd3e5c 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala @@ -121,20 +121,6 @@ class FileSourceScanExecShim( val dynamicPartitionFilters = partitionFilters.filter(isDynamicPruningFilter) val selected = if (dynamicPartitionFilters.nonEmpty) { - // When it includes some DynamicPruningExpression, - // it needs to execute InSubqueryExec first, - // because doTransform path can't execute 'doExecuteColumnar' which will - // execute prepare subquery first. - dynamicPartitionFilters.foreach { - case DynamicPruningExpression(inSubquery: InSubqueryExec) => - if (inSubquery.values().isEmpty) inSubquery.updateResult() - case e: Expression => - e.foreach { - case s: ScalarSubquery => s.updateResult() - case _ => - } - case _ => - } GlutenTimeMetric.withMillisTime { // call the file index for the files matching all filters except dynamic partition filters val predicate = dynamicPartitionFilters.reduce(And) diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala index cfbf91bc2188..b8a14701bb90 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala @@ -122,20 +122,6 @@ class FileSourceScanExecShim( val dynamicPartitionFilters = partitionFilters.filter(isDynamicPruningFilter) val selected = if (dynamicPartitionFilters.nonEmpty) { - // When it includes some DynamicPruningExpression, - // it needs to execute InSubqueryExec first, - // because doTransform path can't execute 'doExecuteColumnar' which will - // execute prepare subquery first. - dynamicPartitionFilters.foreach { - case DynamicPruningExpression(inSubquery: InSubqueryExec) => - if (inSubquery.values().isEmpty) inSubquery.updateResult() - case e: Expression => - e.foreach { - case s: ScalarSubquery => s.updateResult() - case _ => - } - case _ => - } GlutenTimeMetric.withMillisTime { // call the file index for the files matching all filters except dynamic partition filters val predicate = dynamicPartitionFilters.reduce(And) diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala index 0a62c41a69df..ede6570b0777 100644 --- a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala +++ b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala @@ -88,20 +88,6 @@ class FileSourceScanExecShim( val dynamicPartitionFilters = partitionFilters.filter(isDynamicPruningFilter) val selected = if (dynamicPartitionFilters.nonEmpty) { - // When it includes some DynamicPruningExpression, - // it needs to execute InSubqueryExec first, - // because doTransform path can't execute 'doExecuteColumnar' which will - // execute prepare subquery first. - dynamicPartitionFilters.foreach { - case DynamicPruningExpression(inSubquery: InSubqueryExec) => - if (inSubquery.values().isEmpty) inSubquery.updateResult() - case e: Expression => - e.foreach { - case s: ScalarSubquery => s.updateResult() - case _ => - } - case _ => - } GlutenTimeMetric.withMillisTime { // call the file index for the files matching all filters except dynamic partition filters val predicate = dynamicPartitionFilters.reduce(And) From da4177296326a724b8cf2bfa65367a37642f7aee Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Tue, 28 Nov 2023 10:43:42 +0800 Subject: [PATCH 2/5] Remove SupportFormat --- .../execution/BaseDataSource.scala | 4 +-- .../execution/BasicScanExecTransformer.scala | 7 +++-- .../substrait/SupportFormat.scala | 27 ------------------- 3 files changed, 6 insertions(+), 32 deletions(-) delete mode 100644 gluten-core/src/main/scala/io/glutenproject/substrait/SupportFormat.scala diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BaseDataSource.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BaseDataSource.scala index 10dc23dd0ff1..bd5f4e2940eb 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BaseDataSource.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BaseDataSource.scala @@ -16,12 +16,10 @@ */ package io.glutenproject.execution -import io.glutenproject.substrait.SupportFormat - import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.types.StructType -trait BaseDataSource extends SupportFormat { +trait BaseDataSource { /** Returns the actual schema of this data source scan. */ def getDataSchema: StructType diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala index 7e2d23d09506..bd1ea255f82d 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala @@ -27,11 +27,11 @@ import io.glutenproject.substrait.rel.{ReadRelNode, RelBuilder, SplitInfo} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression} import org.apache.spark.sql.vectorized.ColumnarBatch - import com.google.common.collect.Lists - import scala.collection.JavaConverters._ +import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat + trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource { /** Returns the filters that can be pushed down to native file scan */ @@ -39,6 +39,9 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource def outputAttributes(): Seq[Attribute] + /** This can be used to report FileFormat for a file based scan operator. */ + val fileFormat: ReadFileFormat + // TODO: Remove this expensive call when CH support scan custom partition location. def getInputFilePaths: Seq[String] = { // This is a heavy operation, and only the required backend executes the corresponding logic. diff --git a/gluten-core/src/main/scala/io/glutenproject/substrait/SupportFormat.scala b/gluten-core/src/main/scala/io/glutenproject/substrait/SupportFormat.scala deleted file mode 100644 index d0ea8648d494..000000000000 --- a/gluten-core/src/main/scala/io/glutenproject/substrait/SupportFormat.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.glutenproject.substrait - -import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat - -/** - * A mix-in interface for BasicScanExecTransformer. This can be used to report FileFormat for a file - * based scan operator. - */ -trait SupportFormat { - @transient val fileFormat: ReadFileFormat -} From 0bc25ebafba15b56fea63762e7bb3342a70fcb07 Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Wed, 29 Nov 2023 12:15:35 +0800 Subject: [PATCH 3/5] use service loader --- .../execution/BasicScanExecTransformer.scala | 5 +- .../DataSourceV2TransformerRegister.scala | 42 ++++++++++ .../execution/ScanTransformerFactory.scala | 48 +++++++---- ....execution.DataSourceV2TransformerRegister | 1 + .../IcebergTransformerProvider.scala | 31 +++++++ .../source/GlutenIcebergSourceUtil.scala | 84 ++++++++++--------- .../execution/VeloxIcebergSuite.scala | 56 +++++++++++++ .../execution/VeloxTPCHIcebergSuite.scala | 27 ++++++ 8 files changed, 238 insertions(+), 56 deletions(-) create mode 100644 gluten-core/src/main/scala/io/glutenproject/execution/DataSourceV2TransformerRegister.scala create mode 100644 gluten-iceberg/src/main/resources/META-INF/services/io.glutenproject.execution.DataSourceV2TransformerRegister create mode 100644 gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergTransformerProvider.scala create mode 100644 gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala index bd1ea255f82d..822d656ff182 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala @@ -23,14 +23,15 @@ import io.glutenproject.substrait.`type`.ColumnTypeNode import io.glutenproject.substrait.SubstraitContext import io.glutenproject.substrait.plan.PlanBuilder import io.glutenproject.substrait.rel.{ReadRelNode, RelBuilder, SplitInfo} +import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression} import org.apache.spark.sql.vectorized.ColumnarBatch + import com.google.common.collect.Lists -import scala.collection.JavaConverters._ -import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat +import scala.collection.JavaConverters._ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource { diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/DataSourceV2TransformerRegister.scala b/gluten-core/src/main/scala/io/glutenproject/execution/DataSourceV2TransformerRegister.scala new file mode 100644 index 000000000000..da0305acd148 --- /dev/null +++ b/gluten-core/src/main/scala/io/glutenproject/execution/DataSourceV2TransformerRegister.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec + +/** + * Data sources v2 transformer should implement this trait so that they can register an alias to + * their data source v2 transformer. This allows users to give the data source v2 transformer alias + * as the format type over the fully qualified class name. + */ +trait DataSourceV2TransformerRegister { + + /** + * The scan class name that this data source v2 transformer provider adapts. This is overridden by + * children to provide a alias for the data source v2 transformer. For example: + * + * {{{ + * override def scanClassName(): String = "org.apache.iceberg.spark.source.SparkBatchQueryScan" + * }}} + */ + def scanClassName(): String + + def createDataSourceV2Transformer( + batchScan: BatchScanExec, + partitionFilters: Seq[Expression]): BatchScanExecTransformer +} diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala b/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala index 9b21c8e7e096..e0a8766029a6 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala @@ -24,12 +24,14 @@ import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec} import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} -import scala.reflect.runtime.{universe => ru} +import java.util.ServiceLoader +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ object ScanTransformerFactory { - private val IcebergScanClassName = "org.apache.iceberg.spark.source.SparkBatchQueryScan" - private val IcebergTransformerClassName = "io.glutenproject.execution.IcebergScanTransformer" + private val dataSourceV2TransformerMap = new ConcurrentHashMap[String, Class[_]]() def createFileSourceScanTransformer( scanExec: FileSourceScanExec, @@ -87,8 +89,12 @@ object ScanTransformerFactory { } val scan = batchScanExec.scan scan match { - case _ if scan.getClass.getName == IcebergScanClassName => - createBatchScanTransformer(IcebergTransformerClassName, batchScanExec, newPartitionFilters) + case _ if dataSourceV2TransformerExists(scan.getClass.getName) => + val cls = lookupDataSourceV2Transformer(scan.getClass.getName) + cls + .newInstance() + .asInstanceOf[DataSourceV2TransformerRegister] + .createDataSourceV2Transformer(batchScanExec, newPartitionFilters) case _ => new BatchScanExecTransformer( batchScanExec.output, @@ -100,18 +106,30 @@ object ScanTransformerFactory { def supportedBatchScan(scan: Scan): Boolean = scan match { case _: FileScan => true - case _ if scan.getClass.getName == IcebergScanClassName => true + case _ if dataSourceV2TransformerExists(scan.getClass.getName) => true case _ => false } - private def createBatchScanTransformer( - className: String, - params: Any*): BatchScanExecTransformer = { - val classMirror = ru.runtimeMirror(getClass.getClassLoader) - val classModule = classMirror.staticModule(className) - val mirror = classMirror.reflectModule(classModule) - val apply = mirror.symbol.typeSignature.member(ru.TermName("apply")).asMethod - val objMirror = classMirror.reflect(mirror.instance) - objMirror.reflectMethod(apply)(params: _*).asInstanceOf[BatchScanExecTransformer] + private def lookupDataSourceV2Transformer(scanClassName: String): Class[_] = { + dataSourceV2TransformerMap.computeIfAbsent( + scanClassName, + _ => { + val loader = Option(Thread.currentThread().getContextClassLoader) + .getOrElse(getClass.getClassLoader) + val serviceLoader = ServiceLoader.load(classOf[DataSourceV2TransformerRegister], loader) + serviceLoader.asScala + .filter(_.scanClassName().equalsIgnoreCase(scanClassName)) + .toList match { + case head :: Nil => + // there is exactly one registered alias + head.getClass + case _ => null + } + } + ) + } + + private def dataSourceV2TransformerExists(scanClassName: String): Boolean = { + lookupDataSourceV2Transformer(scanClassName) != null } } diff --git a/gluten-iceberg/src/main/resources/META-INF/services/io.glutenproject.execution.DataSourceV2TransformerRegister b/gluten-iceberg/src/main/resources/META-INF/services/io.glutenproject.execution.DataSourceV2TransformerRegister new file mode 100644 index 000000000000..658967bb99b6 --- /dev/null +++ b/gluten-iceberg/src/main/resources/META-INF/services/io.glutenproject.execution.DataSourceV2TransformerRegister @@ -0,0 +1 @@ +io.glutenproject.execution.IcebergTransformerProvider \ No newline at end of file diff --git a/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergTransformerProvider.scala b/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergTransformerProvider.scala new file mode 100644 index 000000000000..17d146da2021 --- /dev/null +++ b/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergTransformerProvider.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec + +class IcebergTransformerProvider extends DataSourceV2TransformerRegister { + + override def scanClassName(): String = "org.apache.iceberg.spark.source.SparkBatchQueryScan" + + override def createDataSourceV2Transformer( + batchScan: BatchScanExec, + partitionFilters: Seq[Expression]): BatchScanExecTransformer = { + IcebergScanTransformer.apply(batchScan, partitionFilters) + } +} diff --git a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala index 5ce4d837c791..2dcfe832e157 100644 --- a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala +++ b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala @@ -22,7 +22,7 @@ import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.spark.softaffinity.SoftAffinityUtil import org.apache.spark.sql.connector.read.{InputPartition, Scan} -import org.apache.iceberg.{FileFormat, FileScanTask, ScanTask} +import org.apache.iceberg.{CombinedScanTask, FileFormat, FileScanTask, ScanTask} import java.lang.{Long => JLong} import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} @@ -30,6 +30,7 @@ import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} import scala.collection.JavaConverters._ object GlutenIcebergSourceUtil { + def genSplitInfo(inputPartition: InputPartition, index: Int): SplitInfo = inputPartition match { case partition: SparkInputPartition => val paths = new JArrayList[String]() @@ -39,43 +40,39 @@ object GlutenIcebergSourceUtil { var fileFormat = ReadFileFormat.UnknownFormat val tasks = partition.taskGroup[ScanTask]().tasks().asScala - if (tasks.forall(_.isInstanceOf[FileScanTask])) { - tasks.map(_.asInstanceOf[FileScanTask]).foreach { - task => - paths.add(task.file().path().toString) - starts.add(task.start()) - lengths.add(task.length()) - partitionColumns.add(new JHashMap[String, String]()) - val currentFileFormat = task.file().format() match { - case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat - case FileFormat.ORC => ReadFileFormat.OrcReadFormat - case _ => - throw new UnsupportedOperationException( - "Iceberg Only support parquet and orc file format.") - } - if (fileFormat == ReadFileFormat.UnknownFormat) { - fileFormat = currentFileFormat - } else if (fileFormat != currentFileFormat) { + asFileScanTask(tasks.toList).foreach { + task => + paths.add(task.file().path().toString) + starts.add(task.start()) + lengths.add(task.length()) + partitionColumns.add(new JHashMap[String, String]()) + val currentFileFormat = task.file().format() match { + case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat + case FileFormat.ORC => ReadFileFormat.OrcReadFormat + case _ => throw new UnsupportedOperationException( - s"Only one file format is supported, " + - s"find different file format $fileFormat and $currentFileFormat") - } - } - val preferredLoc = SoftAffinityUtil.getFilePartitionLocations( - paths.asScala.toArray, - inputPartition.preferredLocations()) - IcebergLocalFilesBuilder.makeIcebergLocalFiles( - index, - paths, - starts, - lengths, - partitionColumns, - fileFormat, - preferredLoc.toList.asJava - ) - } else { - throw new UnsupportedOperationException("Only support iceberg FileScanTask.") + "Iceberg Only support parquet and orc file format.") + } + if (fileFormat == ReadFileFormat.UnknownFormat) { + fileFormat = currentFileFormat + } else if (fileFormat != currentFileFormat) { + throw new UnsupportedOperationException( + s"Only one file format is supported, " + + s"find different file format $fileFormat and $currentFileFormat") + } } + val preferredLoc = SoftAffinityUtil.getFilePartitionLocations( + paths.asScala.toArray, + inputPartition.preferredLocations()) + IcebergLocalFilesBuilder.makeIcebergLocalFiles( + index, + paths, + starts, + lengths, + partitionColumns, + fileFormat, + preferredLoc.toList.asJava + ) case _ => throw new UnsupportedOperationException("Only support iceberg SparkInputPartition.") } @@ -83,10 +80,9 @@ object GlutenIcebergSourceUtil { def getFileFormat(sparkScan: Scan): ReadFileFormat = sparkScan match { case scan: SparkBatchQueryScan => val tasks = scan.tasks().asScala - tasks.map(_.asCombinedScanTask()).foreach { + asFileScanTask(tasks.toList).foreach { task => - val file = task.files().asScala.head.file() - file.format() match { + task.file().format() match { case FileFormat.PARQUET => return ReadFileFormat.ParquetReadFormat case FileFormat.ORC => return ReadFileFormat.OrcReadFormat case _ => @@ -97,4 +93,14 @@ object GlutenIcebergSourceUtil { throw new UnsupportedOperationException("Only support iceberg SparkBatchQueryScan.") } + private def asFileScanTask(tasks: List[ScanTask]): List[FileScanTask] = { + if (tasks.forall(_.isFileScanTask)) { + tasks.map(_.asFileScanTask()) + } else if (tasks.forall(_.isInstanceOf[CombinedScanTask])) { + tasks.flatMap(_.asCombinedScanTask().tasks().asScala) + } else { + throw new UnsupportedOperationException( + "Only support iceberg CombinedScanTask and FileScanTask.") + } + } } diff --git a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala new file mode 100644 index 000000000000..97c590dce212 --- /dev/null +++ b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import org.apache.spark.SparkConf + +class VeloxIcebergSuite extends WholeStageTransformerSuite { + + protected val rootPath: String = getClass.getResource("/").getPath + override protected val backend: String = "velox" + override protected val resourcePath: String = "/tpch-data-parquet-velox" + override protected val fileFormat: String = "parquet" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.sql.files.maxPartitionBytes", "1g") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.memory.offHeap.size", "2g") + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") + .set("spark.sql.catalog.spark_catalog.type", "hadoop") + .set("spark.sql.catalog.spark_catalog.warehouse", s"file://$rootPath/tpch-data-iceberg-velox") + } + + test("iceberg transformer exists") { + spark.sql(""" + |create table iceberg_tb using iceberg as + |(select 1 as col1, 2 as col2, 3 as col3) + |""".stripMargin) + + runQueryAndCompare(""" + |select * from iceberg_tb; + |""".stripMargin) { + checkOperatorMatch[IcebergScanTransformer] + } + } +} diff --git a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala index 0a76a30e0dfb..b8693a48ccab 100644 --- a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala +++ b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala @@ -53,4 +53,31 @@ class VeloxTPCHIcebergSuite extends VeloxTPCHSuite { (table, tableDF) }.toMap } + + test("iceberg transformer exists") { + runQueryAndCompare(""" + |SELECT + | l_orderkey, + | o_orderdate + |FROM + | orders, + | lineitem + |WHERE + | l_orderkey = o_orderkey + |ORDER BY + | l_orderkey, + | o_orderdate + |LIMIT + | 10; + |""".stripMargin) { + df => + { + assert( + getExecutedPlan(df).count( + plan => { + plan.isInstanceOf[IcebergScanTransformer] + }) == 2) + } + } + } } From 495dea1d178f5b5b61d09d10d59be255ded74f52 Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Thu, 30 Nov 2023 11:42:55 +0800 Subject: [PATCH 4/5] combine api --- .../BasicPhysicalOperatorTransformer.scala | 4 ++- .../execution/ScanTransformerFactory.scala | 31 +++---------------- 2 files changed, 8 insertions(+), 27 deletions(-) diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala index 3524c020fed3..d2406bc06cb2 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala @@ -418,10 +418,12 @@ object FilterHandler { def applyFilterPushdownToScan(filter: FilterExec, reuseSubquery: Boolean): SparkPlan = filter.child match { case fileSourceScan: FileSourceScanExec => + val leftFilters = + getLeftFilters(fileSourceScan.dataFilters, flattenCondition(filter.condition)) ScanTransformerFactory.createFileSourceScanTransformer( fileSourceScan, reuseSubquery, - filter) + extraFilters = leftFilters) case batchScan: BatchScanExec => if (ScanTransformerFactory.supportedBatchScan(batchScan.scan)) { ScanTransformerFactory.createBatchScanTransformer(batchScan, reuseSubquery) diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala b/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala index e0a8766029a6..0ec305d589f2 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala @@ -16,12 +16,12 @@ */ package io.glutenproject.execution -import io.glutenproject.execution.FilterHandler.{flattenCondition, getLeftFilters} import io.glutenproject.expression.ExpressionConverter import io.glutenproject.sql.shims.SparkShimLoader +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.connector.read.Scan -import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec} +import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} import java.util.ServiceLoader @@ -36,8 +36,9 @@ object ScanTransformerFactory { def createFileSourceScanTransformer( scanExec: FileSourceScanExec, reuseSubquery: Boolean, + extraFilters: Seq[Expression] = Seq.empty, validation: Boolean = false): FileSourceScanExecTransformer = { - // TODO: Add delta match here + // transform BroadcastExchangeExec to ColumnarBroadcastExchangeExec in partitionFilters val newPartitionFilters = if (validation) { scanExec.partitionFilters } else { @@ -50,29 +51,7 @@ object ScanTransformerFactory { newPartitionFilters, scanExec.optionalBucketSet, scanExec.optionalNumCoalescedBuckets, - scanExec.dataFilters, - scanExec.tableIdentifier, - scanExec.disableBucketedScan - ) - } - - def createFileSourceScanTransformer( - scanExec: FileSourceScanExec, - reuseSubquery: Boolean, - filter: FilterExec): FileSourceScanExecTransformer = { - val leftFilters = - getLeftFilters(scanExec.dataFilters, flattenCondition(filter.condition)) - // transform BroadcastExchangeExec to ColumnarBroadcastExchangeExec in partitionFilters - val newPartitionFilters = - ExpressionConverter.transformDynamicPruningExpr(scanExec.partitionFilters, reuseSubquery) - new FileSourceScanExecTransformer( - scanExec.relation, - scanExec.output, - scanExec.requiredSchema, - newPartitionFilters, - scanExec.optionalBucketSet, - scanExec.optionalNumCoalescedBuckets, - scanExec.dataFilters ++ leftFilters, + scanExec.dataFilters ++ extraFilters, scanExec.tableIdentifier, scanExec.disableBucketedScan ) From d879fd097dada9aba9c21046b01b1104f916188a Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Fri, 1 Dec 2023 23:57:00 +0800 Subject: [PATCH 5/5] remove BatchScanExec in filter pushdown --- .../BasicPhysicalOperatorTransformer.scala | 20 +------ .../extension/ColumnarOverrides.scala | 55 +++++++++---------- 2 files changed, 28 insertions(+), 47 deletions(-) diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala index d2406bc06cb2..29654d99d709 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala @@ -19,7 +19,6 @@ package io.glutenproject.execution import io.glutenproject.backendsapi.BackendsApiManager import io.glutenproject.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer} import io.glutenproject.extension.{GlutenPlan, ValidationResult} -import io.glutenproject.extension.columnar.TransformHints import io.glutenproject.metrics.MetricsUpdater import io.glutenproject.substrait.`type`.TypeBuilder import io.glutenproject.substrait.SubstraitContext @@ -415,7 +414,7 @@ object FilterHandler { // Separate and compare the filter conditions in Scan and Filter. // Push down the left conditions in Filter into Scan. - def applyFilterPushdownToScan(filter: FilterExec, reuseSubquery: Boolean): SparkPlan = + def applyFilterPushdownToScan(filter: FilterExec, reuseSubquery: Boolean): GlutenPlan = filter.child match { case fileSourceScan: FileSourceScanExec => val leftFilters = @@ -424,23 +423,6 @@ object FilterHandler { fileSourceScan, reuseSubquery, extraFilters = leftFilters) - case batchScan: BatchScanExec => - if (ScanTransformerFactory.supportedBatchScan(batchScan.scan)) { - ScanTransformerFactory.createBatchScanTransformer(batchScan, reuseSubquery) - } else { - if (batchScan.runtimeFilters.isEmpty) { - throw new UnsupportedOperationException( - s"${batchScan.scan.getClass.toString} is not supported.") - } else { - // IF filter expressions aren't empty, we need to transform the inner operators. - val newSource = batchScan.copy(runtimeFilters = ExpressionConverter - .transformDynamicPruningExpr(batchScan.runtimeFilters, reuseSubquery)) - TransformHints.tagNotTransformable( - newSource, - "The scan in BatchScanExec is not a FileScan") - newSource - } - } case other => throw new UnsupportedOperationException(s"${other.getClass.toString} is not supported.") } diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index 8c9248695bb8..412b29085e15 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -136,30 +136,20 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean) private def genFilterExec(plan: FilterExec): SparkPlan = { // FIXME: Filter push-down should be better done by Vanilla Spark's planner or by // a individual rule. - // Push down the left conditions in Filter into Scan. - val newChild: SparkPlan = - if ( - plan.child.isInstanceOf[FileSourceScanExec] || - plan.child.isInstanceOf[BatchScanExec] - ) { - TransformHints.getHint(plan.child) match { + // Push down the left conditions in Filter into FileSourceScan. + val newChild: SparkPlan = plan.child match { + case scan: FileSourceScanExec => + TransformHints.getHint(scan) match { case TRANSFORM_SUPPORTED() => val newScan = FilterHandler.applyFilterPushdownToScan(plan, reuseSubquery) newScan match { - case ts: TransformSupport => - if (ts.doValidate().isValid) { - ts - } else { - replaceWithTransformerPlan(plan.child) - } - case p: SparkPlan => p + case ts: TransformSupport if ts.doValidate().isValid => ts + case _ => replaceWithTransformerPlan(scan) } - case _ => - replaceWithTransformerPlan(plan.child) + case _ => replaceWithTransformerPlan(scan) } - } else { - replaceWithTransformerPlan(plan.child) - } + case _ => replaceWithTransformerPlan(plan.child) + } logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") BackendsApiManager.getSparkPlanExecApiInstance .genFilterExecTransformer(plan.condition, newChild) @@ -544,18 +534,27 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean) newSource } case plan: BatchScanExec => - val transformer = ScanTransformerFactory.createBatchScanTransformer(plan, reuseSubquery) - - val validationResult = transformer.doValidate() - if (validationResult.isValid) { - logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - transformer + if (ScanTransformerFactory.supportedBatchScan(plan.scan)) { + val transformer = ScanTransformerFactory.createBatchScanTransformer(plan, reuseSubquery) + val validationResult = transformer.doValidate() + if (validationResult.isValid) { + logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") + transformer + } else { + logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") + val newSource = plan.copy(runtimeFilters = transformer.runtimeFilters) + TransformHints.tagNotTransformable(newSource, validationResult.reason.get) + newSource + } } else { - logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") - val newSource = plan.copy(runtimeFilters = transformer.runtimeFilters) - TransformHints.tagNotTransformable(newSource, validationResult.reason.get) + // If filter expressions aren't empty, we need to transform the inner operators, + // and fallback the BatchScanExec itself. + val newSource = plan.copy(runtimeFilters = ExpressionConverter + .transformDynamicPruningExpr(plan.runtimeFilters, reuseSubquery)) + TransformHints.tagNotTransformable(newSource, "The scan in BatchScanExec is not supported.") newSource } + case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) => // TODO: Add DynamicPartitionPruningHiveScanSuite.scala val newPartitionFilters: Seq[Expression] = ExpressionConverter.transformDynamicPruningExpr(