Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] spark executors died due to underestimated record size #5939

Closed
haoxie-aws opened this issue Jun 22, 2022 · 4 comments · Fixed by #6864
Closed

[SUPPORT] spark executors died due to underestimated record size #5939

haoxie-aws opened this issue Jun 22, 2022 · 4 comments · Fixed by #6864
Assignees
Labels
priority:major degraded perf; unable to move forward; potential bugs spark Issues related to spark writer-core Issues relating to core transactions/write actions

Comments

@haoxie-aws
Copy link

Describe the problem you faced

Hi Hudi team! I have some spark executors intermittently die. When I look into the tasks assigned to dead executors, the tasks were trying to write parquet files that were over 320MB according to the logs of other executors that completed the tasks afterwards. However our PARQUET_MAX_FILE_SIZE is set to 100MB. I also noticed “AvgRecordSize => 26” in the driver log when executors die, while AvgRecordSize is usually above 100 for runs that don’t have executors die. I’m guessing the underestimated record size made Hudi decide to load more record in memory than it can handle and die due to out of memory.

So I took two steps here.

  • To verify if it is the underestimated record size that is causing the issue I added a lower bound of estimated record size which is 0.7 * COPY_ON_WRITE_RECORD_SIZE_ESTIMATE. COPY_ON_WRITE_RECORD_SIZE_ESTIMATE is configured to 110 in my setup. With this change executors stop dying. So I think it confirms that underestimated record size is the cause of dead executors.
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index c54c526253..2cf2b4521b 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -383,6 +383,6 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends SparkHo
       // make this fail safe.
       LOG.error("Error trying to compute average bytes/record ", t);
     }
-    return avgSize;
+    return Math.max(avgSize, (long)(0.7 * hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate()));
   }
 }
  • To understand where the small average record size are from, I looked into hudi commits stats. From the screenshot below we can clearly see that average record size for replacecommit is consistently smaller than the size for a normal commit, and it matches what I see about AvgRecordSize in logs. I also looked into the column size of some parquet files and found that the file generated by replacecommit has significantly fewer different values for some dimensions, therefore it has lower compression ratio.


Image (1)

My setup:

  • Hudi 0.11.0
  • CoW + inline clustering
  • Both PARQUET_MAX_FILE_SIZE and PARQUET_SMALL_FILE_LIMIT are 100MB.
  • I have a few partitions in my table, each partition has around 200GB data.
  • Spark job runs on AWS Glue G.2X workers.

Expected behavior

Hudi should prevent killing spark executors.

Environment Description

  • Hudi version : 0.11.0

  • Spark version : 3.1.2

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : No

@haoxie-aws haoxie-aws changed the title [SUPPORT] [SUPPORT] spark executors died due to underestimated record size Jun 22, 2022
@xushiyan xushiyan added this to Awaiting Triage in GI Tracker Board via automation Jun 23, 2022
@xushiyan xushiyan added spark Issues related to spark writer-core Issues relating to core transactions/write actions priority:major degraded perf; unable to move forward; potential bugs labels Jun 23, 2022
@haoxie-aws
Copy link
Author

@xushiyan This issue seems still awaiting triage, is there any progress on it?

@nsivabalan nsivabalan added priority:critical production down; pipelines stalled; Need help asap. and removed priority:major degraded perf; unable to move forward; potential bugs labels Sep 12, 2022
@xushiyan xushiyan added priority:major degraded perf; unable to move forward; potential bugs and removed priority:critical production down; pipelines stalled; Need help asap. labels Oct 4, 2022
@xushiyan
Copy link
Member

xushiyan commented Oct 4, 2022

@haoxie-aws thanks for the awesome analysis! Your observation is correct and expected, as in clustering makes better use of compression to reduce the file sizes on disk, as records are clustered together for some common fields. The stats from replacecommit reflected that. What should be fixed is, for upsert partitioner which assigns inserts, we should only consider avg size from commit instants. I'll create a patch for this.

@xushiyan xushiyan moved this from Awaiting Triage to Triaged in GI Tracker Board Oct 4, 2022
@xushiyan xushiyan linked a pull request Oct 4, 2022 that will close this issue
4 tasks
GI Tracker Board automation moved this from Triaged to Done Oct 5, 2022
@haoxie-aws
Copy link
Author

@xushiyan thanks for making the fix! We have tested it and it solved the problem! Will it be in the closest upcoming release (probably 0.12.1)?

@xushiyan
Copy link
Member

@haoxie-aws sure. unfortunately it has passed the code freeze for this release and does not qualify for a blocker fix. we aim to include it in 0.12.2 still

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:major degraded perf; unable to move forward; potential bugs spark Issues related to spark writer-core Issues relating to core transactions/write actions
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

3 participants