Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read deleted rows with metadata column IS_DELETED #4683

Merged
merged 21 commits into from
May 27, 2022

Conversation

flyrain
Copy link
Contributor

@flyrain flyrain commented May 2, 2022

Per the discussion in #4539, created this new PR. With the change in #2538 and this PR, we can read deleted rows from row-level deletes.
Please note that there is no IS_DELETED column filter push down at this moment. This will be done as a followup PR. Think a bit more, we may not need pushdown, especially for non-vectorized read. It shouldn't have any difference between Spark filters out rows or Iceberg filters out rows. We iterate through all rows anyway.

cc @aokolnychyi @RussellSpitzer @chenjunjiedada @stevenzwu @Reo-LEI @hameizi @singhpk234 @rajarshisarkar @kbendick @rdblue

@github-actions github-actions bot added the API label May 4, 2022
@flyrain
Copy link
Contributor Author

flyrain commented May 4, 2022

Do we still need class EqualityDeleteRowReader? Its functionality can be archived by a filter on the IS_DELETED column. cc @chenjunjiedada

* @param fieldId a column id in this schema
* @return the index of the field in the schema, or -1 if one wasn't found
*/
public int idToIndex(Integer fieldId) {
Copy link
Contributor

@stevenzwu stevenzwu May 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idToPosition? position might be more clear.

Also we are using Integer arg type, which means it can be null. Probably following the pattern from the idToAlias above. But I am wondering if the fieldId arg should be primitive int type. If we stay with Integer type, we probably need null check here.

Also, should the return type be Integer and nullable just to be consistent with other methods in this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the name suggestion. There are always performance concerns about non-primitive type. I'm kind of OK with both primitive type and non-primitive type here. In terms of consistency, these two public methods also return int. I will leave the return type to int unless we have a strong reason to use Integer.

public int schemaId()
public int highestFieldId()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even this place uses int, I'd prefer to use int for the parameter as well. It is unnecessary to do this boxing and unboxing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even this place uses int, I'd prefer to use int for the parameter as well. It is unnecessary to do this boxing and unboxing.

That is different. For NestedField, fieldId will never be null. hence it returns primitive int, which makes sense. But here, we may not find the field with the provided id, we are returning a special value -1. If we follow the style of other find methods in the Schema class, we can see aliasToId returns null if no matching field is found.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment. With @szehon-ho's suggestion #4683 (comment), I removed the the new method.

@@ -94,6 +95,12 @@ protected DeleteFilter(String filePath, List<DeleteFile> deletes, Schema tableSc
this.eqDeletes = eqDeleteBuilder.build();
this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes);
this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
this.hasColumnIsDeleted = requestedSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I saw a lot of places in Iceberg just use null to indicate the condition. should we use null for columnIsDeletedIndex too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A boolean variable should be expressive.

@@ -227,6 +237,39 @@ public void close() {
}
}

private static class PositionStreamDeleteMarker<T> extends PositionStreamDeleteFilter<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned in the draft PR, this is not actually a filter as it always returns true. It is only to use the iteration part of the filter. Maybe use CloseableIterable#transform instead for iteration?

Copy link
Contributor Author

@flyrain flyrain May 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I've replaced filter with transform for all places except this one. This is trickier to change. Let me think a bit more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Do you think it will be cleaner to have most of the logic at:

abstract class PositionStreamDeleteIterable {
   CloseableIterator createPosDeleteIterator(CloseableIterator<T> items);
}

and have the two concrete subclass (PositionStreamDeleteMarker and PositionStreamDeleteFilter) extend it?

I think having the Marker extend the Filter still seems a bit strange, though the logic is correctly refactored now.

@chenjunjiedada
Copy link
Collaborator

@flyrain , I agree the implementation of EqualityDeleteRowReader can be replaced, but we are using the class for rewrite equality deletes. If we have the utilities to produce only equality deleted rows or position deleted rows, I'm OK to delete them.

@flyrain
Copy link
Contributor Author

flyrain commented May 5, 2022

@flyrain , I agree the implementation of EqualityDeleteRowReader can be replaced, but we are using the class for rewrite equality deletes. If we have the utilities to produce only equality deleted rows or position deleted rows, I'm OK to delete them.

We don't plan for that utilities in this PR. Let's not change anything about EqualityDeleteRowReader. Thanks for the input.


