Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spec: Add column equality delete files #360

Closed
rdblue opened this issue Aug 7, 2019 · 16 comments
Closed

Spec: Add column equality delete files #360

rdblue opened this issue Aug 7, 2019 · 16 comments

Comments

@rdblue
Copy link
Contributor

rdblue commented Aug 7, 2019

One option for encoding row-level deletes is to track a tuple of values that indicate a row is deleted. The delete file will contain a subset of data columns and each row will be used to match data rows using equality.

For example, a delete file may contain a single 2: id column with 2 rows, 18 and 19. This indicates the any row with ID 18 or 19 is deleted in all files to which the delete is applied (limited by sequence number and partition scope).

This format should be documented in the spec.

@rdblue rdblue added this to the Row-level Delete milestone Aug 7, 2019
@chenjunjiedada
Copy link
Collaborator

Hi @rdblue @aokolnychyi,

May I know whether we are working on implementing updates/deletes/upserts feature? Is this the design doc we are following?

@rdblue
Copy link
Contributor Author

rdblue commented Sep 25, 2019

@chenjunjiedada, that's an early doc that outlines options, but I wouldn't say it captures the current design. The best design doc we have right now is the one @erikwright wrote: https://docs.google.com/document/d/1FMKh_SQ6xSUUmoCA8LerTkzIxDUN5JbStQp5Hzot4eo/edit#

@openinx
Copy link
Member

openinx commented Mar 27, 2020

Let me link this issue to the file_type specification. Discussed with @chenjunjiedada , for a table with columns (a,b,c,d) (the field_ids of a,b,c,d are 0,1,2,3) , we may have the delete query:

1. delete from table where a=100 and b = 101;
2. delete from table where b = 101 and c=102 and d=103;
3. delete from table where d=104;

for the delete differential file, we will have two columns : 1. field_ids ; 2. values. so the row-delete operations can be encoded as following:

<list(0), list(100, 101)>
<list(1,2,3), list(101, 102, 103)>
<list(3), list(104)>

for the field_ids, we can optimize it by bit map, say it will encode as following (the fields_ids will be a bit_map, the values would be a list<binary>):

<binary<1000>, list(100, 101)>
<binary<0111> , list(101,102,103)>
<binary<0001>, list(104)>

FYI @rdblue

@rdblue
Copy link
Contributor Author

rdblue commented Apr 6, 2020

@openinx, what is the motivating use case for a format like that one?

The use cases that we've been considering for equality deletes mostly have a stable column set that will be used for deletes. For example, a CDC use case typically uses the primary key of the table that is changing. In that case, it's easier if the schema of the delete file is a data file with just the primary key columns. So if I had a users table, I might have a delete file with a single column, user_id with a field ID that matches the data files.

What are the use cases for a format that allows a dynamic column set for every row? I would want to make sure that it is worth the extra complexity.

@openinx
Copy link
Member

openinx commented Jun 17, 2020

What are the use cases for a format that allows a dynamic column set for every row?

Take the GDPR user case as example, such as a user with many properties, and there're few deletions which need to delete several properties combination, such as (a,b), (a,b,c), (a,b).

In that case, it's easier if the schema of the delete file is a data file with just the primary key columns. So if I had a users table, I might have a delete file with a single column, user_id with a field ID that matches the data files.

Think about the design again, assume the CDC case that we have a table with (id, a) two columns, and id is primary key. and the CDC events come as the orders in an iceberg transaction:

t0,  INSERT(1,2);
t1,  DELETE(1,2);
t2,  INSERT(1,3);

As you said, we will produce two different files for this transaction, one is data file and another one is delete differential file. so the data file will have:

(1,2);
(1,3);

and deletion file will only have the primary key column (as you said if I understand correctly):

(1);

When we do the merge on read, I guess both (1,2) and (1,3) will be deleted by the (1), while actually we should return the row (1,3) because the DELETE(1,2) should only remove the record INSERT(1,2).

So I'm thinking that for equality-delete, we will need to keep all columns in the delete differential files so that we could get ride of this issue (say INSERT and DELETE the same record few times in a single iceberg transaction).

@rdblue
Copy link
Contributor Author

rdblue commented Jun 27, 2020

@openinx, sorry I haven't had a chance to reply before now. We missed you at the last sync up, where we talked about this. I added this to the notes, but I wanted to follow up here as well.

Sounds like there are 2 main reasons to use a dynamic schema. The first is that you might want to delete with different filters, like dropping a user by first/last name, or dropping by user id. I think it's reasonable to say there may be more than one way to delete data, but I don't think that there is much value to encoding those deletes in the same file. There is going to be a small number of column sets used to identify and delete records, and I think it's worth just using a different delete for each set of columns. Then we wouldn't need the complexity of encoding a map with various value types, and can take advantage of columnar formats.

The second argument for a dynamic schema -- the CDC case with rapid upserts for the same row -- is interesting, and something I've been thinking about how to solve as well. Let me present the approach I've been thinking about.

Continuing with your example, I would actually encode the delete using a positional delete rather than an equality delete. That removes the concern about deleting row with ID 1 twice because the delete would be for (file=data_file.avro, pos=0). The second record would not be deleted by accident.

The position-based approach has a few benefits as well:

  1. Position delete files are more efficient to apply because we're using information that we already know -- exactly where the row is
  2. Although we would need to keep a mapping from ID to position for every row in the data file, this would still require less memory than the approach where we encode a delete with an equality predicate with all columns of the original row. Keeping all columns of the original row is more expensive than the identity columns and an int or long position.
  3. Encoding a delete using all of the columns from the previous copy of the row may delete all copies of a row. Consider the following sequence:
t0, UPSERT(1, 2)
t1, UPSERT(1, 3)
t2, UPSERT(1, 4)
t3, UPSERT(1, 3)

The second upsert would be encoded as id=1 and data=3, which would actually delete the last row because the data was reset. This situation could also happen if an upsert is replayed because a stream has at-least-once semantics (which should be fine, since upsert is idempotent).

I think that using a positional delete is a good solution to this problem. That would also mean that we won't need to apply equality deletes to any data files with the same sequence number, which is a small win. Only positional delete files would need to be applied to data files with the same sequence number.

What do you think?

@openinx
Copy link
Member

openinx commented Jun 28, 2020

Well, seems I thought differently with yours.

For my understanding, your solution will divide the deletes into two parts: 1. equality-deletion ; 2. positional-deletion. The equality-deletion will only be applied to data files with sequence_number < current delete file, the positional-deletion will only be applied to data files with the same sequence number.

There're several problems in my thought:

  1. As you said, keeping the index from ID to position is expensive, especially when the data couldn't fit in the limited memory. In that case, we may need to spill into disk. That could produce many random seeks if searching the position for a given ID when generating the positional delete files.
  2. The equality-deletion and positional-deletion seems make the JOIN algorithm complex, both the read & replay implementation need to consider both of them. I'd prefer to use one kind of deletion if possible.
  3. If the equality-deletions only keep the primary key columns in delta files, then it will be a problem when replaying to the downstream iceberg table. For example, we have a table with two columns (a,b), a is primary key and b is partition key. the operation DELETE(a=1) will need to be applied to all partitions in downstream iceberg table, while the DELETE(a=1, b=2) will only need to be applied to partition=2. Keeping all columns for equality-deletions is good for replaying.

I'm writing the some document for the equality-deletes, will post to the mail list in next days.

@rdblue
Copy link
Contributor Author

rdblue commented Jun 29, 2020

Sounds like there is a slight difference in how we are framing the problem. I have been thinking of an upsert case, where we get an entire replacement row but not the previous row values. From your comments, it sounds like you are thinking of a case where we have both the previous row and the replacement row (possibly as separate delete and insert events). It's good to think through that case because it does change what we might do here.

Without the previous row, I think you would have to keep track of previously written values, so the position deletes are attractive because you have to keep less information -- a position and the id columns, rather than all columns. But with the previous row, you wouldn't need to keep any state. I agree that this makes it a good alternative if you have that information in your CDC stream.

