Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.3: Support storage-partitioned joins #6371

Merged
merged 3 commits into from Dec 24, 2022

Conversation

aokolnychyi
Copy link
Contributor

This PR adds support for storage-partitioned joins in Spark 3.3.

* @param specs one or many specs
* @return the constructed grouping key type
*/
public static StructType groupingKeyType(Collection<PartitionSpec> specs) {
return buildPartitionProjectionType("grouping key", specs, commonActiveFieldIds(specs));
public static StructType groupingKeyType(Schema schema, Collection<PartitionSpec> specs) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to take into account the schema as we may project only particular columns.

@@ -255,74 +256,90 @@ public static org.apache.iceberg.Table toIcebergTable(Table table) {
return sparkTable.table();
}

public static Transform[] toTransforms(Schema schema, List<PartitionField> fields) {
SpecTransformToSparkTransform visitor = new SpecTransformToSparkTransform(schema);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled the anonymous class below into SpecTransformToSparkTransform to reuse here.

@@ -42,4 +42,9 @@ private SparkSQLProperties() {}
// Controls whether to check the order of fields during writes
public static final String CHECK_ORDERING = "spark.sql.iceberg.check-ordering";
public static final boolean CHECK_ORDERING_DEFAULT = true;

// Controls whether to preserve the existing grouping of data while planning splits
public static final String PRESERVE_DATA_GROUPING =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One alternative name was spark.sql.iceberg.split.preserve-partition-boundaries. I discarded it because we are not really preserving partition boundaries if there are multiple specs. Also, Spark will push down join keys in the future so we won't really respect partition boundaries as such (arguable).

That being said, I'll think more tomorrow. Ideas are always welcome.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name is fine to me, Ideally this isn't something that get's actually configured by the end user

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid this one will be pretty public. Users will have to explicitly enable this as we don't know if Spark can benefit from the reported distribution and skip shuffles. Hence, we disable it by default to avoid any performance regressions caused by less dense packing of splits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this name makes sense.

}

@SuppressWarnings("unchecked")
protected synchronized List<T> tasks() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made tasks and taskGroups synchronized to follow SparkCopyOnWriteScan that consumes this logic now.

@@ -366,9 +365,9 @@ public Scan buildCopyOnWriteScan() {

Schema expectedSchema = schemaWithMetadataColumns();

TableScan scan =
BatchScan scan =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to change this so that SparkCopyOnWriteScan can use the same task planning.

@@ -356,4 +357,23 @@ private <T> GenericArrayData fillArray(

return new GenericArrayData(array);
}

@Override
public boolean equals(Object other) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to implement as Spark collects grouping keys and checks their equality.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the contract for equals for InternalRow? This probably works, but does it indicate a bug in Spark where Spark assumes that InternalRow implementations consider one another equal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bug in Spark that will be fixed. Spark uses groupBy in one place, which relies on equals.
All other places use InternalRowSet.

return executeAndKeepPlan(() -> sql(query, args));
}

protected SparkPlan executeAndKeepPlan(Action action) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A utility method to fetch what SparkPlan was actually executed.

@@ -404,15 +404,15 @@ public void testSparkTableAddDropPartitions() throws Exception {
assertPartitioningEquals(sparkTable(), 1, "bucket(16, id)");

sql("ALTER TABLE %s ADD PARTITION FIELD truncate(data, 4)", tableName);
assertPartitioningEquals(sparkTable(), 2, "truncate(data, 4)");
assertPartitioningEquals(sparkTable(), 2, "truncate(4, data)");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to change Spark3Util.toTransforms to match TruncateFunction, which expects width first.

@Override
public Transform truncate(String sourceName, int sourceId, int width) {
NamedReference column = Expressions.column(quotedName(sourceId));
return Expressions.apply("truncate", Expressions.literal(width), column);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing width first to match TruncateFunction. Otherwise, the function catalog won't resolve.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is ok as a change because the previous version wouldn't have actually worked correct? Just making sure we aren't breaking any api here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only place that would change is string output of partitioning in SparkTable. Otherwise, we handle both combinations in the TruncateTransform extractor.

// Controls whether to preserve the existing grouping of data while planning splits
public static final String PRESERVE_DATA_GROUPING =
"spark.sql.iceberg.split.preserve-data-grouping";
public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is false to avoid introducing a performance regression as split planning may be less dense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question here would be should we really have the default be false? I have no problem with the name but It feels like this is probably always the right decision for a scan with possible joins. We should probably look into this more in the future but my guess is that if a query requires partition columns, we should group by those columns.

That's my long way to say i'm fine with the default being false for now, but I think it should probably be true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My worry is performance regressions that this may cause. There may be substantially more splits if this config is on. In order to benefit from SPJ, joins must have equality conditions on partition columns. Spark will propagate join conditions in the future. Our long-term plan may be to check if v2 bucketing enabled and whether we have join conditions on partition columns to return true by default. Even that will be suboptimal cause we don't know if SPJ would actually apply or we simply produced less dense splits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think long term we will want to get rid of this, but that will require some help from Spark. Ideally, Spark should tell the source whether or not it cares about preserving grouping, and on which columns it matters. If we had that information we wouldn't need this at all.

@aokolnychyi
Copy link
Contributor Author


// TODO: add tests for truncate transforms once SPARK-40295 is released
// TODO: add tests for cases when one side contains a subset of keys once Spark supports this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have some tests with multiple buckets? or multiple partition transforms?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the example below

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work with a non Iceberg Source as part of the join?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory it's possible. Spark will check if both sides of the SPJ have compatible partition transforms, via checking whether the V2 function identifiers are the same. So if the non-Iceberg source reports the same functions to Spark and use them in partition transforms, it could work (although I'm not sure whether this is a common use case).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will definitely need to support distributing the other side using the Iceberg function catalog. A common use case for this is MERGE. The incoming relation may be a view, which will never be distributed in a compatible way with Iceberg tables. I hope Spark would be smart enough to shuffle just the smaller relation using the reported partitioning from the target table.

Unfortunately, it does not work even if I distribute the incoming data manually using the Iceberg function catalog. We must have KeyGroupPartitioning on both sides, which can only be reported by data sources.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It uses Inner Join for merges without a "when not matching"

I would think for outer join we would similarly just keep all of the non-matching partitions separate but I haven't really looked at the code

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could just distribute all the non-matching rows "randomly" into the N partitions, but I'll need to check more closely on Spark side. It's a bit complicated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao, we can't use the newly added logic for pushing down common keys cause we don't know the keys in the relation we will shuffle (until we actually shuffle it)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be great to at least support cases when users manually repartition the incoming source relation using the Iceberg function catalog. I believe that would produce HashPartitioning or RangePartitioning on the other side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao, we can't use the newly added logic for pushing down common keys cause we don't know the keys in the relation we will shuffle (until we actually shuffle it)?

Correct. We don't know the keys until runtime, which is too late.

I'd be great to at least support cases when users manually repartition the incoming source relation using the Iceberg function catalog. I believe that would produce HashPartitioning or RangePartitioning on the other side.

Let me keep a note, and do some experiment on these ideas.

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me and it's exciting to see the culmination of all the work @aokolnychyi and @sunchao have been doing!

I have a few small notes,
1.) On the grouping key type method in Partitioning.java, we probably want to write a little more documentation about the "schema" argument and what that means when it is null.
2.) There are a lot of Spark Sql configurations required in the test suite, how many of these are actually required for SPJ, are any of those potentially being removed in the future?
3.) Does SPJ work when one side is not an Iceberg Table? All the tests look like Iceberg tables vs Iceberg tables.

