-
Notifications
You must be signed in to change notification settings - Fork 582
Description
Backend
VL (Velox)
Problem Description
We encountered a task failure when running the same job on Gluten that succeeds on vanilla Spark. The Gluten tasks fail with exit code 137 (OOM killed by OS). The most significant finding: Gluten shows ~64GB spill memory per task, while vanilla Spark only shows ~4GB - a 16x difference for identical workloads.
Questions
-
Why is Gluten's spill memory ~64GB while Spark's is ~4GB?
- Is this a metrics reporting issue, or is Gluten actually spilling significantly more data to memory?
- The 16x difference seems unusually large for the same workload
-
Is this a Sort/Spill issue rather than a Window function issue?
- Both implementations perform Sort before Window in the operator sequence
- We suspect the root cause is in the sort and spilling mechanism rather than the window function itself ([VL] UnboundedWindowFunctionFrame OOM when partition is large #10500)
- Is this assessment correct?
-
What are the potential root causes and recommended workarounds?
- Are there known issues with Gluten's spilling mechanism under memory pressure?
- What configuration tuning would you recommend (e.g., off-heap vs on-heap allocation, shuffle/spill settings)?
- Should we adjust
spark.sql.shuffle.partitionsor other memory-related parameters?
Any insights into why Gluten exhibits such different spilling behavior would be greatly appreciated.
Key Observations
Spill Memory Discrepancy
The most significant difference we observed is in spill memory metrics:
- Gluten: Spill (Memory) ~64GB per task
- Vanilla Spark: Spill (Memory) ~4GB per task
This represents approximately 16x higher memory spilling in Gluten compared to vanilla Spark for the same workload.
Stage Operators Comparison
Gluten Operators (Failed)
(189) ShuffleQueryStage
(190) InputAdapter
(191) InputIteratorTransformer
(192) SortExecTransformer
Arguments: [transid#1096 ASC NULLS FIRST, dt#1191 DESC NULLS LAST, txn_date#62 DESC NULLS LAST, ts#1177 DESC NULLS LAST]
(193) WindowExecTransformer
Arguments: [row_number() windowspecdefinition(transid#1096, dt#1191 DESC NULLS LAST, txn_date#62 DESC NULLS LAST, ts#1177 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#176]
(194) FilterExecTransformer
(195) ProjectExecTransformer
(196) WholeStageCodegenTransformer (37)
(197) VeloxResizeBatches
(198) ColumnarExchange
Vanilla Spark Operators (Successful)
(119) ReusedExchange [Reuses operator id: 14]
(120) ShuffleQueryStage
(121) Sort [codegen id : 33]
Arguments: [transid#1096 ASC NULLS FIRST, dt#1191 DESC NULLS LAST, txn_date#62 DESC NULLS LAST, ts#1177 DESC NULLS LAST]
(122) Window
Arguments: [row_number() windowspecdefinition(transid#1096, dt#1191 DESC NULLS LAST, txn_date#62 DESC NULLS LAST, ts#1177 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#176]
(123) Filter [codegen id : 34]
Condition: (rank#176 = 1)
(124) Project [codegen id : 34]
(125) Exchange
Metrics Comparison
Gluten - Sample Successful Tasks (before failure)
| Index | Task ID | Duration | GC Time | Peak Execution Memory | Shuffle Write Size/Records | Shuffle Read Size/Records | Spill (Memory) | Spill (Disk) | Shuffle Remote Reads |
|---|---|---|---|---|---|---|---|---|---|
| 5 | 345199 | 4.9 min | 2 s | 3.2 GiB | 366.1 MiB / 20,684,642 | 703.5 MiB / 39,438,747 | 62.8 GiB | 1.2 GiB | 670.2 MiB |
| 6 | 345201 | 3.8 min | 3 s | 3.2 GiB | 366 MiB / 20,681,664 | 703.4 MiB / 39,431,462 | 62.8 GiB | 1.2 GiB | 662 MiB |
| 7 | 345203 | 5.3 min | 4 s | 3.2 GiB | 366 MiB / 20,676,880 | 703.2 MiB / 39,421,370 | 62.8 GiB | 1.2 GiB | 677.9 MiB |
Gluten - Summary Metrics for 47 Completed Tasks
| Metric | Min | 25th percentile | Median | 75th percentile | Max |
|---|---|---|---|---|---|
| Duration | 3.5 min | 5.2 min | 6.0 min | 6.6 min | 8.0 min |
| GC Time | 0.1 s | 1 s | 3 s | 4 s | 4 s |
| Peak Execution Memory | 2.6 GiB | 2.6 GiB | 3.2 GiB | 3.2 GiB | 3.3 GiB |
| Spill (Memory) | 62.7 GiB | 62.8 GiB | 62.8 GiB | 62.9 GiB | 62.9 GiB |
| Spill (Disk) | 1.2 GiB | 1.2 GiB | 1.2 GiB | 1.2 GiB | 1.2 GiB |
| Shuffle Read Size/Records | 703.2 MiB / 39,421,370 | 703.4 MiB / 39,432,615 | 703.5 MiB / 39,437,423 | 703.6 MiB / 39,443,119 | 703.9 MiB / 39,460,186 |
| Shuffle Write Size/Records | 366 MiB / 20,676,880 | 366 MiB / 20,682,012 | 366.1 MiB / 20,684,419 | 366.1 MiB / 20,686,668 | 366.3 MiB / 20,695,719 |
Vanilla Spark - Sample Successful Tasks
| Index | Task ID | Duration | GC Time | Peak Execution Memory | Shuffle Write Size/Records | Shuffle Read Size/Records | Spill (Memory) | Spill (Disk) | Shuffle Remote Reads |
|---|---|---|---|---|---|---|---|---|---|
| 5 | 341338 | 3.6 min | 3 s | 4 GiB | 596.8 MiB / 20,555,139 | 1.2 GiB / 39,533,494 | 5.5 GiB | 1.1 GiB | 1.1 GiB |
| 6 | 341340 | 3.8 min | 7 s | 3.6 GiB | 596.7 MiB / 20,552,224 | 1.2 GiB / 39,525,019 | 5.8 GiB | 1.2 GiB | 1.1 GiB |
| 7 | 341342 | 3.4 min | 6 s | 3.7 GiB | 596.6 MiB / 20,547,334 | 1.2 GiB / 39,515,478 | 2.9 GiB | 615.7 MiB | 1.1 GiB |
Vanilla Spark - Summary Metrics for 200 Completed Tasks
| Metric | Min | 25th percentile | Median | 75th percentile | Max |
|---|---|---|---|---|---|
| Duration | 3.0 min | 3.2 min | 3.3 min | 3.5 min | 4.4 min |
| GC Time | 2 s | 3 s | 5 s | 7 s | 9 s |
| Peak Execution Memory | 3 GiB | 3.4 GiB | 3.6 GiB | 4 GiB | 6.8 GiB |
| Spill (Memory) | 2.8 GiB | 3.4 GiB | 3.7 GiB | 5.2 GiB | 6.1 GiB |
| Spill (Disk) | 594.8 MiB | 694.2 MiB | 747.8 MiB | 1.1 GiB | 1.2 GiB |
| Shuffle Read Size/Records | 1.2 GiB / 39,511,291 | 1.2 GiB / 39,526,316 | 1.2 GiB / 39,532,177 | 1.2 GiB / 39,538,317 | 1.2 GiB / 39,553,345 |
| Shuffle Write Size/Records | 596.5 MiB / 20,543,879 | 596.7 MiB / 20,552,224 | 596.8 MiB / 20,555,023 | 596.9 MiB / 20,558,078 | 597.1 MiB / 20,566,005 |
Configuration
Gluten Configuration
Executor Reqs:
memoryOverhead: 8192 MB
cores: 15
memory: 20725 MB
offHeap: 60375 MB
Task Reqs:
cpus: 1.0
spark.sql.shuffle.partitions: 200 (default)
Spark Configuration
Executor Reqs:
memoryOverhead: 8192 MB
cores: 15
memory: 81920 MB
offHeap: 0 MB
Task Reqs:
cpus: 1.0
spark.sql.shuffle.partitions: 200 (default)
Note: Total memory is similar (~90GB for Spark, ~89GB for Gluten), but allocated differently between on-heap and off-heap.
Additional Context
The workload involves a window function with row_number() partitioned by transid and ordered by multiple columns (dt DESC, txn_date DESC, ts DESC), followed by filtering for rank = 1 (deduplication pattern).
Gluten
(119) ReusedExchange [Reuses operator id: 14]
Output [8]: [transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63, dt#1191, ts#1177]
(120) ShuffleQueryStage
Output [8]: [transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63, dt#1191, ts#1177]
Arguments: 10
(121) Sort [codegen id : 33]
Input [8]: [transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63, dt#1191, ts#1177]
Arguments: [transid#1096 ASC NULLS FIRST, dt#1191 DESC NULLS LAST, txn_date#62 DESC NULLS LAST, ts#1177 DESC NULLS LAST], false, 0
(122) Window
Input [8]: [transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63, dt#1191, ts#1177]
Arguments: [row_number() windowspecdefinition(transid#1096, dt#1191 DESC NULLS LAST, txn_date#62 DESC NULLS LAST, ts#1177 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#176], [transid#1096], [dt#1191 DESC NULLS LAST, txn_date#62 DESC NULLS LAST, ts#1177 DESC NULLS LAST]
(123) Filter [codegen id : 34]
Input [9]: [transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63, dt#1191, ts#1177, rank#176]
Condition : (rank#176 = 1)
(124) Project [codegen id : 34]
Output [6]: [transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63]
Input [9]: [transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63, dt#1191, ts#1177, rank#176]
(125) Exchange
Input [6]: [transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63]
Arguments: hashpartitioning(cc_id#1117, 200), ENSURE_REQUIREMENTS, [plan_id=18453]
Vanilla Spark
(189) ShuffleQueryStage
Output [8]: [transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63, dt#1191, ts#1177]
Arguments: 11
(190) InputAdapter
Input [8]: [transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63, dt#1191, ts#1177]
(191) InputIteratorTransformer
Input [8]: [transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63, dt#1191, ts#1177]
(192) SortExecTransformer
Input [8]: [transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63, dt#1191, ts#1177]
Arguments: [transid#1096 ASC NULLS FIRST, dt#1191 DESC NULLS LAST, txn_date#62 DESC NULLS LAST, ts#1177 DESC NULLS LAST], false, 0
(193) WindowExecTransformer
Input [8]: [transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63, dt#1191, ts#1177]
Arguments: [row_number() windowspecdefinition(transid#1096, dt#1191 DESC NULLS LAST, txn_date#62 DESC NULLS LAST, ts#1177 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#176], [transid#1096], [dt#1191 DESC NULLS LAST, txn_date#62 DESC NULLS LAST, ts#1177 DESC NULLS LAST]
(194) FilterExecTransformer
Input [9]: [transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63, dt#1191, ts#1177, rank#176]
Arguments: (rank#176 = 1)
(195) ProjectExecTransformer
Output [7]: [hash(cc_id#1117, 42) AS hash_partition_key#3916, transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63]
Input [9]: [transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63, dt#1191, ts#1177, rank#176]
(196) WholeStageCodegenTransformer (37)
Input [7]: [hash_partition_key#3916, transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63]
Arguments: false
(197) VeloxResizeBatches
Input [7]: [hash_partition_key#3916, transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63]
Arguments: 1024, 2147483647
(198) ColumnarExchange
Input [7]: [hash_partition_key#3916, transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63]
Arguments: hashpartitioning(cc_id#1117, 200), ENSURE_REQUIREMENTS, [transid#1096, txn_date#62, trans_type#1101, trans_status#1111, cc_id#1117, realtime_processor#63], [plan_id=20642], [shuffle_writer_type=hash]
Gluten version
Gluten-1.3
Spark version
Spark-3.5.x
Spark configurations
No response
System information
No response