There's still a major problem with using equality deletes that I pointed out: deleting with row values would potentially delete future rows when two copies of a row have the same values. Using a unique ID for each CDC event could mitigate the problem, but duplicate events from at-least-once processing would still incorrectly delete data rows. How would you avoid this problem?

To your point about replaying events, I think I agree that it would be a good to keep track of the previous column values if they are present in a CDC delete event, but I'm not sure it would enable a source to replay exactly the events that were received -- after all, we're still separating inserts and deletes into separate files so we can't easily reconstruct an upsert event.

Assuming that it is likely valuable to track the deleted data values, I don't think that it makes a case for dynamic columns:

  1. It would be good to track deleted data whether the delete is encoding using position or equality. We can add additional columns for deleted values to both formats, so this doesn't strictly require equality deletes.
  2. If we introduce additional columns for deleted row values, then why wouldn't we add columns more to the equality delete file? They could be normal data files that contain deleted rows and are used differently. Then we get columnar encoding. Making the column list dynamic doesn't help, unless for some reason the CDC event is missing some columns. And in that case, it's not too much overhead to have a delete file for each combination of columns the CDC event emitter produces.

Clarifications

the positional-deletion will only be applied to data files with the same sequence number.

Position deletes would apply to data files with a sequence number <= to the delete file's sequence number. They would still be used for other delete cases, which require <.

If we want to use equality delete files for this, then we would similarly apply an equality delete file when a data file's sequence number is <= the equality delete's sequence number.

The optimization I was suggesting is if we don't use equality deletes to encode deletes within the same commit, we can use < instead of <=.

keeping the index from ID to position is expensive, especially when the data couldn't fit in the limited memory

You could avoid spilling to disk by closing the data file when this is getting too expensive, instead of spilling to disk. If we assume that the ID column is a UUID and the position is a long, then each (id, pos) pair takes about 24 bytes. If we double that for JVM overhead, then we can track about 2.8 million rows in 128 MB of memory. While that's significant, it isn't unreasonable to cut off data files well before 2.8 million rows. Even if the checkpoint interval is as long as 5 minutes, then then the rate of rows to the file would need to be 9,300 rows per second to exceed that limit.

The equality-deletion and positional-deletion seems make the JOIN algorithm complex

This would happen anyway because data files will need to merge multiple delete files. Position deletes are more efficient because they are more targeted, so deployments will very likely have regular compaction from equality deletes to position deletes. That means at any given time, a file may have both delete formats to merge in. This is not that much complexity as long as we can filter the results of another filter.

@openinx
Copy link
Member

openinx commented Jun 30, 2020

I have been thinking of an upsert case, where we get an entire replacement row but not the previous row values.

The upsert without the previous row values depends on the primary key specification, otherwise an UPSERT(1,2) don't know which row should it change. The primary key will also introduce other questions:

  1. whether the primary key columns should include the partition column and bucket column ? if not, then a table with (a,b) columns, a is the primary key and b is the partition key. for an UPSERT(1,2) its old row could be in any partitions of the iceberg table, we will need broadcast to all partition ? That seems resource wasting. If sure, then it will limit the usage, for example we may adjust the bucket policy based on the fields to be JOIN between two table to avoid the massive data shuffle. So when adjust the bucket policy we will also need to adjust the primary key to include the new bucket columns ?
  2. How to ensure the uniqueness in iceberg table ? Within a given bucket, two different version of the same row may appear in two different data files, then will we need the costly JOIN between data files (Previously, we only need to JOIN between data files and delete files); Within a given iceberg table, if primary key don't include the partition column, then two rows with the same primary key may appear in two different partition, the pk deduplication is a problem too.

In my opinion, CDC events from RDBMS should always provide both old values and new values (such as row-level binlog). It will be good to handle this case correctly first. About the other CDC events, such as UPSERT with only primary key, it seems need more consideration.

Using a unique ID for each CDC event could mitigate the problem, but duplicate events from at-least-once processing would still incorrectly delete data rows. How would you avoid this problem?

For spark streaming, it only guarantee the at-least-once semantics. Seems hard to maintain the consistent data in sink table unless we provide a identified global timestamp to indicate the before-after order. Let me think more about this. For flink, it provide the exactly-once semantics, so it seems easier.

but I'm not sure it would enable a source to replay exactly the events that were received -- after all, we're still separating inserts and deletes into separate files so we can't easily reconstruct an upsert event.

I think the mixed equality-deletion/positional-deletion you described seems hard to reconstruct the correct event order. The pure positional-deletion described in this document could restore the original order.

I don't think that it makes a case for dynamic columns.

Yes, I agree that we don't need to care about the dynamic columns now. In the real CDC events, it always provide all column values for a row. Please ignore the propose about the dynamic columns encoding.

@rdblue
Copy link
Contributor Author

rdblue commented Jul 3, 2020

I think we're in agreement on a few points for moving forward:

  • We will use a static schema for equality deletes
  • We need to be able to reconstruct an equivalent stream of changes for streaming CDC pipelines
  • We should add a way to encode all of the columns for an equality delete and identify the subset used for deletion (for efficiency)
  • We may need to add a way to encode all columns into position delete files (if used for CDC)
  • For the CDC case, we'll first assume that we have the entire deleted row in delete events
  • We should handle a stream of upserts as a separate use case

The doc describes ways to use both equality and position deletes for CDC streams. Sounds like equality would be ideal if (1) events have a unique ID, and (2) the execution has exactly-once semantics. Otherwise, I think it is possible to use position deletes. Which do you plan to target?

@JingsongLi
Copy link
Contributor

JingsongLi commented Jul 3, 2020

Thanks for your discussion and summary. Sorry for joining this discussion lately. Here is my thinking, correct me if I am wrong:
IIUC, there can be two user oriented modes for equality delete:

Mode1

Iceberg as a database: Users can using SQLs to query this iceberg table: "insert into", "update ... where x=a”, "delete from where x=a“. For equality delete, with high performance, iceberg should just writes number of physical records at constant level for these SQLs.

What is the solution for mode1?

  • For an iceberg transaction, just writing logs, including inserts and deletes, but merging efficiency looks not good.
  • For an iceberg transaction, just writing inserts file and equality deletes file: Looks not work, consider insert(1, 2), delete(1, 2), insert(1, 2). If there just be inserts file and deletes file, it is hard to know we need emit an insert(1, 2) after merging. In other words, we can't distinguish the order between inserts and deletes.
  • For an iceberg transaction, writing inserts file and equality deletes file and position deletes file. For a delete(1, 2), we may should write this record to both equality deletes file and position deletes file, because a delete should delete records from old files, and also delete records in this transaction in ordered. When merging, old transactions can just join equality deletes file is OK.
    The equality deletes file could just have the key fields. The position deletes file should just have file_id and position.

Mode2

Iceberg as a CDC receiver: Theoretically, every records from CDC stream should just affect single record to Database.

  • CDC with primary key(unique ID), iceberg can let it affect all records to Database too. The semantics are the same.
  • CDC without primary key(unique ID), it is hard to implement for this too, store all fields in memory looks expensive, and maybe need some special merging algorithm, I think it maybe too demanding.
    Should support both batch reading and streaming reading for CDC input stream, and also producing CDC stream for streaming reading too.

What is the solution for mode2?

  • For batch reading: The solution can be similar to mode1. As above said, with primary key(unique ID), CDC records can affect all records to Database too, so, it can be treated as a mode1 stream.
  • For streaming reading: If we want to output CDC stream, a CDC stream should include delete records with all columns, so in equality deletes files, should have all the columns.

@openinx
Copy link
Member

openinx commented Jul 3, 2020

@rdblue, our team also had another discussion about the user cases and solutions. For the user cases, we have three cases:

  • Case.1: the table has a primary key(the unique ID as you said), and all CDC events are INSERT/UPDATE/DELETE operations(the UPDATE event has both old values and new values). Such as the classic MySQL binlog events (assuming that t have (id, data) columns and id is the primary key):
