-
Notifications
You must be signed in to change notification settings - Fork 4.8k
HIVE-27309: Large number of partitions and small files causes OOM in … #4645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
Outdated
Show resolved
Hide resolved
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
Outdated
Show resolved
Hide resolved
6c36bda to
4ece6c3
Compare
4ece6c3 to
d1f6191
Compare
d1f6191 to
0aca7fb
Compare
0aca7fb to
0a99fe5
Compare
| } | ||
|
|
||
| if (conf.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX + conf.get(InputFormatConfig.TABLE_IDENTIFIER)) == null) { | ||
| conf.set(InputFormatConfig.SERIALIZED_TABLE_PREFIX + conf.get(InputFormatConfig.TABLE_IDENTIFIER), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, every split has a conf and each conf would contain a srialized iceberg table. I want to know if the srialized iceberg table would make conf bigger? and also would cause AM oom in case of many splits?
Did we have a benchmark test to verify the validation of this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
Conf contained serialized Iceberg table even before these change. All q-tests besides one were passing after removing the serialized table from splits and without adding serialized table to conf because it was already there. So these changes actually do not increase memory usage anywhere, but strictly decrease, because serialized table is removed from the splits.
-
One q-test that was failing was show_partitions_test.q. It was failing because it is doing select from table.partitions.
-
There were 2 unit test classes (not q-tests) that were failing because serialized table wasn't in conf, but it is something that was present only in unit tests.
For these 2 edge cases, I made the change to conditionally add serialized table into conf in IcebergInputFormat.getSplits(), only in case it is missing in conf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I almost see what you mean. thx
| } | ||
|
|
||
| if (conf.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX + conf.get(InputFormatConfig.TABLE_IDENTIFIER)) == null) { | ||
| conf.set(InputFormatConfig.SERIALIZED_TABLE_PREFIX + conf.get(InputFormatConfig.TABLE_IDENTIFIER), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I almost see what you mean. thx
| .orElseGet(() -> Catalogs.loadTable(conf)); | ||
|
|
||
| if (conf.get(InputFormatConfig.TABLE_IDENTIFIER) == null) { | ||
| conf.set(InputFormatConfig.TABLE_IDENTIFIER, table.name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do not need this param. We can just use table.name(),
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also we can skip the if check as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this is also what i mean ;)
| this.conf = newContext.getConfiguration(); | ||
| this.table = ((IcebergSplit) split).table(); | ||
| this.table = SerializationUtil.deserializeFromBase64( | ||
| conf.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX + conf.get(InputFormatConfig.TABLE_IDENTIFIER))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| conf.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX + conf.get(InputFormatConfig.TABLE_IDENTIFIER))); | |
| conf.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX + table.name())); |
0a99fe5 to
03e684e
Compare
deniskuzZ
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
03e684e to
744acbc
Compare
| .ofNullable(HiveIcebergStorageHandler.table(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER))) | ||
| .orElseGet(() -> Catalogs.loadTable(conf)); | ||
|
|
||
| conf.set(InputFormatConfig.TABLE_IDENTIFIER, table.name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we do the conf setup when loading the table:
Table table = Optional
.ofNullable(HiveIcebergStorageHandler.table(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER)))
.orElseGet(() -> {
Table tbl = Catalogs.loadTable(conf);
conf.set(InputFormatConfig.TABLE_IDENTIFIER, tbl.name());
conf.set(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tbl.name(), SerializationUtil.serializeToBase64(tbl));
return tbl;
});
…query coordinator.
744acbc to
e7c5580
Compare
|
Kudos, SonarCloud Quality Gate passed!
|
…query coordinator (Dmitriy Fingerman, reviewed by Butao Zhang, Denys Kuzmenko) Closes apache#4645









…query coordinator.
What changes were proposed in this pull request?
Removed serialized table from Iceberg split and instead using the serialized table already existing in the config.
Why are the changes needed?
"org.apache.iceberg.SerializableTable" is getting serialized in every split takes a hit with large number of small splits. E.g in the case provided in the ticket, i had 100,000+ splits which were grouped to 41 splits.
However, during serialization "Table" information is serialized and each entity is around 7 KB. With 100,000 entries it will be easily 700 MB where as AM size itself is 2 GB.
When large number of nested partitions (with small files) are read, AM bails out with OOM.
Does this PR introduce any user-facing change?
No
Is the change a dependency upgrade?
No
How was this patch tested?
Passing pre-commit testing.