Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Performance Tuning: Slow stages (Building Workload Profile & Getting Small files from partitions) during Hudi Writes #2620

Open
codejoyan opened this issue Mar 2, 2021 · 19 comments
Assignees
Labels
index performance priority:major degraded perf; unable to move forward; potential bugs

Comments

@codejoyan
Copy link
Contributor

Hi,
I am seeing some performance issues while upserting data especially in the below 2 jobs:

15 (SparkUpsertCommitActionExecutor)
17 (UpsertPartitioner)

Attached are some of the stats regarding the slow jobs/stages.
Configurations used:
--driver-memory 5G --executor-memory 10G --executor-cores 5 --num-executors 10
Upsert config parameters:
option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator").
option("hoodie.upsert.shuffle.parallelism","2").
option("hoodie.insert.shuffle.parallelism","2").
option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT_BYTES, 128 * 1024 * 1024).
option(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES, 128 * 1024 * 1024).
option("hoodie.copyonwrite.record.size.estimate", "40")

Can you please guide how to approach tuning this performance problem? Let me know if you need any further details.

Below are some of the stats:
Screenshot 2021-03-03 at 1 52 04 AM

Environment Description

  • Hudi version : 0.7.0
  • Storage : GCS
  • Running on Docker? (yes/no) : No
@bvaradar
Copy link
Contributor

bvaradar commented Mar 4, 2021

@nsivabalan is looking into this.

@codejoyan
Copy link
Contributor Author

codejoyan commented Mar 6, 2021

Thanks @bvaradar and @nsivabalan. Please let me know how to improve the performance or if you need any further details to investigate.
I used the below configurations (SIMPLE INDEX and turned off compaction) to speed up the inserts and see much improvement:
hoodie.parquet.small.file.limit 0
hoodie.index.type SIMPLE

But what are the downsides of not using the DEFAULT Bloom filter. In my use-case I would have late arriving data, so will the performance suffer because of this choice?

Also I would like to understand why these specific steps are taking time. From Spark web-UI it seems the execution of the below methods are taking too long. Any insights to understand what is happening in the background please?

org.apache.hudi.index.bloom.SparkHoodieBloomIndex.findMatchingFilesForRecordKeys(SparkHoodieBloomIndex.java:266)
org.apache.hudi.index.bloom.SparkHoodieBloomIndex.tagLocationBacktoRecords(SparkHoodieBloomIndex.java:287)
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:433)

@nsivabalan
Copy link
Contributor

Hey hi @codejoyan : Few clarifying questions on your use-case and record keys.

  • What constitutes your record key? Is it completely random, or does it have any ordering property to it.
  • For eg, if you record key consists of timestamp, we could leverage file pruning by min and max ranges per data file. But if its completely random, then our pruning step would be an overhead since we may not filter out any data file as min and max are completely random.
  • Does your ingestion batch contain just inserts or updates as well.
  • If updates, does it touch latest partitions or spread across all partitions equally.

If your record keys are completely random, then using SIMPLE makes sense, as we may not do any filtering. While with default BLOOM index, we do filtering based on min/max ranges which may not be required(since in this step we read parquet footers to parse the min/max ranges).

Once you clarify these details, I can look into it further.

@codejoyan
Copy link
Contributor Author

codejoyan commented Mar 16, 2021

Apologies for the delay @nsivabalan
Below are the answers to the questions you asked:

  • What constitutes your record key? - The record key is random within a partition (store number (integer), trip number (string), bill item number (short)). However 2 of the 3 columns are numeric and sortable. Will pre-sorting help based on the use case above?
  • Does your ingestion batch contain just inserts or updates as well - It consists 90% inserts and 10% updates.
  • If updates, does it touch latest partitions or spread across all partitions equally. - It touches mostly the latest partition.

Few additional Questions:
Use Case:

  • The use-case is to track the user visits to different stores for making purchases.
  • The dataset is partitioned by region and visit date.
  • The record key is ComplexKeyGenerator and combination of store number (integer), trip number (string), bill item number (short).
  • The pre-combine key is a timestamp column.

Based on the above scenario do you suggest:

  1. What other partition strategy or record key strategy might be used to take advantage of bloom filter?
  2. There are 2 jobs that take time. Are both related to index lookup time. Or something else is also contributing to the increased load time?

@kimberlyamandalu
Copy link

I have a similar issue where bloom index performance is very slow for upsert into a Hudi MOR table.
Does anyone know if when Hudi performs an upsert, does it only lookup index for the related partitions or does it lookup against the entire data set? I have partitions of year and month from 1998 to 2020. My upserts are mostly to recent partitions (95%). I also notice a lot of calls to build fs view for older partitions i know should not have any upserts

AbstractTableFileSystemView: Building file system view for partition (message_year=2002/message_month=9)

image

Obtain key ranges for file slices (range pruning=on)
collect at HoodieSparkEngineContext.java:73+details
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
org.apache.hudi.client.common.HoodieSparkEngineContext.map(HoodieSparkEngineContext.java:73)
org.apache.hudi.index.bloom.SparkHoodieBloomIndex.loadInvolvedFiles(SparkHoodieBloomIndex.java:176)
org.apache.hudi.index.bloom.SparkHoodieBloomIndex.lookupIndex(SparkHoodieBloomIndex.java:119)
org.apache.hudi.index.bloom.SparkHoodieBloomIndex.tagLocation(SparkHoodieBloomIndex.java:84)
org.apache.hudi.index.bloom.SparkHoodieBloomIndex.tagLocation(SparkHoodieBloomIndex.java:60)
org.apache.hudi.table.action.commit.AbstractWriteHelper.tag(AbstractWriteHelper.java:69)
org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:51)
org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:46)
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:82)
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:74)
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:146)
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:214)
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:181)
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:134)
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)

@codejoyan
Copy link
Contributor Author

@nsivabalan, any inputs would be very helpful.

@nsivabalan
Copy link
Contributor

@codejoyan : sorry, somehow slipped from my radar.
May I know whats the scale of data you are dealing with? I see your parallelism is very less (2). Can you try w/ 100 or more and see how it goes.

Among 3 methods you have quoted, 2 of them are index related and 3rd is actual write operation.

Best way to decide partitionin strategy is to see what your queries usually filter based on. If its date based, then you definitely need to have date in your partitioning strategy which you already do. And if adding region would cut down most of the data to be looked up, sure. I assume this would also blow up your # partitions in general since its no of dates * no of regions.

wrt record keys and bloom:
You can try to use regular bloom "BLOOM" as index. With this, there are few config knobs. with simple bloom, we don't lot of config knobs to play around.
within a single batch of writes, does records have some ordering to it or is it just random. From your response I guess its random. So, you can turn of range pruning since that may not help much.
https://hudi.apache.org/docs/configurations.html#bloomIndexPruneByRanges to false. (default value is true).

@n3nash : do you have any pointers here.

@nsivabalan
Copy link
Contributor

@kimberlyamandalu : do you have a support ticket for your question. lets not pollute this issue. we can create a new one for your use-case and can discuss over there

@kimberlyamandalu
Copy link

@kimberlyamandalu : do you have a support ticket for your question. lets not pollute this issue. we can create a new one for your use-case and can discuss over there

hi @nsivabalan no, i do not have a separate ticket for my question. I thought it might be related to this so I chimed in. I can open a new ticket for my use case so we can isolate. Sorry for the confusion. Thanks.

@n3nash n3nash added this to In progress in GI Tracker Board Apr 22, 2021
@njalan
Copy link

njalan commented Apr 23, 2021

I face the same issue. It usually takes 1-2 minutes for getting small files from partitions in one micro batch(60 seconds interval). My storage is s3. But it looks like it is working fine on hdfs.

@n3nash
Copy link
Contributor

n3nash commented Apr 27, 2021

@kimberlyamandalu @njalan @codejoyan There are a few problems when using BLOOM_INDEX

  1. Depending on the number of entries in the parquet file, if the BLOOM_INDEX num_entries is not configured correctly, it will lead to lots of false positives that results in bloom index spending more time looking up data. You can check the default bloom index entries here ->
    public static final String DEFAULT_BLOOM_FILTER_NUM_ENTRIES = "60000";
    . You can either increase this or use dynamic bloom filter. We are working on adding metrics to emit how many such false positives happened.
  2. The BLOOM_INDEX step needs to perform a "listing" of the partitions to find the candidate files. On S3 without hoodie.metadata.table being enabled, this listing can take time. Enable the config to eliminate these file listings.
  3. Depending on your workload, BLOOM_INDEX could, in some cases not be the ideal choice. For example, if you have updates across all your partitions, then using SIMPLE_INDEX is better since bloom will just do some extra work and then do the work that SIMPLE_INDEX would have done anyways.

@nsivabalan
Copy link
Contributor

FAQ link on how to configure bloom configs.

@codejoyan
Copy link
Contributor Author

codejoyan commented Jul 22, 2021

Problem Statement: I am using COW table and receiving roughly 1GB of incremental data. The batch has data quality check and upsert. Attached is the spark UI stages screenshot:

  • The record key is complex. It is a composite key of (a, b, c) (string, number, number). There is no timestamp ordering, but we can order by the numbers?
  • Yes the dataset is partitioned. It is regular partition. Tried both regular bloom and regular simple. With simple partition, performance is better.
  • Upsert parallelism is default 1500. Operation is upsert.
  • As suggested I have set "hoodie.bloom.index.prune.by.ranges" = false
  • Upserts with inserts into the new partition and updates predominantly into latest partitions. But few updates touch many old partitions too.

SnapShot Count before the Upsert
Below is the snapshot view before running the upsert.

scala> val svsSnapshotDF = spark.read.format("org.apache.hudi").
     | load(targetPath + "/*/*/*")

scala> svsSnapshotDF.groupBy("v_date").count().sort(col("v_date")).show(false)
21/07/22 11:53:55 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
+----------+---------+                                                          
|v_date|count    |
+----------+---------+
|2021-07-02|266836321|
|2021-07-03|270866302|
|2021-07-04|198333856|
|2021-07-05|212205824|
|2021-07-06|198391165|
|2021-07-07|188043723|
|2021-07-08|445      |
+----------+---------+

Incremental Count after the Upsert:

scala> svsSnapshotDF.select(col("_hoodie_commit_time")).distinct.sort($"_hoodie_commit_time".desc).show(false)
+-------------------+                                                           
|_hoodie_commit_time|
+-------------------+
|20210721051130     |
|20210721045241     |
|20210721043446     |
|20210720185844     |
|20210720113928     |
|20210720110235     |
|20210720093310     |
|20210720073405     |
|20210720055244     |
|20210720051405     |
|20210720041607     |
|20210719181512     |
|20210719150715     |
|20210719140407     |
|20210719134750     |
|20210719133012     |
|20210719131145     |
|20210719063351     |
|20210719061724     |
+-------------------+

scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._

scala> val beginTime = "20210721051130"
beginTime: String = 20210721051130

scala> val svsIncrementalDF = spark.read.format("hudi").
     | option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
     | option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
     | load(targetPath)
svsIncrementalDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 125 more fields]

scala> svsIncrementalDF.groupBy("v_date").count().sort(col("v_date")).show(false)
+----------+-------+                                                            
|v_date|count  |
+----------+-------+
|2021-07-07|2680595|
|2021-07-08|25260  |
+----------+-------+

Completed Jobs Screenshot:
Screenshot 2021-07-22 at 6 11 33 PM

Slow Jobs Details ScreenShot
Screenshot 2021-07-22 at 6 38 10 PM
Screenshot 2021-07-22 at 6 37 52 PM