INERT INTO t SET @1=1 @2='apple';
DELETE FROM t WHERE @1=1 @2='apple';
UPDATE t WHERE @1=1 @2='apple' SET @1=1 @2='pear';
  • Case.2: the table has a primary key, and all CDC events are UPSERT/DELETE, the UPSERT don't have to provide old values. such as:
UPSERT(1, 'apple')
DELETE(1, 'apple')
DELETE(1) // DELETE don't have to provide all the column values, the primary key is enough.
  • Case.3: the table don't have a primary key, all CDC events are INSERT/UPDATE/DELETE(UPDATE should provide both old values and new values). Supporting UPSERT without old values is impossible.

The three cases should match the CDC requirements from users, and we have the three goals to meet the requirements: 1) Fast ingestion; 2) Acceptable Batch Read performance; 3) an equivalent stream so that we could keep the eventual consistency between source table and sink table.

Now, we have three solutions:

  1. The mixed equality-deletes and pos-deletes solution as you proposed.
  2. The pure postional-deletes I described in the document.
  3. The incremental log files solution.

In summary, the solution#2 and solution#3 resovled the before-after order for downstream, but they both have few issues about batch read performance. The solution #2 will need to check whether there exist a delete row whose <file_id, offset> isn't less than the candinate row's in INSERT data files, it will use a data structure like HashMap<primaryKey, TreeSet<T>>(maintain those delete marker) to implement the JOIN algorithm. It don't need any map<primary_key, pos> when writing but solution#1 need, while the solution#1 is more simple when considering merge read.

The solution#3 will need to do JOIN between the newly written INSERT while the solution#1 will only JOIN between INSERT and DELETE.

For solution#1, the problem is: how to reconstruct the stream events which could replay to downstream corrently ? We could take an example, the CDC events:

t0, INSERT(1,2)
t1, DELETE(2,2)
t2, UPSERT(1,3)
(The first field is primary key)

It will generate three files:

  • the INSERT file will have:
INSERT(1,2)
INSERT(1,3)
  • the equality-delete file will have:
DELETE(2,2)
  • the postional-delete file will have:
<data-file=insert-file-id, position=0, primary_key_column=1>

NOTICE: here we add the primary_key_column=1 in positional-delete file because if it's UPSERT then the DELETE(1,2) should also mask the data files who have a smaller sequence number. Keeping the primary key value here could avoid to searching the original insert data file for comparing primary key when doing JOIN between data file and delete file.

Now let's consider the replay issue again, we could regard the equality-delete file are deleting existed rows before this transaction, so it should be replayed firstly. Then we could restore the correct event order by JOIN INSERT file and positional-delete file. In theory, we could maintain the eventual consistency between source table and sink table because we gurantee:

  1. Events of the same row will be consumed exactly as the order they are written in the source table.
  2. We don’t guarantee the consumption order of different rows.
  3. If an UPDATE event updates from one row to another row, then we don’t guarantee the before-after order between two rows.

So seems the solution#1 could be a candidate solution for the primary key case#1 and case#2.

@rdblue
Copy link
Contributor Author

rdblue commented Jul 4, 2020

@openinx, I agree with the guarantees that you propose for the reconstructed CDC stream. It sounds like the first solution, with mixed equality and position deletes is probably the design we should use since it will have good read performance and good write performance, with the cost being the ID to position map we need while a data file is open.

For UPSERT, I think the main difference is that we don't have the deleted column data, so the stream provides a slightly different guarantee when it is replayed. I don't think that we need to keep track of the primary key column in the position delete file. Isn't that table-level configuration that won't change across data files?

@aokolnychyi
Copy link
Contributor

Is this document up-to-date and discusses how to solve open questions?

@openinx
Copy link
Member

openinx commented Aug 3, 2020

Thanks @aokolnychyi for bringing this up. Part of the document is out-of-date now, I will update those parts in next days. Thanks.

@rdblue
Copy link
Contributor Author

rdblue commented Oct 28, 2020

The spec was updated with equality delete files in #1499.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants