New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: CDC does not respect when the table is rolled back. #10247
Labels
bug
Something isn't working
Comments
javrasya
changed the title
Spark CDC does not respect when the table is rolled back.
Spark: CDC does not respect when the table is rolled back.
Apr 29, 2024
@javrasya Do you mean when the @TestTemplate
public void testQueryWithRollback() {
createTable();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap1 = table.currentSnapshot();
long rightAfterSnap1 = waitUntilAfter(snap1.timestampMillis());
sql("INSERT INTO %s VALUES (2, 'b')", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();
long rightAfterSnap2 = waitUntilAfter(snap2.timestampMillis());
sql("CALL %s.system.rollback_to_snapshot('%s', %d)", catalogName, tableIdent, snap1.snapshotId());
table.refresh();
Snapshot snap4 = table.currentSnapshot();
assertThat(snap4).isEqualTo(snap1);
sql("INSERT OVERWRITE %s VALUES (-2, 'a')", tableName);
table.refresh();
Snapshot snap3 = table.currentSnapshot();
long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis());
assertEquals(
"Should have expected changed rows up to snapshot 3",
ImmutableList.of(
row(1, "a", "INSERT", 0, snap1.snapshotId()),
row(1, "a", "DELETE", 1, snap3.snapshotId()),
row(-2, "a", "INSERT", 1, snap3.snapshotId())),
changelogRecords(null, rightAfterSnap3));
assertEquals(
"Should have expected changed rows up to snapshot 2",
ImmutableList.of(
row(1, "a", "INSERT", 0, snap1.snapshotId()),
row(2, "b", "INSERT", 1, snap2.snapshotId())),
changelogRecords(null, rightAfterSnap2)); |
Exactly @manuzhang . It feels like it should filter that out and this is a bug. Wdyt? |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Apache Iceberg version
1.4.3
Query engine
Spark
Please describe the bug 馃悶
We had to rollback our table because it had some broken snapshots. We are turning that table which gets upserts into a changelog stream in the downstream and process it that way. We use time boundaries. The way how it seems to work is that it looks at the history of the table and do some sort of a time travel query to find the recent snapshot id as of the end timestamp we pass down the the CDC procedure.
But since it only uses the history entries which does not give enough info about if the snapshots are in the link of the main branch reference.
Here is the problematic line which calls the function in the iceberg-core
iceberg/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Line 530 in 426818b
https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java#L350-L358
I think it should disregard the snapshots when they are no longer in the main branch link
The text was updated successfully, but these errors were encountered: