-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.3: Uniquess validation when computing updates of changelogs #7388
Conversation
private Row deletedRow = null; | ||
private long deletedRowCount = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use a stack/list for the same purpose, but this solution has less memory cost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any potential issues with reusing the same object given that we mutate that object in place in pre/post image computation? Or is it safe because there will be an exception if multiple DELETE rows get there? Seems like it is going to work, just checking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't mutate the object while removing carry-over rows. To provide more context, this PR split the ChangelogIterator
to two iterators, the first one(CarryoverRemoveIterator) only handles the removing carry-over rows, after that, we apply the second one(Modified ChangelogIterator
), which computes updates only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, we don't mutate them in CarryoverRemoveIterator
but in ChangelogIterator
when computing update images. However, that seems to happen only for a pair of delete and insert, which should work, I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems to work correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. I overlooked it at the beginning. Thinking a bit more. It wouldn't be an issue in the current use case as you mentioned. The shared object only happens when there are multiple identical delete rows. For example, assume the following rows went through the removeCarryoverIterator
{0, "a", "data", DELETE}
{0, "a", "data", DELETE}
{3, "a", "new_data", INSERT}
The iterator output will be identical to the input with a subtle difference, which is the first 2 rows share the same object. It is fine in the current use case, since computeUpdates
will throw an exception when there are duplicated delete rows.
{0, "a", "data", DELETE}
{0, "a", "data", DELETE}
{3, "a", "new_data", INSERT}
There is minor a risk. That in case if an iterator in the future concatenate the RemoveCarryoverIterator
, and try to mutate one of a delete, this would be surprise, since it can mutate multiple rows. Overall, we are good here.
private boolean popupDeleteRow() { | ||
return (!rowIterator.hasNext() || nextCachedRow != null) && hasDeleteRow(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to return cached delete rows when it hit a boundary.
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/CarryoverRemoveIterator.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/CarryoverRemoveIterator.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/CarryoverRemoveIterator.java
Outdated
Show resolved
Hide resolved
private Row deletedRow = null; | ||
private long deletedRowCount = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any potential issues with reusing the same object given that we mutate that object in place in pre/post image computation? Or is it safe because there will be an exception if multiple DELETE rows get there? Seems like it is going to work, just checking.
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/CarryoverRemoveIterator.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/CarryoverRemoveIterator.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/CarryoverRemoveIterator.java
Outdated
Show resolved
Hide resolved
Thanks @aokolnychyi for the review. Ready for another look, |
These failures are not related.
|
retest this please |
} | ||
|
||
/** | ||
* Pop up the delete rows if there are delete rows cached and the next row is not the same record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs a better description, i'm not sure what "pop up" means in this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Will it be correct that this method returns true if a previously buffered delete row must returned? If so, can we adjust the comment and the method name?
* <li>(id=1, data='b', op='UPDATE_AFTER') | ||
* </ul> | ||
*/ | ||
/** An iterator that transforms rows from changelog tables within a single Spark task. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to describe how this iterator transforms rows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. I also think we need some details about the actual algorithm in the doc. Should be added in implementations, I guess?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The detailed algorithms are documented in each subclass. ComputeUpdateIterator
and RemoveCarryoverIterator
. I also made this class to be abstract class. Two static public methods are also documented. Would that be good?
public static Iterator<Row> computeUpdates()
public static Iterator<Row> removeCarryovers()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an abstract class I think it's ok to keep this javadoc as is
* <li>(id=2, data='b', op='DELETE') | ||
* </ul> | ||
*/ | ||
class CarryoverRemoveIterator extends ChangelogIterator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will RemoveCarryoversIterator
be a bit more natural and match the procedure param name?
*/ | ||
class CarryoverRemoveIterator extends ChangelogIterator { | ||
private final Iterator<Row> rowIterator; | ||
private final int[] indicesForIdentifySameRow; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: indicesForIdentifySameRow
-> indicesToIdentifySameRow
?
* </ul> | ||
*/ | ||
class CarryoverRemoveIterator extends ChangelogIterator { | ||
private final Iterator<Row> rowIterator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: Shall we use rowIterator()
from the parent instead of having the same var here too?
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/CarryoverRemoveIterator.java
Outdated
Show resolved
Hide resolved
return sameLogicalRow(currentRow, nextRow) | ||
&& currentRow.getString(changeTypeIndex).equals(DELETE) | ||
&& nextRow.getString(changeTypeIndex).equals(INSERT); | ||
protected boolean isColumnSame(Row currentRow, Row nextRow, int idx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional: It seems the result of this method is always negated. I am wondering whether we want to switch the logic around.
protected boolean isDifferentValue(Row currentRow, Row nextRow, int idx) {
return !Objects.equals(nextRow.get(idx), currentRow.get(idx));
}
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/CarryoverRemoveIterator.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/CarryoverRemoveIterator.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ComputeUpdateIterator.java
Outdated
Show resolved
Hide resolved
I did a more detailed round. I think the algorithm is correct. I left some minor comments but it seems close. |
Thanks @aokolnychyi and @RussellSpitzer for the reviews. Resolved all comments and ready for another look. |
} | ||
|
||
/** | ||
* Creates an iterator for records of a changelog table. | ||
* Creates an iterator combine with {@link RemoveCarryoverIterator} and {@link |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
} | ||
return true; | ||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Class is abstract now we can drop these
} | ||
|
||
private boolean cachedUpdateRecord() { | ||
return cachedRow != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't this only be UPDATE_AFTER?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is only UPDATE_AFTER
. Is it more clear to do that this way?
private boolean cachedUpdateRecord() {
return cachedRow != null
&& cachedRow.getString(changeTypeIndex()).equals(UPDATE_AFTER);
}
|
||
@Override | ||
public Row next() { | ||
// if there is an updated cached row, return it directly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this comment can note,
// An UPDATE_BEFORE record was returned on the last invocation, this time return the the UPDATE_AFTER?
Just to be a little more clear about why this branch exists
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. This only returns UPDATE_AFTER
return row; | ||
} | ||
|
||
Row currentRow = currentRow(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Either a cached record which is not an UPDATE or the next record in the iterator.
Not sure if this comment is needed, but I need this to remind me what is going on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice to have one.
|
||
Preconditions.checkState( | ||
nextRowChangeType.equals(INSERT), | ||
"The next row should be an INSERT row, but it is %s. That means there are multiple" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cannot X because Y.
"Cannot compute updates because there are multiple rows inserted with the same identifier fields. ...." ?
|
||
@Override | ||
public Row next() { | ||
if (returnCachedDeleteRow()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Non-carryover delete rows found. 1 or more identical delete rows were seen followed by a non-identical insert. This means none of the delete rows were carry over rows. Emit one delete row and decrease the amount of delete rows seen.
// cache the delete row if there is 0 delete row cached | ||
if (!hasCachedDeleteRow()) { | ||
deletedRow = currentRow; | ||
deletedRowCount++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this always be 1?
deletedRowCount = 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
|
||
Row nextRow = rowIterator().next(); | ||
|
||
if (isSameRecord(currentRow, nextRow)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we be comparing deleteRow here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the current row is the delete row if there are cached delete rows.
|
||
Row currentRow = currentRow(); | ||
|
||
if (currentRow.getString(changeTypeIndex()).equals(DELETE) && rowIterator().hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this I think would be easier to understand as as
if (deletedRow != null && rowIterator().hasNext)
And nulling out deletedRow when deletedRowCount is decremented to 0
or you could use
if (hasCachedDeleteRow && rowIterator().hasNext)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current row could also be the cachedNextRecord
, which can be a delete or insert row.
|
||
if (currentRow.getString(changeTypeIndex()).equals(DELETE) && rowIterator().hasNext()) { | ||
// cache the delete row if there is 0 delete row cached | ||
if (!hasCachedDeleteRow()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would pull this out of this if branch since it's a separate handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand you correctly, but the current row could be an insert. We need to check its change type anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No other comments apart from what Russell noted.
} | ||
} else { | ||
// mark the boundary since the next row is not the same record as the current row | ||
cachedNextRecord = nextRow; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this behavior is correct, my gut says there needs to be a while loop in here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rewrote it with while-loop in the new commit.
Resolve the comments
Thanks @aokolnychyi and @RussellSpitzer for the review. Resolved all comments, ready for another look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, it seems much clearer to me now. Hopefully I will feel the same when I look at it in the future :)
Thanks a lot for the reviews, @aokolnychyi @RussellSpitzer ! |
Iceberg doesn't enforce the row uniqueness for a value of identifier fields(a.k.a primary key in the other system). That means there can be duplicate rows with the same identifier fields values.
We can handle duplicate rows while removing the carryover rows. But it is impossible to computing updates in that case. This PR improves the logic to handle duplicated rows in removing carryovers, and throw exception for computing updates.
cc @RussellSpitzer @szehon-ho @aokolnychyi @rdblue