Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.base.Preconditions;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
Expand Down Expand Up @@ -74,24 +75,32 @@ protected String writeNewMetadata(TableMetadata metadata, int newVersion) {
OutputFile newMetadataLocation = io().newOutputFile(newTableMetadataFilePath);

// write the new metadata
TableMetadataParser.write(metadata, newMetadataLocation);
// use overwrite to avoid negative caching in S3. this is safe because the metadata location is
// always unique because it includes a UUID.
TableMetadataParser.overwrite(metadata, newMetadataLocation);

return newTableMetadataFilePath;
}

protected void refreshFromMetadataLocation(String newLocation) {
refreshFromMetadataLocation(newLocation, 20);
refreshFromMetadataLocation(newLocation, null, 20);
}

protected void refreshFromMetadataLocation(String newLocation, int numRetries) {
refreshFromMetadataLocation(newLocation, null, numRetries);
}

protected void refreshFromMetadataLocation(String newLocation, Predicate<Exception> shouldRetry,
int numRetries) {
// use null-safe equality check because new tables have a null metadata location
if (!Objects.equal(currentMetadataLocation, newLocation)) {
LOG.info("Refreshing table metadata from new version: {}", newLocation);

AtomicReference<TableMetadata> newMetadata = new AtomicReference<>();
Tasks.foreach(newLocation)
.retry(numRetries).exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */)
.suppressFailureWhenFinished()
.throwFailureWhenFinished()
.shouldRetryTest(shouldRetry)
.run(metadataLocation -> newMetadata.set(
TableMetadataParser.read(this, io().newInputFile(metadataLocation))));

Expand Down
16 changes: 13 additions & 3 deletions core/src/main/java/org/apache/iceberg/TableMetadataParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.util.Comparator;
Expand Down Expand Up @@ -95,10 +96,19 @@ private TableMetadataParser() {}
static final String TIMESTAMP_MS = "timestamp-ms";
static final String SNAPSHOT_LOG = "snapshot-log";

public static void overwrite(TableMetadata metadata, OutputFile outputFile) {
internalWrite(metadata, outputFile, true);
}

public static void write(TableMetadata metadata, OutputFile outputFile) {
Codec codec = Codec.fromFileName(outputFile.location());
try (OutputStreamWriter writer = new OutputStreamWriter(
codec == Codec.GZIP ? new GZIPOutputStream(outputFile.create()) : outputFile.create())) {
internalWrite(metadata, outputFile, false);
}

public static void internalWrite(
TableMetadata metadata, OutputFile outputFile, boolean overwrite) {
boolean isGzip = Codec.fromFileName(outputFile.location()) == Codec.GZIP;
OutputStream stream = overwrite ? outputFile.createOrOverwrite() : outputFile.create();
try (OutputStreamWriter writer = new OutputStreamWriter(isGzip ? new GZIPOutputStream(stream) : stream)) {
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
generator.useDefaultPrettyPrinter();
toJson(metadata, generator);
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/java/org/apache/iceberg/util/Tasks.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -81,6 +82,7 @@ public static class Builder<I> {
private List<Class<? extends Exception>> stopRetryExceptions = Lists.newArrayList(
UnrecoverableException.class);
private List<Class<? extends Exception>> onlyRetryExceptions = null;
private Predicate<Exception> shouldRetryPredicate = null;
private int maxAttempts = 1; // not all operations can be retried
private long minSleepTimeMs = 1000; // 1 second
private long maxSleepTimeMs = 600000; // 10 minutes
Expand Down Expand Up @@ -146,6 +148,11 @@ public Builder<I> stopRetryOn(Class<? extends Exception>... exceptions) {
return this;
}

public Builder<I> shouldRetryTest(Predicate<Exception> shouldRetry) {
this.shouldRetryPredicate = shouldRetry;
return this;
}

public Builder<I> noRetry() {
this.maxAttempts = 1;
return this;
Expand Down Expand Up @@ -405,7 +412,12 @@ private <E extends Exception> void runTaskWithRetry(Task<I, E> task, I item)
throw e;
}

if (onlyRetryExceptions != null) {
if (shouldRetryPredicate != null) {
if (!shouldRetryPredicate.test(e)) {
throw e;
}

} else if (onlyRetryExceptions != null) {
// if onlyRetryExceptions are present, then this retries if one is found
boolean matchedRetryException = false;
for (Class<? extends Exception> exClass : onlyRetryExceptions) {
Expand Down