Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promoting Idempotency Through Metadata #47

Open
omervk opened this issue Aug 5, 2018 · 2 comments
Open

Promoting Idempotency Through Metadata #47

omervk opened this issue Aug 5, 2018 · 2 comments

Comments

@omervk
Copy link
Contributor

omervk commented Aug 5, 2018

Currently, implementing idempotent jobs over Iceberg is done via data-based predicates. For instance, if a job run is presumed to have written the data for 2018-08-05, you will write something like:

df.write(t).overwrite($"day" === "2018-08-05")

However, this may be:

  1. Slow - Need to push down filter for partitions on rewrite and to calculate the boundary values for each write (might be slow if it's on-demand)
  2. Incomplete (false negatives) - the predicate doesn't cover it. e.g. late arriving data included in the previous output
  3. Overzealous (false positives) - the predicate covers it, but it's not data we want to overwrite. e.g. data that has arrived before or after this job's run for the same day
  4. Mix domain knowledge into the operations - each job needs to understand which field determines what it wrote and what value it wrote and preserve that value somewhere

To promote more complete idempotency, we can use the metadata Iceberg provides to revert previous snapshots based on their metadata. If, for instance, Partition P1 writes file F1, and we want to re-run the job that wrote it, we can write P2 which deletes F1 and writes F2 with the new data, effectively reverting P1.

The benefits from this would be:

  1. Snapshot isolation is preserved
  2. No duplicate data can be read (F1+F2)
  3. No incomplete data can be read (neither F1 nor F2)
  4. We can revert a snapshot, regardless of how far back it happened

Note: This would only be usable in cases where we are only appending new data in snapshots, so cases where we also regularly compact or coalesce files may not be supported.

To achieve this, we could:

  1. Use the com.netflix.iceberg.RewriteFiles operation, but this would keep us at a very low-level, close to Iceberg, and force us to manually manage the files ourselves.
  2. Use the com.netflix.iceberg.Rollback operation, but this only rolls back the previous snapshot, which is something we don't want to be tied to.
  3. Use the com.netflix.iceberg.DeleteFiles operation, but this would create a new snapshot, causing us to either read duplicate or incomplete data.

What could be great is an API that lets us have some sort of transaction over both high-level (Spark) and low-level (Iceberg) APIs, so that we could delete the files written in a snapshot and write data using Spark, only then committing the transaction and creating a new snapshot.

@rdblue I would love to hear what you think this kind of API would look like.

@rdblue
Copy link
Contributor

rdblue commented Aug 8, 2018

I think we're talking about different use cases that are both idempotent:

  • Incremental processing - I wouldn't want to build incremental processing on top of the overwrite you're referring to for the reasons you cite here. If I have a job that processes all data as it comes in, then to make it idempotent and able to re-run (e.g. after fixing a bug) then I would agree that we don't want to require filtering on the data values.
  • Aggregation processing - Likewise, if I have a job that aggregates another table to produce a daily summary, I would not want to revert snapshots for nearly the same reasons: What if 2 days of summaries were written in 1 snapshot? In this case, because the job knows what output data it creates, it also knows what output it should replace.

I think the right choice depends on the job and how it is written. Our data engineers write jobs all the time that overwrite specific partitions and are idempotent, but we also have a separate system for them to run incremental processing.

Also, the point about overwrite being slow doesn't really apply unless what you're trying to delete is mixed with other data you're not trying to delete. This isn't intended to support that case.

I think there is a use case for incremental processing, so I'd like to see support for writes that also revert a previous commit. I'm skeptical about an API that bridges both Spark and Iceberg, though. To do this, I'd just pass the snapshot to revert through Spark's API as a DataFrameWriter option to Iceberg. Then let Iceberg handle the revert and commit.

I think this requires:

  • A RevertCommit action
  • Support for an atomic RevertCommit with some other operation, like a ReplaceCommit. For revert + append, it could validate that the replacement commit has the same type as the reverted commit.

@rdblue
Copy link
Contributor

rdblue commented Aug 26, 2018

@omervk, you might want to have a look at #52, which adds single-table transactions. That would enable you to revert a commit and replace its data in the same commit.

Also, I've updated the Snapshots section of the spec with a set of operations to help identify what changed in a snapshot. It would be great to hear your take on the update and whether revert should be one of those actions. If so, maybe the operation info should be an object instead of a string that can encode more information, like what previous snapshot was reverted or how many data files were added/removed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants