Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
* under the License.
*/

package org.apache.iceberg;
package org.apache.iceberg.util;

import org.apache.iceberg.NullOrder;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.transforms.SortOrderVisitor;

Expand Down
69 changes: 69 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.util;

import java.util.Collection;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.transforms.SortOrderVisitor;

public class SortOrderUtil {

private SortOrderUtil() {
}

public static SortOrder buildSortOrder(Table table) {
return buildSortOrder(table.spec(), table.sortOrder());
}

public static SortOrder buildSortOrder(PartitionSpec spec, SortOrder sortOrder) {
if (sortOrder.isUnsorted() && spec.isUnpartitioned()) {
return SortOrder.unsorted();
}

Schema schema = spec.schema();

Multimap<Integer, SortField> sortFieldIndex = Multimaps.index(sortOrder.fields(), SortField::sourceId);

// build a sort prefix of partition fields that are not already in the sort order
SortOrder.Builder builder = SortOrder.builderFor(schema);
for (PartitionField field : spec.fields()) {
Collection<SortField> sortFields = sortFieldIndex.get(field.sourceId());
boolean isSorted = sortFields.stream().anyMatch(sortField ->
field.transform().equals(sortField.transform()) || sortField.transform().satisfiesOrderOf(field.transform()));
if (!isSorted) {
String sourceName = schema.findColumnName(field.sourceId());
builder.asc(Expressions.transform(sourceName, field.transform()));
}
}

// add the configured sort to the partition spec prefix sort
SortOrderVisitor.visit(sortOrder, new CopySortOrderFields(builder));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed another CopySortOrderFields which I think was a duplicate.


return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,16 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.iceberg.DistributionMode
import org.apache.iceberg.TableProperties
import org.apache.iceberg.TableProperties.MERGE_CARDINALITY_CHECK_ENABLED
import org.apache.iceberg.TableProperties.MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT
import org.apache.iceberg.spark.Spark3Util.toClusteredDistribution
import org.apache.iceberg.spark.Spark3Util.toOrderedDistribution
import org.apache.iceberg.spark.source.SparkTable
import org.apache.iceberg.spark.Spark3Util
import org.apache.iceberg.util.PropertyUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.expressions.Ascending
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.IsNull
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.NullsFirst
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.FullOuter
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.LeftAnti
Expand All @@ -52,16 +45,17 @@ import org.apache.spark.sql.catalyst.plans.logical.MergeIntoParams
import org.apache.spark.sql.catalyst.plans.logical.MergeIntoTable
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.Repartition
import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
import org.apache.spark.sql.catalyst.plans.logical.Sort
import org.apache.spark.sql.catalyst.plans.logical.UpdateAction
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils
import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
import org.apache.spark.sql.catalyst.utils.RewriteRowLevelOperationHelper
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.iceberg.distributions.OrderedDistribution
import org.apache.spark.sql.connector.iceberg.write.MergeBuilder
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.BooleanType

case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with RewriteRowLevelOperationHelper {
Expand All @@ -72,7 +66,8 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with

import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this doesn't need to move the import above the constants.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is more natural to have imports for implicits before variables and methods in a class. I'd be in favor of changing but I can do that in a separate PR. I'll revert it from here and submit a follow-up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree about order. We should probably also move constants into a companion class instead of inline. Does Scala do that automatically or are these initialized for every instance?

Copy link
Contributor Author

@aokolnychyi aokolnychyi Jan 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd need to check the bytecode but I agree on moving constants to the companion object.
Will submit a follow-up.


override def resolver: Resolver = spark.sessionState.conf.resolver
private val conf: SQLConf = spark.sessionState.conf
override val resolver: Resolver = conf.resolver

override def apply(plan: LogicalPlan): LogicalPlan = {
plan resolveOperators {
Expand Down Expand Up @@ -233,47 +228,20 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
!(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
}

def buildWritePlan(
childPlan: LogicalPlan,
table: Table): LogicalPlan = {
val defaultDistributionMode = table match {
case iceberg: SparkTable if !iceberg.table.sortOrder.isUnsorted =>
TableProperties.WRITE_DISTRIBUTION_MODE_RANGE
case _ =>
TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT
}

table match {
case iceTable: SparkTable =>
val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
val table = iceTable.table()
val distributionMode: String = table.properties
.getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE, defaultDistributionMode)
val order = toCatalyst(toOrderedDistribution(table.spec(), table.sortOrder(), true), childPlan)
DistributionMode.fromName(distributionMode) match {
case DistributionMode.NONE =>
Sort(buildSortOrder(order), global = false, childPlan)
case DistributionMode.HASH =>
val clustering = toCatalyst(toClusteredDistribution(table.spec()), childPlan)
val hashPartitioned = RepartitionByExpression(clustering, childPlan, numShufflePartitions)
Sort(buildSortOrder(order), global = false, hashPartitioned)
case DistributionMode.RANGE =>
val roundRobin = Repartition(numShufflePartitions, shuffle = true, childPlan)
Sort(buildSortOrder(order), global = true, roundRobin)
}
private def buildWritePlan(childPlan: LogicalPlan, table: Table): LogicalPlan = {
val icebergTable = Spark3Util.toIcebergTable(table)
val distribution = Spark3Util.buildRequiredDistribution(icebergTable)
val ordering = Spark3Util.buildRequiredOrdering(distribution, icebergTable)
// range partitioning in Spark triggers a skew estimation job prior to shuffling
// we insert a round-robin partitioning to avoid executing the merge join twice
val newChildPlan = distribution match {
case _: OrderedDistribution =>
val numShufflePartitions = conf.numShufflePartitions
Repartition(numShufflePartitions, shuffle = true, childPlan)
case _ =>
childPlan
}
}

private def buildSortOrder(exprs: Seq[Expression]): Seq[SortOrder] = {
exprs.map { expr =>
expr match {
case e: SortOrder => e
case other =>
SortOrder(other, Ascending, NullsFirst, Set.empty)
}
}
DistributionAndOrderingUtils.prepareQuery(distribution, ordering, newChildPlan, conf)
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.utils

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.IcebergBucketTransform
import org.apache.spark.sql.catalyst.expressions.IcebergDayTransform
import org.apache.spark.sql.catalyst.expressions.IcebergHourTransform
import org.apache.spark.sql.catalyst.expressions.IcebergMonthTransform
import org.apache.spark.sql.catalyst.expressions.IcebergYearTransform
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
import org.apache.spark.sql.catalyst.plans.logical.Sort
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits
import org.apache.spark.sql.connector.expressions.BucketTransform
import org.apache.spark.sql.connector.expressions.DaysTransform
import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.connector.expressions.FieldReference
import org.apache.spark.sql.connector.expressions.HoursTransform
import org.apache.spark.sql.connector.expressions.IdentityTransform
import org.apache.spark.sql.connector.expressions.MonthsTransform
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.expressions.YearsTransform
import org.apache.spark.sql.connector.iceberg.distributions.ClusteredDistribution
import org.apache.spark.sql.connector.iceberg.distributions.Distribution
import org.apache.spark.sql.connector.iceberg.distributions.OrderedDistribution
import org.apache.spark.sql.connector.iceberg.distributions.UnspecifiedDistribution
import org.apache.spark.sql.connector.iceberg.expressions.NullOrdering
import org.apache.spark.sql.connector.iceberg.expressions.SortDirection
import org.apache.spark.sql.connector.iceberg.expressions.SortOrder
import org.apache.spark.sql.internal.SQLConf

object DistributionAndOrderingUtils {

def prepareQuery(
requiredDistribution: Distribution,
requiredOrdering: Seq[SortOrder],
query: LogicalPlan,
conf: SQLConf): LogicalPlan = {

val resolver = conf.resolver

val distribution = requiredDistribution match {
case d: OrderedDistribution =>
d.ordering.map(e => toCatalyst(e, query, resolver))
case d: ClusteredDistribution =>
d.clustering.map(e => toCatalyst(e, query, resolver))
case _: UnspecifiedDistribution =>
Array.empty[catalyst.expressions.Expression]
}

val queryWithDistribution = if (distribution.nonEmpty) {
val numShufflePartitions = conf.numShufflePartitions
// the conversion to catalyst expressions above produces SortOrder expressions
// for OrderedDistribution and generic expressions for ClusteredDistribution
// this allows RepartitionByExpression to pick either range or hash partitioning
RepartitionByExpression(distribution, query, numShufflePartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When WRITE_DISTRIBUTION_MODE = range, before this PR, Logical Plan is

Sort [dt#8 ASC NULLS FIRST, v#7 ASC NULLS FIRST], true
+- Repartition 2000, true
   +- MergeInto 

After this PR, Logical Plan is

Sort [dt#8 ASC NULLS FIRST, v#7 ASC NULLS FIRST], false
+- RepartitionByExpression [dt#8], 2000
   +- Repartition 2000, true
      +- MergeInto

In my opinion, the conversion of global sort to local sort + range partitioning is correct, but here we need to consider the CollapseRepartition rule in Spark Optimizer. In this case, this rule will eliminate the Repartition 2000, true node.

Please take a look here, thanks @aokolnychyi @rdblue

} else {
query
}

val ordering = requiredOrdering
.map(e => toCatalyst(e, query, resolver))
.asInstanceOf[Seq[catalyst.expressions.SortOrder]]

val queryWithDistributionAndOrdering = if (ordering.nonEmpty) {
Sort(ordering, global = false, queryWithDistribution)
} else {
queryWithDistribution
}

queryWithDistributionAndOrdering
}

private def toCatalyst(
expr: Expression,
query: LogicalPlan,
resolver: Resolver): catalyst.expressions.Expression = {

// we cannot perform the resolution in the analyzer since we need to optimize expressions
// in nodes like OverwriteByExpression before constructing a logical write
def resolve(parts: Seq[String]): NamedExpression = {
query.resolve(parts, resolver) match {
case Some(attr) =>
attr
case None =>
val ref = parts.map(CatalogV2Implicits.quoteIfNeeded).mkString(".")
throw new AnalysisException(s"Cannot resolve '$ref' using ${query.output}")
}
}

expr match {
case s: SortOrder =>
val catalystChild = toCatalyst(s.expression(), query, resolver)
catalyst.expressions.SortOrder(catalystChild, toCatalyst(s.direction), toCatalyst(s.nullOrdering), Set.empty)
case it: IdentityTransform =>
resolve(it.ref.fieldNames)
case BucketTransform(numBuckets, ref) =>
IcebergBucketTransform(numBuckets, resolve(ref.fieldNames))
case yt: YearsTransform =>
IcebergYearTransform(resolve(yt.ref.fieldNames))
case mt: MonthsTransform =>
IcebergMonthTransform(resolve(mt.ref.fieldNames))
case dt: DaysTransform =>
IcebergDayTransform(resolve(dt.ref.fieldNames))
case ht: HoursTransform =>
IcebergHourTransform(resolve(ht.ref.fieldNames))
case ref: FieldReference =>
resolve(ref.fieldNames)
case _ =>
throw new RuntimeException(s"$expr is not currently supported")

}
}

private def toCatalyst(direction: SortDirection): catalyst.expressions.SortDirection = {
direction match {
case SortDirection.ASCENDING => catalyst.expressions.Ascending
case SortDirection.DESCENDING => catalyst.expressions.Descending
}
}

private def toCatalyst(nullOrdering: NullOrdering): catalyst.expressions.NullOrdering = {
nullOrdering match {
case NullOrdering.NULLS_FIRST => catalyst.expressions.NullsFirst
case NullOrdering.NULLS_LAST => catalyst.expressions.NullsLast
}
}

private object BucketTransform {
def unapply(transform: Transform): Option[(Int, FieldReference)] = transform match {
case bt: BucketTransform => bt.columns match {
case Seq(nf: NamedReference) =>
Some(bt.numBuckets.value(), FieldReference(nf.fieldNames()))
case _ =>
None
}
case _ => None
}
}
}
Loading