List<T> plannedTasks = Lists.newArrayList();

for (ScanTask task : taskIterable) {
ValidationException.check(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this desirable to do the validation in the for LOOP? the exception will be thrown anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is to provide a reasonable exception if there is a task of an unsupported type. Since we may have a combination of different tasks, each task is being validated.

I don't think it would be super expensive to perform this validation but it is kind of optional too.

@github-actions github-actions bot added the API label Dec 19, 2022
}

@Override
protected StructType groupingKeyType() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One debatable point if groupingKeyType should be part of equals and hashCode.

TestTables.TestTable table =
TestTables.create(tableDir, "test", SCHEMA, BY_CATEGORY_DATA_SPEC, V1_FORMAT_VERSION);

Schema projectedSchema = table.schema().select("id", "data");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so is the idea to use the pushed projection to limit the fields used? I think that's a great idea!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Otherwise, we will break Spark because we can report non-projected columns.

Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots()));

Snapshot currentSnapshot = table.currentSnapshot();
validateCopyOnWrite(currentSnapshot, "1", "1", "1");
Copy link
Contributor

@rdblue rdblue Dec 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it would be nice to not pass the counts as strings...

Not that we should fix it in this PR. But maybe we can introduce a version of validateCopyOnWrite that accepts ints and calls the string version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what we did historically to simplify the validation as we compare summary map (i.e. strings).
I'll check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this properly would touch lots of existing places. I'll follow up to fix this separately.

