Skip to content

Commit

Permalink
Add implementation of getCurrentCommitState for Iceberg source (#129)
Browse files Browse the repository at this point in the history
Fixes #66
This change implements the getCurrentCommitState functionality in the Iceberg source. This feature finds all Iceberg snapshots committed after a given timestamp. In addition, the change also includes minor refactoring, renaming of pending commits, and fixes to code comments.
  • Loading branch information
ashvina authored Oct 30, 2023
1 parent cb191da commit f60e6d2
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 23 deletions.
14 changes: 10 additions & 4 deletions api/src/main/java/io/onetable/model/CurrentCommitState.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,19 @@
* Represents the current state of commits that are ready for immediate processing and syncing,
* while also tracking pending commits intended for future incremental syncs.
*
* <p>'commitsToProcess' captures commits that are should be processed and synced in the current
* round. 'pendingInstants' tracks instants that are pending at the start of the sync process and
* should be considered for future incremental syncs.
* <p>'commitsToProcess' captures commits that should be processed and synced in the current round.
* 'inFlightInstants' tracks instants that are pending at the start of the sync process and should
* be considered for future incremental syncs.
*/
@Value
@Builder
public class CurrentCommitState<COMMIT> {
@Builder.Default List<COMMIT> commitsToProcess = Collections.emptyList();
@Builder.Default List<Instant> pendingInstants = Collections.emptyList();

/**
* The instants of commits that were incomplete or pending at a given time. For e.g. the commits
* that were started but not completed when performing the sync. Tracking these commits is
* necessary to avoid missing commits in case of concurrent writers in Hudi.
*/
@Builder.Default List<Instant> inFlightInstants = Collections.emptyList();
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public IncrementalTableChanges extractTableChanges(
}
return IncrementalTableChanges.builder()
.tableChanges(tableChangeList)
.pendingCommits(currentCommitState.getPendingInstants())
.pendingCommits(currentCommitState.getInFlightInstants())
.build();
}
}
2 changes: 1 addition & 1 deletion core/src/main/java/io/onetable/client/OneTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private <COMMIT> SyncResultForTableFormats syncIncrementalChanges(
.collect(
Collectors.toMap(
Map.Entry::getKey, entry -> entry.getValue().getLastSyncInstant()));
// TODO: Simplify consolidation of storing lastInstant and pendingInstants together.
// TODO: Simplify consolidation of storing lastInstant and inFlightInstants together.
Map<TableFormat, List<Instant>> pendingInstantsToConsiderForNextSyncByFormat =
syncClientByFormat.entrySet().stream()
.collect(
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/onetable/hudi/HudiClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public CurrentCommitState<HoodieInstant> getCurrentCommitState(
commitsPair.getPendingCommits());
return CurrentCommitState.<HoodieInstant>builder()
.commitsToProcess(commitsToProcessNext)
.pendingInstants(pendingInstantsToProcessNext)
.inFlightInstants(pendingInstantsToProcessNext)
.build();
}

Expand Down
29 changes: 27 additions & 2 deletions core/src/main/java/io/onetable/iceberg/IcebergSourceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,32 @@ public TableChange getTableChangeForCommit(Snapshot snapshot) {

@Override
public CurrentCommitState<Snapshot> getCurrentCommitState(
InstantsForIncrementalSync instantsForIncrementalSync) {
return null;
InstantsForIncrementalSync lastSyncInstant) {

long epochMilli = lastSyncInstant.getLastSyncInstant().toEpochMilli();
Table iceTable = getSourceTable();

// There are two ways to fetch Iceberg table's change log; 1) fetch the history using .history()
// method and 2) fetch the snapshots using .snapshots() method and traverse the snapshots in
// reverse chronological order. The issue with #1 is that if transactions are involved, the
// history tracks only the last snapshot of a multi-snapshot transaction. As a result the
// timeline generated for sync would be incomplete. Hence, #2 is used.

Snapshot pendingSnapshot = iceTable.currentSnapshot();
if (pendingSnapshot.timestampMillis() <= epochMilli) {
// Even the latest snapshot was committed before the lastSyncInstant. No new commits were made
// and no new snapshots need to be synced. Return empty state.
return CurrentCommitState.<Snapshot>builder().build();
}

List<Snapshot> snapshots = new ArrayList<>();
while (pendingSnapshot != null && pendingSnapshot.timestampMillis() > epochMilli) {
snapshots.add(pendingSnapshot);
pendingSnapshot =
pendingSnapshot.parentId() != null ? iceTable.snapshot(pendingSnapshot.parentId()) : null;
}
// reverse the list to process the oldest snapshot first
Collections.reverse(snapshots);
return CurrentCommitState.<Snapshot>builder().commitsToProcess(snapshots).build();
}
}
80 changes: 66 additions & 14 deletions core/src/test/java/io/onetable/iceberg/TestIcebergSourceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand All @@ -47,9 +45,7 @@
import org.apache.iceberg.types.Types;

import io.onetable.client.PerTableConfig;
import io.onetable.model.OneSnapshot;
import io.onetable.model.OneTable;
import io.onetable.model.TableChange;
import io.onetable.model.*;
import io.onetable.model.schema.*;
import io.onetable.model.stat.Range;
import io.onetable.model.storage.FileFormat;
Expand Down Expand Up @@ -266,8 +262,68 @@ public void testGetTableChangeForCommit(@TempDir Path workingDir) throws IOExcep
Assertions.assertEquals(snapshot7, snapshot8);
}

private static long getDataFileCount(Table catalogSales) {
return StreamSupport.stream(catalogSales.newScan().planFiles().spliterator(), false).count();
@Test
public void testGetCurrentCommitState(@TempDir Path workingDir) throws IOException {
Table catalogSales = createTestTableWithData(workingDir.toString());
String tablePath = catalogSales.location();
Snapshot snapshot1 = catalogSales.currentSnapshot();

String dataFilePath = String.join("/", tablePath, "data", UUID.randomUUID() + ".parquet");
catalogSales
.newAppend()
.appendFile(generateTestDataFile(10, catalogSales, dataFilePath))
.commit();
Snapshot snapshot2 = catalogSales.currentSnapshot();

Transaction tx = catalogSales.newTransaction();
tx.newDelete().deleteFromRowFilter(Expressions.lessThan("cs_sold_date_sk", 3)).commit();
AppendFiles appendAction = tx.newAppend();
for (int partition = 6; partition < 7; partition++) {
dataFilePath = String.join("/", tablePath, "data", UUID.randomUUID() + ".parquet");
appendAction.appendFile(generateTestDataFile(partition, catalogSales, dataFilePath));
}
appendAction.commit();
tx.commitTransaction();
// the transaction would result in 2 snapshots, although 3a will not be in the history as only
// the last snapshot of a multi-snapshot transaction is tracked in history.
Snapshot snapshot3b = catalogSales.currentSnapshot();
Snapshot snapshot3a = catalogSales.snapshot(snapshot3b.parentId());

dataFilePath = String.join("/", tablePath, "data", UUID.randomUUID() + ".parquet");
catalogSales
.newAppend()
.appendFile(generateTestDataFile(11, catalogSales, dataFilePath))
.commit();
Snapshot snapshot4 = catalogSales.currentSnapshot();

validatePendingCommits(catalogSales, snapshot1, snapshot2, snapshot3a, snapshot3b, snapshot4);
validatePendingCommits(catalogSales, snapshot3a, snapshot3b, snapshot4);

// TODO this use case is invalid. If a snapshot in the middle of a chain is expired, the chain
// TODO in invalid. This should result in termination of incremental sync?
catalogSales.expireSnapshots().expireSnapshotId(snapshot2.snapshotId()).commit();
validatePendingCommits(catalogSales, snapshot1, snapshot3a, snapshot3b, snapshot4);
// TODO invalid use case below
// even though 3a, 3b belong to same transaction, one of the two can be expired
// catalogSales.expireSnapshots().expireSnapshotId(snapshot3a.snapshotId()).commit();
// validatePendingCommits(catalogSales, snapshot1, snapshot2, snapshot3b, snapshot4);
}

private void validatePendingCommits(Table table, Snapshot lastSync, Snapshot... snapshots) {
InstantsForIncrementalSync instant =
InstantsForIncrementalSync.builder()
.lastSyncInstant(Instant.ofEpochMilli(lastSync.timestampMillis()))
.build();
IcebergSourceClient sourceClient = getIcebergSourceClient(table);
CurrentCommitState<Snapshot> toBeProcessed = sourceClient.getCurrentCommitState(instant);
Assertions.assertEquals(0, toBeProcessed.getInFlightInstants().size());
Assertions.assertArrayEquals(snapshots, toBeProcessed.getCommitsToProcess().toArray());
}

private static long getDataFileCount(Table catalogSales) throws IOException {
try (CloseableIterable<FileScanTask> files = catalogSales.newScan().planFiles()) {
return StreamSupport.stream(files.spliterator(), false).count();
}
}

private void validateTableChangeDiffSize(
Expand All @@ -293,11 +349,7 @@ private void validateSchema(OneSchema readSchema, Schema expectedSchema) {
Assertions.assertEquals(expectedField.fieldId(), column.fieldId());
Assertions.assertEquals(expectedField.type(), column.type());
Assertions.assertEquals(expectedField.isOptional(), column.isOptional());

// TODO: fix this
// Assertions.assertEquals(expectedField.doc(), column.doc());
// Assertions.assertEquals(expectedField.getOrdinal(), column.getOrdinal());
// Assertions.assertEquals(expectedField.getTransform(), column.getTransform());
Assertions.assertEquals(expectedField.doc(), column.doc());
}
}

Expand Down

0 comments on commit f60e6d2

Please sign in to comment.