Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,9 @@ private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnap
LOG.info("Executing expire snapshots on iceberg table {} with {} threads", icebergTable.name(), numThreads);
deleteExecutorService = getDeleteExecutorService(icebergTable.name(), numThreads);
}
if (expireSnapshotsSpec.isExpireByTimestampRange()) {
if (expireSnapshotsSpec == null) {
expireSnapshotWithDefaultParams(icebergTable, deleteExecutorService);
} else if (expireSnapshotsSpec.isExpireByTimestampRange()) {
expireSnapshotByTimestampRange(icebergTable, expireSnapshotsSpec.getFromTimestampMillis(),
expireSnapshotsSpec.getTimestampMillis(), deleteExecutorService);
} else if (expireSnapshotsSpec.isExpireByIds()) {
Expand All @@ -911,6 +913,14 @@ private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnap
}
}

private void expireSnapshotWithDefaultParams(Table icebergTable, ExecutorService deleteExecutorService) {
ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();
if (deleteExecutorService != null) {
expireSnapshots.executeDeleteWith(deleteExecutorService);
}
expireSnapshots.commit();
}

private void expireSnapshotRetainLast(Table icebergTable, int numRetainLast, ExecutorService deleteExecutorService) {
ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();
expireSnapshots.retainLast(numRetainLast);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.junit.Test;

import static org.apache.iceberg.TableProperties.MAX_SNAPSHOT_AGE_MS;
import static org.apache.iceberg.TableProperties.MIN_SNAPSHOTS_TO_KEEP;

/**
* Tests covering the rollback feature
Expand Down Expand Up @@ -117,6 +118,34 @@ public void testExpireSnapshotsWithRetainLast() throws IOException, InterruptedE
Assert.assertEquals(5, IterableUtils.size(table.snapshots()));
}

@Test
public void testExpireSnapshotsWithDefaultParams() throws IOException, InterruptedException {
TableIdentifier identifier = TableIdentifier.of("default", "source");
Table table = testTables.createTableWithVersions(shell, identifier.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 10);
// No snapshot should expire, since the max snapshot age to expire is by default 5 days
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS RETAIN LAST 5");
table.refresh();
Assert.assertEquals(10, IterableUtils.size(table.snapshots()));

// Change max snapshot age to expire to 1 ms & min snapshots to keep as 3 & re-execute
shell.executeStatement(
"ALTER TABLE " + identifier.name() + " SET TBLPROPERTIES('" + MAX_SNAPSHOT_AGE_MS + "'='1'" + ",'" +
MIN_SNAPSHOTS_TO_KEEP + "'='3')");
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS");
table.refresh();
Assert.assertEquals(3, IterableUtils.size(table.snapshots()));

// Change the min snapshot to keep as 2
shell.executeStatement(
"ALTER TABLE " + identifier.name() + " SET TBLPROPERTIES('" + MIN_SNAPSHOTS_TO_KEEP + "'='2')");
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS");
table.refresh();
Assert.assertEquals(2, IterableUtils.size(table.snapshots()));

}

@Test
public void testDeleteOrphanFiles() throws IOException, InterruptedException {
TableIdentifier identifier = TableIdentifier.of("default", "source");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,8 @@ alterStatementSuffixExecute
@after { gParent.popMsg(state); }
: KW_EXECUTE KW_ROLLBACK LPAREN (rollbackParam=(StringLiteral | Number)) RPAREN
-> ^(TOK_ALTERTABLE_EXECUTE KW_ROLLBACK $rollbackParam)
| KW_EXECUTE KW_EXPIRE_SNAPSHOTS LPAREN (expireParam=StringLiteral) RPAREN
-> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $expireParam)
| KW_EXECUTE KW_EXPIRE_SNAPSHOTS (LPAREN (expireParam=StringLiteral) RPAREN)?
-> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $expireParam?)
| KW_EXECUTE KW_SET_CURRENT_SNAPSHOT LPAREN (snapshotParam=expression) RPAREN
-> ^(TOK_ALTERTABLE_EXECUTE KW_SET_CURRENT_SNAPSHOT $snapshotParam)
| KW_EXECUTE KW_FAST_FORWARD sourceBranch=StringLiteral (targetBranch=StringLiteral)?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ private static AlterTableExecuteDesc getSetCurrentSnapshotDesc(TableName tableNa
private static AlterTableExecuteDesc getExpireSnapshotDesc(TableName tableName, Map<String, String> partitionSpec,
List<Node> children) throws SemanticException {
AlterTableExecuteSpec<ExpireSnapshotsSpec> spec;

if (children.size() == 1) {
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, null);
return new AlterTableExecuteDesc(tableName, partitionSpec, spec);
}
ZoneId timeZone = SessionState.get() == null ?
new HiveConf().getLocalTimeZone() :
SessionState.get().getConf().getLocalTimeZone();
Expand Down