Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Data Capture(CDC)[Draft] #4539

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open

Change Data Capture(CDC)[Draft] #4539

wants to merge 14 commits into from

Conversation

flyrain
Copy link
Contributor

@flyrain flyrain commented Apr 11, 2022

The draft PR for change data capture. It largely aligns with the MVP we discussed in the design doc and mail list.

  1. To emit delete and insert CDC records only
  2. Create a Spark action for CDC generation
  3. Leverage the _deleted metadata column for both pos deletes and eq deletes. Both deleted(pos and eq) rows are in the same format.
  4. For row-level deletes, it supports non-vectorized read and parquet vectorized read.

It is a draft PR though. So there are limitations

  1. Multiple optimization can be done, for example, meta column _deleted pushdown.
  2. Need to expand interface to support query by timestamp besides snapshot ids.
  3. More test cases

Happy to take feedbacks and file the formal PRs.
cc @aokolnychyi @RussellSpitzer @szehon-ho @jackye1995 @kbendick @karuppayya @chenjunjiedada @stevenzwu @rdblue

/**
* Instantiates an action to generate CDC records.
*/
default Cdc generateCdcRecords(Table table) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Consider using CDC as all caps instead, like it is in the javadoc comments. For me, it looks a lot cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also open to other names, e.g., ChangeDataSet, ChangeDataCapture

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Flink it's referred to as Changelog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should not use acronyms.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree that using a full word would be better.

In comments and even method names it's fine in my opinion but as the main class name it probably would be best to use the full name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for Changelog. Here, it means generateChangelog

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am OK with generateChangelog.

Copy link
Contributor

@stevenzwu stevenzwu Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or generateChangeSet, since action is a batch execution. if it is a long-running streaming execution, changelog would be more accurate as it implies a stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked generateChangelog but if it can confuse people, generateChangeSet sounds good too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combining feedbacks, changed it to GetChangeSet. The name GenerateChangeSet is good, but it is way too long. Think about the class name BaseGenerateChangeSetSparkActionResult. I admit the verb get is plain comparing to generate. But I think it is fine, a plain name is suitable for a tool.

/**
* Instantiates an action to generate CDC records.
*/
default Cdc generateCdcRecords(Table table) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should not use acronyms.

Comment on lines 191 to 194
ManifestGroup manifestGroup = new ManifestGroup(table.io(), snapshot.dataManifests(), snapshot.deleteManifests())
.filterData(filter)
.ignoreAdded()
.specsById(((HasTableOperations) table).operations().current().specsById());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] should we also set caseSensitive ? It's used in ResidualEvaluator when we planFiles

Copy link
Contributor Author

@flyrain flyrain Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

caseSensitive is not necessary here.

return null;
}

String groupID = UUID.randomUUID().toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we give it a prefix relevant to data scanned as well , something like
readRowLevelDeletes-{UUID}

Copy link
Contributor Author

@flyrain flyrain Apr 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is for easier debugging. We probably don't do that since this uuid is used totally internally, it won't be in any log or UI.

core/src/main/java/org/apache/iceberg/ManifestGroup.java Outdated Show resolved Hide resolved
Comment on lines 84 to 86
for (int i = 0; i < snapshotIds.size(); i++) {
generateCdcRecordsPerSnapshot(snapshotIds.get(i), i);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] how about doing this in separate threads ? (may be later iteration of PR)

Copy link
Contributor

@singhpk234 singhpk234 Apr 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

df's would be computed lazily, though there can be some value that MT could bring in constructing DF's (as we would be calling planFiles for each of them). Your thoughts ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can plan them in parallel if the planning perf is an issue. It'd be especially usefully in case of a big table and/or a big number of snapshots.

}

for (int i = 0; i < snapshotIds.size(); i++) {
generateCdcRecordsPerSnapshot(snapshotIds.get(i), i);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can describe commit order use file sequence number.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it is a part of the design. I simplified it in the draft. Will add it later. To be clear, v1 doesn't have seq number, we will give a zero-based order.

Comment on lines 105 to 122
// new data file as the insert
Dataset<Row> df = readAppendDataFiles(snapshotId, commitOrder);
if (df != null) {
dfs.add(df);
}

// metadata deleted data files
Dataset<Row> deletedDf = readMetadataDeletedFiles(snapshotId, commitOrder);
if (deletedDf != null) {
dfs.add(deletedDf);
}

// pos and eq deletes
Dataset<Row> rowLevelDeleteDf = readRowLevelDeletes(snapshotId, commitOrder);
if (rowLevelDeleteDf != null) {
dfs.add(rowLevelDeleteDf);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there dfs should be first add deleteDf and then add appendDataDf. Because delete data are apply to old snapshots. For example:
table xxx (id,data) primary key is id.
snap 1 ->insert (1,aa)
snap 2 ->delete(1,aa) insert(1,bb)
If there we deal snap 2 that add insert (1,bb) first delete (1,aa) second in dfs(dfs is aLinkedList), the final result we get in downstream is delete primary key id=1.

Copy link
Contributor Author

@flyrain flyrain Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can definitely make the change. Although, I don't think it makes any difference. Basically, these cdc records(e.g. 1, aa, D, 1, bb, I ) within a snapshot doesn't have any order. Please check the discussion in the design doc. We should treat them same in term of order while ingesting them into downstream. Here, we cannot delete the row (1, aa) by just checking its id, instead, we may delete it by checking both values.

Copy link
Contributor

@stevenzwu stevenzwu Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @hameizi on emitting eq deletes before the append, as eq deletes are applied to the previous snapshots. We should apply/emit eq deletes before the appended rows in the current snapshot.

I also left a comment regarding the pos deletes in issue #3941 . I am not sure we need to emit pos deletes, as they are applied as delete filter for the inserted rows in the current snapshot already. It is like they are squashed away already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with that, will make the change.

For pos deletes, there are two scenarios:

  1. It deletes rows from the previous snapshots. We need to emit them as other deleted rows.
  2. It deletes rows within the same snapshots. We can squash it by default. I use a flag ignoreRowsDeletedWithinSnapshot to control that in case people don't want to squash it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that pos deletes are only applicable to files appended in the same snapshot and eq deletes are only applied to the previous snapshots.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark does, and it doesn't use eq delete at all. In case of merge, delete, update, Spark uses pos deletes to remove rows from previous snapshots.


package org.apache.iceberg.actions;

public interface Cdc extends Action<Cdc, Cdc.Result> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an action? Action typically modify the table like rewrite, expire snapshots.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is. Action usually modifies table, but is not necessarily limited by that. This is the first PR, we will also explore a way to use scan for CDC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the existing actions in Iceberg seem to modify the table. Since this is a public API, I like to double check. Can we keep it in SparkActions only for now if we are in experimental phase?

Conceptually, this is a scan/read (not a maintenance action).

Copy link
Contributor Author

@flyrain flyrain Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. This is the start point. As I said in another comment, we need a well designed scan interface first.

Copy link
Contributor

@aokolnychyi aokolnychyi Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't mind having an action like this. We have RemoveOrphanFiles that does not modify the table state, for instance.

I would match the naming style we have in other actions, though. I think it can be GenerateChangelog as all other actions start with a verb.

Copy link
Contributor

@stevenzwu stevenzwu Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RemoveOrphanFiles is also a maintenance action on the table. Agree that technically it doesn't modify the table state. But it is a garbage collection action for the tables. Those orphaned files may be part of the table before but got unreferenced due to compaction or retention purge. Or those orphaned files were intended to be added to the table but got aborted in the middle. To me, it is a modification of the broader env/infra of the table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primary goal of adding the Action API was to provide scalable and ready-to-use recipes for common scenarios. I am not sure we should strictly restrict the usage to maintenance.

That being said, I'd be happy to discuss alternative ways to expose this.

@@ -41,7 +41,7 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ParallelIterable;

class ManifestGroup {
public class ManifestGroup {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use scan API, we may not need to expose ManifestGroup as public

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be awesome. We have to design the scan API nicely though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about extending TableScan (can be convinced) but we can definitely create a utility class in core that will internally use ManifestGroup. I don' think we should expose this class. Maybe, it is safer to start with a utility and then see if it is something that can be part of TableScan.

Copy link
Contributor

@stevenzwu stevenzwu Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi we are not talking about extending TableScan. We are discussing introducing new scan interfaces to support incremental scan (appends only and CDC). Please see PR 4580 : #4580 (comment)

}

PositionSetDeleteMarker<T> deleteMarker = new PositionSetDeleteMarker<>(rowToPosition, deleteSet, markDeleted);
return deleteMarker.filter(rows);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since PositionSetDeleteMarker always return true, we are not actually doing filter. it seems that we are mainly leverage the filter to traverse the iterable and get the side effect of calling markDeleted with matched rows. The semantics is a little odd to me. Maybe we don't need to introduce PositionSetDeleteMarker filter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just use this method from CloseableIterable to traverse? it returns the same object after applying the Consumer function.

  static <I, O> CloseableIterable<O> transform(CloseableIterable<I> iterable, Function<I, O> transform) {

Copy link
Collaborator

@chenjunjiedada chenjunjiedada Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it traverses the rows and adds is_deleted value to the row. +1 to use transform from ClosableIterable. The Filter interface makes it hard to understand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe we can add a new interface called Marker to marker these rows. I think it will be easier to understand than using CloseableIterable.

};
}

protected void markRowDeleted(T item) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this just be an abstract method and force impl classes to implement this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, it needs changes for all subclass of DeleteFilter. This is PR is already big. Maybe we should split it. For example, we can have a separated PR to read deleted rows in case of row-level change. cc @chenjunjiedada

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous PR which read the deleted rows has such implementation, +1 to use another PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for use abstract method and force impl this. And I think the changes of Deletes and the DeleteFilter should implement in other separated PR, because implement read cdc for Flink will need this changes too.

}

@Override
public Object cdcRecords() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe make Cdc.Result a generic class to get type info?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, will make the change.

"The fromSnapshot(%s) is not an ancestor of the toSnapshot(%s)", fromSnapshotId, toSnapshotId);

snapshotIds.clear();
// include the fromSnapshotId
Copy link
Contributor

@stevenzwu stevenzwu Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exclusive behavior for fromSnapshotId might be easier to work with . Assuming last enumerated position (toSnapshotId) needs to be saved somewhere. Next run could just set the fromSnapshotId to the saved position.

I believe that is the intention why TableScan#appendsBetween has the semantic of (fromSnapshotId, toSnapshotId].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. One concern is that it cannot get the first snapshot of a table. Two solutions:

  1. Add a new interface like toSnapshot(), which compute all snapshots to the toSnapshot.
  2. Add a overwrite method between(long fromSnapshotId, long toSnapshotId, boolean includeFromSnapshotId)
    What do you think?

Copy link
Contributor

@stevenzwu stevenzwu Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean "get the first snapshot of a table"?

Add a new interface like toSnapshot(), which compute all snapshots to the toSnapshot.

In PR #4580, This is covered if fromSnapshotId is not set, which means null is used. Then IncrementalScan would process all snapshots up to the toSnapshotId. As if we trace the ancestor chain of toSnapshotId, we will eventually trace to null snapshotId and stop.

Add a overwrite method between(long fromSnapshotId, long toSnapshotId, boolean includeFromSnapshotId)

This API is not not necessary. we can just set the fromSnapshotId to the parent snapshot id if we want the inclusive behavior for the very first scan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null can solve the issue. However, I was trying to avoid it since it doesn't have a clear meaning to users.

  1. Users don't know they can input a null.
  2. Users don't know what will happen when they input a null.

So users have to rely on the doc to figure that out. I'd suggest to provide the new interface beforeSnapshot() or toSnapshot(). It did the same thing under the hood, but more meaningful to users. Another benefit is that it'd be logically complete since @aokolnychyi also proposed the method afterSnapshot() in here, #4539 (comment).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. @rdblue also made a similar suggestion: #4580 (comment). We will go with fromSnapshotInclusive and fromSnapshotExclusive in the new increment scan interface

return new BaseCdcSparkActionResult(outputDf);
}

private void generateCdcRecordsPerSnapshot(long snapshotId, int commitOrder) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe return the Datasets from this method (instead of updating a class variable). then outside can do the union. E.g. use the reduce method from Java stream API.

generateCdcRecordsPerSnapshot(snapshotIds.get(i), i);
}

Dataset<Row> outputDf = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use unionAll?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expected the schema of these datasets are the same. There are no difference between unionAll and unionByName. BTW, unionAll only union one dataset, not a list of datasets,

}

for (int i = 0; i < snapshotIds.size(); i++) {
generateCdcRecordsPerSnapshot(snapshotIds.get(i), i);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We pass the index of the snapshot list as commitOrder. What is the intended usage of commitOrder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


import static org.apache.spark.sql.functions.lit;

public class BaseCdcSparkAction extends BaseSparkAction<Cdc, Cdc.Result> implements Cdc {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A high level question. since this algorithm doesn't guarantee ordering within a snapshot (for a good reason), does that mean we can only have parallel executors if it only scans one snapshot? If this scans multiple snapshots, parallel executors could mess up the ordering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We got the metadata column commit order to indicate the order of cdc records from multiple snapshots. The order won't be messed up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does that mean we can only have parallel executors if it only scans one snapshot? If this scans multiple snapshots, parallel executors could mess up the ordering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, they won't. The order will be kept by the metadata column _commit_order for multiple snapshots in terms of parallel executors. Each snapshot will get a unique _commit_order for its change set no matter how parallel the data is read.

return withCdcColumns(scanDF, snapshotId, "I", commitOrder);
}

private List<FileScanTask> planAppendedFiles(long snapshotId) {
Copy link
Contributor

@stevenzwu stevenzwu Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, this is how this implementation discovers files appended in the specified snapshot.

  • discover all files reachable by the snapshotId via time travel.
  • found all files added in this snapshot.
  • filter the files discovered during the first step with the appended data files discovered in second step

This method probably can be simplified. We can use TableScan#appendsBetween(parentSnapshotId, snapshotId)#planFiles to get the FileScanTask collection.

Then we can apply the ignoreRowsDeletedWithinSnapshot transformation if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, let me make the change


private Dataset<Row> withCdcColumns(Dataset<Row> df, long snapshotId, String cdcType, int commitOrder) {
return df.withColumn(RECORD_TYPE, lit(cdcType))
.withColumn(COMMIT_SNAPSHOT_ID, lit(snapshotId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are the intended usage of those 3 commit related metadata columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RECORD_TYPE indicates the change types.(I, D, -U, +U)
COMMIT_SNAPSHOT_ID indicates which snapshot makes the change record.
COMMIT_TIMESTAMP indicates when the change happend.
COMMIT_ORDER indicates change order in case of multiple snapshots.

}

@Test
public void testMergeWithOnlyUpdateClause() throws ParseException, NoSuchTableException {
Copy link
Contributor Author

@flyrain flyrain Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu, here is an example that Spark uses pos deletes in the merge command.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you say8ing Spark merge into writes pos deletes? if so, it doesn't answer my question why do we need to emit pos deletes, as pos deletes are already applied as delete filters when emit insert records for the same snapshot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned here, #4539 (comment), we only apply the pos deletes for data files generated in the same snapshot. We still emit pos deletes when they delete rows from previous snapshots.

* @param snapshotId id of the snapshot to generate changed data
* @return this for method chaining
*/
Cdc ofSnapshot(long snapshotId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decide to include a verb in the action name, I'd also consider renaming these methods like this:

// changes for a specific snapshot
GenerateChangelog forSnapshot(long snapshotId);

// changes starting from a particular snapshot (exclusive)
GenerateChangelog afterSnapshot(long fromSnapshotId);

// changes from start snapshot (exclusive) to end snapshot (inclusive)
GenerateChangelog betweenSnapshots(long fromSnapshotId, long toSnapshotId);

*
* @return this for method chaining
*/
Cdc ofCurrentSnapshot();
Copy link
Contributor

@aokolnychyi aokolnychyi Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case will this method be useful? I guess consumers will probably always want to consume from a particular point in time so I am not sure CDC records for a current snapshot would be very helpful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. for incremental read, we need to bookkeep the position. implicit current snapshot would make the bookkeeping impossible and hence is probably not a valid use case

* A Dummy Vector Reader which doesn't actually read files, instead it returns a dummy
* VectorHolder which indicates whether the row is deleted.
*/
public static class DeletedVectorReader extends VectorizedArrowReader {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A basic question, do we need to enable vectorization to read CDC? What about the non-vectorized reader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vectorized read is significant faster than non-vectorized read, check benchmark in #3287. We should have it. Non-vectorized read is handled by the change I made in class RowDataReader, DeleteFilter and Deletes.

@@ -226,10 +266,14 @@ private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {

// if there are fewer deletes than a reasonable number to keep in memory, use a set
if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
return Deletes.filter(records, this::pos, Deletes.toPositionIndex(filePath, deletes));
return hasMetadataColumnIsDeleted ?
Deletes.marker(records, this::pos, Deletes.toPositionIndex(filePath, deletes), this::markRowDeleted) :
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should abstract these later.

if (needWrapDeleteColumn) {
    Deletes.map(records, this::pos, Deletes.toPositionoIndex(filePath, deletes), this::markRowDeleted);
} else {
    Deletes.filter(..)
}

What about adding a class DeleteStream that supports filter and map.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DeleteStream contains the filter logics in this DeleteFilter also contains transforming logics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can definitely refactor here. I've got some ideas in my mind. Trying to understand your point here. Do you mean a class DeleteStream combine both functionality from PositionSetDeleteFilter and PositionSetDeleteMarker? I will test these ideas anyway. Thanks for the suggestion.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, like the Java steam API.

@@ -70,8 +70,7 @@ public class TestSparkParquetReadMetadataColumns {
private static final Schema PROJECTION_SCHEMA = new Schema(
required(100, "id", Types.LongType.get()),
required(101, "data", Types.StringType.get()),
MetadataColumns.ROW_POSITION,
MetadataColumns.IS_DELETED
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we change these unit tests?

Copy link
Contributor Author

@flyrain flyrain Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some tests(e.g., testReadRowNumbersWithDelete) failed due to it uses the PROJECTION_SCHEMA, which contains the column IS_DELETED. So the row count was wrong since it still assumes only undeleted rows are there, with the new logic, both delete and undeleted rows are there.
To fix it, we can either remove the column IS_DELETED so that deleted rows won't be in the results, or change the test case to filter out the deleted rows in the results. I choose the former so that it can fix multiple failures at once.

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a review of the action. I think it shows the algorithm we discussed in the issue can be implemented fairly easy. I'd say supporting is_deleted is actually more challenging.

@flyrain, what about creating a separate PR for is_deleted? I think these two are independent. We can approach vectorized and non-vectorized readers separately.

/**
* Instantiates an action to generate a change data set.
*/
default GetChangeSet getChangeSet(Table table) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For other reviewers: we had a discussion on the name in this thread.

I understand the concern about long class names but getXXX usually indicates something already exists and we simply return it. In this case, though, we actually perform quite some computation to build/generate a set of changes. Using generate or build would consume just a few extra chars but will be more descriptive, in my view. All public methods and classes will be short enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anton made a good argument here. generate is not much longer

*
* @return this for method chaining
*/
GetChangeSet forCurrentSnapshot();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu and I had a short discussion on whether this method is useful. @flyrain, do you have a valid use case for this method in mind?

/**
* Returns the change set.
*/
Object changeSet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may want to parameterize this and use Dataset<Row> in Spark instead of plain Object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made it return a generic type.

@@ -41,7 +41,7 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ParallelIterable;

class ManifestGroup {
public class ManifestGroup {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about extending TableScan (can be convinced) but we can definitely create a utility class in core that will internally use ManifestGroup. I don' think we should expose this class. Maybe, it is safer to start with a utility and then see if it is something that can be part of TableScan.

public static final String COMMIT_TIMESTAMP = "_commit_timestamp";
public static final String COMMIT_ORDER = "_commit_order";

private final List<Long> snapshotIds = Lists.newLinkedList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While using a single snapshot ID list is convenient, the downside is that we can't validate whether there are conflicting calls to betweenSnapshots, forSnapshot, etc. In SparkScanBuilder and BaseTableScan, we took a different approach.

Copy link
Contributor Author

@flyrain flyrain Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider them incompatible, right? For example, if you do

getChangeSet(table).betweenSnapshots(s1, s2).forCurrentSnapshot(s3).execute()

It should only honor the later one, which is forCurrentSnapshot(s3). If you flip the order like this

getChangeSet(table).forCurrentSnapshot(s3).betweenSnapshots(s1, s2).execute()

It should only pick up snapshots between s1 and s2.
The current logic support this logic already. Basically, it cleans up the lists every time before adding any snapshot id into it.


String groupID = UUID.randomUUID().toString();
FileScanTaskSetManager manager = FileScanTaskSetManager.get();
manager.stageTasks(table, groupID, fileScanTasks);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We stage tasks but we never invalidate them. This will cause a memory leak. It will be tricky to invalidate them as we don't know whether it is already safe to do so. We can instruct the manager to remove tasks after the first access or make the action result closable. Let's think what will be best.

}

private List<FileScanTask> planAppendedFiles(long snapshotId) {
CloseableIterable<FileScanTask> fileScanTasks = table.newScan()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a full table scan. If you use a utility with ManifestGroup, you would be able to get a much better performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I made a similar comment here: #4539 (comment)

fileScanTasks.forEach(fileScanTask -> {
if (fileScanTask.file().content().equals(FileContent.DATA) && dataFiles.contains(fileScanTask.file().path())) {
FileScanTask newFileScanTask = fileScanTask;
if (!ignoreRowsDeletedWithinSnapshot) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is correct. Records added and removed in the same snapshot must have a lesser commit order compared to all other records added in that snapshot. I'd not add this functionality to start with.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, order is tricky when we output both deletes and inserts. I will remove the option to do that. It always applies the deletes within the same snapshot.


private List<FileScanTask> planMetadataDeletedFiles(long snapshotId) {
Snapshot snapshot = table.snapshot(snapshotId);
ManifestGroup manifestGroup = new ManifestGroup(table.io(), snapshot.dataManifests(), snapshot.deleteManifests())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to access all data manifests? Aren't we interested only in data manifests added in this snapshot? Do we also need delete manifests for this?

ManifestGroup manifestGroup = new ManifestGroup(table.io(), snapshot.dataManifests(), manifestFiles)
.filterData(filter)
.onlyWithRowLevelDeletes()
.specsById(((HasTableOperations) table).operations().current().specsById());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why not use table.specs()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will make the change. Double checked, they point to the same object.

}

PositionSetDeleteMarker<T> deleteMarker = new PositionSetDeleteMarker<>(rowToPosition, deleteSet, markDeleted);
return deleteMarker.filter(rows);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe we can add a new interface called Marker to marker these rows. I think it will be easier to understand than using CloseableIterable.

@@ -185,14 +204,35 @@ private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
.reduce(Predicate::and)
.orElse(t -> true);

Filter<T> remainingRowsFilter = new Filter<T>() {
Filter<T> remainingRowsFilter = hasMetadataColumnIsDeleted ? getMarker(remainingRows) : getFilter(remainingRows);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we shouldn't use this implicit way to decide whether to keep deleted records. We can add the _delete column by default and use different methods to decide whether to keep the deleted records, and then use different methods to get the required data in different situations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think the delete logic is cleaner in that way. We are more or less in that direction.

};
}

protected void markRowDeleted(T item) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for use abstract method and force impl this. And I think the changes of Deletes and the DeleteFilter should implement in other separated PR, because implement read cdc for Flink will need this changes too.

@flyrain
Copy link
Contributor Author

flyrain commented Jan 12, 2023

There will be no update on this PR. Please check the latest progress here, https://github.com/apache/iceberg/projects/26.

GenerateChangeSet forCurrentSnapshot();

/**
* Emit changed data from a particular snapshot(exclusive).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "from" here is ambiguous, probably "since" is a bit closer?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

10 participants