Skip to content

Spark: Add DistributionAndOrderingUtils#2141

Merged
aokolnychyi merged 4 commits intoapache:masterfrom
aokolnychyi:refactor-required-distribution
Jan 26, 2021
Merged

Spark: Add DistributionAndOrderingUtils#2141
aokolnychyi merged 4 commits intoapache:masterfrom
aokolnychyi:refactor-required-distribution

Conversation

@aokolnychyi
Copy link
Contributor

This PR adds DistributionAndOrderingUtils that is being proposed in master and migrates RewriteMergeInto to use it.

@aokolnychyi
Copy link
Contributor Author

cc @rdblue @dilipbiswal

}

// add the configured sort to the partition spec prefix sort
SortOrderVisitor.visit(sortOrder, new CopySortOrderFields(builder));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed another CopySortOrderFields which I think was a duplicate.

val roundRobin = Repartition(numShufflePartitions, shuffle = true, childPlan)
Sort(buildSortOrder(order), global = true, roundRobin)
case iceberg: SparkTable =>
val distribution = Spark3Util.buildRequiredDistribution(iceberg.table)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention to reuse Spark3Util during inserts in Spark 3.2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we expose the same interface as in Spark 3.2 from Table and use that here instead? Then Spark3Util calls remain in SparkTable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate a bit more? The interface in Spark 3.2 is implemented by Write, not Table. Are you thinking of passing SparkTable as an arg?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I was thinking it was on Table instead. Let's go with this then.


protected def buildSimpleScanPlan(
relation: DataSourceV2Relation,
cond: catalyst.expressions.Expression): LogicalPlan = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted back.

case _: OrderedDistribution =>
// insert a round robin partitioning to avoid executing the join twice
val numShufflePartitions = conf.numShufflePartitions
Repartition(numShufflePartitions, shuffle = true, childPlan)
Copy link
Contributor

@rdblue rdblue Jan 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is here and not in prepareQuery because we don't want to assume that a global ordering always requires an extra round-robin repartition? If so, it would be good to move the comment above newChildPlan and make it a bit more clear why this extra step is happening.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we don't want to add a round-robin repartition during inserts, for example. I'll add more info.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Could you check it once, @rdblue?

case NONE:
return Distributions.unspecified();
case HASH:
return Distributions.clustered(toTransforms(table.spec()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct to return a clustered distribution with no expressions if the spec is unpartitioned? I think I would rather return Distributions.unspecified just to be safe when passing this back to Spark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, let me handle this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a check at the beginning of this method. Could you check, @rdblue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A sorted table may not be partitioned, but it would pass the check you added. Then if the distribution mode is hash, it would return an empty clustered distribution. I think it would be more correct and easier to reason about if the check was done here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Updated.


object IcebergImplicits {
implicit class TableHelper(table: Table) {
def asIcebergTable: org.apache.iceberg.Table = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it looks cleaner with implicits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be named toIcebergTable or icebergTable? This is doing more than just a cast, it is accessing the underlying table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic will be needed in a few places so I moved it to Spark3Util and got rid of implicits.

@aokolnychyi aokolnychyi force-pushed the refactor-required-distribution branch from 7f52312 to 5baec72 Compare January 26, 2021 00:36
private val TRUE_LITERAL = Literal(true, BooleanType)
private val FALSE_LITERAL = Literal(false, BooleanType)

import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this doesn't need to move the import above the constants.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is more natural to have imports for implicits before variables and methods in a class. I'd be in favor of changing but I can do that in a separate PR. I'll revert it from here and submit a follow-up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree about order. We should probably also move constants into a companion class instead of inline. Does Scala do that automatically or are these initialized for every instance?

Copy link
Contributor Author

@aokolnychyi aokolnychyi Jan 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd need to check the bytecode but I agree on moving constants to the companion object.
Will submit a follow-up.

@rdblue
Copy link
Contributor

rdblue commented Jan 26, 2021

Looks correct to me. I'd prefer to fix the unnecessary changes, but I'll leave it up to you whether to merge or fix and then merge. I usually allow nits through on the last pass to avoid blocking.

@aokolnychyi aokolnychyi merged commit e5e1c8a into apache:master Jan 26, 2021
@aokolnychyi
Copy link
Contributor Author

Thanks for reviewing, @rdblue!

// the conversion to catalyst expressions above produces SortOrder expressions
// for OrderedDistribution and generic expressions for ClusteredDistribution
// this allows RepartitionByExpression to pick either range or hash partitioning
RepartitionByExpression(distribution, query, numShufflePartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When WRITE_DISTRIBUTION_MODE = range, before this PR, Logical Plan is

Sort [dt#8 ASC NULLS FIRST, v#7 ASC NULLS FIRST], true
+- Repartition 2000, true
   +- MergeInto 

After this PR, Logical Plan is

Sort [dt#8 ASC NULLS FIRST, v#7 ASC NULLS FIRST], false
+- RepartitionByExpression [dt#8], 2000
   +- Repartition 2000, true
      +- MergeInto

In my opinion, the conversion of global sort to local sort + range partitioning is correct, but here we need to consider the CollapseRepartition rule in Spark Optimizer. In this case, this rule will eliminate the Repartition 2000, true node.

Please take a look here, thanks @aokolnychyi @rdblue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants