Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Occasional delay in write stages w/ upsert operation #11821

Open
cbomgit opened this issue Aug 23, 2024 · 5 comments
Open

[SUPPORT] Occasional delay in write stages w/ upsert operation #11821

cbomgit opened this issue Aug 23, 2024 · 5 comments

Comments

@cbomgit
Copy link

cbomgit commented Aug 23, 2024

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

We're experiencing a strange issue when deleting records from a table with an UPSERT command. We're noticing, that occasionally, our job will get stuck on the Doing partition and writing data stage for multiple hours. We generally see this vary between 5 - 30 minutes, but will sometimes see this take several hours. For example, in this screen shot, the one task took 5-ish hours (the remaining tasks were skipped).

image

Our job runs in a loop and works of batches of data, so it is a long running job. We have not yet identified commonalities in the data batches when we run into this. We have not observed high CPU, OOM, loss of data nodes or high datanode space usage, during this time.

We do not see any errors in the driver application logs or the executor/task logs that indicate that something is going wrong. We do see a lot heartbeat files that are written during this time. Digging into the task logs for an example task that took a long time. We see something like:

4-08-23 09:26:32,501 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) org.apache.hudi.io.HoodieMergeHandle: Number of entries in MemoryBasedMap => 170, Total size in bytes of MemoryBasedMap => 165668, Number of entries in BitCaskDiskMap => 0, Size of file spilled to disk => 0
2024-08-23 09:26:32,501 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) org.apache.hudi.io.HoodieMergeHandle: partitionPath:region=NA/year=2024/month=02/day=14/hour=00, fileId to be merged:b8973e4d-5bc9-4a3a-ba99-8181c9831056-0
2024-08-23 09:26:32,575 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) org.apache.hudi.io.HoodieMergeHandle: Merging new data into oldPath s3://bucket-name/region=NA/year=2024/month=02/day=14/hour=00/b8973e4d-5bc9-4a3a-ba99-8181c9831056-0_4-4995-1712252_20240815132438223.parquet, as newPath s3://bucket-name/region=NA/year=2024/month=02/day=14/hour=00/bucket-name
2024-08-23 09:26:33,241 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) org.apache.hudi.table.marker.DirectWriteMarkers: Creating Marker Path=s3://bucket-name/.hoodie/.temp/20240823091953364/region=NA/year=2024/month=02/day=14/hour=00/bucket-name.marker.MERGE
2024-08-23 09:26:33,275 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) com.amazon.ws.emr.hadoop.fs.cse.CSEMultipartUploadOutputStream: close closed:false s3://bucket-name/.hoodie/.temp/20240823091953364/region=NA/year=2024/month=02/day=14/hour=00/bucket-name.marker.MERGE
2024-08-23 09:26:33,460 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) com.amazon.ws.emr.hadoop.fs.s3.upload.dispatch.DefaultMultipartUploadDispatcher: Completed multipart upload of 1 parts 0 bytes
2024-08-23 09:26:33,460 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) com.amazon.ws.emr.hadoop.fs.cse.CSEMultipartUploadOutputStream: Finished uploading bucket-name/.hoodie/.temp/20240823091953364/region=NA/year=2024/month=02/day=14/hour=00/bucket-name.marker.MERGE. Elapsed seconds: 0.
2024-08-23 09:26:33,460 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) org.apache.hudi.table.marker.DirectWriteMarkers: [direct] Created marker file s3://bucket-name/.hoodie/.temp/20240823091953364/region=NA/year=2024/month=02/day=14/hour=00/bucket-name.marker.MERGE in 885 ms:%

The next time we see this task id in the logs, we see:

