Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4: Remove no longer needed write extensions #7443

Merged

Conversation

aokolnychyi
Copy link
Contributor

This PR removes no longer needed write extensions. Notable changes:

  • Switch to use the function catalog instead of custom Catalyst expressions for distribution and ordering.
  • Extensions are no longer needed to request a proper distribution and ordering.
  • Support for coalescing too small files with AQE (handling skew coming in a separate PR).
  • A new function catalog that can be used without a metastore.

@github-actions github-actions bot added the spark label Apr 27, 2023
}
val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, query, conf)
o.copy(write = Some(write), query = newQuery)

case rd @ ReplaceIcebergData(r: DataSourceV2Relation, query, _, None) =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to keep plans for row-level operations for now as Spark plans don't support runtime filtering for UPDATE and MERGE. It will be part of Spark 3.5.

@BeforeClass
public static void setupSpark() {
// disable AQE as tests assume that writes generate a particular number of files
spark.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this PR, AQE coalesces small tasks. Hence, I had to disable it.

@@ -53,7 +53,7 @@ public SortOrder truncate(
String sourceName, int id, int width, SortDirection direction, NullOrder nullOrder) {
return Expressions.sort(
Expressions.apply(
"truncate", Expressions.column(quotedName(id)), Expressions.literal(width)),
"truncate", Expressions.literal(width), Expressions.column(quotedName(id))),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to switch it around so that truncate is resolvable. Spark did not support transforms with multiple arguments before.

/**
* A function catalog that can be used to resolve Iceberg functions without a metastore connection.
*/
public class SparkFunctionCatalog implements SupportsFunctions {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is used directly in the compaction code but can be also configured as a proper catalog.

return namespace.length == 0;
}

default Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied from BaseCatalog to reuse in different places.


@Override
public int requiredNumPartitions() {
return numShufflePartitions;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used to request a particular number of partitions instead of setting a SQL conf.

@aokolnychyi
Copy link
Contributor Author

@aokolnychyi aokolnychyi force-pushed the simplify-spark-write-extensions branch from 920c364 to 72181b8 Compare April 27, 2023 17:18
@aokolnychyi aokolnychyi force-pushed the simplify-spark-write-extensions branch from 72181b8 to 395cd06 Compare April 27, 2023 17:48
@@ -16,23 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder a little about the naming here, I know i'm always confused about SparkCatalog (our class) versus the Spark Catalog class. IcebergFunctionCatalog? I don't feel strongly about this though.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Apr 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought a bit about it but then looked at all our other classes like SparkCatalog, SparkWrite, etc. I guess it makes sense to follow this pattern because catalogs are referred using the qualified name that includes the package too.

Copy link
Member

@RussellSpitzer RussellSpitzer Apr 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the main issue is whenever we discuss it in docs or with users. Then i'm constantly trying to explain the difference between Spark's Catalog , SparkCatalog and of course Hive's Catalog and HiveCatalog :)

Copy link
Contributor Author

@aokolnychyi aokolnychyi Apr 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree, it was a questionable decision on our end to go with this naming in the first place.

df.select("c1", "c2", "c3")
.write()
.format("iceberg")
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to turn this off for the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check below compares rows, which started to fail. Before this PR, the incoming data was not ordered as the table had a truncate transform (unsupported without extensions). After this PR, there is a sort, which breaks the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could compare counts too. No preference on my side so I went for the smallest change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably just compare counts, otherwise it makes it seem like there is some relationship between Distribution and Ordering and this test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sound good, I'll switch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched.

@aokolnychyi aokolnychyi merged commit 91327e7 into apache:master Apr 27, 2023
@aokolnychyi
Copy link
Contributor Author

Thanks, @RussellSpitzer!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants