Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ case class DataSource(
// will be adjusted within InsertIntoHadoopFsRelation.
InsertIntoHadoopFsRelationCommand(
outputPath = outputPath,
staticPartitions = Map.empty,
statementStaticPartitions = Map.empty,
ifPartitionNotExists = false,
partitionColumns = partitionColumns.map(UnresolvedAttribute.quoted),
bucketSpec = bucketSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.{EqualTo => ExpressionsEqualTo, In => ExpressionsIn}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, InsertIntoDir, InsertIntoStatement, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.trees.TreePattern.{JOIN, UNION}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{GeneratedColumn, ResolveDefaultColumns, V2ExpressionBuilder}
import org.apache.spark.sql.connector.catalog.SupportsRead
Expand Down Expand Up @@ -186,6 +188,7 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
// values in the PARTITION clause (e.g. b in the above example).
// dynamic_partitioning_columns are partitioning columns that do not assigned
// values in the PARTITION clause (e.g. c in the above example).
val pullTopPredicateParts = mutable.Map[String, Option[String]]()
val actualQuery = if (parts.exists(_._2.isDefined)) {
val projectList = convertStaticPartitions(
sourceAttributes = query.output,
Expand All @@ -194,6 +197,22 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
targetPartitionSchema = t.partitionSchema)
Project(projectList, query)
} else {
query.transformWithPruning(!_.containsAnyPattern(UNION, JOIN)) {
case filter @ LogicalFilter(condition, _) if
condition.deterministic && !SubqueryExpression.hasSubquery(condition) =>
val normalizedFilters = DataSourceStrategy.normalizeExprs(Seq(condition), l.output)
val (partitionKeyFilters, _) = DataSourceUtils
.getPartitionFiltersAndDataFilters(t.partitionSchema, normalizedFilters)

partitionKeyFilters.map {
case ExpressionsEqualTo(AttributeReference(name, _, _, _), Literal(value, _)) =>
pullTopPredicateParts += (name -> Some(value.toString))
case ExpressionsIn(AttributeReference(name, _, _, _), list @ Seq(Literal(value, _)))
if list.size == 1 => pullTopPredicateParts += (name -> Some(value.toString))
case _ => // do nothing
}
filter
}
query
}

Expand All @@ -208,11 +227,13 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {

val partitionSchema = actualQuery.resolve(
t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
val staticPartitions = parts.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get }
val pullTopPredicateStaticPartitions = pullTopPredicateParts.filter(_._2.nonEmpty)
.map { case (k, v) => k -> v.get }.toMap
val statementStaticPartitions = parts.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get }

val insertCommand = InsertIntoHadoopFsRelationCommand(
outputPath,
staticPartitions,
statementStaticPartitions,
i.ifPartitionNotExists,
partitionSchema,
t.bucketSpec,
Expand All @@ -222,7 +243,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
mode,
table,
Some(t.location),
actualQuery.output.map(_.name))
actualQuery.output.map(_.name),
pullTopPredicateStaticPartitions)

// For dynamic partition overwrite, we do not delete partition directories ahead.
// We write to staging directories and move to final partition directories after writing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ import org.apache.spark.sql.util.SchemaUtils
* A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
* Writing to dynamic partitions is also supported.
*
* @param staticPartitions partial partitioning spec for write. This defines the scope of partition
* overwrites: when the spec is empty, all partitions are overwritten.
* When it covers a prefix of the partition keys, only partitions matching
* the prefix are overwritten.
* @param statementStaticPartitions partial partitioning spec for write. This defines the scope
* of partition overwrites: when the spec is empty, all partitions
* are overwritten.When it covers a prefix of the partition keys,
* only partitions matching the prefix are overwritten.
* @param ifPartitionNotExists If true, only write if the partition does not exist.
* Only valid for static partitions.
*/
case class InsertIntoHadoopFsRelationCommand(
outputPath: Path,
staticPartitions: TablePartitionSpec,
statementStaticPartitions: TablePartitionSpec,
ifPartitionNotExists: Boolean,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
Expand All @@ -56,7 +56,8 @@ case class InsertIntoHadoopFsRelationCommand(
mode: SaveMode,
catalogTable: Option[CatalogTable],
fileIndex: Option[FileIndex],
outputColumnNames: Seq[String])
outputColumnNames: Seq[String],
pullTopPredicateStaticPartitions: Map[String, String] = Map.empty)
extends V1WriteCommand {

private lazy val parameters = CaseInsensitiveMap(options)
Expand All @@ -71,12 +72,12 @@ case class InsertIntoHadoopFsRelationCommand(
// This config only makes sense when we are overwriting a partitioned dataset with dynamic
// partition columns.
enableDynamicOverwrite && mode == SaveMode.Overwrite &&
staticPartitions.size < partitionColumns.length
statementStaticPartitions.size < partitionColumns.length
}

override def requiredOrdering: Seq[SortOrder] =
V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options,
staticPartitions.size)
statementStaticPartitions.size)

override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
// Most formats don't do well with duplicate columns, so lets not allow that
Expand All @@ -101,7 +102,8 @@ case class InsertIntoHadoopFsRelationCommand(
// may be relevant to the insertion job.
if (partitionsTrackedByCatalog) {
matchingPartitions = sparkSession.sessionState.catalog.listPartitions(
catalogTable.get.identifier, Some(staticPartitions))
catalogTable.get.identifier,
Some(statementStaticPartitions ++ pullTopPredicateStaticPartitions))
initialMatchingPartitions = matchingPartitions.map(_.spec)
customPartitionLocations = getCustomPartitionLocations(
fs, catalogTable.get, qualifiedOutputPath, matchingPartitions)
Expand Down Expand Up @@ -187,15 +189,15 @@ case class InsertIntoHadoopFsRelationCommand(
bucketSpec = bucketSpec,
statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
options = options,
numStaticPartitionCols = staticPartitions.size)
numStaticPartitionCols = statementStaticPartitions.size)


// update metastore partition metadata
if (updatedPartitionPaths.isEmpty && staticPartitions.nonEmpty
&& partitionColumns.length == staticPartitions.size) {
if (updatedPartitionPaths.isEmpty && statementStaticPartitions.nonEmpty
&& partitionColumns.length == statementStaticPartitions.size) {
// Avoid empty static partition can't loaded to datasource table.
val staticPathFragment =
PartitioningUtils.getPathFragment(staticPartitions, partitionColumns)
PartitioningUtils.getPathFragment(statementStaticPartitions, partitionColumns)
refreshUpdatedPartitions(Set(staticPathFragment))
} else {
refreshUpdatedPartitions(updatedPartitionPaths)
Expand Down Expand Up @@ -226,9 +228,9 @@ case class InsertIntoHadoopFsRelationCommand(
qualifiedOutputPath: Path,
customPartitionLocations: Map[TablePartitionSpec, String],
committer: FileCommitProtocol): Unit = {
val staticPartitionPrefix = if (staticPartitions.nonEmpty) {
val staticPartitionPrefix = if (statementStaticPartitions.nonEmpty) {
"/" + partitionColumns.flatMap { p =>
staticPartitions.get(p.name).map(getPartitionPathString(p.name, _))
statementStaticPartitions.get(p.name).map(getPartitionPathString(p.name, _))
}.mkString("/")
} else {
""
Expand All @@ -241,7 +243,7 @@ case class InsertIntoHadoopFsRelationCommand(
// now clear all custom partition locations (e.g. /custom/dir/where/foo=2/bar=4)
for ((spec, customLoc) <- customPartitionLocations) {
assert(
(staticPartitions.toSet -- spec).isEmpty,
(statementStaticPartitions.toSet -- spec).isEmpty,
"Custom partition location did not match static partitioning keys")
val path = new Path(customLoc)
if (fs.exists(path) && !committer.deleteWithJob(fs, path, true)) {
Expand Down