fix: Honor SparkSession overrides for rebase mode and timezone in compaction tasks#18675
Conversation
…on tasks
When MOR compaction runs outside a Spark SQL execution context (e.g. a
standalone CompactTask runner), `SQLConf.get` on the executor task thread
returns a fresh fallback `SQLConf` with default values, not the user's
SparkSession overrides. As a result, `Spark{3_3,3_4,3_5,4_0}Adapter
.getDateTimeRebaseMode()` resolved to `EXCEPTION` even when the user had
set `spark.sql.parquet.datetimeRebaseModeInWrite=LEGACY`, producing
`SparkUpgradeException [INCONSISTENT_BEHAVIOR_CROSS_VERSION
.WRITE_ANCIENT_DATETIME]` during compaction of MOR tables containing
pre-1900 timestamps. The same gap affected
`HoodieRowParquetWriteSupport.init()`'s `sessionLocalTimeZone` read.
Adapter and WriteSupport now resolve the value in this order:
1. SQLConf override (so `spark.conf.set(...)` on the SparkSession takes
effect on the driver and inside SQL execution contexts).
2. SparkConf via SparkEnv.get.conf (broadcast to every executor at
startup, so user-set keys are honored on executor tasks running
outside a SQL execution context).
3. The ConfigEntry's own default (or SQLConf.sessionLocalTimeZone for
the timezone helper).
Adds TestSparkAdapterRebaseModePropagation (3 methods) covering rebase
mode and timezone propagation into vanilla parallelize().map() task
closures. Each test fails without the fix.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR adds a SparkConf fallback for rebase-mode and session timezone reads so they survive Spark's missing SQLConf propagation in non-SQL-execution executor tasks (e.g., compaction dispatched via parallelize().map()). The 3-step resolution looks correct, the new helper is exposed for testing in a clean way, and the tests are well-constructed (the JVM-default-vs-custom timezone trick is a nice touch). One thing worth double-checking is whether Spark4_1Adapter.getDateTimeRebaseMode() should be in this fix as well — see the inline comment. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here. A single minor Scala idiom nit — otherwise the code is clean and well-documented.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18675 +/- ##
=========================================
Coverage 68.07% 68.08%
- Complexity 28916 28937 +21
=========================================
Files 2519 2519
Lines 140611 140645 +34
Branches 17423 17426 +3
=========================================
+ Hits 95726 95754 +28
- Misses 37026 37031 +5
- Partials 7859 7860 +1
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
When MOR compaction runs outside a Spark SQL execution context (for example, a standalone compaction runner that dispatches per-file-group work via
JavaSparkContext.parallelize(...).map(...)),SQLConf.geton the executor task thread returns a fresh fallbackSQLConfwith default values rather than the user's SparkSession overrides. As a result,Spark{3_3,3_4,3_5,4_0}Adapter.getDateTimeRebaseMode()resolves toEXCEPTIONeven when the user has setspark.sql.parquet.datetimeRebaseModeInWrite=LEGACY, producingSparkUpgradeException [INCONSISTENT_BEHAVIOR_CROSS_VERSION.WRITE_ANCIENT_DATETIME]during compaction of MOR tables containing pre-1900 timestamps (a common Oracle DATE sentinel pattern). The same gap affectsHoodieRowParquetWriteSupport.init()'ssessionLocalTimeZoneread when LEGACY rebase metadata is being recorded.Summary and Changelog
This change resolves the value in three steps:
spark.conf.set(...)on the SparkSession (and any task thread that IS inside a Spark SQL execution context, where Spark already propagated SQLConf for us).SparkEnv.get.conf— catches values set in SparkConf at startup. SparkConf is broadcast to every executor, so user overrides reach tasks dispatched outside any SQL execution context.ConfigEntry's own default (orSQLConf.sessionLocalTimeZonefor the timezone helper).The inline-compaction-via-
df.writepath was already correct because Spark's SQL execution wrapper had already populated the task thread's SQLConf. No call-site changes; the fix is localized to the adapter read and a small helper inHoodieRowParquetWriteSupport.Spark{3_3,3_4,3_5,4_0}Adapter.getDateTimeRebaseMode(): SQLConf → SparkConf → ConfigEntry default.HoodieRowParquetWriteSupport: extracted Parquet metadata key strings as named constants; newresolveSessionLocalTimeZone()helper applies the same resolution order; called frominit()when LEGACY rebase metadata is being recorded.New tests:
TestSparkAdapterRebaseModePropagation(3 methods)parallelize().map(),SparkEnv.get.conf(mechanism check),sessionLocalTimeZonereaches executor task threads.Each test fails without the fix.
Impact
User-facing: users who set
spark.sql.parquet.datetimeRebaseModeInWrite(orspark.sql.session.timeZone) on the SparkSession — at startup via SparkConf or at runtime viaspark.conf.set(...)— will have the override honored by Hudi compaction tasks dispatched outside a Spark SQL execution context. No behavior change for callers already inside a SQL execution context, and no change to default behavior when no override is set.Risk Level
Low. The SparkConf fallback only fires when
SQLConf.getdoes not have the key set; SparkConf is the canonical broadcast source for cluster-wide configuration. No call-site or API changes.Documentation Update
None required — fix preserves the documented behavior of the underlying Spark configs.
Contributor's checklist