Skip to content
Permalink
Browse files
Spark: Backport CommitStateUnknownException handling for RewriteManif…
…estSparkAction (#4850)

Co-authored-by: Prashant Singh <psinghvk@amazon.com>
  • Loading branch information
singhpk234 and Prashant Singh committed May 24, 2022
1 parent a911623 commit 73d64499b13a6291d22105b8e8270c233715689e
Showing 6 changed files with 219 additions and 0 deletions.
@@ -40,6 +40,7 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.BaseRewriteManifestsActionResult;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
@@ -295,6 +296,9 @@ private void replaceManifests(Iterable<ManifestFile> deletedManifests, Iterable<
// delete new manifests as they were rewritten before the commit
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
}
} catch (CommitStateUnknownException commitStateUnknownException) {
// don't clean up added manifest files, because they may have been successfully committed.
throw commitStateUnknownException;
} catch (Exception e) {
// delete all new manifests because the rewrite failed
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
@@ -24,13 +24,15 @@
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -52,6 +54,9 @@
import org.junit.runners.Parameterized;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

@RunWith(Parameterized.class)
public class TestRewriteManifestsAction extends SparkTestBase {
@@ -157,6 +162,70 @@ public void testRewriteSmallManifestsNonPartitionedTable() {
Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
}

@Test
public void testRewriteManifestsWithCommitStateUnknownException() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

List<ThreeColumnRecord> records1 = Lists.newArrayList(
new ThreeColumnRecord(1, null, "AAAA"),
new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")
);
writeRecords(records1);

List<ThreeColumnRecord> records2 = Lists.newArrayList(
new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"),
new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")
);
writeRecords(records2);

table.refresh();

List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());

SparkActions actions = SparkActions.get();

// create a spy which would throw a CommitStateUnknownException after successful commit.
org.apache.iceberg.RewriteManifests newRewriteManifests = table.rewriteManifests();
org.apache.iceberg.RewriteManifests spyNewRewriteManifests = spy(newRewriteManifests);
doAnswer(invocation -> {
newRewriteManifests.commit();
throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
}).when(spyNewRewriteManifests).commit();

Table spyTable = spy(table);
when(spyTable.rewriteManifests()).thenReturn(spyNewRewriteManifests);

AssertHelpers.assertThrowsCause("Should throw a Commit State Unknown Exception",
RuntimeException.class,
"Datacenter on Fire",
() -> actions.rewriteManifests(spyTable).rewriteIf(manifest -> true).execute());

table.refresh();

// table should reflect the changes, since the commit was successful
List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size());

Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
Assert.assertFalse(newManifests.get(0).hasAddedFiles());
Assert.assertFalse(newManifests.get(0).hasDeletedFiles());

List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records1);
expectedRecords.addAll(records2);

Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2")
.as(Encoders.bean(ThreeColumnRecord.class))
.collectAsList();

Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
}

@Test
public void testRewriteSmallManifestsPartitionedTable() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
@@ -40,6 +40,7 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.BaseRewriteManifestsActionResult;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
@@ -295,6 +296,9 @@ private void replaceManifests(Iterable<ManifestFile> deletedManifests, Iterable<
// delete new manifests as they were rewritten before the commit
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
}
} catch (CommitStateUnknownException commitStateUnknownException) {
// don't clean up added manifest files, because they may have been successfully committed.
throw commitStateUnknownException;
} catch (Exception e) {
// delete all new manifests because the rewrite failed
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
@@ -24,13 +24,15 @@
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -52,6 +54,9 @@
import org.junit.runners.Parameterized;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

@RunWith(Parameterized.class)
public class TestRewriteManifestsAction extends SparkTestBase {
@@ -157,6 +162,70 @@ public void testRewriteSmallManifestsNonPartitionedTable() {
Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
}

@Test
public void testRewriteManifestsWithCommitStateUnknownException() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

List<ThreeColumnRecord> records1 = Lists.newArrayList(
new ThreeColumnRecord(1, null, "AAAA"),
new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")
);
writeRecords(records1);

List<ThreeColumnRecord> records2 = Lists.newArrayList(
new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"),
new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")
);
writeRecords(records2);

table.refresh();

List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());

SparkActions actions = SparkActions.get();

// create a spy which would throw a CommitStateUnknownException after successful commit.
org.apache.iceberg.RewriteManifests newRewriteManifests = table.rewriteManifests();
org.apache.iceberg.RewriteManifests spyNewRewriteManifests = spy(newRewriteManifests);
doAnswer(invocation -> {
newRewriteManifests.commit();
throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
}).when(spyNewRewriteManifests).commit();

Table spyTable = spy(table);
when(spyTable.rewriteManifests()).thenReturn(spyNewRewriteManifests);

AssertHelpers.assertThrowsCause("Should throw a Commit State Unknown Exception",
RuntimeException.class,
"Datacenter on Fire",
() -> actions.rewriteManifests(spyTable).rewriteIf(manifest -> true).execute());

table.refresh();

// table should reflect the changes, since the commit was successful
List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size());

Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
Assert.assertFalse(newManifests.get(0).hasAddedFiles());
Assert.assertFalse(newManifests.get(0).hasDeletedFiles());

List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records1);
expectedRecords.addAll(records2);

Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2")
.as(Encoders.bean(ThreeColumnRecord.class))
.collectAsList();

Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
}

@Test
public void testRewriteSmallManifestsPartitionedTable() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
@@ -40,6 +40,7 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.BaseRewriteManifestsActionResult;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
@@ -295,6 +296,9 @@ private void replaceManifests(Iterable<ManifestFile> deletedManifests, Iterable<
// delete new manifests as they were rewritten before the commit
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
}
} catch (CommitStateUnknownException commitStateUnknownException) {
// don't clean up added manifest files, because they may have been successfully committed.
throw commitStateUnknownException;
} catch (Exception e) {
// delete all new manifests because the rewrite failed
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
@@ -24,13 +24,15 @@
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -52,6 +54,9 @@
import org.junit.runners.Parameterized;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

@RunWith(Parameterized.class)
public class TestRewriteManifestsAction extends SparkTestBase {
@@ -157,6 +162,70 @@ public void testRewriteSmallManifestsNonPartitionedTable() {
Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
}

@Test
public void testRewriteManifestsWithCommitStateUnknownException() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

List<ThreeColumnRecord> records1 = Lists.newArrayList(
new ThreeColumnRecord(1, null, "AAAA"),
new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")
);
writeRecords(records1);

List<ThreeColumnRecord> records2 = Lists.newArrayList(
new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"),
new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")
);
writeRecords(records2);

table.refresh();

List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());

SparkActions actions = SparkActions.get();

// create a spy which would throw a CommitStateUnknownException after successful commit.
org.apache.iceberg.RewriteManifests newRewriteManifests = table.rewriteManifests();
org.apache.iceberg.RewriteManifests spyNewRewriteManifests = spy(newRewriteManifests);
doAnswer(invocation -> {
newRewriteManifests.commit();
throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
}).when(spyNewRewriteManifests).commit();

Table spyTable = spy(table);
when(spyTable.rewriteManifests()).thenReturn(spyNewRewriteManifests);

AssertHelpers.assertThrowsCause("Should throw a Commit State Unknown Exception",
RuntimeException.class,
"Datacenter on Fire",
() -> actions.rewriteManifests(spyTable).rewriteIf(manifest -> true).execute());

table.refresh();

// table should reflect the changes, since the commit was successful
List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size());

Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
Assert.assertFalse(newManifests.get(0).hasAddedFiles());
Assert.assertFalse(newManifests.get(0).hasDeletedFiles());

List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records1);
expectedRecords.addAll(records2);

Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2")
.as(Encoders.bean(ThreeColumnRecord.class))
.collectAsList();

Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
}

@Test
public void testRewriteSmallManifestsPartitionedTable() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)

0 comments on commit 73d6449

Please sign in to comment.