Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core, AWS: Auto optimize table using post commit notifications #7194

Closed
wants to merge 1 commit into from

Conversation

rajarshisarkar
Copy link
Contributor

@rajarshisarkar rajarshisarkar commented Mar 24, 2023

This PR uses MetricsReporter post commit notifications to auto optimize tables. The solution lets users to collect table activities during writes and make better decisions on how to optimize each table differently. This is an opt-in feature and would be helpful in scenarios where the users would not like to maintain different optimisations as scheduled pipelines.

The overall approach is to form the rewrite data files SQL command and submit it to EMR-on-EC2, EMR-on-EKS, or Athena after write operations. Users can use either of these implementations (via table or catalog properties): RewriteUsingEMREC2, RewriteUsingEMROnEKS or RewriteUsingAthena to rewrite the tables when some defined thresholds are met (commit based or time based).

Looking forward to the community feedback.


Command to launch session:

spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=s3://bucket/warehouse/test-table \
    --conf spark.sql.catalog.my_catalog.metrics-reporter-impl=org.apache.iceberg.aws.reporter.OptimizeTableReporter \
    --conf spark.sql.catalog.my_catalog.auto.optimize.rewrite-data-files.synchronous.enabled=true \
    --conf spark.sql.catalog.my_catalog.auto.optimize.rewrite-data-files.impl=org.apache.iceberg.aws.emr.RewriteUsingEMROnEC2 \
    --conf spark.sql.catalog.my_catalog.auto.optimize.rewrite-data-files.emr.cluster-id=j-3QLCP3UJJ7IOZ \
    --conf spark.sql.catalog.my_catalog.auto.optimize.rewrite-data-files.commit.threshold=1
spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=s3://bucket/warehouse/test-table \
    --conf spark.sql.catalog.my_catalog.metrics-reporter-impl=org.apache.iceberg.aws.reporter.OptimizeTableReporter \
    --conf spark.sql.catalog.my_catalog.auto.optimize.rewrite-data-files.synchronous.enabled=true \
    --conf spark.sql.catalog.my_catalog.auto.optimize.rewrite-data-files.impl=org.apache.iceberg.aws.athena.RewriteUsingAthena \
    --conf spark.sql.catalog.my_catalog.auto.optimize.rewrite-data-files.emr.cluster-id=j-3QLCP3UJJ7IOZ \
    --conf spark.sql.catalog.my_catalog.auto.optimize.rewrite-data-files.commit.threshold=1 \
    --conf spark.sql.catalog.my_catalog.auto.optimize.rewrite-data-files.athena.output-bucket=s3://bucket

cc: @jackye1995 @singhpk234 @amogh-jahagirdar

import org.immutables.value.Value;

/** A commit report that contains all relevant information from a Snapshot. */
@Value.Immutable
public interface CommitReport extends MetricsReport {

TableOperations tableOperations();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that shouldn't be added to a CommitReport. why is this required here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableOperations is required in the MetricsReporter implementation to fetch:

  1. Catalog/table properties.
  2. List of snapshots (to know difference between the current snapshot and latest rewrite snapshot)
  3. Configuration object (to know hive.metastore.uris)
  4. IO class name.

If we cannot add TableOperations to CommitReport, can we have a new type of MetricsReport for it? Please let me know your thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there should be any coupling between TableOperations and any subtype of MetricsReport. The purpose of a MetricsReport is to only carry final results from a scan/commit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious to know if we can have a new interface for such type of operations. Thoughts?

@@ -440,6 +440,7 @@ private void notifyListeners() {

reporter.report(
ImmutableCommitReport.builder()
.tableOperations(ops)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that shouldn't be added to a CommitReport

core/src/main/java/org/apache/iceberg/TableProperties.java Outdated Show resolved Hide resolved
@@ -489,6 +489,9 @@ acceptedBreaks:
\ @ org.apache.iceberg.SnapshotScan<ThisT, T extends org.apache.iceberg.ScanTask,\
\ G extends org.apache.iceberg.ScanTaskGroup<T extends org.apache.iceberg.ScanTask>>"
justification: "Removing deprecations for 1.3.0"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.TableOperations org.apache.iceberg.metrics.CommitReport::tableOperations()"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as I mentioned further below, TableOperations shouldn't be added to CommitReport. A CommitReport represents some final results from a commit that happened

@rajarshisarkar rajarshisarkar force-pushed the auto-optimize branch 2 times, most recently from 948c841 to 2b609df Compare March 24, 2023 11:45
@jackye1995
Copy link
Contributor

Thanks for putting this up @rajarshisarkar.

For some background, Brooklyn Data's blog cited that Iceberg read workloads were 7x-8x slower against Delta when an UPSERT command added 92000 small files.

We reproduced the setup internally and noticed a speed-up up to 6.8x in the Iceberg read queries after combining the small files The community also saw a 6.25x improvement in read query performance after compaction on a 25MB dataset consisting of 100,000 records in #5997.

I understand the difference is there because we want to decouple optimization from read and write, but I am curious to see if we could provide some out-of-the-box optimization vendor integrations in this way through the metrics reporter if the user does not want to use any auto-optimization solution.

@nastra @Fokko @rdblue @danielcweeks please let us know if this is something that the community is interested in taking, or if not, how we could add some integrations in a community-friendly way to close the gap in table format comparisons like this.

@danielcweeks
Copy link
Contributor

I think there's a more fundamental consideration that we need to address here which is that this is introducing something that we don't do in the Iceberg project: Invoke an external provider to perform operations on behalf of the library.

While Iceberg provides procedures and integration to perform certain executions (like rewrite data files), it does not actually execute them in response to some form of stimulus.

While I understand the issues associated with small files, I don't feel that this is a generalizable approach that we would want to standardize on especially because it involves a very specific provider and services.

@jackye1995
Copy link
Contributor

this is introducing something that we don't do in the Iceberg project: Invoke an external provider to perform operations on behalf of the library.

Yes I 100% agree with that. But first of all this is within the aws module which is an external provider integration. And secondly, without this PR, we will likely also contribute things like CloudWatch, SQS and SNS metrics reporter.

So the questions maybe you could help share your opinions are that:

  1. would it work if we contribute CloudWatch, SQS and SNS metrics reporters, such that these compaction actions could be triggered from that point, which is outside the Iceberg library
  2. would it be okay with the community if we publish a separated open source repository for metrics reporters like this, such that they will be a part of the EMR distribution, and users can plug it into a catalog if necessary.
  3. we could also develop a metrics reporter for a specific engine like Spark, and we contact the Spark endpoint to submit such a job instead of doing it through an external service provider's API. Would that be a viable approach?

@nastra
Copy link
Contributor

nastra commented Mar 27, 2023

but I am curious to see if we could provide some out-of-the-box optimization vendor integrations in this way through the metrics reporter if the user does not want to use any auto-optimization solution.

One of the problems with the proposed approach is that optimizations are being triggered as an immediate result of a commit. The implication is that whatever happens in the metric report consumer needs to happen in a way that doesn't affect the commit path. For example, failures in the consumer should not lead to commit failures.
Additionally, every single commit triggers additional workload, so I think consuming a metrics report and actually performing some workload should be completely decoupled from one another.

@rajarshisarkar
Copy link
Contributor Author

rajarshisarkar commented Mar 30, 2023

One of the problems with the proposed approach is that optimizations are being triggered as an immediate result of a commit. The implication is that whatever happens in the metric report consumer needs to happen in a way that doesn't affect the commit path. For example, failures in the consumer should not lead to commit failures.
Additionally, every single commit triggers additional workload, so I think consuming a metrics report and actually performing some workload should be completely decoupled from one another.

This is an opt-in feature and would be helpful in scenarios where the users would not like to maintain different optimisations as scheduled pipelines. This feature would actually take away the operational overhead from the users in terms of maintaining the extra pipelines. Yes, the consumer should not affect the commit path (for the incoming commits) which makes it suitable for batch workloads. Regarding additional workload, every commit would just do some basic threshold checks on the table history only when the user opts-in for auto optimisation. We can arrange the thresholds in a way that the quickest threshold checks are done earlier so that we exit early, if possible. As the approach is suitable for batch workloads so the user shouldn't mind this latency after the commit. Thoughts?

@danielcweeks
Copy link
Contributor

this is introducing something that we don't do in the Iceberg project: Invoke an external provider to perform operations on behalf of the library.

Yes I 100% agree with that. But first of all this is within the aws module which is an external provider integration. And secondly, without this PR, we will likely also contribute things like CloudWatch, SQS and SNS metrics reporter.

So the questions maybe you could help share your opinions are that:

  1. would it work if we contribute CloudWatch, SQS and SNS metrics reporters, such that these compaction actions could be triggered from that point, which is outside the Iceberg library
  2. would it be okay with the community if we publish a separated open source repository for metrics reporters like this, such that they will be a part of the EMR distribution, and users can plug it into a catalog if necessary.
  3. we could also develop a metrics reporter for a specific engine like Spark, and we contact the Spark endpoint to submit such a job instead of doing it through an external service provider's API. Would that be a viable approach?

I think we should clarify that the iceberg-aws module was originally introduced to integrate the storage aspects of AWS S3 with Iceberg (similar to GCP, etc.) since cloud provides are ubiquitous for data warehousing and that aligns well with the requirements to use Iceberg. The existence of the module isn't justification to include anything AWS related. We should focus on things that are highly aligned and generalizable.

For items 1 and 2 above, I would lean towards publishing them separately since they are pluggable and can be integrated with other AWS libraries and tools.

For number 3, I think invocations and automatic callbacks into systems takes us out of the format+integration space and starts down the path of defining the runtime activity, which is where we've historically said we draw the line on what should, or should not be part of Iceberg.

This is very much a slippery slope because if we allow vendor solutions or hooks like this (regardless of how convenient/optional it may be) we would need to be open to the same contributions from all vendors (cloud or otherwise).

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Apr 4, 2023

There can be many different ways to handle table management. By adding specific table properties we are forcing a particular implementation. I don't think it is a good idea. In my view, it is OK for vendors to provide their own implementations and Iceberg as a library should allow those implementation to plug in. That means if the Iceberg repo needs to adapt to inject extra metrics reporters during writes or if we need another AutoOptimize API to make this happen, we should do it. However, this seems too specific to AWS to live in the Iceberg repo directly. When we created actions, the assumption was that we would share common table management blocks but we won't force how those blocks are used/triggered.

We could discuss some form of auto optimize API that would kick in after writes (sync or async) and use the same cluster and resources. That could make sense but starting an async compaction job on EMR seems too specific to live here directly.

Two questions:

  • Is the existing metrics reporter API sufficient to build external auto optimize? If not, what is missing?
  • Do we see value in non-vendor specific auto-optimize that would automatically invoke actions?

@rdblue
Copy link
Contributor

rdblue commented Apr 23, 2023

I'm going to close this PR because I don't think it is an approach that makes sense for the Iceberg project.

One of the reasons why Iceberg exists is because it is important to solve problems in the right place. Before, we needed to solve problems in the processing engine or in a file format, and those created awkward, half-baked solutions. Similarly, I think that this is not the right place or a good approach for optimization.

First, the ideal approach is to write data correctly in the first place. That's why Iceberg defines table-level tuning settings and write order, and why we request distribution and ordering in engines like Spark. We want to be able to asynchronously optimize tables, but we don't want to require it if we don't need to. Focusing effort on fixing the underlying problem (creating too many files) is a better approach. I think we should see if we can address the problem in the write path by coalescing outputs and aligning write distribution with table partitioning.

Second, kicking off a job in a specific downstream job through an API intended to collect metrics is not a good design for asynchronous optimization. Quite a few comments question aspects of this. Those are valid concerns. But ignoring the specifics, I think that the choices here were made because this is attempting to solve a problem in the wrong place. Rather than going that direction and then getting pulled deeper into a mess -- adding more compute options or rules for how to take action -- I think the right approach is to have APIs that enable people to build optimizers, similar to how we handle catalogs. That's why we built metrics reporting as an API: to get important information to downstream systems.

@rdblue rdblue closed this Apr 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants