Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Improve GenericReader performance #4218

Merged
merged 5 commits into from
Mar 4, 2022
Merged

Conversation

pvary
Copy link
Contributor

@pvary pvary commented Feb 24, 2022

@rbalamohan identified that:

the computation of "get" in "Caffeine" cache itself is expensive with operations like lookups, afterreads, stats updates, fork-join for purging etc.

This causes running GenericRecord.create() for every record expensive.
Sometimes we can not reuse the containers in readers, but we still need better performance. If we create a template record in readers and copy this template record then we can avoid the cache retrieval of the nameToPos map.

Created some basic jmh performance tests, and here are the results:

benchmark-result.txt.orc.base:  mean =      2.571 ±(99.9%) 0.172 s/op
benchmark-result.txt.orc.fix:  mean =      2.297 ±(99.9%) 0.129 s/op
benchmark-result.txt.parquet.base:  mean =      2.649 ±(99.9%) 0.136 s/op
benchmark-result.txt.parquet.fix:  mean =      2.209 ±(99.9%) 0.093 s/op

We can see that the change gained ~10-20% performance for the generic readers.

@pvary
Copy link
Contributor Author

pvary commented Feb 24, 2022

CC: @rbalamohan

@pvary
Copy link
Contributor Author

pvary commented Feb 24, 2022

@aokolnychyi: Could you please review when you have some time? I see that you created the other jmh tests, so you might be the best person to take a look.
Thanks,
Peter

