From 73d64499b13a6291d22105b8e8270c233715689e Mon Sep 17 00:00:00 2001 From: Prashant Singh <35593236+singhpk234@users.noreply.github.com> Date: Tue, 24 May 2022 20:26:55 +0530 Subject: [PATCH] Spark: Backport CommitStateUnknownException handling for RewriteManifestSparkAction (#4850) Co-authored-by: Prashant Singh --- .../BaseRewriteManifestsSparkAction.java | 4 ++ .../actions/TestRewriteManifestsAction.java | 69 +++++++++++++++++++ .../BaseRewriteManifestsSparkAction.java | 4 ++ .../actions/TestRewriteManifestsAction.java | 69 +++++++++++++++++++ .../BaseRewriteManifestsSparkAction.java | 4 ++ .../actions/TestRewriteManifestsAction.java | 69 +++++++++++++++++++ 6 files changed, 219 insertions(+) diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java index c09a6480c4b..c446d42ca06 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java @@ -40,6 +40,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.BaseRewriteManifestsActionResult; import org.apache.iceberg.actions.RewriteManifests; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; @@ -295,6 +296,9 @@ private void replaceManifests(Iterable deletedManifests, Iterable< // delete new manifests as they were rewritten before the commit deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); } + } catch (CommitStateUnknownException commitStateUnknownException) { + // don't clean up added manifest files, because they may have been successfully committed. + throw commitStateUnknownException; } catch (Exception e) { // delete all new manifests because the rewrite failed deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index ec5c54d21e2..40adb7d4c91 100644 --- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -31,6 +32,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.RewriteManifests; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -52,6 +54,9 @@ import org.junit.runners.Parameterized; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; @RunWith(Parameterized.class) public class TestRewriteManifestsAction extends SparkTestBase { @@ -157,6 +162,70 @@ public void testRewriteSmallManifestsNonPartitionedTable() { Assert.assertEquals("Rows must match", expectedRecords, actualRecords); } + @Test + public void testRewriteManifestsWithCommitStateUnknownException() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + List records1 = Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), + new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB") + ); + writeRecords(records1); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD") + ); + writeRecords(records2); + + table.refresh(); + + List manifests = table.currentSnapshot().allManifests(); + Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size()); + + SparkActions actions = SparkActions.get(); + + // create a spy which would throw a CommitStateUnknownException after successful commit. + org.apache.iceberg.RewriteManifests newRewriteManifests = table.rewriteManifests(); + org.apache.iceberg.RewriteManifests spyNewRewriteManifests = spy(newRewriteManifests); + doAnswer(invocation -> { + newRewriteManifests.commit(); + throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire")); + }).when(spyNewRewriteManifests).commit(); + + Table spyTable = spy(table); + when(spyTable.rewriteManifests()).thenReturn(spyNewRewriteManifests); + + AssertHelpers.assertThrowsCause("Should throw a Commit State Unknown Exception", + RuntimeException.class, + "Datacenter on Fire", + () -> actions.rewriteManifests(spyTable).rewriteIf(manifest -> true).execute()); + + table.refresh(); + + // table should reflect the changes, since the commit was successful + List newManifests = table.currentSnapshot().allManifests(); + Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size()); + + Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount()); + Assert.assertFalse(newManifests.get(0).hasAddedFiles()); + Assert.assertFalse(newManifests.get(0).hasDeletedFiles()); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(records1); + expectedRecords.addAll(records2); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF.sort("c1", "c2") + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + } + @Test public void testRewriteSmallManifestsPartitionedTable() { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java index c09a6480c4b..c446d42ca06 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java @@ -40,6 +40,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.BaseRewriteManifestsActionResult; import org.apache.iceberg.actions.RewriteManifests; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; @@ -295,6 +296,9 @@ private void replaceManifests(Iterable deletedManifests, Iterable< // delete new manifests as they were rewritten before the commit deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); } + } catch (CommitStateUnknownException commitStateUnknownException) { + // don't clean up added manifest files, because they may have been successfully committed. + throw commitStateUnknownException; } catch (Exception e) { // delete all new manifests because the rewrite failed deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index ec5c54d21e2..40adb7d4c91 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -31,6 +32,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.RewriteManifests; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -52,6 +54,9 @@ import org.junit.runners.Parameterized; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; @RunWith(Parameterized.class) public class TestRewriteManifestsAction extends SparkTestBase { @@ -157,6 +162,70 @@ public void testRewriteSmallManifestsNonPartitionedTable() { Assert.assertEquals("Rows must match", expectedRecords, actualRecords); } + @Test + public void testRewriteManifestsWithCommitStateUnknownException() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + List records1 = Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), + new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB") + ); + writeRecords(records1); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD") + ); + writeRecords(records2); + + table.refresh(); + + List manifests = table.currentSnapshot().allManifests(); + Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size()); + + SparkActions actions = SparkActions.get(); + + // create a spy which would throw a CommitStateUnknownException after successful commit. + org.apache.iceberg.RewriteManifests newRewriteManifests = table.rewriteManifests(); + org.apache.iceberg.RewriteManifests spyNewRewriteManifests = spy(newRewriteManifests); + doAnswer(invocation -> { + newRewriteManifests.commit(); + throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire")); + }).when(spyNewRewriteManifests).commit(); + + Table spyTable = spy(table); + when(spyTable.rewriteManifests()).thenReturn(spyNewRewriteManifests); + + AssertHelpers.assertThrowsCause("Should throw a Commit State Unknown Exception", + RuntimeException.class, + "Datacenter on Fire", + () -> actions.rewriteManifests(spyTable).rewriteIf(manifest -> true).execute()); + + table.refresh(); + + // table should reflect the changes, since the commit was successful + List newManifests = table.currentSnapshot().allManifests(); + Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size()); + + Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount()); + Assert.assertFalse(newManifests.get(0).hasAddedFiles()); + Assert.assertFalse(newManifests.get(0).hasDeletedFiles()); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(records1); + expectedRecords.addAll(records2); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF.sort("c1", "c2") + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + } + @Test public void testRewriteSmallManifestsPartitionedTable() { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java index c09a6480c4b..c446d42ca06 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java @@ -40,6 +40,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.BaseRewriteManifestsActionResult; import org.apache.iceberg.actions.RewriteManifests; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; @@ -295,6 +296,9 @@ private void replaceManifests(Iterable deletedManifests, Iterable< // delete new manifests as they were rewritten before the commit deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); } + } catch (CommitStateUnknownException commitStateUnknownException) { + // don't clean up added manifest files, because they may have been successfully committed. + throw commitStateUnknownException; } catch (Exception e) { // delete all new manifests because the rewrite failed deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index ec5c54d21e2..40adb7d4c91 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -31,6 +32,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.RewriteManifests; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -52,6 +54,9 @@ import org.junit.runners.Parameterized; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; @RunWith(Parameterized.class) public class TestRewriteManifestsAction extends SparkTestBase { @@ -157,6 +162,70 @@ public void testRewriteSmallManifestsNonPartitionedTable() { Assert.assertEquals("Rows must match", expectedRecords, actualRecords); } + @Test + public void testRewriteManifestsWithCommitStateUnknownException() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + List records1 = Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), + new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB") + ); + writeRecords(records1); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD") + ); + writeRecords(records2); + + table.refresh(); + + List manifests = table.currentSnapshot().allManifests(); + Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size()); + + SparkActions actions = SparkActions.get(); + + // create a spy which would throw a CommitStateUnknownException after successful commit. + org.apache.iceberg.RewriteManifests newRewriteManifests = table.rewriteManifests(); + org.apache.iceberg.RewriteManifests spyNewRewriteManifests = spy(newRewriteManifests); + doAnswer(invocation -> { + newRewriteManifests.commit(); + throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire")); + }).when(spyNewRewriteManifests).commit(); + + Table spyTable = spy(table); + when(spyTable.rewriteManifests()).thenReturn(spyNewRewriteManifests); + + AssertHelpers.assertThrowsCause("Should throw a Commit State Unknown Exception", + RuntimeException.class, + "Datacenter on Fire", + () -> actions.rewriteManifests(spyTable).rewriteIf(manifest -> true).execute()); + + table.refresh(); + + // table should reflect the changes, since the commit was successful + List newManifests = table.currentSnapshot().allManifests(); + Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size()); + + Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount()); + Assert.assertFalse(newManifests.get(0).hasAddedFiles()); + Assert.assertFalse(newManifests.get(0).hasDeletedFiles()); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(records1); + expectedRecords.addAll(records2); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF.sort("c1", "c2") + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + } + @Test public void testRewriteSmallManifestsPartitionedTable() { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)