@codejoyan
Copy link
Contributor Author

codejoyan commented Jul 24, 2021

Some additional details for the above runs.

  1. The configs I am using - REGULAR BLOOM.
  2. Max and Min file size in older partitions - 116 MB and 6 MB respectively
  3. Avg record size - 50 bytes
  4. Avg no of data files in older partitions - Between 157 to 225

I then changed the configs as below to have roughly 100k entries per file. But the performance is worse now. It basically gets stuck. Attached Spark Web UI screenshot

  1. Configs -
    hoodie.insert.shuffle.parallelism - 1500
    hoodie.upsert.shuffle.parallelism - 1500
    hoodie.parquet.small.file.limit - 4200000
    hoodie.parquet.max.file.size - 5000000
    hoodie.index.type - BLOOM
    hoodie.copyonwrite.record.size.estimate - 50
    hoodie.copyonwrite.insert.split.size - 100000
    hoodie.bloom.index.prune.by.ranges - false
    hoodie.bloom.index.filter.type - DYNAMIC_V0
    hoodie.index.bloom.num_entries - 30000

The performance is now okay witth BLOOM index when the incremental batch size is around 100 MB (around 4-5 mins for upsert). But it gets worse when batch size increases (> 5 GB) and the countByKey at BaseSparkCommitActionExecutor.java:154 step gets stuck.

**

  • What is the general rule of thumb between file size, insert split and incremental batch size?
  • Also this also proves that COW table is not good when the incremental batch size varies in size. Am I correct?

**

Screenshot 2021-07-24 at 10 01 31 PM

@xushiyan xushiyan added awaiting-triage priority:critical production down; pipelines stalled; Need help asap. and removed blocked-on-user labels Sep 24, 2021
@xushiyan xushiyan added priority:major degraded perf; unable to move forward; potential bugs and removed priority:critical production down; pipelines stalled; Need help asap. labels Jan 31, 2022
@xushiyan xushiyan moved this from User Action to Repro Needed in GI Tracker Board Jan 31, 2022
@awplxz
Copy link

awplxz commented Jun 2, 2022

Is there any progress on this problem, I have the same problem,
According to my observation,the amount of shuffle data has tripled for the second time upsert

@codope
Copy link
Member

codope commented Aug 1, 2022

Is there any progress on this problem, I have the same problem, According to my observation,the amount of shuffle data has tripled for the second time upsert

@awplxz There were a bunch of fixes regarding performance that we landed recently. Do you see the same behavior with the latest master?

@xushiyan xushiyan moved this from Repro Needed to User Action in GI Tracker Board Oct 30, 2022
@koldic
Copy link

koldic commented Dec 1, 2022

Hi, I have the same problem with slow stages. Firstly it runs well, however when more and more small files are inserted it slows, and the Getting Small files stage with the Doing partition and writing data stage takes even an hour to finish.
I tried to change hoodie.parquet.small.file.limit to the smallest possible value (1MB) to limit the small files that it collects, but it won´t help. When I changed it to 0 it helped, since the stage and collecting small files is disabled with this value. Is there any way how to turn on this setting back without slowing down all jobs or just try to use offline compaction?
I also use a simple Index, as the key is random and version 0.12.1.

@zyclove
Copy link

zyclove commented Nov 17, 2023

I also encountered the same problem with 0.14.0, how to solve it?
disable metadata ?
set hoodie.metadata.table=false;

change hoodie.parquet.small.file.limit ?

set hoodie.bloom.index.prune.by.ranges = false ?

change hoodie.memory.merge.max.size ?

Can this be optimized in hudi 1.0? This stage is simply too time consuming.

image

@FFCMSouza
Copy link

I'm having the same problema on hudi version 0.14.1 and spark 3.4.1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
index performance priority:major degraded perf; unable to move forward; potential bugs
Projects
GI Tracker Board
User Action
Status: 👤 User Action
Development

No branches or pull requests