2024-08-23 11:18:09,558 [INFO] (producer-thread-1) org.apache.hudi.common.util.queue.IteratorBasedQueueProducer: finished buffering records
2024-08-23 11:18:09,560 [INFO] (consumer-thread-1) org.apache.hudi.common.util.queue.BoundedInMemoryExecutor: Queue Consumption is done; notifying producer threads
2024-08-23 11:18:12,007 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) com.amazon.ws.emr.hadoop.fs.cse.CSEMultipartUploadOutputStream: close closed:false s3://bucket-name/table_name/region=NA/year=2024/month=02/day=14/hour=00/b8973e4d-5bc9-4a3a-ba99-8181c9831056-0_4-5965-2033103_20240823091953364.parquet
2024-08-23 11:18:13,383 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) com.amazon.ws.emr.hadoop.fs.s3.upload.dispatch.DefaultMultipartUploadDispatcher: Completed multipart upload of 6 parts 766624225 bytes
2024-08-23 11:18:13,383 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) com.amazon.ws.emr.hadoop.fs.cse.CSEMultipartUploadOutputStream: Finished uploading bucket-name/table_name/region=NA/year=2024/month=02/day=14/hour=00/b8973e4d-5bc9-4a3a-ba99-8181c9831056-0_4-5965-2033103_20240823091953364.parquet. Elapsed seconds: 6699.
2024-08-23 11:18:13,513 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) org.apache.hudi.io.HoodieMergeHandle: MergeHandle for partitionPath region=NA/year=2024/month=02/day=14/hour=00 fileID b8973e4d-5bc9-4a3a-ba99-8181c9831056-0, took 6701018 ms.

Note the 5th line that says that elapsed seconds (6699 = 1.86 hours). When looking at the uploaded file in question, I notice that it is about ~800MB in size. Is it expected that such a file size would bottleneck the job like this?

Here's the config we're using (with sensitive info redacted):

hoodie.parquet.small.file.limit -> 104857600
hoodie.datasource.write.precombine.field -> eventVersion
hoodie.datasource.write.payload.class -> org.apache.hudi.common.model.EmptyHoodieRecordPayload
hoodie.bloom.index.filter.dynamic.max.entries -> 1106137
hoodie.cleaner.fileversions.retained -> 2
hoodie.parquet.max.file.size -> 134217728
hoodie.cleaner.parallelism -> 1500
hoodie.write.lock.client.num_retries -> 10
hoodie.delete.shuffle.parallelism -> 1500
hoodie.bloom.index.prune.by.ranges -> true
hoodie.metadata.enable -> false
hoodie.clean.automatic -> false
hoodie.datasource.write.operation -> upsert
hoodie.write.lock.wait_time_ms -> 600000
hoodie.metrics.reporter.type -> CLOUDWATCH
hoodie.datasource.write.recordkey.field -> timestamp,eventId,subType,trackedItem
hoodie.table.name -> my_table_name
hoodie.datasource.write.table.type -> COPY_ON_WRITE
hoodie.datasource.write.hive_style_partitioning -> true
hoodie.datasource.write.partitions.to.delete -> 
hoodie.write.lock.dynamodb.partition_key -> my_table_name_key
hoodie.cleaner.policy -> KEEP_LATEST_FILE_VERSIONS
hoodie.write.markers.type -> DIRECT
hoodie.metrics.on -> false
hoodie.datasource.write.reconcile.schema -> true
hoodie.datasource.write.keygenerator.class -> org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.cleaner.policy.failed.writes -> LAZY
hoodie.upsert.shuffle.parallelism -> 1500
hoodie.write.lock.dynamodb.table -> HoodieLockTable
hoodie.write.lock.provider -> org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider
hoodie.datasource.write.partitionpath.field -> region,year,month,day,hour
hoodie.bloom.index.filter.type -> DYNAMIC_V0
hoodie.write.lock.wait_time_ms_between_retry -> 30000
hoodie.write.concurrency.mode -> optimistic_concurrency_control
hoodie.write.lock.dynamodb.region -> us-east-1

Environment Description

  • Hudi version : 0.11.1

  • Spark version : 3.2.1

  • Hive version : 3.1.3

  • Hadoop version : 3.2.1

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

Happy to provide more info. Thanks!

Editing to add cluster specifics:

  • Running on a 60 node cluster, r5a.8xlarge.
  • spark configs:
[
    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.default.parallelism": "3350",
            "spark.driver.cores": "5",
            "spark.driver.extraJavaOptions": "-Djavax.net.ssl.trustStore=/home/hadoop/trust-store/InternalAndExternalTrustStore.jks -Djavax.net.ssl.trustStorePassword=amazon -XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p'",
            "spark.driver.extraLibraryPath": "/home/hadoop/caching-shared-objects:/usr/lib/hadoop-lzo/lib/native",
            "spark.driver.memory": "37477M",
            "spark.driver.memoryOverhead": "4164M",
            "spark.dynamicAllocation.enabled": "false",
            "spark.eventLog.logBlockUpdates.enabled": "true",
            "spark.executor.cores": "5",
            "spark.executor.extraJavaOptions": "-Djavax.net.ssl.trustStore=/home/hadoop/trust-store/InternalAndExternalTrustStore.jks -Djavax.net.ssl.trustStorePassword=amazon -XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p'",
            "spark.executor.extraLibraryPath": "/home/hadoop/caching-shared-objects:/usr/lib/hadoop-lzo/lib/native",
            "spark.executor.heartbeatInterval": "60s",
            "spark.executor.instances": "335",
            "spark.executor.memory": "37477M",
            "spark.executor.memoryOverhead": "4164M",
            "spark.history.fs.cleaner.enabled": "true",
            "spark.history.fs.cleaner.interval": "1d",
            "spark.history.fs.cleaner.maxAge": "7d",
            "spark.memory.fraction": "0.80",
            "spark.memory.storageFraction": "0.30",
            "spark.network.timeout": "800s",
            "spark.rdd.compress": "true",
            "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
            "spark.shuffle.compress": "true",
            "spark.shuffle.spill.compress": "true",
            "spark.sql.shuffle.partitions": "750",
            "spark.storage.level": "MEMORY_AND_DISK_SER",
            "spark.yarn.scheduler.reporterThread.maxFailures": "5"
        }
    },
    {
        "Classification": "capacity-scheduler",
        "Properties": {
            "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
        }
    }
]
@cbomgit
Copy link
Author

cbomgit commented Aug 23, 2024

Screenshot 2024-08-23 at 4 14 57 PM

I saved a thread dump but here is a screenshot of the thread dump for the executor, which suggests that it's getting hung up here: https://github.com/apache/hudi/blob/release-0.11.1/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java#L147

@danny0405
Copy link
Contributor

My speculation is that it is because of the false positive of bloom_filter index, while because you are still using 0.11 release, maybe you can try simple index instead.

@cbomgit
Copy link
Author

cbomgit commented Aug 24, 2024

Thanks for the response! Is there any way to confirm the fp rate?

@danny0405
Copy link
Contributor

There is a stage named like "buildPartitionProfile" which would take long time if the fp happens.

@cbomgit
Copy link
Author

cbomgit commented Aug 25, 2024

Thanks. Switching to SIMPLE index doesn't seem to help. Job fails at the following job and stage:

Screenshot 2024-08-25 at 2 08 33 PM
Screenshot 2024-08-25 at 2 09 05 PM
Screenshot 2024-08-25 at 2 09 00 PM

The task logs show OOM error:

2024-08-25T05:22:43.611+0000: [Full GC (Allocation Failure)  36580M->33919M(40960M), 46.7644028 secs]
   [Eden: 0.0B(1792.0M)->0.0B(2048.0M) Survivors: 256.0M->0.0B Heap: 36580.4M(40960.0M)->33919.2M(40960.0M)], [Metaspace: 117312K->117312K(135168K)]
 [Times: user=77.83 sys=0.20, real=46.76 secs] 
2024-08-25T05:23:30.376+0000: [Full GC (Allocation Failure)  33919M->33904M(40960M), 43.9530911 secs]
   [Eden: 0.0B(2048.0M)->0.0B(2048.0M) Survivors: 0.0B->0.0B Heap: 33919.2M(40960.0M)->33904.1M(40960.0M)], [Metaspace: 117312K->113897K(135168K)]
 [Times: user=74.71 sys=0.03, real=43.96 secs] 
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p
kill -9 %p"
#   Executing /bin/sh -c "kill -9 829
kill -9 829"...

For settings, I set simple index and upsert parallelism to 1500, but it doesn't seem to be using that setting here. Are there any HUDI config knobs you recommend tweaking to increase the throughput here?

@nsivabalan nsivabalan changed the title [SUPPORT] [SUPPORT] Occasional delay in write stages w/ upsert operation Sep 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Awaiting Triage
Development

No branches or pull requests

3 participants