Skip to content

Core: Add BaseDeltaWriter#1888

Merged
rdblue merged 9 commits intoapache:masterfrom
openinx:delta-writer
Dec 11, 2020
Merged

Core: Add BaseDeltaWriter#1888
rdblue merged 9 commits intoapache:masterfrom
openinx:delta-writer

Conversation

@openinx
Copy link
Member

@openinx openinx commented Dec 8, 2020

This is a separate PR to add the BaseDeltaWriter in BaseTaskWriter (https://github.com/apache/iceberg/pull/1818/files#diff-fc9a9fd84d24c607fd85e053b08a559f56dd2dd2a46f1341c528e7a0269f873cR92). The DeltaWriter could accept both insert and equality deletes.

For the CDC case and upsert case, compute engine such as flink and spark could write the streaming records (INSERT or DELETE) by this delta writer to apache iceberg table.

private StructLikeMap<PathOffset> insertedRowMap;

public BaseDeltaWriter(PartitionKey partition, Schema eqDeleteSchema) {
Preconditions.checkNotNull(eqDeleteSchema, "equality-delete schema could not be null.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the error message is slightly misleading because it uses "could", which implies that there is some case where it could be null. How about "Equality delete schema cannot be null"?

/**
* Base delta writer to write both insert records and equality-deletes.
*/
protected abstract class BaseDeltaWriter implements Closeable {
Copy link
Contributor

@rdblue rdblue Dec 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about BaseEqualityDeltaWriter?

I think Spark MERGE INTO will likely use a delta writer that doesn't create the equality writer or use the SortedPosDeleteWriter because it will request that the rows are already ordered.

*
* @param key is the projected values which could be write to eq-delete file directly.
*/
public void delete(T key) throws IOException {
Copy link
Contributor

@rdblue rdblue Dec 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why pass a key here instead of a row?

I think it would be easier to assume that this is a row, so that the write and delete methods accept the same data. That also provides a way to write the row to the delete file, or just the key based on configuration. The way it is here, there is no way to write the whole row in the delete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the tests, I see that this can be used when the deleted row won't be passed. But I don't think this leaves an option for writing an entire row to the delete file and using a subset of its columns for the delete. This assumes that whatever struct is passed here is the entire delete key.

To support both use cases (writing an already projected key and writing a full row and projecting), I think we should have two methods: deleteKey and delete. That way both are supported. We would also need a test for passing the full row to the delete file, but deleting by key.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agreed that it's better to provide a delete(T row) method which can accept an entire row to write eq-delete file. Because it's possible that have a table with (a,b,c,d) columns, the equality fields are (a,c) columns, while someone want to write the (a,b,c,d) values into eq-delete file.

The problem is: how could we project the generic data type T to match the expected eqDeleteRowSchema (so that the eqDeleteWriter could write the correct column values) ? Sounds like we will need to provide an extra abstracted method to accomplish that generic projection ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought it again, if the table has the (a,b,c,d) columns, and the equality fields are (b,c), then I think it's enough to provide two ways to write the equality-delete file:

  1. Only write the (b,c) column values into equality-delete file. That is enough to maintain the correct deletion semantics if people don't expect to do any real incremental pulling.

  2. Write the (a,b,c,d) column values into equality-delete file. That's suitable if people want to consume the incremental inserts and deletes for future streaming analysis or data pipeline etc.

Except the above cases, I can't think of other scenarios that require a custom equality schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think those two cases are correct: the caller may want to pass the key to delete, or an entire row to delete.


// Check records in the eq-delete file.
DeleteFile eqDeleteFile = result.deleteFiles()[0];
Assert.assertEquals(ImmutableList.of(record, record), readRecordsAsList(eqDeleteSchema, eqDeleteFile.path()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to encode the record twice? What about detecting already deleted keys and omitting the second delete? That might be over-complicating this for a very small optimization though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Em, agreed that we don't have to upsert the same key twice. The first upsert is enough.

createRecord(4, "ccc")
)), actualRowSet("*"));

// Start the 2th transaction.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it would normally be "2nd"

*/
protected abstract StructLike asStructLike(T data);

protected abstract StructLike asCopiedKey(T row);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be removed from the API because this already has asStructLike. Rather than relying on the implementation to create the projection and copy it, this could be implemented here like this:

  protected StructLike asCopiedKey(T row) {
    return structProjection.copy().wrap(asStructLike(row));
  }

We would need to create StructProjection in this class, but it would be fairly easy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the above implementation of asCopiedKey may has problems, because in some compute engine --- for example flink, it will use the singleton RowDataWrapper instance to implement the asStructLike, then for both the old key and copied key, they are sharing the same RowDataWrapper which has wrapped the new RowData. That is messing up the keys in insertedRowMap.

Considered this issue, I finally decided to abstract a separate asCopiedKey to clone a totally different key which won't share object values with the old one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I like the idea of copying the data into a new struct.

}

public Set<CharSequence> referencedDataFiles() {
return referencedDataFiles;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Should this set be part of the WriteResult instead of separate? I think that tasks are going to need to pass the set back to the commit for validation, so adding it to the WriteResult seems like the right way to handle it.

@rdblue rdblue merged commit 47b6b05 into apache:master Dec 11, 2020
@rdblue
Copy link
Contributor

rdblue commented Dec 11, 2020

Merged! Great work, @openinx!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants