Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MOR table rolls out new parquet files at 10MB for new inserts - even though max file size set as 128MB #3676

Closed
FelixKJose opened this issue Sep 16, 2021 · 12 comments
Labels
priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions

Comments

@FelixKJose
Copy link

I have been trying to set up data in HUDI MOR table for my consumption load test, but come across an interesting behavior. I have configured the write operation as 'upsert', even though all my writes are 'insert'. When I do perform new insert, I could see new version of the base parquet file is getting created by appending new data.
As my commits to retained configuration is set to 1, it keeps only one old version of the base file. Everything was working as expected until it starts creating new parquet files once the current partquet file reaches ~10 MB even though my max parquet file size is set to 128MB and small file size as default (100MB).

Following is my configuration:
"hoodie.datasource.write.table.type": "MERGE_ON_READ",
"hoodie.datasource.write.precombine.field": "eventDateTime",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.write.streaming.retry.count": 3,
"hoodie.datasource.write.streaming.retry.interval.ms": 2000,
"hoodie.datasource.write.streaming.ignore.failed.batch": "false",
"hoodie.payload.ordering.field": "eventDateTime",
"hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.DefaultHoodieRecordPayload",
"hoodie.upsert.shuffle.parallelism": 1,
"hoodie.insert.shuffle.parallelism": 1,
"hoodie.consistency.check.enabled": "false",
"hoodie.index.type": "BLOOM",
"hoodie.bloom.index.filter.type": "DYNAMIC_V0",
"hoodie.index.bloom.num_entries": 60000,
"hoodie.index.bloom.fpp": 1e-09,
"hoodie.parquet.max.file.size": "134217728",
"hoodie.parquet.block.size": "134217728",
"hoodie.parquet.page.size": "1048576",
# "hoodie.datasource.compaction.async.enable": True,
"hoodie.compact.inline": True,
# "hoodie.clean.async": True,
# 'hoodie.clean.automatic': True,
'hoodie.cleaner.commits.retained': 1,
"hoodie.keep.min.commits": 2,
"hoodie.compact.inline.max.delta.commits": 2,
"hoodie.table.name": "flattened_calculations_mor",
"hoodie.datasource.write.recordkey.field": "identifier",
"hoodie.datasource.hive_sync.table": "flattened_calculations_mor",
"hoodie.upsert.shuffle.parallelism": 5,
"hoodie.insert.shuffle.parallelism": 5,
"hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.NonPartitionedExtractor"

    spark_session = (
        SparkSession.builder.appName("Data_Bulk_ingestion_Job")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.hive.convertMetastoreParquet", "false")
        .config(
            "spark.jars.packages",
            "org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.0.2",
        )
        .getOrCreate()
    )
Following is the screenshot of the fioles generated:

Screen Shot 2021-09-16 at 11 25 35 AM

Following is the .hoodie folder content: [hoodie.zip](https://github.com/apache/hudi/files/7179089/hoodie.zip)

@nsivabalan

@nsivabalan
Copy link
Contributor

nsivabalan commented Sep 18, 2021

I see you have very aggressive cleaner and archival commits to retain. Can you leave it to default and try it out.

'hoodie.cleaner.commits.retained': 1,
"hoodie.keep.min.commits": 2,

may be we can deduce if cleaner/archival interplays w/ small file handling.

@nsivabalan nsivabalan added blocked-on-user priority:major degraded perf; unable to move forward; potential bugs labels Sep 18, 2021
@FelixKJose
Copy link
Author

I have tried to set those two configs to default but I am having the same behavior (10MB)

@nsivabalan
Copy link
Contributor

I found the rootcause. Looks like in MOR, when an index is used which cannot index log files (which is the case for all out of box indexes in hudi), we just choose the smallest parquet file for every commit. So, over time, every file will grow to become fullest is the idea here.

source link

@vinothchandar : do you know whats the reason here. my understanding is that, new inserts will not go into delta logs at all and only updates go into delta logs. If this statement is true, we don't need to do special handling if HoodieIndex can index log files or not.
Or am I missing something here.

@FelixKJose : So, if you keep adding more and more commits, each parquet data file will slowly grow to get to its full size. just that for one commit, only one base file per partition will be chosen.

Also, a suggestion wrt some other configs as we are dealing w/ small file handling. do pay attention to recordsizeestimate . bcoz, only those files whose size is > ( recordsizeestimationthreshold * smallfilelimit) will be considered while determining the avg record size.

To illustrate w/ an example:
for first commit, hudi relies on recordsizeestimate to pack records into data files. After that, hudi can calculate based on previous commit stats. But even in that, only those files which has some min size threshold will be considered. for eg, as per default values, record size estimation threshold is 1.0 and parquet small file size is 100Mb. and so only those data files whose size is > 100Mb will be looked into to determine avg record size. Until then, hudi just takes the value from recordsizeestimate.

So, for eg, avg record size in your case is 200kb. but you did not set the right value for recordsizeestimate. So, until there is one data file whose size is > 100Mb, hudi will assume record size as recordsizeestimate. So, in summary, hudi will keep assuming avg record size as 1024 and assign records to data files based on this estimation.

@JoshuaZhuCN
Copy link

I've encountered the same problem, which I've temporarily resolved by setting the value of the parameter COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE for each commit, and it doesn't appear in 0.8.0

@FelixKJose
Copy link
Author

Thank you so much @nsivabalan
I have tried the following
"hoodie.copyonwrite.record.size.estimate": "204800", # 200KB
but now its generating large number of smaller files (around 530KB), so I am wondering what is wrong?

@xushiyan xushiyan added spark Issues related to spark awaiting-triage and removed blocked-on-user labels Sep 24, 2021
@vinothchandar vinothchandar added writer-core Issues relating to core transactions/write actions and removed spark Issues related to spark labels Sep 24, 2021
@zhihuihong
Copy link

zhihuihong commented Sep 27, 2021

have you tried using clustering after inserting data?
my job created many 7mb files as well, and i used clustering to reorganize data layout. I don't know how to change 7mb settings as well, but clustering works. (maybe 7mb depends on some specific ratio of hoodie.parquet.max.file.size?)

Below are some posts found in hudi website for your reference:
https://hudi.apache.org/blog/2021/01/27/hudi-clustering-intro
https://hudi.apache.org/blog/2021/08/23/async-clustering

@nsivabalan
Copy link
Contributor

@FelixKJose : what I meant is, you are good w/ your configs in general. just that for every commit only one small file will be packed w/ more inserts. rest of incoming records will go into new files. So, every commit will try to make a small file into bigger one. when this progresses w/ 20 to 30 commits, your small file count should go down eventually. I mean, older ones will go down. may be new ones could pop up, but newer commits will take care of it.

@FelixKJose
Copy link
Author

@nsivabalan Ok, got it, but kind of confused because, I was hoping all inserts will to one parquet file until it reaches the given max size (128 MB). Seems like my understanding is not correct, and I don't see anywhere this is described like this. Could you point me to any docs describing this behavior?

@nsivabalan
Copy link
Contributor

guess, we don't have clear documentation around this. I myself had to dig through the code and tried it myself before confirming some of the nuance behaviors.

@nsivabalan
Copy link
Contributor

nsivabalan commented Oct 13, 2021

@FelixKJose : If you are interested in working on a fix, I have filed a tracking jira https://issues.apache.org/jira/browse/HUDI-2550. I can help guide you if you are interested.

@xushiyan xushiyan added jira-filed priority:critical production down; pipelines stalled; Need help asap. and removed awaiting-triage priority:major degraded perf; unable to move forward; potential bugs labels Oct 21, 2021
@xushiyan
Copy link
Member

@nsivabalan i also filed https://issues.apache.org/jira/browse/HUDI-2609 to make docs clearer on this.

@gupash
Copy link

gupash commented Sep 29, 2022

@nsivabalan, just came across this ticket, I have 2 questions:

  1. I do understand that the smallest parquet file will be padded with more inserts per commit. And eventually all the base parquet files will increase in size. My question is, why are more than 1 base parquet file even getting created per partition, in the first place, if the first one is less than 100MB?
  2. hoodie.copyonwrite.record.size.estimate only works incase of COW table, what about MOR table?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions
Projects
None yet
Development

No branches or pull requests

7 participants