Skip to content

Commit

Permalink
[Hotfix] Fix fetchLatestNonOptimizedSnapshotTime (apache#2396)
Browse files Browse the repository at this point in the history
* fix fetchLatestNonOptimizedSnapshotTime

* fix

* spotless

* fix ut

* rename data-expire.since

* using UTC zone for snapshot timestamp

* resolve conflict

* rerview

* spotless

* review

* review
  • Loading branch information
XBaith authored and ShawHee committed Dec 29, 2023
1 parent 336311f commit 91cea0d
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Optional;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand All @@ -79,6 +78,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedTransferQueue;
Expand Down Expand Up @@ -216,22 +216,33 @@ public void expireData(TableRuntime tableRuntime) {
if (!expirationConfig.isValid(field, table.name())) {
return;
}
Instant startInstant;
if (expirationConfig.getSince() == DataExpirationConfig.Since.CURRENT_TIMESTAMP) {
startInstant = Instant.now().atZone(getDefaultZoneId(field)).toInstant();
} else {
startInstant =
Instant.ofEpochMilli(fetchLatestNonOptimizedSnapshotTime(table))
.atZone(getDefaultZoneId(field))
.toInstant();
}

expireDataFrom(expirationConfig, startInstant);
expireDataFrom(expirationConfig, expireBaseOnRule(expirationConfig, field));
} catch (Throwable t) {
LOG.error("Unexpected purge error for table {} ", tableRuntime.getTableIdentifier(), t);
}
}

protected Instant expireBaseOnRule(
DataExpirationConfig expirationConfig, Types.NestedField field) {
switch (expirationConfig.getBaseOnRule()) {
case CURRENT_TIME:
return Instant.now().atZone(getDefaultZoneId(field)).toInstant();
case LAST_COMMIT_TIME:
long lastCommitTimestamp = fetchLatestNonOptimizedSnapshotTime(getTable());
// if the table does not exist any non-optimized snapshots, should skip the expiration
if (lastCommitTimestamp != Long.MAX_VALUE) {
// snapshot timestamp should be UTC
return Instant.ofEpochMilli(lastCommitTimestamp);
} else {
return Instant.MIN;
}
default:
throw new IllegalArgumentException(
"Cannot expire data base on " + expirationConfig.getBaseOnRule().name());
}
}

