Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minor change iceberg source interface #178

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions core/src/main/java/io/onetable/iceberg/IcebergSourceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@

@Log4j2
@Builder
public class IcebergSourceClient implements SourceClient<Snapshot> {
public class IcebergSourceClient implements SourceClient<Long> {
Copy link
Contributor

@ashvina ashvina Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is a bit confusing. Why is a generic required in the contract if all commits have to be identified by a long id. If the justification is alignment with delta and hudi, then all future sources will need to ensure a long id for commit is available. IMO, a snapshot is a cleaner and less ambiguous representation for a source client. Moreover, just because serialization is efficient with a long, the entire contract should not be changed.
I think such changes should have more compelling reasons.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is allowing us to move towards your suggestion here: #135 - Maybe we have misunderstood.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashvina The Generics is still required because for Hudi it is HoodieInstant, For Delta it is Long in the current state. The entity in the generics semantically represents a way to get source table state at that point. If possible it is better to choose a leaner entity(We can add to docs) and snapshot Id for Iceberg fits this.

Copy link
Contributor

@ashvina ashvina Nov 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be careful not to rush this change. These abstractions are fundamental to OneTable and will affect its future extensibility. My main concern is that we are altering the model because of how one of the components behaves, namely the serialization of the last and pending commits. Let me know if I misunderstood this.
In my opinion, we need a new representation for commits, OneCommit. Just as a table's state is mapped to OneTable, a source client implementation should map the metadata of a table format's commit to OneCommit.
Regarding the tracking of the commit backlog, if OneCommit is available, the serializer or any other component can decide independently what details are best and necessary to serialize. This should cleanly decouple the client's implementation and the behavior of core OneTable components.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's work through the round trip conversion testing to see where that leads us. I think that will allow us to have a better vision for where we should be going with respect to the commit representation and serialization

@NonNull private final Configuration hadoopConf;
@NonNull private final PerTableConfig sourceTableConfig;

Expand Down Expand Up @@ -89,8 +89,9 @@ private FileIO initTableOps() {
}

@Override
public OneTable getTable(Snapshot snapshot) {
public OneTable getTable(Long snapshotId) {
Table iceTable = getSourceTable();
Snapshot snapshot = iceTable.snapshot(snapshotId);

Schema iceSchema = iceTable.schemas().get(snapshot.schemaId());
IcebergSchemaExtractor schemaExtractor = IcebergSchemaExtractor.getInstance();
Expand All @@ -113,8 +114,9 @@ public OneTable getTable(Snapshot snapshot) {
}

@Override
public SchemaCatalog getSchemaCatalog(OneTable table, Snapshot snapshot) {
public SchemaCatalog getSchemaCatalog(OneTable table, Long snapshotId) {
Table iceTable = getSourceTable();
Snapshot snapshot = iceTable.snapshot(snapshotId);
Integer iceSchemaId = snapshot.schemaId();
Schema iceSchema = iceTable.schemas().get(iceSchemaId);
IcebergSchemaExtractor schemaExtractor = IcebergSchemaExtractor.getInstance();
Expand All @@ -129,8 +131,8 @@ public OneSnapshot getCurrentSnapshot() {
Table iceTable = getSourceTable();

Snapshot currentSnapshot = iceTable.currentSnapshot();
OneTable irTable = getTable(currentSnapshot);
SchemaCatalog schemaCatalog = getSchemaCatalog(irTable, currentSnapshot);
OneTable irTable = getTable(currentSnapshot.snapshotId());
SchemaCatalog schemaCatalog = getSchemaCatalog(irTable, currentSnapshot.snapshotId());

TableScan scan = iceTable.newScan().useSnapshot(currentSnapshot.snapshotId());
PartitionSpec partitionSpec = iceTable.spec();
Expand Down Expand Up @@ -162,11 +164,12 @@ private OneDataFile fromIceberg(DataFile file, PartitionSpec partitionSpec, OneT
}

@Override
public TableChange getTableChangeForCommit(Snapshot snapshot) {
public TableChange getTableChangeForCommit(Long snapshotId) {
FileIO fileIO = getTableOps();
Table iceTable = getSourceTable();
PartitionSpec partitionSpec = iceTable.spec();
OneTable irTable = getTable(snapshot);
Snapshot snapshot = iceTable.snapshot(snapshotId);
OneTable irTable = getTable(snapshotId);

Set<OneDataFile> dataFilesAdded =
StreamSupport.stream(snapshot.addedDataFiles(fileIO).spliterator(), false)
Expand All @@ -184,12 +187,12 @@ public TableChange getTableChangeForCommit(Snapshot snapshot) {
.filesRemoved(dataFilesRemoved)
.build();

OneTable table = getTable(snapshot);
OneTable table = getTable(snapshot.snapshotId());
return TableChange.builder().tableAsOfChange(table).filesDiff(filesDiff).build();
}

@Override
public CommitsBacklog<Snapshot> getCommitsBacklog(InstantsForIncrementalSync lastSyncInstant) {
public CommitsBacklog<Long> getCommitsBacklog(InstantsForIncrementalSync lastSyncInstant) {

long epochMilli = lastSyncInstant.getLastSyncInstant().toEpochMilli();
Table iceTable = getSourceTable();
Expand All @@ -204,18 +207,18 @@ public CommitsBacklog<Snapshot> getCommitsBacklog(InstantsForIncrementalSync las
if (pendingSnapshot.timestampMillis() <= epochMilli) {
// Even the latest snapshot was committed before the lastSyncInstant. No new commits were made
// and no new snapshots need to be synced. Return empty state.
return CommitsBacklog.<Snapshot>builder().build();
return CommitsBacklog.<Long>builder().build();
}

List<Snapshot> snapshots = new ArrayList<>();
List<Long> snapshotIds = new ArrayList<>();
while (pendingSnapshot != null && pendingSnapshot.timestampMillis() > epochMilli) {
snapshots.add(pendingSnapshot);
snapshotIds.add(pendingSnapshot.snapshotId());
pendingSnapshot =
pendingSnapshot.parentId() != null ? iceTable.snapshot(pendingSnapshot.parentId()) : null;
}
// reverse the list to process the oldest snapshot first
Collections.reverse(snapshots);
return CommitsBacklog.<Snapshot>builder().commitsToProcess(snapshots).build();
Collections.reverse(snapshotIds);
return CommitsBacklog.<Long>builder().commitsToProcess(snapshotIds).build();
}

// TODO(https://github.com/onetable-io/onetable/issues/147): Handle this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@

package io.onetable.iceberg;

import org.apache.iceberg.Snapshot;

import io.onetable.client.PerTableConfig;
import io.onetable.client.SourceClientProvider;

/** A concrete implementation of {@link SourceClientProvider} for Hudi table format. */
public class IcebergSourceClientProvider extends SourceClientProvider<Snapshot> {
public class IcebergSourceClientProvider extends SourceClientProvider<Long> {
@Override
public IcebergSourceClient getSourceClientInstance(PerTableConfig sourceTableConfig) {
return IcebergSourceClient.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void getTableTest(@TempDir Path workingDir) throws IOException {
IcebergSourceClient client = clientProvider.getSourceClientInstance(sourceTableConfig);

Snapshot snapshot = catalogSales.currentSnapshot();
OneTable oneTable = client.getTable(snapshot);
OneTable oneTable = client.getTable(snapshot.snapshotId());
Assertions.assertNotNull(oneTable);
Assertions.assertEquals(TableFormat.ICEBERG, oneTable.getTableFormat());
Assertions.assertTrue(oneTable.getName().endsWith("catalog_sales"));
Expand Down Expand Up @@ -121,7 +121,7 @@ public void getSchemaCatalogTest(@TempDir Path workingDir) throws IOException {
IcebergSourceClient client = clientProvider.getSourceClientInstance(sourceTableConfig);
IcebergSourceClient spyClient = spy(client);

SchemaCatalog schemaCatalog = spyClient.getSchemaCatalog(null, iceCurrentSnapshot);
SchemaCatalog schemaCatalog = spyClient.getSchemaCatalog(null, iceCurrentSnapshot.snapshotId());
Assertions.assertNotNull(schemaCatalog);
Map<SchemaVersion, OneSchema> schemas = schemaCatalog.getSchemas();
Assertions.assertEquals(1, schemas.size());
Expand Down Expand Up @@ -156,8 +156,9 @@ public void testGetCurrentSnapshot(@TempDir Path workingDir) throws IOException
Assertions.assertEquals(
String.valueOf(iceCurrentSnapshot.snapshotId()), oneSnapshot.getVersion());
Assertions.assertNotNull(oneSnapshot.getTable());
verify(spyClient, times(1)).getTable(iceCurrentSnapshot);
verify(spyClient, times(1)).getSchemaCatalog(oneSnapshot.getTable(), iceCurrentSnapshot);
verify(spyClient, times(1)).getTable(iceCurrentSnapshot.snapshotId());
verify(spyClient, times(1))
.getSchemaCatalog(oneSnapshot.getTable(), iceCurrentSnapshot.snapshotId());
verify(spyPartitionConverter, times(5)).toOneTable(any(), any(), any());
verify(spyDataFileExtractor, times(5)).fromIceberg(any(), any(), any());

Expand Down Expand Up @@ -315,9 +316,11 @@ private void validatePendingCommits(Table table, Snapshot lastSync, Snapshot...
.lastSyncInstant(Instant.ofEpochMilli(lastSync.timestampMillis()))
.build();
IcebergSourceClient sourceClient = getIcebergSourceClient(table);
CommitsBacklog<Snapshot> commitsBacklog = sourceClient.getCommitsBacklog(instant);
CommitsBacklog<Long> commitsBacklog = sourceClient.getCommitsBacklog(instant);
Assertions.assertEquals(0, commitsBacklog.getInFlightInstants().size());
Assertions.assertArrayEquals(snapshots, commitsBacklog.getCommitsToProcess().toArray());
Long[] snapshotIds =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just use a list here?

Arrays.stream(snapshots).map(snapshot -> snapshot.snapshotId()).toArray(Long[]::new);
Assertions.assertArrayEquals(snapshotIds, commitsBacklog.getCommitsToProcess().toArray());
}

private static long getDataFileCount(Table catalogSales) throws IOException {
Expand All @@ -329,7 +332,7 @@ private static long getDataFileCount(Table catalogSales) throws IOException {
private void validateTableChangeDiffSize(
Table table, Snapshot snapshot, int addedFiles, int removedFiles) {
IcebergSourceClient sourceClient = getIcebergSourceClient(table);
TableChange tableChange = sourceClient.getTableChangeForCommit(snapshot);
TableChange tableChange = sourceClient.getTableChangeForCommit(snapshot.snapshotId());
Assertions.assertEquals(addedFiles, tableChange.getFilesDiff().getFilesAdded().size());
Assertions.assertEquals(removedFiles, tableChange.getFilesDiff().getFilesRemoved().size());
}
Expand Down