Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Hotfix] Fix fetchLatestNonOptimizedSnapshotTime #2396

Merged
merged 29 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c43bf00
fix fetchLatestNonOptimizedSnapshotTime
XBaith Dec 1, 2023
5cab81e
fix
XBaith Dec 1, 2023
0e9cca8
spotless
XBaith Dec 1, 2023
2959418
fix ut
XBaith Dec 1, 2023
e3018b2
Merge branch 'master' into fix-fetch
XBaith Dec 1, 2023
894aa15
Merge branch 'master' into fix-fetch
XBaith Dec 4, 2023
d59bd69
Merge branch 'master' into fix-fetch
XBaith Dec 4, 2023
8518d58
rename data-expire.since
XBaith Dec 5, 2023
fd3f807
using UTC zone for snapshot timestamp
XBaith Dec 5, 2023
8c2c0cd
Merge branch 'master' into fix-fetch
XBaith Dec 5, 2023
0c178ba
Merge branch 'master' into fix-fetch
XBaith Dec 5, 2023
d350292
Merge branch 'master' into fix-fetch
XBaith Dec 6, 2023
0fc7537
Merge branch 'master' into fix-fetch
XBaith Dec 6, 2023
c1fcbcc
Merge branch 'master' into fix-fetch
XBaith Dec 6, 2023
f479037
resolve conflict
XBaith Dec 6, 2023
b357b3f
Merge branch 'master' into fix-fetch
XBaith Dec 7, 2023
6adf9a2
Merge branch 'master' into fix-fetch
XBaith Dec 8, 2023
b1e4151
Merge branch 'master' into fix-fetch
XBaith Dec 11, 2023
5ed5da9
rerview
XBaith Dec 11, 2023
49a4ef6
spotless
XBaith Dec 11, 2023
6518b33
Merge branch 'master' into fix-fetch
XBaith Dec 11, 2023
760f3cc
Merge branch 'master' into fix-fetch
XBaith Dec 12, 2023
6c2d279
Merge branch 'master' into fix-fetch
XBaith Dec 13, 2023
6e532cd
Merge branch 'master' into fix-fetch
XBaith Dec 13, 2023
10e147a
review
XBaith Dec 13, 2023
778cfc2
review
XBaith Dec 14, 2023
6bff78d
Merge branch 'master' into fix-fetch
XBaith Dec 14, 2023
b832b08
Merge branch 'master' into fix-fetch
XBaith Dec 14, 2023
e8f60e5
Merge branch 'master' into fix-fetch
XBaith Dec 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Optional;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand All @@ -79,6 +78,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedTransferQueue;
Expand Down Expand Up @@ -216,22 +216,33 @@
if (!expirationConfig.isValid(field, table.name())) {
return;
}
Instant startInstant;
if (expirationConfig.getSince() == DataExpirationConfig.Since.CURRENT_TIMESTAMP) {
startInstant = Instant.now().atZone(getDefaultZoneId(field)).toInstant();
} else {
startInstant =
Instant.ofEpochMilli(fetchLatestNonOptimizedSnapshotTime(table))
.atZone(getDefaultZoneId(field))
.toInstant();
}

expireDataFrom(expirationConfig, startInstant);
expireDataFrom(expirationConfig, expireBaseOnRule(expirationConfig, field));

Check warning on line 220 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java#L220

Added line #L220 was not covered by tests
} catch (Throwable t) {
LOG.error("Unexpected purge error for table {} ", tableRuntime.getTableIdentifier(), t);
}
}

protected Instant expireBaseOnRule(
DataExpirationConfig expirationConfig, Types.NestedField field) {
switch (expirationConfig.getBaseOnRule()) {
case CURRENT_TIME:
return Instant.now().atZone(getDefaultZoneId(field)).toInstant();

Check warning on line 230 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java#L230

Added line #L230 was not covered by tests
case LAST_COMMIT_TIME:
long lastCommitTimestamp = fetchLatestNonOptimizedSnapshotTime(getTable());
// if the table does not exist any non-optimized snapshots, should skip the expiration
if (lastCommitTimestamp != Long.MAX_VALUE) {
// snapshot timestamp should be UTC
return Instant.ofEpochMilli(lastCommitTimestamp);
} else {
return Instant.MIN;

Check warning on line 238 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java#L238

Added line #L238 was not covered by tests
}
default:
throw new IllegalArgumentException(
"Cannot expire data base on " + expirationConfig.getBaseOnRule().name());

Check warning on line 242 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java#L241-L242

Added lines #L241 - L242 were not covered by tests
}
}

