From 8017c76f7ffad41077a2e1ebc1ca4abe6447018c Mon Sep 17 00:00:00 2001 From: fusheng Date: Mon, 15 Jul 2024 16:17:23 +0800 Subject: [PATCH 1/2] when writing dynamic partitions, some dynamic partitions in InsertIntoHadoopFsRelationCommand can be compensated to specific partition values --- .../datasources/DataSourceStrategy.scala | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 5d2310c130703..09a9143708c44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -35,11 +35,13 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.{EqualTo => ExpressionsEqualTo, In => ExpressionsIn} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, InsertIntoDir, InsertIntoStatement, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 +import org.apache.spark.sql.catalyst.trees.TreePattern.{JOIN, UNION} import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.{GeneratedColumn, ResolveDefaultColumns, V2ExpressionBuilder} import org.apache.spark.sql.connector.catalog.SupportsRead @@ -186,6 +188,7 @@ object DataSourceAnalysis extends Rule[LogicalPlan] { // values in the PARTITION clause (e.g. b in the above example). // dynamic_partitioning_columns are partitioning columns that do not assigned // values in the PARTITION clause (e.g. c in the above example). + val pullTopPredicateParts = mutable.Map[String, Option[String]]() val actualQuery = if (parts.exists(_._2.isDefined)) { val projectList = convertStaticPartitions( sourceAttributes = query.output, @@ -194,6 +197,22 @@ object DataSourceAnalysis extends Rule[LogicalPlan] { targetPartitionSchema = t.partitionSchema) Project(projectList, query) } else { + query.transformWithPruning(!_.containsAnyPattern(UNION, JOIN)) { + case filter @ LogicalFilter(condition, _) if + condition.deterministic && !SubqueryExpression.hasSubquery(condition) => + val normalizedFilters = DataSourceStrategy.normalizeExprs(Seq(condition), l.output) + val (partitionKeyFilters, _) = DataSourceUtils + .getPartitionFiltersAndDataFilters(t.partitionSchema, normalizedFilters) + + partitionKeyFilters.map { + case ExpressionsEqualTo(AttributeReference(name, _, _, _), Literal(value, _)) => + pullTopPredicateParts += (name -> Some(value.toString)) + case ExpressionsIn(AttributeReference(name, _, _, _), list @ Seq(Literal(value, _))) + if list.size == 1 => pullTopPredicateParts += (name -> Some(value.toString)) + case _ => // do nothing + } + filter + } query } @@ -208,7 +227,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] { val partitionSchema = actualQuery.resolve( t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver) - val staticPartitions = parts.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get } + val staticPartitions = (parts ++ pullTopPredicateParts).filter(_._2.nonEmpty) + .map { case (k, v) => k -> v.get } val insertCommand = InsertIntoHadoopFsRelationCommand( outputPath, From cc722e30858d439a3137f5c2de564d34d7bad8cc Mon Sep 17 00:00:00 2001 From: panbingkun Date: Mon, 15 Jul 2024 17:11:17 +0800 Subject: [PATCH 2/2] [MINOR][SQL][TESTS] Fix compilation warning `adaptation of an empty argument list by inserting () is deprecated` The pr aims to fix compilation warning: `adaptation of an empty argument list by inserting () is deprecated` Fix compilation warning. No. Manually check. Pass GA. No. Closes #47350 from panbingkun/ParquetCommitterSuite_deprecated. Authored-by: panbingkun Signed-off-by: yangjie01 --- .../execution/datasources/DataSource.scala | 2 +- .../datasources/DataSourceStrategy.scala | 30 +++++++++++++--- .../InsertIntoHadoopFsRelationCommand.scala | 34 ++++++++++--------- .../parquet/ParquetCommitterSuite.scala | 2 +- 4 files changed, 46 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d88b5ee8877d7..c9b3c24947dd4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -473,7 +473,7 @@ case class DataSource( // will be adjusted within InsertIntoHadoopFsRelation. InsertIntoHadoopFsRelationCommand( outputPath = outputPath, - staticPartitions = Map.empty, + statementStaticPartitions = Map.empty, ifPartitionNotExists = false, partitionColumns = partitionColumns.map(UnresolvedAttribute.quoted), bucketSpec = bucketSpec, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 5d2310c130703..2507079c447f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -35,11 +35,13 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.{EqualTo => ExpressionsEqualTo, In => ExpressionsIn} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, InsertIntoDir, InsertIntoStatement, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 +import org.apache.spark.sql.catalyst.trees.TreePattern.{JOIN, UNION} import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.{GeneratedColumn, ResolveDefaultColumns, V2ExpressionBuilder} import org.apache.spark.sql.connector.catalog.SupportsRead @@ -186,6 +188,7 @@ object DataSourceAnalysis extends Rule[LogicalPlan] { // values in the PARTITION clause (e.g. b in the above example). // dynamic_partitioning_columns are partitioning columns that do not assigned // values in the PARTITION clause (e.g. c in the above example). + val pullTopPredicateParts = mutable.Map[String, Option[String]]() val actualQuery = if (parts.exists(_._2.isDefined)) { val projectList = convertStaticPartitions( sourceAttributes = query.output, @@ -194,6 +197,22 @@ object DataSourceAnalysis extends Rule[LogicalPlan] { targetPartitionSchema = t.partitionSchema) Project(projectList, query) } else { + query.transformWithPruning(!_.containsAnyPattern(UNION, JOIN)) { + case filter @ LogicalFilter(condition, _) if + condition.deterministic && !SubqueryExpression.hasSubquery(condition) => + val normalizedFilters = DataSourceStrategy.normalizeExprs(Seq(condition), l.output) + val (partitionKeyFilters, _) = DataSourceUtils + .getPartitionFiltersAndDataFilters(t.partitionSchema, normalizedFilters) + + partitionKeyFilters.map { + case ExpressionsEqualTo(AttributeReference(name, _, _, _), Literal(value, _)) => + pullTopPredicateParts += (name -> Some(value.toString)) + case ExpressionsIn(AttributeReference(name, _, _, _), list @ Seq(Literal(value, _))) + if list.size == 1 => pullTopPredicateParts += (name -> Some(value.toString)) + case _ => // do nothing + } + filter + } query } @@ -208,11 +227,13 @@ object DataSourceAnalysis extends Rule[LogicalPlan] { val partitionSchema = actualQuery.resolve( t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver) - val staticPartitions = parts.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get } + val pullTopPredicateStaticPartitions = pullTopPredicateParts.filter(_._2.nonEmpty) + .map { case (k, v) => k -> v.get }.toMap + val statementStaticPartitions = parts.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get } val insertCommand = InsertIntoHadoopFsRelationCommand( outputPath, - staticPartitions, + statementStaticPartitions, i.ifPartitionNotExists, partitionSchema, t.bucketSpec, @@ -222,7 +243,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] { mode, table, Some(t.location), - actualQuery.output.map(_.name)) + actualQuery.output.map(_.name), + pullTopPredicateStaticPartitions) // For dynamic partition overwrite, we do not delete partition directories ahead. // We write to staging directories and move to final partition directories after writing diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index fe6ec094812e8..585dbe4d43ff0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -37,16 +37,16 @@ import org.apache.spark.sql.util.SchemaUtils * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending. * Writing to dynamic partitions is also supported. * - * @param staticPartitions partial partitioning spec for write. This defines the scope of partition - * overwrites: when the spec is empty, all partitions are overwritten. - * When it covers a prefix of the partition keys, only partitions matching - * the prefix are overwritten. + * @param statementStaticPartitions partial partitioning spec for write. This defines the scope + * of partition overwrites: when the spec is empty, all partitions + * are overwritten.When it covers a prefix of the partition keys, + * only partitions matching the prefix are overwritten. * @param ifPartitionNotExists If true, only write if the partition does not exist. * Only valid for static partitions. */ case class InsertIntoHadoopFsRelationCommand( outputPath: Path, - staticPartitions: TablePartitionSpec, + statementStaticPartitions: TablePartitionSpec, ifPartitionNotExists: Boolean, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], @@ -56,7 +56,8 @@ case class InsertIntoHadoopFsRelationCommand( mode: SaveMode, catalogTable: Option[CatalogTable], fileIndex: Option[FileIndex], - outputColumnNames: Seq[String]) + outputColumnNames: Seq[String], + pullTopPredicateStaticPartitions: Map[String, String] = Map.empty) extends V1WriteCommand { private lazy val parameters = CaseInsensitiveMap(options) @@ -71,12 +72,12 @@ case class InsertIntoHadoopFsRelationCommand( // This config only makes sense when we are overwriting a partitioned dataset with dynamic // partition columns. enableDynamicOverwrite && mode == SaveMode.Overwrite && - staticPartitions.size < partitionColumns.length + statementStaticPartitions.size < partitionColumns.length } override def requiredOrdering: Seq[SortOrder] = V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options, - staticPartitions.size) + statementStaticPartitions.size) override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { // Most formats don't do well with duplicate columns, so lets not allow that @@ -101,7 +102,8 @@ case class InsertIntoHadoopFsRelationCommand( // may be relevant to the insertion job. if (partitionsTrackedByCatalog) { matchingPartitions = sparkSession.sessionState.catalog.listPartitions( - catalogTable.get.identifier, Some(staticPartitions)) + catalogTable.get.identifier, + Some(statementStaticPartitions ++ pullTopPredicateStaticPartitions)) initialMatchingPartitions = matchingPartitions.map(_.spec) customPartitionLocations = getCustomPartitionLocations( fs, catalogTable.get, qualifiedOutputPath, matchingPartitions) @@ -187,15 +189,15 @@ case class InsertIntoHadoopFsRelationCommand( bucketSpec = bucketSpec, statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)), options = options, - numStaticPartitionCols = staticPartitions.size) + numStaticPartitionCols = statementStaticPartitions.size) // update metastore partition metadata - if (updatedPartitionPaths.isEmpty && staticPartitions.nonEmpty - && partitionColumns.length == staticPartitions.size) { + if (updatedPartitionPaths.isEmpty && statementStaticPartitions.nonEmpty + && partitionColumns.length == statementStaticPartitions.size) { // Avoid empty static partition can't loaded to datasource table. val staticPathFragment = - PartitioningUtils.getPathFragment(staticPartitions, partitionColumns) + PartitioningUtils.getPathFragment(statementStaticPartitions, partitionColumns) refreshUpdatedPartitions(Set(staticPathFragment)) } else { refreshUpdatedPartitions(updatedPartitionPaths) @@ -226,9 +228,9 @@ case class InsertIntoHadoopFsRelationCommand( qualifiedOutputPath: Path, customPartitionLocations: Map[TablePartitionSpec, String], committer: FileCommitProtocol): Unit = { - val staticPartitionPrefix = if (staticPartitions.nonEmpty) { + val staticPartitionPrefix = if (statementStaticPartitions.nonEmpty) { "/" + partitionColumns.flatMap { p => - staticPartitions.get(p.name).map(getPartitionPathString(p.name, _)) + statementStaticPartitions.get(p.name).map(getPartitionPathString(p.name, _)) }.mkString("/") } else { "" @@ -241,7 +243,7 @@ case class InsertIntoHadoopFsRelationCommand( // now clear all custom partition locations (e.g. /custom/dir/where/foo=2/bar=4) for ((spec, customLoc) <- customPartitionLocations) { assert( - (staticPartitions.toSet -- spec).isEmpty, + (statementStaticPartitions.toSet -- spec).isEmpty, "Custom partition location did not match static partitioning keys") val path = new Path(customLoc) if (fs.exists(path) && !committer.deleteWithJob(fs, path, true)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala index eadd55bdc3205..fb435e3639fde 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala @@ -116,7 +116,7 @@ class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils test("SPARK-48804: Fail fast on unloadable or invalid committers") { Seq("invalid", getClass.getName).foreach { committer => val e = intercept[IllegalArgumentException] { - withSQLConf(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> committer)() + withSQLConf(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> committer)(()) } assert(e.getMessage.contains(classOf[OutputCommitter].getName)) }