PositionSetDeleteFilter<T> filter = new PositionSetDeleteFilter<>(rowToPosition, deleteSet);
public static <T> CloseableIterable<T> filter(CloseableIterable<T> rows, Predicate<T> shouldKeep) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the new filter method API. only concern is for compatibility. this is a public static method from a public class. Probably very few users call this API directly. if we want to be safe, we can keep the old API and mark it as deprecated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I had the same concern. However, we've discussed what should be considered as APIs in one of the community sync. we agreed that only consider the public things in the API module as APIs. cc @rdblue. With that, it should be OK to change this public method?

@aokolnychyi
Copy link
Contributor

Let me take another look too. Sorry for the delay.

keep = false;
}
@Override
protected CloseableIterator createPosDeleteIterator(CloseableIterator<T> items) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: raw type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say it's a bit more than a nit! Iceberg PRs should always use the type system.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it in a new commit.

return isDeleted;
}

protected abstract CloseableIterator createPosDeleteIterator(CloseableIterator<T> items);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: raw type

} catch (IOException e) {
throw new UncheckedIOException("Failed to close delete positions iterator", e);
@Override
protected CloseableIterator createPosDeleteIterator(CloseableIterator<T> items) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: raw type

return isDeleted;
}

protected abstract CloseableIterator createPosDeleteIterator(CloseableIterator<T> items);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really create a position delete iterator? Don't we iterate over records here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do iterate the pos delete records, for example, line 191, this.nextDeletePos = deletePosIterator.next();.

Copy link
Contributor

@aokolnychyi aokolnychyi May 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do iterate over position deletes but I think this iterator is for remaining data records, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline with @aokolnychyi, I've changed the name to applyDelete

@@ -67,6 +66,8 @@
private final List<DeleteFile> eqDeletes;
private final Schema requiredSchema;
private final Accessor<StructLike> posAccessor;
private final boolean hasColumnIsDeleted;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: hasIsDeletedColumn?

@@ -67,6 +66,8 @@
private final List<DeleteFile> eqDeletes;
private final Schema requiredSchema;
private final Accessor<StructLike> posAccessor;
private final boolean hasColumnIsDeleted;
private final int columnIsDeletedPosition;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isDeletedColumnPosition?

this.columnIsDeletedPosition = requiredSchema.columns().indexOf(MetadataColumns.IS_DELETED);
}

protected int columnIsDeletedPosition() {
Copy link
Contributor

@aokolnychyi aokolnychyi May 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isDeletedColumnPosition()?

Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(filePath, deletes));
}

private CloseableIterable<T> createDeleteIterable(CloseableIterable<T> records, Predicate<T> isDeleted) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better name? Is it an iterable of remaining rows?

@aokolnychyi
Copy link
Contributor

This looks close to me. I'd switch to an explicit call to set the _deleted value to avoid any assumptions about default values.

this.deletePosIterator = deletePositions;
// consume delete positions until the next is past the current position
boolean isDeleted = currentPos == nextDeletePos;
while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) {
this.nextDeletePos = deletePosIterator.next();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now the only place in the code which refers to it as "this.nextDeletePos"

protected PositionFilterIterator(CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {
super(items);
this.deletePosIterator = deletePositions;
// consume delete positions until the next is past the current position
Copy link
Member

@RussellSpitzer RussellSpitzer May 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is any simpler but we can remove one if statement, not sure if this is more clear.

    // Consume nextDeletePos till past currentPos, if currentPos equals any consumed nextDeletePos the current row has been deleted
    boolean isDeleted = currentPos == nextDeletePos;
    while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) {
      this.nextDeletePos = deletePosIterator.next();
      isDeleted |= currentPos == nextDeletePos
    }
  return isDeleted;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with either one. I kind of think if is easier to read. If we will make it simpler, here is another way, which we don't have to check on isDeleted. What do you think?

while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) {
      this.nextDeletePos = deletePosIterator.next();
      if (currentPos == nextDeletePos) {
        isDeleted = true;
      }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it as-is to avoid last-minute changes in this tricky place.

}

@Test
public void testIsDeletedColumnWithoutDeleteFile() {
Copy link
Contributor Author

@flyrain flyrain May 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test to project is_deleted column when there is no delete file.

@aokolnychyi aokolnychyi merged commit 534f3c9 into apache:master May 27, 2022
@aokolnychyi
Copy link
Contributor

Thanks, @flyrain! Great to have this done. Thanks everyone who reviewed!

@flyrain
Copy link
Contributor Author

flyrain commented May 27, 2022

Thank @aokolnychyi! Thanks everyone for the review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

None yet

8 participants