/**
* Purge data older than the specified UTC timestamp
*
Expand All @@ -241,6 +252,10 @@
*/
@VisibleForTesting
public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instant) {
if (instant.equals(Instant.MIN)) {
return;

Check warning on line 256 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java#L256

Added line #L256 was not covered by tests
}

long expireTimestamp = instant.minusMillis(expirationConfig.getRetentionTime()).toEpochMilli();
LOG.info(
"Expiring data older than {} in table {} ",
Expand Down Expand Up @@ -428,17 +443,17 @@
}

/**
* When expiring historic data and `data-expire.since` is `CURRENT_SNAPSHOT`, the latest snapshot
* should not be produced by Amoro.
* When expiring historic data and `data-expire.base-on-rule` is `LAST_COMMIT_TIME`, the latest
* snapshot should not be produced by Amoro optimizing.
*
* @param table iceberg table
* @return the latest non-optimized snapshot timestamp
*/
public static long fetchLatestNonOptimizedSnapshotTime(Table table) {
Optional<Snapshot> snapshot =
IcebergTableUtil.findSnapshot(
table, s -> s.summary().containsValue(CommitMetaProducer.OPTIMIZE.name()));
return snapshot.isPresent() ? snapshot.get().timestampMillis() : Long.MAX_VALUE;
IcebergTableUtil.findFirstMatchSnapshot(
table, s -> !s.summary().containsValue(CommitMetaProducer.OPTIMIZE.name()));
return snapshot.map(Snapshot::timestampMillis).orElse(Long.MAX_VALUE);
}

private static int deleteInvalidFilesInFs(
Expand Down Expand Up @@ -598,8 +613,7 @@

Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField());
return CloseableIterable.transform(
CloseableIterable.withNoopClose(
com.google.common.collect.Iterables.concat(dataFiles, deleteFiles)),
CloseableIterable.withNoopClose(Iterables.concat(dataFiles, deleteFiles)),
contentFile -> {
Literal<Long> literal =
getExpireTimestampLiteral(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.primitives.Longs;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeMap;
import org.slf4j.Logger;
Expand All @@ -61,7 +60,6 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -154,30 +152,33 @@
if (!expirationConfig.isValid(field, arcticTable.name())) {
return;
}
ZoneId defaultZone = IcebergTableMaintainer.getDefaultZoneId(field);
Instant startInstant;
if (expirationConfig.getSince() == DataExpirationConfig.Since.CURRENT_TIMESTAMP) {
startInstant = Instant.now().atZone(defaultZone).toInstant();
} else {
long latestBaseTs =
IcebergTableMaintainer.fetchLatestNonOptimizedSnapshotTime(baseMaintainer.getTable());
long latestChangeTs =
changeMaintainer == null
? Long.MAX_VALUE
: IcebergTableMaintainer.fetchLatestNonOptimizedSnapshotTime(
changeMaintainer.getTable());
long latestNonOptimizedTs = Longs.min(latestChangeTs, latestBaseTs);

startInstant = Instant.ofEpochMilli(latestNonOptimizedTs).atZone(defaultZone).toInstant();
}
expireDataFrom(expirationConfig, startInstant);

expireDataFrom(expirationConfig, expireMixedBaseOnRule(expirationConfig, field));

Check warning on line 156 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L156

Added line #L156 was not covered by tests
} catch (Throwable t) {
LOG.error("Unexpected purge error for table {} ", tableRuntime.getTableIdentifier(), t);
}
}

protected Instant expireMixedBaseOnRule(
DataExpirationConfig expirationConfig, Types.NestedField field) {
Instant changeInstant =
Optional.ofNullable(changeMaintainer).isPresent()
? changeMaintainer.expireBaseOnRule(expirationConfig, field)
: Instant.MIN;
Instant baseInstant = baseMaintainer.expireBaseOnRule(expirationConfig, field);
if (changeInstant.compareTo(baseInstant) >= 0) {
return changeInstant;
} else {
return baseInstant;
}
}

@VisibleForTesting
public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instant) {
if (instant.equals(Instant.MIN)) {
return;

Check warning on line 179 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L179

Added line #L179 was not covered by tests
}

long expireTimestamp = instant.minusMillis(expirationConfig.getRetentionTime()).toEpochMilli();
Types.NestedField field = arcticTable.schema().findField(expirationConfig.getExpirationField());
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
private String dateTimePattern;
// data-expire.datetime-number-format
private String numberDateFormat;
// data-expire.since
private Since since;
// data-expire.base-on-rule
private BaseOnRule baseOnRule;

@VisibleForTesting
public enum ExpireLevel {
Expand All @@ -52,14 +52,15 @@
}

@VisibleForTesting
public enum Since {
LATEST_SNAPSHOT,
CURRENT_TIMESTAMP;
public enum BaseOnRule {
LAST_COMMIT_TIME,
CURRENT_TIME;

public static Since fromString(String since) {
Preconditions.checkArgument(null != since, "data-expire.since is invalid: null");
public static BaseOnRule fromString(String since) {
Preconditions.checkArgument(
null != since, TableProperties.DATA_EXPIRATION_BASE_ON_RULE + " is invalid: null");
try {
return Since.valueOf(since.toUpperCase(Locale.ENGLISH));
return BaseOnRule.valueOf(since.toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format("Unable to expire data since: %s", since), e);
Expand All @@ -81,14 +82,14 @@
long retentionTime,
String dateTimePattern,
String numberDateFormat,
Since since) {
BaseOnRule baseOnRule) {

Check warning on line 85 in ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java#L85

Added line #L85 was not covered by tests
this.enabled = enabled;
this.expirationField = expirationField;
this.expirationLevel = expirationLevel;
this.retentionTime = retentionTime;
this.dateTimePattern = dateTimePattern;
this.numberDateFormat = numberDateFormat;
this.since = since;
this.baseOnRule = baseOnRule;

Check warning on line 92 in ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java#L92

Added line #L92 was not covered by tests
}

public DataExpirationConfig(ArcticTable table) {
Expand Down Expand Up @@ -133,12 +134,12 @@
properties,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT);
since =
Since.fromString(
baseOnRule =
BaseOnRule.fromString(
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.DATA_EXPIRATION_SINCE,
TableProperties.DATA_EXPIRATION_SINCE_DEFAULT));
TableProperties.DATA_EXPIRATION_BASE_ON_RULE,
TableProperties.DATA_EXPIRATION_BASE_ON_RULE_DEFAULT));
}

public static DataExpirationConfig parse(Map<String, String> properties) {
Expand Down Expand Up @@ -168,12 +169,12 @@
properties,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT))
.setSince(
Since.fromString(
.setBaseOnRule(
BaseOnRule.fromString(
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.DATA_EXPIRATION_SINCE,
TableProperties.DATA_EXPIRATION_SINCE_DEFAULT)));
TableProperties.DATA_EXPIRATION_BASE_ON_RULE,
TableProperties.DATA_EXPIRATION_BASE_ON_RULE_DEFAULT)));
String retention =
CompatiblePropertyUtil.propertyAsString(
properties, TableProperties.DATA_EXPIRATION_RETENTION_TIME, null);
Expand Down Expand Up @@ -238,12 +239,12 @@
return this;
}

public Since getSince() {
return since;
public BaseOnRule getBaseOnRule() {
return baseOnRule;
}

public DataExpirationConfig setSince(Since since) {
this.since = since;
public DataExpirationConfig setBaseOnRule(BaseOnRule baseOnRule) {
this.baseOnRule = baseOnRule;
return this;
}

Expand All @@ -262,7 +263,7 @@
&& expirationLevel == config.expirationLevel
&& Objects.equal(dateTimePattern, config.dateTimePattern)
&& Objects.equal(numberDateFormat, config.numberDateFormat)
&& since == config.since;
&& baseOnRule == config.baseOnRule;
}

@Override
Expand All @@ -274,7 +275,7 @@
retentionTime,
dateTimePattern,
numberDateFormat,
since);
baseOnRule);
}

public boolean isValid(Types.NestedField field, String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,18 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Optional;
import org.apache.iceberg.relocated.com.google.common.base.Predicate;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -78,9 +80,11 @@ public static Snapshot getSnapshot(Table table, boolean refresh) {
return table.currentSnapshot();
}

public static Optional<Snapshot> findSnapshot(Table table, Predicate<Snapshot> predicate) {
Iterable<Snapshot> snapshots = table.snapshots();
return Iterables.tryFind(snapshots, predicate);
public static Optional<Snapshot> findFirstMatchSnapshot(
Table table, Predicate<Snapshot> predicate) {
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
Collections.reverse(snapshots);
return Optional.ofNullable(Iterables.tryFind(snapshots, predicate).orNull());
}

public static Set<String> getAllContentFilePath(Table internalTable) {
Expand Down
Loading