/**
* Purge data older than the specified UTC timestamp
*
Expand All @@ -241,6 +252,10 @@ public void expireData(TableRuntime tableRuntime) {
*/
@VisibleForTesting
public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instant) {
if (instant.equals(Instant.MIN)) {
return;
}

long expireTimestamp = instant.minusMillis(expirationConfig.getRetentionTime()).toEpochMilli();
LOG.info(
"Expiring data older than {} in table {} ",
Expand Down Expand Up @@ -428,17 +443,17 @@ public static Snapshot findLatestSnapshotContainsKey(Table table, String summary
}

/**
* When expiring historic data and `data-expire.since` is `CURRENT_SNAPSHOT`, the latest snapshot
* should not be produced by Amoro.
* When expiring historic data and `data-expire.base-on-rule` is `LAST_COMMIT_TIME`, the latest
* snapshot should not be produced by Amoro optimizing.
*
* @param table iceberg table
* @return the latest non-optimized snapshot timestamp
*/
public static long fetchLatestNonOptimizedSnapshotTime(Table table) {
Optional<Snapshot> snapshot =
IcebergTableUtil.findSnapshot(
table, s -> s.summary().containsValue(CommitMetaProducer.OPTIMIZE.name()));
return snapshot.isPresent() ? snapshot.get().timestampMillis() : Long.MAX_VALUE;
IcebergTableUtil.findFirstMatchSnapshot(
table, s -> !s.summary().containsValue(CommitMetaProducer.OPTIMIZE.name()));
return snapshot.map(Snapshot::timestampMillis).orElse(Long.MAX_VALUE);
}

private static int deleteInvalidFilesInFs(
Expand Down Expand Up @@ -598,8 +613,7 @@ CloseableIterable<FileEntry> fileScan(

Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField());
return CloseableIterable.transform(
CloseableIterable.withNoopClose(
com.google.common.collect.Iterables.concat(dataFiles, deleteFiles)),
CloseableIterable.withNoopClose(Iterables.concat(dataFiles, deleteFiles)),
contentFile -> {
Literal<Long> literal =
getExpireTimestampLiteral(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.primitives.Longs;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeMap;
import org.slf4j.Logger;
Expand All @@ -61,7 +60,6 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -154,30 +152,33 @@ public void expireData(TableRuntime tableRuntime) {
if (!expirationConfig.isValid(field, arcticTable.name())) {
return;
}
ZoneId defaultZone = IcebergTableMaintainer.getDefaultZoneId(field);
Instant startInstant;
if (expirationConfig.getSince() == DataExpirationConfig.Since.CURRENT_TIMESTAMP) {
startInstant = Instant.now().atZone(defaultZone).toInstant();
} else {
long latestBaseTs =
IcebergTableMaintainer.fetchLatestNonOptimizedSnapshotTime(baseMaintainer.getTable());
long latestChangeTs =
changeMaintainer == null
? Long.MAX_VALUE
: IcebergTableMaintainer.fetchLatestNonOptimizedSnapshotTime(
changeMaintainer.getTable());
long latestNonOptimizedTs = Longs.min(latestChangeTs, latestBaseTs);

startInstant = Instant.ofEpochMilli(latestNonOptimizedTs).atZone(defaultZone).toInstant();
}
expireDataFrom(expirationConfig, startInstant);

expireDataFrom(expirationConfig, expireMixedBaseOnRule(expirationConfig, field));
} catch (Throwable t) {
LOG.error("Unexpected purge error for table {} ", tableRuntime.getTableIdentifier(), t);
}
}

protected Instant expireMixedBaseOnRule(
DataExpirationConfig expirationConfig, Types.NestedField field) {
Instant changeInstant =
Optional.ofNullable(changeMaintainer).isPresent()
? changeMaintainer.expireBaseOnRule(expirationConfig, field)
: Instant.MIN;
Instant baseInstant = baseMaintainer.expireBaseOnRule(expirationConfig, field);
if (changeInstant.compareTo(baseInstant) >= 0) {
return changeInstant;
} else {
return baseInstant;
}
}

@VisibleForTesting
public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instant) {
if (instant.equals(Instant.MIN)) {
return;
}

long expireTimestamp = instant.minusMillis(expirationConfig.getRetentionTime()).toEpochMilli();
Types.NestedField field = arcticTable.schema().findField(expirationConfig.getExpirationField());
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public class DataExpirationConfig {
private String dateTimePattern;
// data-expire.datetime-number-format
private String numberDateFormat;
// data-expire.since
private Since since;
// data-expire.base-on-rule
private BaseOnRule baseOnRule;

@VisibleForTesting
public enum ExpireLevel {
Expand All @@ -52,14 +52,15 @@ public static ExpireLevel fromString(String level) {
}

@VisibleForTesting
public enum Since {
LATEST_SNAPSHOT,
CURRENT_TIMESTAMP;
public enum BaseOnRule {
LAST_COMMIT_TIME,
CURRENT_TIME;

public static Since fromString(String since) {
Preconditions.checkArgument(null != since, "data-expire.since is invalid: null");
public static BaseOnRule fromString(String since) {
Preconditions.checkArgument(
null != since, TableProperties.DATA_EXPIRATION_BASE_ON_RULE + " is invalid: null");
try {
return Since.valueOf(since.toUpperCase(Locale.ENGLISH));
return BaseOnRule.valueOf(since.toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format("Unable to expire data since: %s", since), e);
Expand All @@ -81,14 +82,14 @@ public DataExpirationConfig(
long retentionTime,
String dateTimePattern,
String numberDateFormat,
Since since) {
BaseOnRule baseOnRule) {
this.enabled = enabled;
this.expirationField = expirationField;
this.expirationLevel = expirationLevel;
this.retentionTime = retentionTime;
this.dateTimePattern = dateTimePattern;
this.numberDateFormat = numberDateFormat;
this.since = since;
this.baseOnRule = baseOnRule;
}

public DataExpirationConfig(ArcticTable table) {
Expand Down Expand Up @@ -133,12 +134,12 @@ public DataExpirationConfig(ArcticTable table) {
properties,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT);
since =
Since.fromString(
baseOnRule =
BaseOnRule.fromString(
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.DATA_EXPIRATION_SINCE,
TableProperties.DATA_EXPIRATION_SINCE_DEFAULT));
TableProperties.DATA_EXPIRATION_BASE_ON_RULE,
TableProperties.DATA_EXPIRATION_BASE_ON_RULE_DEFAULT));
}

public static DataExpirationConfig parse(Map<String, String> properties) {
Expand Down Expand Up @@ -168,12 +169,12 @@ public static DataExpirationConfig parse(Map<String, String> properties) {
properties,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT))
.setSince(
Since.fromString(
.setBaseOnRule(
BaseOnRule.fromString(
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.DATA_EXPIRATION_SINCE,
TableProperties.DATA_EXPIRATION_SINCE_DEFAULT)));
TableProperties.DATA_EXPIRATION_BASE_ON_RULE,
TableProperties.DATA_EXPIRATION_BASE_ON_RULE_DEFAULT)));
String retention =
CompatiblePropertyUtil.propertyAsString(
properties, TableProperties.DATA_EXPIRATION_RETENTION_TIME, null);
Expand Down Expand Up @@ -238,12 +239,12 @@ public DataExpirationConfig setNumberDateFormat(String numberDateFormat) {
return this;
}

public Since getSince() {
return since;
public BaseOnRule getBaseOnRule() {
return baseOnRule;
}

public DataExpirationConfig setSince(Since since) {
this.since = since;
public DataExpirationConfig setBaseOnRule(BaseOnRule baseOnRule) {
this.baseOnRule = baseOnRule;
return this;
}

Expand All @@ -262,7 +263,7 @@ public boolean equals(Object o) {
&& expirationLevel == config.expirationLevel
&& Objects.equal(dateTimePattern, config.dateTimePattern)
&& Objects.equal(numberDateFormat, config.numberDateFormat)
&& since == config.since;
&& baseOnRule == config.baseOnRule;
}

@Override
Expand All @@ -274,7 +275,7 @@ public int hashCode() {
retentionTime,
dateTimePattern,
numberDateFormat,
since);
baseOnRule);
}

public boolean isValid(Types.NestedField field, String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,18 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Optional;
import org.apache.iceberg.relocated.com.google.common.base.Predicate;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -78,9 +80,11 @@ public static Snapshot getSnapshot(Table table, boolean refresh) {
return table.currentSnapshot();
}

public static Optional<Snapshot> findSnapshot(Table table, Predicate<Snapshot> predicate) {
Iterable<Snapshot> snapshots = table.snapshots();
return Iterables.tryFind(snapshots, predicate);
public static Optional<Snapshot> findFirstMatchSnapshot(
Table table, Predicate<Snapshot> predicate) {
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Collections.reverse(snapshots);
return Optional.ofNullable(Iterables.tryFind(snapshots, predicate).orNull());
}

public static Set<String> getAllContentFilePath(Table internalTable) {
Expand Down
Loading

0 comments on commit 91cea0d

Please sign in to comment.