public static StructType groupingKeyType(Collection<PartitionSpec> specs) {
return buildPartitionProjectionType("grouping key", specs, commonActiveFieldIds(specs));
public static StructType groupingKeyType(Schema schema, Collection<PartitionSpec> specs) {
return buildPartitionProjectionType("grouping key", specs, commonActiveFieldIds(schema, specs));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we limit the specs passed to this method to just the ones that are used by manifests that are scanned during planning? (I may answer this myself later, but I want to write the question down)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is determined based on scan tasks that match our filter in SparkPartitioningAwareScan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a table has multiple specs but we scan only tasks that belong to one spec, we should take into account only the one that is being queried.

executeAndKeepPlan(
"DELETE FROM %s t WHERE "
+ "EXISTS (SELECT 1 FROM %s s WHERE t.id = s.id AND t.dep = s.dep) AND "
+ "dep = 'hr'",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why restrict this to just hr? It seems like that could cause everything to fit in a single task, which Spark handles specially in some cases. If we were to test a delete in both departments, then I think this would be more likely to test what we want in Spark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPJ requires the same set of keys on both sides. The delta relation contains only records in hr. If I skip this, SPJ would not apply as the target has both hr and hardware. I make sure we don't have a single partition by configuring the open file cost and writing more than 1 file.

import org.junit.BeforeClass;
import org.junit.Test;

public class TestStoragePartitionedJoins extends SparkTestBaseWithCatalog {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue, this is the primary test suite that covers different transforms. I had just a few tests for row-level operations to make sure it works there too.

return groupingKeyType;
}

private StructType computeGroupingKeyType() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method only exists to avoid Spotless formatting that splits this into multiple lines.

@@ -473,6 +490,10 @@ private static String parentName(String[] fieldNames) {
return null;
}

public static String describe(List<org.apache.iceberg.expressions.Expression> exprs) {
return exprs.stream().map(Spark3Util::describe).collect(Collectors.joining(", "));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker, but should we sanitize the expressions using ExpressionUtil.sanitize here? There is a risk of logging IDs if we log this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is used by toString, so it looks like the right behavior is to not sanitize because this is put into Spark plans where the full filter should be shown. We may want to introduce a sanitized describe later though, for logging purposes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's exactly my thought process too. I wanted to sanitize first but then realized it is used in the plan and kept the existing behavior.

protected synchronized List<T> tasks() {
if (tasks == null) {
try (CloseableIterable<? extends ScanTask> taskIterable = scan.planFiles()) {
List<T> plannedTasks = Lists.newArrayList();
Copy link
Contributor

@rdblue rdblue Dec 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: It looks like this was already done in SparkBatchQueryScan, but I generally prefer to keep things as a CloseableIterable as long as possible. This could probably return a CloseableIterable instead. That might help if we want to actually forget about tasks and gc them after runtime filtering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you but I am not sure it applies here. We eagerly loaded tasks from day 1. They are used in lots of places and will need to be materialized immediately to compute stats, determine grouping key, etc.

I think we will be able to gc filtered out tasks in the current implementation as well.

groupingKeyType());
StructLikeSet plannedGroupingKeys = groupingKeys(plannedTaskGroups);

LOG.debug(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this useful? Maybe the number of unique grouping keys and the type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be helpful to know how many splits we had before and after runtime filtering.

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all looks good to me. I had a couple of additional questions in the new SparkPartitionginAwareScan about the transform construction, but I think that should be fairly simple to solve. Nice work, @aokolnychyi and @sunchao!

@github-actions github-actions bot removed the API label Dec 24, 2022
@aokolnychyi aokolnychyi merged commit 31c8011 into apache:master Dec 24, 2022
@aokolnychyi
Copy link
Contributor Author

Thanks for reviewing, @RussellSpitzer @sunchao @zinking @rdblue!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants