From 910e326dbeb3e1115f54c3893ab6d2f832add3df Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 14 Aug 2019 14:41:35 -0700 Subject: [PATCH 1/2] Use overwrite to create metadata JSON files for metastore tables. --- .../iceberg/BaseMetastoreTableOperations.java | 4 +++- .../org/apache/iceberg/TableMetadataParser.java | 16 +++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 9013f3923ce5..ad68107d1ae5 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -74,7 +74,9 @@ protected String writeNewMetadata(TableMetadata metadata, int newVersion) { OutputFile newMetadataLocation = io().newOutputFile(newTableMetadataFilePath); // write the new metadata - TableMetadataParser.write(metadata, newMetadataLocation); + // use overwrite to avoid negative caching in S3. this is safe because the metadata location is + // always unique because it includes a UUID. + TableMetadataParser.overwrite(metadata, newMetadataLocation); return newTableMetadataFilePath; } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java index 03f947d9d76e..8feaa8a8a0fe 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java @@ -27,6 +27,7 @@ import com.google.common.collect.Sets; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.StringWriter; import java.util.Comparator; @@ -95,10 +96,19 @@ private TableMetadataParser() {} static final String TIMESTAMP_MS = "timestamp-ms"; static final String SNAPSHOT_LOG = "snapshot-log"; + public static void overwrite(TableMetadata metadata, OutputFile outputFile) { + internalWrite(metadata, outputFile, true); + } + public static void write(TableMetadata metadata, OutputFile outputFile) { - Codec codec = Codec.fromFileName(outputFile.location()); - try (OutputStreamWriter writer = new OutputStreamWriter( - codec == Codec.GZIP ? new GZIPOutputStream(outputFile.create()) : outputFile.create())) { + internalWrite(metadata, outputFile, false); + } + + public static void internalWrite( + TableMetadata metadata, OutputFile outputFile, boolean overwrite) { + boolean isGzip = Codec.fromFileName(outputFile.location()) == Codec.GZIP; + OutputStream stream = overwrite ? outputFile.createOrOverwrite() : outputFile.create(); + try (OutputStreamWriter writer = new OutputStreamWriter(isGzip ? new GZIPOutputStream(stream) : stream)) { JsonGenerator generator = JsonUtil.factory().createGenerator(writer); generator.useDefaultPrettyPrinter(); toJson(metadata, generator); From 61607920222dc3319d3905fdf96f585984d3fd1b Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 14 Aug 2019 14:42:43 -0700 Subject: [PATCH 2/2] Add an optional predicate to control refresh retries. --- .../iceberg/BaseMetastoreTableOperations.java | 11 +++++++++-- .../main/java/org/apache/iceberg/util/Tasks.java | 14 +++++++++++++- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index ad68107d1ae5..55f3305beb50 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; @@ -82,10 +83,15 @@ protected String writeNewMetadata(TableMetadata metadata, int newVersion) { } protected void refreshFromMetadataLocation(String newLocation) { - refreshFromMetadataLocation(newLocation, 20); + refreshFromMetadataLocation(newLocation, null, 20); } protected void refreshFromMetadataLocation(String newLocation, int numRetries) { + refreshFromMetadataLocation(newLocation, null, numRetries); + } + + protected void refreshFromMetadataLocation(String newLocation, Predicate shouldRetry, + int numRetries) { // use null-safe equality check because new tables have a null metadata location if (!Objects.equal(currentMetadataLocation, newLocation)) { LOG.info("Refreshing table metadata from new version: {}", newLocation); @@ -93,7 +99,8 @@ protected void refreshFromMetadataLocation(String newLocation, int numRetries) { AtomicReference newMetadata = new AtomicReference<>(); Tasks.foreach(newLocation) .retry(numRetries).exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */) - .suppressFailureWhenFinished() + .throwFailureWhenFinished() + .shouldRetryTest(shouldRetry) .run(metadataLocation -> newMetadata.set( TableMetadataParser.read(this, io().newInputFile(metadataLocation)))); diff --git a/core/src/main/java/org/apache/iceberg/util/Tasks.java b/core/src/main/java/org/apache/iceberg/util/Tasks.java index a353f2b5b13e..239044a49e7a 100644 --- a/core/src/main/java/org/apache/iceberg/util/Tasks.java +++ b/core/src/main/java/org/apache/iceberg/util/Tasks.java @@ -35,6 +35,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,6 +82,7 @@ public static class Builder { private List> stopRetryExceptions = Lists.newArrayList( UnrecoverableException.class); private List> onlyRetryExceptions = null; + private Predicate shouldRetryPredicate = null; private int maxAttempts = 1; // not all operations can be retried private long minSleepTimeMs = 1000; // 1 second private long maxSleepTimeMs = 600000; // 10 minutes @@ -146,6 +148,11 @@ public Builder stopRetryOn(Class... exceptions) { return this; } + public Builder shouldRetryTest(Predicate shouldRetry) { + this.shouldRetryPredicate = shouldRetry; + return this; + } + public Builder noRetry() { this.maxAttempts = 1; return this; @@ -405,7 +412,12 @@ private void runTaskWithRetry(Task task, I item) throw e; } - if (onlyRetryExceptions != null) { + if (shouldRetryPredicate != null) { + if (!shouldRetryPredicate.test(e)) { + throw e; + } + + } else if (onlyRetryExceptions != null) { // if onlyRetryExceptions are present, then this retries if one is found boolean matchedRetryException = false; for (Class exClass : onlyRetryExceptions) {