return o -> {
if (o == null) {
return null;
}

List<Object> data = ((StructObjectInspector) pair.sourceInspector()).getStructFieldsDataAsList(o);
Record result = GenericRecord.create(type);
Record result = template.copy();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this result in NPE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then there were an NPE with the original code as well. The template is only null, when the type is null.

  private GenericRecord(StructType struct) {
    this.struct = struct;
    this.size = struct.fields().size();         <-- NPE here when the struct is null
    this.values = new Object[size];
    this.nameToPos = NAME_MAP_CACHE.get(struct);
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, the type should never be null here because the SchemaWithPartnerVistor guarantee it. See:

for (Types.NestedField field : struct.fields()) {
P fieldPartner = partner != null ? accessors.fieldPartner(partner, field.fieldId(), field.name()) : null;
visitor.beforeField(field, fieldPartner);
T result;
try {
result = visit(field.type(), fieldPartner, visitor, accessors);
} finally {
visitor.afterField(field, fieldPartner);
}
results.add(visitor.field(field, fieldPartner, result));

I think we can just add a Precondition.checkNotNull in the line128 instead of checking the null case in a if-else block.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here we use GenericRecord(GenericRecord toCopy) constructor rather than GenericRecord(StructType struct) to eliminate the access to NAME_MAP_CACHE ? I'm afraid the further PR will easily fallback to use the other because this tiny different is hard for the next developer and reviewer to notice. Is possible to add few doc ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were an issue with positional deletes where the row was not present in the file. The code below still created reader for the empty struct.

      Types.StructType struct = iType != null ? iType.asStructType() : null;
      return visitor.struct(struct, group, visitFields(struct, group, visitor));

Changed it to this, let's see if it breaks something:

      return iType != null ?
              visitor.struct(iType.asStructType(), group, visitFields(iType.asStructType(), group, visitor)) : null;

Copy link
Contributor Author

@pvary pvary Mar 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to revert the changes.
In the Deserializer we do not have null struct so I added the Preconditions.checkNotNull() check, but I had to revert to the null checks at the readers because the Spark reader expects the visitors/readers to handle null structs when the readers are created.

I had failures with TestSparkReadProjection.testListOfStructsProjection() and TestGenericAppenderFactory.testPosDeleteWriterWithRowSchema()

@pvary
Copy link
Contributor Author

pvary commented Feb 26, 2022

Seems like @aokolnychyi is occupied otherwise. @RussellSpitzer, or @shardulm94, any chance of taking a look?
Thanks, Peter

@pvary
Copy link
Contributor Author

pvary commented Mar 1, 2022

@openinx or @rdblue?
This is a small change and it would save ~10-20% performance for the generic readers.
I would like to have a review, so I can merge.

Thanks,
Peter

data/benchmark/.gitkeep Outdated Show resolved Hide resolved
data/src/jmh/java/org/apache/iceberg/ReaderBenchmark.java Outdated Show resolved Hide resolved
return o -> {
if (o == null) {
return null;
}

List<Object> data = ((StructObjectInspector) pair.sourceInspector()).getStructFieldsDataAsList(o);
Record result = GenericRecord.create(type);
Record result = template.copy();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, the type should never be null here because the SchemaWithPartnerVistor guarantee it. See:

for (Types.NestedField field : struct.fields()) {
P fieldPartner = partner != null ? accessors.fieldPartner(partner, field.fieldId(), field.name()) : null;
visitor.beforeField(field, fieldPartner);
T result;
try {
result = visit(field.type(), fieldPartner, visitor, accessors);
} finally {
visitor.afterField(field, fieldPartner);
}
results.add(visitor.field(field, fieldPartner, result));

I think we can just add a Precondition.checkNotNull in the line128 instead of checking the null case in a if-else block.

return o -> {
if (o == null) {
return null;
}

List<Object> data = ((StructObjectInspector) pair.sourceInspector()).getStructFieldsDataAsList(o);
Record result = GenericRecord.create(type);
Record result = template.copy();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here we use GenericRecord(GenericRecord toCopy) constructor rather than GenericRecord(StructType struct) to eliminate the access to NAME_MAP_CACHE ? I'm afraid the further PR will easily fallback to use the other because this tiny different is hard for the next developer and reviewer to notice. Is possible to add few doc ?

@pvary
Copy link
Contributor Author

pvary commented Mar 2, 2022

@openinx: Fixed the null check where I was able to, added the comments, and the tests are green. If you have some time, I think the PR is ready for another review. Thanks again for taking the time to check this out!

@ben-manes
Copy link

private static final LoadingCache<StructType, Map<String, Integer>> NAME_MAP_CACHE =
Caffeine.newBuilder()
.weakKeys()
.build(struct -> {
Map<String, Integer> idToPos = Maps.newHashMap();
List<Types.NestedField> fields = struct.fields();
for (int i = 0; i < fields.size(); i += 1) {
idToPos.put(fields.get(i).name(), i);
}
return idToPos;
});

Note that this uses identity (==) equality for key lookup. If you are using equivalent (.equals(o)) instances instead then this will be a cache miss and the cache won't be providing much value. Since it is weak keys this seems like a likely mistake and of course lets the GC collect aggressively, so you might not get a lot of reuse. It is often a helpful feature for cases like caching request-scoped data, and I'm not familiar enough with the code base to assess if this is a problematic usage. Typically a bounded cache is better and more explicit, but does require some mental effort to decide on a decent maximum size.

Otherwise if should perform similarly to a ConcurrentHashMap from the caller's perspective.


RecordReader(List<Type> types,
List<ParquetValueReader<?>> readers,
StructType struct) {
super(types, readers);
this.structType = struct;
this.template = struct != null ? GenericRecord.create(struct) : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these also have preconditions that validate the struct is not null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original change did not contain the null check, and there were failing tests (org.apache.iceberg.TestGenericAppenderFactory > testPosDeleteWriterWithRowSchema[FileFormat=parquet, Partitioned=true] FAILED) with the exception below.

In nutshell, when we are reading the positional delete files we either have row in the data or we don't. I have tried to fix this by returning a null reader instead of creating a reader for a null struct. But this caused issues with Spark projection where we create the reader for the full schema, and having a null or missing reader caused issues when we tried to match the readers to the projected schema.
Fixing this is not trivial so I decided to move forward with the null checks to keep the backward compatibility:

  • It should possible to create a reader with a null schema
  • Fail only when we try to actually read from this reader
    java.lang.NullPointerException
        at org.apache.iceberg.data.GenericRecord.<init>(GenericRecord.java:63)
        at org.apache.iceberg.data.GenericRecord.create(GenericRecord.java:53)
        at org.apache.iceberg.data.parquet.GenericParquetReaders$RecordReader.<init>(GenericParquetReaders.java:62)
        at org.apache.iceberg.data.parquet.GenericParquetReaders.createStructReader(GenericParquetReaders.java:52)
        at org.apache.iceberg.data.parquet.BaseParquetReaders$ReadBuilder.struct(BaseParquetReaders.java:176)
        at org.apache.iceberg.data.parquet.BaseParquetReaders$ReadBuilder.struct(BaseParquetReaders.java:114)
        at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:141)
        at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visitField(TypeWithSchemaVisitor.java:169)
        at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visitFields(TypeWithSchemaVisitor.java:183)
        at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:47)
        at org.apache.iceberg.data.parquet.BaseParquetReaders.createReader(BaseParquetReaders.java:67)
        at org.apache.iceberg.data.parquet.BaseParquetReaders.createReader(BaseParquetReaders.java:58)
        at org.apache.iceberg.data.parquet.GenericParquetReaders.buildReader(GenericParquetReaders.java:41)
        at org.apache.iceberg.data.DeleteFilter.lambda$openDeletes$6(DeleteFilter.java:255)
        at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:118)
        at org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:66)
        at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:77)
        at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:38)
        at org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:35)
        at org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:73)
        at org.apache.iceberg.io.CloseableIterable$4$1.<init>(CloseableIterable.java:99)
        at org.apache.iceberg.io.CloseableIterable$4.iterator(CloseableIterable.java:98)
        at org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.<init>(CloseableIterable.java:152)
        at org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.<init>(CloseableIterable.java:143)
        at org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable.iterator(CloseableIterable.java:138)
        at org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable.iterator(CloseableIterable.java:129)
        at java.lang.Iterable.forEach(Iterable.java:74)
        at org.apache.iceberg.deletes.Deletes.toPositionIndex(Deletes.java:97)
        at org.apache.iceberg.deletes.Deletes.toPositionIndex(Deletes.java:91)
        at org.apache.iceberg.data.DeleteFilter.applyPosDeletes(DeleteFilter.java:231)
        at org.apache.iceberg.data.DeleteFilter.filter(DeleteFilter.java:126)
        at org.apache.iceberg.data.GenericReader.open(GenericReader.java:77)
        at org.apache.iceberg.relocated.com.google.common.collect.Iterators$6.transform(Iterators.java:826)
        at org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:52)
        at org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.<init>(CloseableIterable.java:151)
        at org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.<init>(CloseableIterable.java:143)
        at org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable.iterator(CloseableIterable.java:138)
        at org.apache.iceberg.data.GenericReader.open(GenericReader.java:65)
        at org.apache.iceberg.data.TableScanIterable.iterator(TableScanIterable.java:41)
        at org.apache.iceberg.data.TableScanIterable.iterator(TableScanIterable.java:29)
        at java.lang.Iterable.forEach(Iterable.java:74)

Copy link
Member

@openinx openinx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost looks good to me now, except the .gitignore issue and @rdblue 's Precondition issue.

@github-actions github-actions bot added the INFRA label Mar 3, 2022
@pvary
Copy link
Contributor Author

pvary commented Mar 3, 2022

@ben-manes:

Note that this uses identity (==) equality for key lookup.

We are reusing the struct objects, so == should be fine. @rbalamohan checked and he wrote:

Probed further: It doesn't look like a cache miss, but the computation of "get" in "Caffeine" cache itself is expensive with operations like lookups, afterreads, stats updates, fork-join for purging etc.

Copy link
Member

@openinx openinx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good for me now. Thanks @pvary for the contribution, and thanks all for reviewing !

@openinx openinx merged commit e75d732 into apache:master Mar 4, 2022
@openinx openinx added this to the Iceberg 0.14.0 Release milestone Mar 4, 2022
@pvary
Copy link
Contributor Author

pvary commented Mar 4, 2022

Thanks everyone for the review and the merge!

@pvary pvary deleted the genericspeed branch March 17, 2022 11:13
sunchao pushed a commit to sunchao/iceberg that referenced this pull request May 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants