From 5f86fb6ab56333ae1d21cdbcee61e6d244d10ec6 Mon Sep 17 00:00:00 2001
From: Xinli Shang
Date: Fri, 27 Mar 2026 11:37:39 -0700
Subject: [PATCH 01/12] feat: Add Spark streamer validators for phase 3
precommit validation
Implements phase 3 of the precommit validation framework by adding:
- SparkKafkaOffsetValidator: Validates Kafka offset consistency
- SparkValidationContext: Provides Spark-specific validation context
- SparkStreamerValidatorUtils: Utility functions for Spark streamer validation
- Comprehensive test coverage for all validator components
- Integration with StreamSync and HoodiePreCommitValidatorConfig
Co-Authored-By: Claude Opus 4.6
---
bootstrap_register_only_issue.md | 170 +++++++++
.../HoodiePreCommitValidatorConfig.java | 8 +-
.../hudi/utilities/streamer/StreamSync.java | 5 +
.../validator/SparkKafkaOffsetValidator.java | 57 ++++
.../SparkStreamerValidatorUtils.java | 172 ++++++++++
.../validator/SparkValidationContext.java | 137 ++++++++
.../TestSparkKafkaOffsetValidator.java | 322 ++++++++++++++++++
.../TestSparkStreamerValidatorUtils.java | 237 +++++++++++++
.../validator/TestSparkValidationContext.java | 156 +++++++++
9 files changed, 1262 insertions(+), 2 deletions(-)
create mode 100644 bootstrap_register_only_issue.md
create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkKafkaOffsetValidator.java
create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java
create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java
create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkKafkaOffsetValidator.java
create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkStreamerValidatorUtils.java
create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkValidationContext.java
diff --git a/bootstrap_register_only_issue.md b/bootstrap_register_only_issue.md
new file mode 100644
index 0000000000000..52914b1bd7ae9
--- /dev/null
+++ b/bootstrap_register_only_issue.md
@@ -0,0 +1,170 @@
+### Feature Description
+
+**What the feature achieves:**
+Adds a `REGISTER_ONLY` bootstrap mode that allows Hudi to register existing partitions and their file listings without reading file contents or creating skeleton files. At query time, Hudi natively reads these partitions as plain Parquet, ensuring `SELECT * FROM table` returns complete results across all tiers — no view wrappers or query changes needed. This enables a three-tier bootstrap strategy for onboarding large Hive tables where historical data resides in cold storage (e.g., S3 Glacier, Azure Archive).
+
+**Why this feature is needed:**
+Problem: Organizations migrating large Hive tables to Hudi often have a tiered storage layout:
+- Recent data (e.g., last 30 days) in hot/standard storage — should be fully rewritten into Hudi
+- Warm data (e.g., 30 days to 1 year) in standard storage — suitable for METADATA_ONLY bootstrap
+- Cold data (e.g., older than 1 year) in archival/cold storage (S3 Glacier, etc.) — cannot be read without expensive retrieval
+
+Current gaps:
+- Bootstrap requires every discovered partition to be either `FULL_RECORD` or `METADATA_ONLY` (enforced by `checkArgument` in `SparkBootstrapCommitActionExecutor.java:292-293`)
+- Both modes require reading file contents: `FULL_RECORD` rewrites all data, `METADATA_ONLY` reads every record to extract record keys
+- For cold storage, reading file contents triggers data retrieval (e.g., restore from Glacier), which is expensive, slow, and often impractical for terabytes of archival data
+- If users bootstrap only recent partitions and skip cold ones entirely, Hudi queries that span into the cold date range silently return incomplete results — **silent data loss**
+- The bootstrap epic ([#14665](https://github.com/apache/hudi/issues/14665)) describes "Onboard for new partitions alone" but there is no implementation that safely handles query completeness for skipped partitions
+
+Real scenario:
+- A Hive table has 3 years of daily partitions (~1,095 partitions)
+- Only the last year of data is in standard storage; older data is in S3 Glacier
+- User wants to onboard to Hudi but cannot afford to restore 2+ years of Glacier data just to extract record keys
+- With today's bootstrap, the user must either: (a) pay the Glacier retrieval cost for all cold data, or (b) skip old partitions and risk silent data loss on queries
+
+### User Experience
+
+**How users will use this feature:**
+
+Configuration:
+```properties
+# Use the date-based 3-tier selector
+hoodie.bootstrap.mode.selector=org.apache.hudi.client.bootstrap.selector.DateBasedBootstrapModeSelector
+
+# Partitions newer than 30 days → FULL_RECORD (full rewrite into Hudi)
+hoodie.bootstrap.mode.selector.days.full_record=30
+
+# Partitions between 30 and 365 days → METADATA_ONLY (skeleton files, read warm storage for record keys)
+hoodie.bootstrap.mode.selector.days.metadata_only=365
+
+# Partitions older than 365 days → REGISTER_ONLY (no file content reading at all)
+# (implicit: anything older than metadata_only threshold)
+
+# Partition date format (to parse partition paths like datestr=2024-01-15)
+hoodie.bootstrap.mode.selector.partition.date.format=yyyy-MM-dd
+hoodie.bootstrap.mode.selector.partition.date.field=datestr
+```
+
+Usage Example — Spark bootstrap:
+```scala
+spark.emptyDataFrame.write
+ .format("hudi")
+ .option("hoodie.bootstrap.base.path", "/data/hive_table")
+ .option("hoodie.table.name", "my_hudi_table")
+ .option("hoodie.datasource.write.operation", "bootstrap")
+ .option("hoodie.bootstrap.mode.selector",
+ "org.apache.hudi.client.bootstrap.selector.DateBasedBootstrapModeSelector")
+ .option("hoodie.bootstrap.mode.selector.days.full_record", "30")
+ .option("hoodie.bootstrap.mode.selector.days.metadata_only", "365")
+ .option("hoodie.bootstrap.mode.selector.partition.date.format", "yyyy-MM-dd")
+ .option("hoodie.bootstrap.mode.selector.partition.date.field", "datestr")
+ .mode(SaveMode.Overwrite)
+ .save("/data/my_hudi_table")
+```
+
+Query behavior — **no query changes needed**:
+```sql
+-- This returns ALL data: hot (FULL_RECORD) + warm (METADATA_ONLY) + cold (REGISTER_ONLY)
+-- No views, no UNION ALL, no special syntax
+SELECT * FROM my_hudi_table;
+
+-- Partition filtering works as expected
+SELECT * FROM my_hudi_table WHERE datestr >= '2024-01-01';
+
+-- Cold partition queries work, just read as plain Parquet (may have different performance)
+SELECT * FROM my_hudi_table WHERE datestr = '2022-06-15';
+```
+
+Performance characteristics by tier:
+
+| Tier | Bootstrap cost | Query performance | Hudi meta columns |
+|------|---------------|-------------------|-------------------|
+| FULL_RECORD (hot) | Full rewrite | Best — native Hudi file | All populated |
+| METADATA_ONLY (warm) | Read for record keys | Moderate — skeleton stitching at read time | All populated |
+| REGISTER_ONLY (cold) | File listing only (no content read) | Same as plain Parquet | Returned as null |
+
+Write guardrails:
+- Upserts/deletes targeting REGISTER_ONLY partitions fail fast with a clear error message
+- This is expected: without record keys, Hudi cannot index or merge records in these partitions
+- If cold data is later restored to warm/hot storage, partitions can be "promoted" via re-bootstrap
+
+API Changes:
+
+New public APIs:
+```java
+// New enum value in BootstrapMode
+public enum BootstrapMode {
+ FULL_RECORD,
+ METADATA_ONLY,
+ REGISTER_ONLY // NEW: register partition file listing without reading contents
+}
+
+// New selector
+public class DateBasedBootstrapModeSelector extends BootstrapModeSelector {
+ @Override
+ public Map> select(
+ List>> partitions);
+}
+```
+
+### Hudi RFC Requirements
+
+**RFC PR link:** (if applicable)
+
+Why RFC is needed:
+- Does this change public interfaces/APIs? **Yes**
+ - New `REGISTER_ONLY` value in `BootstrapMode` enum
+ - New `DateBasedBootstrapModeSelector` class
+ - Read path changes for bootstrapped tables to natively serve plain Parquet files without Hudi meta columns
+ - New table property to indicate presence of REGISTER_ONLY partitions
+
+- Does this change storage format? **Minor**
+ - Bootstrap commit metadata will include REGISTER_ONLY partition entries (file listings without skeleton files)
+ - No new file formats; REGISTER_ONLY partitions reference original source Parquet files as-is
+ - Backward compatible: tables without REGISTER_ONLY partitions are unaffected
+
+Justification:
+- Extends the bootstrap mode enum (affects selectors, executor, read path)
+- Read path changes require handling files without Hudi meta columns within a Hudi table
+- Needs design review for query completeness and schema merging guarantees
+- Affects how Hudi defines table boundaries (a Hudi table now includes "unmanaged" partitions)
+
+### Task Breakdown
+
+**Phase 1: Core Bootstrap Changes (write path)**
+- Add `REGISTER_ONLY` to `BootstrapMode` enum (`BootstrapMode.java`)
+- Create `DateBasedBootstrapModeSelector` with configurable date thresholds and partition date parsing
+- Add config properties to `HoodieBootstrapConfig.java` for date-based tier boundaries
+- Modify `SparkBootstrapCommitActionExecutor` to handle 3 modes:
+ - Relax `checkArgument` validation (line 292-293) to accept `REGISTER_ONLY` partitions
+ - For `REGISTER_ONLY`: record partition file listings in commit metadata without reading file contents or creating skeleton files
+ - Skip bootstrap index entries for `REGISTER_ONLY` partitions (`HFileBootstrapIndex`)
+- Add table property `hoodie.bootstrap.has.register.only.partitions`
+- Unit tests for selector and executor changes
+
+**Phase 2: Read Path — Native Query Completeness (critical)**
+- Modify `HoodieBootstrapRelation` (Spark) to handle REGISTER_ONLY partitions:
+ - When a base file has no bootstrap index entry AND its schema has no Hudi meta columns → read as plain Parquet
+ - Return null for Hudi meta columns (`_hoodie_record_key`, `_hoodie_commit_time`, `_hoodie_partition_path`, `_hoodie_file_name`, `_hoodie_commit_seqno`)
+- Handle schema merging: queries spanning multiple tiers must produce a unified schema where Hudi meta columns are nullable
+- Ensure partition pruning works correctly for REGISTER_ONLY partitions
+- Integration tests verifying:
+ - `SELECT *` across all 3 tiers returns complete results
+ - `SELECT * WHERE partition_col = ` returns correct data
+ - `SELECT * WHERE partition_col = ` performance is unaffected
+ - Hudi meta columns are null for REGISTER_ONLY rows, populated for others
+
+**Phase 3: Write Path Guardrails**
+- Fail fast when upsert/delete targets a REGISTER_ONLY partition with a clear error message:
+ `"Cannot upsert/delete in REGISTER_ONLY bootstrap partition [datestr=2022-06-15]. Re-bootstrap with FULL_RECORD or METADATA_ONLY mode to enable writes."`
+- Allow insert-overwrite to "promote" a REGISTER_ONLY partition to a regular Hudi partition (optional, future enhancement)
+
+**Phase 4: Tooling & Documentation** (optional, future)
+- CLI command to list partitions by bootstrap mode
+- CLI command to "promote" REGISTER_ONLY partitions to METADATA_ONLY or FULL_RECORD (when data is restored from cold storage)
+- Documentation and migration guide updates
+
+### Related Issues
+- [#14665](https://github.com/apache/hudi/issues/14665) — Efficient bootstrap and migration of existing non-Hudi dataset (parent epic)
+- [#15974](https://github.com/apache/hudi/issues/15974) — Treat full bootstrap table as regular table
+- [#15856](https://github.com/apache/hudi/issues/15856) — Precombine field is not required for metadata only bootstrap
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
index f85cc44120d4e..169494b7244ac 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
@@ -43,7 +43,10 @@ public class HoodiePreCommitValidatorConfig extends HoodieConfig {
.key("hoodie.precommit.validators")
.defaultValue("")
.markAdvanced()
- .withDocumentation("Comma separated list of class names that can be invoked to validate commit");
+ .withDocumentation("Comma separated list of class names that can be invoked to validate commit. "
+ + "Available streaming offset validators: "
+ + "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator (Flink Kafka), "
+ + "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator (Spark/HoodieStreamer Kafka)");
public static final String VALIDATOR_TABLE_VARIABLE = "";
public static final ConfigProperty EQUALITY_SQL_QUERIES = ConfigProperty
@@ -71,7 +74,8 @@ public class HoodiePreCommitValidatorConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Tolerance percentage for streaming offset validation "
+ "(used by org.apache.hudi.client.validator.StreamingOffsetValidator "
- + "and org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator). "
+ + "and org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator "
+ + "and org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator). "
+ "The validator compares the offset difference (expected records from source) "
+ "with actual records written. If the deviation exceeds this percentage, "
+ "the commit is rejected or warned depending on the validation failure policy. "
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 600891c85dff6..ef5e89fe806b3 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -115,6 +115,7 @@
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.streamer.HoodieStreamer.Config;
+import org.apache.hudi.utilities.streamer.validator.SparkStreamerValidatorUtils;
import org.apache.hudi.utilities.transform.Transformer;
import com.codahale.metrics.Timer;
@@ -874,6 +875,10 @@ private Pair
+ *
+ *
This validator is primarily intended for append-only ingestion from Kafka via HoodieStreamer.
+ * For upsert workloads with deduplication, configure a higher tolerance or use WARN_LOG.
+ */
+public class SparkKafkaOffsetValidator extends StreamingOffsetValidator {
+
+ /**
+ * Create a Spark Kafka offset validator.
+ *
+ * @param config Validator configuration
+ */
+ public SparkKafkaOffsetValidator(TypedProperties config) {
+ super(config, StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, CheckpointFormat.SPARK_KAFKA);
+ }
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java
new file mode 100644
index 0000000000000..42275f1187a35
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the HoodieStreamer commit flow.
+ *
+ *
Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.
+ *
+ *
Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before
+ * the commit is finalized.
+ */
+public class SparkStreamerValidatorUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkStreamerValidatorUtils.class);
+
+ /**
+ * Run all configured pre-commit validators.
+ *
+ * @param props Configuration properties containing validator class names
+ * @param instant Commit instant time
+ * @param writeStatusRDD Write statuses from Spark write operations
+ * @param checkpointCommitMetadata Extra metadata being committed (contains checkpoint info)
+ * @param metaClient Table meta client for timeline access and previous commit lookup
+ * @throws HoodieValidationException if any validator fails with FAIL policy
+ */
+ public static void runValidators(TypedProperties props,
+ String instant,
+ JavaRDD writeStatusRDD,
+ Map checkpointCommitMetadata,
+ HoodieTableMetaClient metaClient) {
+ String validatorClassNames = props.getString(
+ HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+ HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());
+
+ if (StringUtils.isNullOrEmpty(validatorClassNames)) {
+ return;
+ }
+
+ // Collect write statuses and build context
+ List allWriteStatus = writeStatusRDD.collect();
+ HoodieCommitMetadata currentMetadata = buildCommitMetadata(allWriteStatus, checkpointCommitMetadata);
+ List writeStats = allWriteStatus.stream()
+ .map(WriteStatus::getStat)
+ .collect(Collectors.toList());
+
+ // Load previous commit metadata from timeline
+ Option previousCommitMetadata = loadPreviousCommitMetadata(metaClient);
+
+ ValidationContext context = new SparkValidationContext(
+ instant,
+ Option.of(currentMetadata),
+ Option.of(writeStats),
+ previousCommitMetadata,
+ metaClient);
+
+ // Instantiate and run each validator
+ List classNames = Arrays.stream(validatorClassNames.split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList());
+
+ for (String className : classNames) {
+ try {
+ BasePreCommitValidator validator = (BasePreCommitValidator)
+ ReflectionUtils.loadClass(className, new Class>[] {TypedProperties.class}, props);
+ LOG.info("Running pre-commit validator: {} for instant: {}", className, instant);
+ validator.validateWithMetadata(context);
+ LOG.info("Pre-commit validator {} passed for instant: {}", className, instant);
+ } catch (HoodieValidationException e) {
+ LOG.error("Pre-commit validator {} failed for instant: {}", className, instant, e);
+ throw e;
+ } catch (Exception e) {
+ LOG.error("Failed to instantiate or run validator: {}", className, e);
+ throw new HoodieValidationException(
+ "Failed to run pre-commit validator: " + className, e);
+ }
+ }
+ }
+
+ /**
+ * Build HoodieCommitMetadata from write statuses and extra metadata.
+ * This constructs the metadata object that would be committed, giving
+ * validators access to the same data.
+ */
+ private static HoodieCommitMetadata buildCommitMetadata(
+ List writeStatuses, Map extraMetadata) {
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+
+ // Add write stats
+ for (WriteStatus status : writeStatuses) {
+ HoodieWriteStat stat = status.getStat();
+ if (stat != null) {
+ metadata.addWriteStat(stat.getPartitionPath(), stat);
+ }
+ }
+
+ // Add extra metadata (includes checkpoint info like deltastreamer.checkpoint.key)
+ if (extraMetadata != null) {
+ extraMetadata.forEach(metadata::addMetadata);
+ }
+
+ return metadata;
+ }
+
+ /**
+ * Load the previous completed commit metadata from the timeline.
+ */
+ private static Option loadPreviousCommitMetadata(HoodieTableMetaClient metaClient) {
+ try {
+ HoodieTimeline completedTimeline = metaClient.reloadActiveTimeline()
+ .getWriteTimeline()
+ .filterCompletedInstants();
+ Option lastInstant = completedTimeline.lastInstant();
+ if (lastInstant.isPresent()) {
+ return Option.of(completedTimeline.readCommitMetadata(lastInstant.get()));
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to load previous commit metadata, skipping previous commit comparison", e);
+ }
+ return Option.empty();
+ }
+
+ private SparkStreamerValidatorUtils() {
+ // Utility class
+ }
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java
new file mode 100644
index 0000000000000..d8f845bd19789
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+
+import java.util.List;
+
+/**
+ * Spark/HoodieStreamer implementation of {@link ValidationContext}.
+ *
+ *
Constructed from data available in {@code StreamSync.writeToSinkAndDoMetaSync()}
+ * before the commit is finalized. Provides validators with access to commit metadata,
+ * write statistics, and previous commit information for streaming offset validation.
+ *
+ *
Unlike Flink's implementation, Spark can optionally provide active timeline access
+ * via {@link HoodieTableMetaClient} for richer validation patterns.
+ */
+public class SparkValidationContext implements ValidationContext {
+
+ private final String instantTime;
+ private final Option commitMetadata;
+ private final Option> writeStats;
+ private final Option previousCommitMetadata;
+ private final HoodieTableMetaClient metaClient;
+
+ /**
+ * Create a Spark validation context with full timeline access.
+ *
+ * @param instantTime Current commit instant time
+ * @param commitMetadata Current commit metadata (with extraMetadata including checkpoints)
+ * @param writeStats Write statistics from write operations
+ * @param previousCommitMetadata Metadata from the previous completed commit
+ * @param metaClient Table meta client for timeline access (may be null for testing)
+ */
+ public SparkValidationContext(String instantTime,
+ Option commitMetadata,
+ Option> writeStats,
+ Option previousCommitMetadata,
+ HoodieTableMetaClient metaClient) {
+ this.instantTime = instantTime;
+ this.commitMetadata = commitMetadata;
+ this.writeStats = writeStats;
+ this.previousCommitMetadata = previousCommitMetadata;
+ this.metaClient = metaClient;
+ }
+
+ /**
+ * Create a Spark validation context without timeline access (for testing).
+ *
+ * @param instantTime Current commit instant time
+ * @param commitMetadata Current commit metadata (with extraMetadata including checkpoints)
+ * @param writeStats Write statistics from write operations
+ * @param previousCommitMetadata Metadata from the previous completed commit
+ */
+ public SparkValidationContext(String instantTime,
+ Option commitMetadata,
+ Option> writeStats,
+ Option previousCommitMetadata) {
+ this(instantTime, commitMetadata, writeStats, previousCommitMetadata, null);
+ }
+
+ @Override
+ public String getInstantTime() {
+ return instantTime;
+ }
+
+ @Override
+ public Option getCommitMetadata() {
+ return commitMetadata;
+ }
+
+ @Override
+ public Option> getWriteStats() {
+ return writeStats;
+ }
+
+ /**
+ * Get the active timeline. Available when metaClient is provided.
+ *
+ * @throws UnsupportedOperationException if metaClient was not provided
+ */
+ @Override
+ public HoodieActiveTimeline getActiveTimeline() {
+ if (metaClient == null) {
+ throw new UnsupportedOperationException(
+ "Active timeline is not available without HoodieTableMetaClient.");
+ }
+ return metaClient.getActiveTimeline();
+ }
+
+ /**
+ * Not directly supported. Use {@link #isFirstCommit()} or
+ * {@link #getPreviousCommitMetadata()} instead.
+ *
+ * @throws UnsupportedOperationException always
+ */
+ @Override
+ public Option getPreviousCommitInstant() {
+ throw new UnsupportedOperationException(
+ "getPreviousCommitInstant() is not available in HoodieStreamer pre-commit validation context. "
+ + "Use isFirstCommit() or getPreviousCommitMetadata() instead.");
+ }
+
+ @Override
+ public boolean isFirstCommit() {
+ return !previousCommitMetadata.isPresent();
+ }
+
+ @Override
+ public Option getPreviousCommitMetadata() {
+ return previousCommitMetadata;
+ }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkKafkaOffsetValidator.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkKafkaOffsetValidator.java
new file mode 100644
index 0000000000000..d109aa3246f64
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkKafkaOffsetValidator.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Tests for {@link SparkKafkaOffsetValidator}.
+ */
+public class TestSparkKafkaOffsetValidator {
+
+ // ========== Helper methods ==========
+
+ private static TypedProperties defaultConfig() {
+ TypedProperties props = new TypedProperties();
+ props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(), "0.0");
+ props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), "FAIL");
+ return props;
+ }
+
+ private static TypedProperties configWithTolerance(double tolerance) {
+ TypedProperties props = defaultConfig();
+ props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
+ String.valueOf(tolerance));
+ return props;
+ }
+
+ private static TypedProperties configWithWarnPolicy() {
+ TypedProperties props = defaultConfig();
+ props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), "WARN_LOG");
+ return props;
+ }
+
+ /**
+ * Build a Spark Kafka checkpoint string.
+ * Format: topic,partition:offset,partition:offset,...
+ */
+ private static String buildSparkKafkaCheckpoint(String topic, int[] partitions, long[] offsets) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(topic);
+ for (int i = 0; i < partitions.length; i++) {
+ sb.append(",").append(partitions[i]).append(":").append(offsets[i]);
+ }
+ return sb.toString();
+ }
+
+ private static HoodieCommitMetadata buildMetadata(String checkpointValue) {
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ if (checkpointValue != null) {
+ metadata.addMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, checkpointValue);
+ }
+ return metadata;
+ }
+
+ private static List buildWriteStats(long numInserts, long numUpdates) {
+ HoodieWriteStat stat = new HoodieWriteStat();
+ stat.setNumInserts(numInserts);
+ stat.setNumUpdateWrites(numUpdates);
+ stat.setPartitionPath("partition1");
+ return Collections.singletonList(stat);
+ }
+
+ private static SparkValidationContext buildContext(
+ String instantTime,
+ HoodieCommitMetadata currentMetadata,
+ List writeStats,
+ HoodieCommitMetadata previousMetadata) {
+ return new SparkValidationContext(
+ instantTime,
+ Option.of(currentMetadata),
+ Option.of(writeStats),
+ previousMetadata != null ? Option.of(previousMetadata) : Option.empty());
+ }
+
+ // ========== Tests ==========
+
+ @Test
+ public void testExactMatchPasses() {
+ // Previous: partition 0 at offset 100, partition 1 at offset 200
+ // Current: partition 0 at offset 200, partition 1 at offset 300
+ // Diff = (200-100) + (300-200) = 200. Records written = 200.
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0, 1}, new long[]{100, 200});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0, 1}, new long[]{200, 300});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(200, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testDataLossDetected() {
+ // Diff = 1000 but only 500 records written -> 50% deviation
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{1000});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(500, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig());
+ assertThrows(HoodieValidationException.class, () -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testWithinTolerancePasses() {
+ // Diff = 1000, records = 950 -> 5% deviation, tolerance = 10%
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{1000});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(950, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(configWithTolerance(10.0));
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testWarnPolicyDoesNotThrow() {
+ // Data loss but WARN_LOG policy
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{1000});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(0, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(configWithWarnPolicy());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testSkipsFirstCommit() {
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{1000});
+
+ // No previous commit
+ SparkValidationContext ctx = new SparkValidationContext(
+ "20260320120000000",
+ Option.of(buildMetadata(currCheckpoint)),
+ Option.of(buildWriteStats(500, 0)),
+ Option.empty());
+
+ SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testSkipsWhenNoCheckpointKey() {
+ // Current metadata has no checkpoint key
+ HoodieCommitMetadata currentMeta = new HoodieCommitMetadata();
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{100});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ currentMeta,
+ buildWriteStats(500, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testMultiPartitionValidation() {
+ // 4 partitions, each advancing by 250 = total diff 1000
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events",
+ new int[]{0, 1, 2, 3}, new long[]{0, 0, 0, 0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events",
+ new int[]{0, 1, 2, 3}, new long[]{250, 250, 250, 250});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(800, 200), // 800 inserts + 200 updates = 1000
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testEmptyCommitSkipsValidation() {
+ // Both offsets same and no records written
+ String checkpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{100});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(checkpoint),
+ buildWriteStats(0, 0),
+ buildMetadata(checkpoint));
+
+ SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testPreviousCheckpointMissingSkipsValidation() {
+ // Previous metadata exists but has no checkpoint key
+ HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{1000});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(500, 0),
+ prevMeta);
+
+ SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testOvercountingDetected() {
+ // More records written than offset diff
+ // Diff = 100, records = 200 -> |100-200|/100 = 100% deviation
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{100});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(200, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig());
+ assertThrows(HoodieValidationException.class, () -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testExactToleranceBoundaryPasses() {
+ // Diff = 1000, records = 900 -> 10% deviation, tolerance = 10%
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{1000});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(900, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(configWithTolerance(10.0));
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testJustOverToleranceFails() {
+ // Diff = 1000, records = 899 -> 10.1% deviation, tolerance = 10%
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{1000});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(899, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(configWithTolerance(10.0));
+ assertThrows(HoodieValidationException.class, () -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testOnlyInsertsNoUpdates() {
+ // Pure insert workload
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0, 1}, new long[]{0, 0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0, 1}, new long[]{500, 500});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(1000, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testUpdatesCountedInRecordTotal() {
+ // Diff = 1000. 600 inserts + 400 updates = 1000 total
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{1000});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(600, 400),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkStreamerValidatorUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkStreamerValidatorUtils.java
new file mode 100644
index 0000000000000..1b20a1dcef268
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkStreamerValidatorUtils.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link SparkStreamerValidatorUtils}.
+ *
+ *
Uses a lightweight Spark context for JavaRDD creation. Tests validate the orchestration
+ * logic (class loading, config passing, error handling) using first-commit scenarios
+ * (no previous commit on timeline) to avoid needing a full HoodieTable setup.
+ */
+public class TestSparkStreamerValidatorUtils {
+
+ private static JavaSparkContext jsc;
+
+ @TempDir
+ Path tempDir;
+
+ @BeforeAll
+ public static void setUp() {
+ SparkConf conf = new SparkConf()
+ .setAppName("TestSparkStreamerValidatorUtils")
+ .setMaster("local[2]")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
+ jsc = new JavaSparkContext(conf);
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ if (jsc != null) {
+ jsc.stop();
+ jsc = null;
+ }
+ }
+
+ private static TypedProperties propsWithValidator(String validatorClassName) {
+ TypedProperties props = new TypedProperties();
+ props.setProperty(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), validatorClassName);
+ props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(), "0.0");
+ props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), "FAIL");
+ return props;
+ }
+
+ private static WriteStatus buildWriteStatus(String partitionPath, long numInserts, long numUpdates) {
+ HoodieWriteStat stat = new HoodieWriteStat();
+ stat.setPartitionPath(partitionPath);
+ stat.setNumInserts(numInserts);
+ stat.setNumUpdateWrites(numUpdates);
+
+ WriteStatus ws = new WriteStatus(false, 0.0);
+ ws.setStat(stat);
+ return ws;
+ }
+
+ private JavaRDD toRDD(List writeStatuses) {
+ return jsc.parallelize(writeStatuses);
+ }
+
+ private org.apache.hudi.common.table.HoodieTableMetaClient createMetaClient() throws IOException {
+ return org.apache.hudi.common.testutils.HoodieTestUtils.init(
+ tempDir.toAbsolutePath().toString());
+ }
+
+ // ========== Tests ==========
+
+ @Test
+ public void testNoValidatorsConfigured() throws IOException {
+ TypedProperties props = new TypedProperties();
+ List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", toRDD(writeStatuses),
+ new HashMap<>(), createMetaClient()));
+ }
+
+ @Test
+ public void testEmptyValidatorString() throws IOException {
+ TypedProperties props = new TypedProperties();
+ props.setProperty(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), "");
+ List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", toRDD(writeStatuses),
+ new HashMap<>(), createMetaClient()));
+ }
+
+ @Test
+ public void testValidValidatorFirstCommitPasses() throws IOException {
+ TypedProperties props = propsWithValidator(
+ "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+ List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 100, 0));
+ Map extraMeta = new HashMap<>();
+ extraMeta.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "events,0:100");
+
+ // First commit (no previous metadata on timeline) — validator should skip and pass
+ assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", toRDD(writeStatuses), extraMeta, createMetaClient()));
+ }
+
+ @Test
+ public void testInvalidValidatorClassThrows() throws IOException {
+ TypedProperties props = propsWithValidator("com.nonexistent.FakeValidator");
+ List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ assertThrows(HoodieValidationException.class,
+ () -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", toRDD(writeStatuses), new HashMap<>(), createMetaClient()));
+ }
+
+ @Test
+ public void testMultipleValidators() throws IOException {
+ TypedProperties props = propsWithValidator(
+ "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator,"
+ + "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+ List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 100, 0));
+ Map extraMeta = new HashMap<>();
+ extraMeta.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "events,0:100");
+
+ assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", toRDD(writeStatuses), extraMeta, createMetaClient()));
+ }
+
+ @Test
+ public void testValidatorWithWhitespaceInClassNames() throws IOException {
+ TypedProperties props = propsWithValidator(
+ " org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator , ");
+
+ List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", toRDD(writeStatuses), new HashMap<>(), createMetaClient()));
+ }
+
+ @Test
+ public void testNullExtraMetadataHandled() throws IOException {
+ TypedProperties props = propsWithValidator(
+ "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+ List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", toRDD(writeStatuses), null, createMetaClient()));
+ }
+
+ @Test
+ public void testMultipleWriteStatusesAggregated() throws IOException {
+ TypedProperties props = propsWithValidator(
+ "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+ List writeStatuses = new ArrayList<>();
+ writeStatuses.add(buildWriteStatus("p1", 60, 0));
+ writeStatuses.add(buildWriteStatus("p2", 40, 0));
+
+ Map extraMeta = new HashMap<>();
+ extraMeta.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "events,0:100");
+
+ assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", toRDD(writeStatuses), extraMeta, createMetaClient()));
+ }
+
+ @Test
+ public void testEmptyWriteStatuses() throws IOException {
+ TypedProperties props = propsWithValidator(
+ "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+ List writeStatuses = Collections.emptyList();
+ Map extraMeta = new HashMap<>();
+ extraMeta.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "events,0:100");
+
+ assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", toRDD(writeStatuses), extraMeta, createMetaClient()));
+ }
+
+ @Test
+ public void testValidationExceptionPreservedAcrossValidators() throws IOException {
+ TypedProperties props = propsWithValidator(
+ "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator,"
+ + "com.nonexistent.FakeValidator");
+
+ List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ HoodieValidationException ex = assertThrows(HoodieValidationException.class,
+ () -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", toRDD(writeStatuses), new HashMap<>(), createMetaClient()));
+ assertTrue(ex.getMessage().contains("FakeValidator"));
+ }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkValidationContext.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkValidationContext.java
new file mode 100644
index 0000000000000..7f94262e98c43
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkValidationContext.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link SparkValidationContext}.
+ */
+public class TestSparkValidationContext {
+
+ private static HoodieWriteStat buildStat(long inserts, long updates) {
+ HoodieWriteStat stat = new HoodieWriteStat();
+ stat.setNumInserts(inserts);
+ stat.setNumUpdateWrites(updates);
+ stat.setPartitionPath("partition1");
+ return stat;
+ }
+
+ @Test
+ public void testBasicProperties() {
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ metadata.addMetadata("key1", "value1");
+ List writeStats = Collections.singletonList(buildStat(100, 50));
+
+ SparkValidationContext ctx = new SparkValidationContext(
+ "20260320120000000",
+ Option.of(metadata),
+ Option.of(writeStats),
+ Option.empty());
+
+ assertEquals("20260320120000000", ctx.getInstantTime());
+ assertTrue(ctx.getCommitMetadata().isPresent());
+ assertTrue(ctx.getWriteStats().isPresent());
+ assertEquals(1, ctx.getWriteStats().get().size());
+ }
+
+ @Test
+ public void testRecordCounting() {
+ List writeStats = Arrays.asList(
+ buildStat(100, 50), // partition1: 100 inserts, 50 updates
+ buildStat(200, 30)); // partition2: 200 inserts, 30 updates
+
+ SparkValidationContext ctx = new SparkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.of(writeStats),
+ Option.empty());
+
+ assertEquals(300, ctx.getTotalInsertRecordsWritten());
+ assertEquals(80, ctx.getTotalUpdateRecordsWritten());
+ assertEquals(380, ctx.getTotalRecordsWritten());
+ }
+
+ @Test
+ public void testFirstCommitDetection() {
+ // No previous commit metadata -> first commit
+ SparkValidationContext ctx = new SparkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.of(Collections.emptyList()),
+ Option.empty());
+
+ assertTrue(ctx.isFirstCommit());
+ }
+
+ @Test
+ public void testNotFirstCommitWhenPreviousExists() {
+ HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+
+ SparkValidationContext ctx = new SparkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.of(Collections.emptyList()),
+ Option.of(prevMeta));
+
+ assertFalse(ctx.isFirstCommit());
+ }
+
+ @Test
+ public void testExtraMetadataAccess() {
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ metadata.addMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "events,0:1000");
+ metadata.addMetadata("custom.key", "custom_value");
+
+ SparkValidationContext ctx = new SparkValidationContext(
+ "20260320120000000",
+ Option.of(metadata),
+ Option.of(Collections.emptyList()),
+ Option.empty());
+
+ assertEquals("events,0:1000",
+ ctx.getExtraMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1).get());
+ assertEquals("custom_value", ctx.getExtraMetadata("custom.key").get());
+ assertFalse(ctx.getExtraMetadata("nonexistent.key").isPresent());
+ }
+
+ @Test
+ public void testPreviousCommitMetadataAccess() {
+ HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+ prevMeta.addMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "events,0:500");
+
+ SparkValidationContext ctx = new SparkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.of(Collections.emptyList()),
+ Option.of(prevMeta));
+
+ assertTrue(ctx.getPreviousCommitMetadata().isPresent());
+ assertEquals("events,0:500",
+ ctx.getPreviousCommitMetadata().get().getMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1));
+ }
+
+ @Test
+ public void testEmptyWriteStats() {
+ SparkValidationContext ctx = new SparkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.empty(),
+ Option.empty());
+
+ assertEquals(0, ctx.getTotalRecordsWritten());
+ assertEquals(0, ctx.getTotalInsertRecordsWritten());
+ assertEquals(0, ctx.getTotalUpdateRecordsWritten());
+ }
+}
From dd19d66b849313c211b4daffaff000564ade35b4 Mon Sep 17 00:00:00 2001
From: Xinli Shang
Date: Tue, 31 Mar 2026 07:50:36 -0700
Subject: [PATCH 02/12] fix: address code review and fix checkstyle violations
- Remove unused imports (java.io.IOException, HoodieCommitMetadata,
HoodieTestTable, Option) that caused checkstyle build failures
- Remove accidentally committed bootstrap_register_only_issue.md
- Cache writeStatusRDD before collect() to prevent second DAG evaluation
and potential driver OOM
- Add comment explaining why validator runs before writeClient.commit():
offset validation is a stronger guard than commitOnErrors and must
prevent the commit when data loss is detected
- Clarify buildCommitMetadata() produces a pre-commit preview object,
not a fully-constructed commit record
- Add Javadoc to SparkKafkaOffsetValidator and SparkStreamerValidatorUtils
explaining incompatibility with SparkValidatorUtils (different interface
and constructor signature) to prevent misconfiguration
- Add two-commit integration tests (testSecondCommitMatchingOffsetsPasses,
testSecondCommitDataLossDetected) using HoodieTestTable to exercise the
real offset comparison path, not just the first-commit skip path
---
bootstrap_register_only_issue.md | 170 ------------------
.../hudi/utilities/streamer/StreamSync.java | 6 +-
.../validator/SparkKafkaOffsetValidator.java | 7 +
.../SparkStreamerValidatorUtils.java | 22 ++-
.../TestSparkStreamerValidatorUtils.java | 50 +++++-
5 files changed, 75 insertions(+), 180 deletions(-)
delete mode 100644 bootstrap_register_only_issue.md
diff --git a/bootstrap_register_only_issue.md b/bootstrap_register_only_issue.md
deleted file mode 100644
index 52914b1bd7ae9..0000000000000
--- a/bootstrap_register_only_issue.md
+++ /dev/null
@@ -1,170 +0,0 @@
-### Feature Description
-
-**What the feature achieves:**
-Adds a `REGISTER_ONLY` bootstrap mode that allows Hudi to register existing partitions and their file listings without reading file contents or creating skeleton files. At query time, Hudi natively reads these partitions as plain Parquet, ensuring `SELECT * FROM table` returns complete results across all tiers — no view wrappers or query changes needed. This enables a three-tier bootstrap strategy for onboarding large Hive tables where historical data resides in cold storage (e.g., S3 Glacier, Azure Archive).
-
-**Why this feature is needed:**
-Problem: Organizations migrating large Hive tables to Hudi often have a tiered storage layout:
-- Recent data (e.g., last 30 days) in hot/standard storage — should be fully rewritten into Hudi
-- Warm data (e.g., 30 days to 1 year) in standard storage — suitable for METADATA_ONLY bootstrap
-- Cold data (e.g., older than 1 year) in archival/cold storage (S3 Glacier, etc.) — cannot be read without expensive retrieval
-
-Current gaps:
-- Bootstrap requires every discovered partition to be either `FULL_RECORD` or `METADATA_ONLY` (enforced by `checkArgument` in `SparkBootstrapCommitActionExecutor.java:292-293`)
-- Both modes require reading file contents: `FULL_RECORD` rewrites all data, `METADATA_ONLY` reads every record to extract record keys
-- For cold storage, reading file contents triggers data retrieval (e.g., restore from Glacier), which is expensive, slow, and often impractical for terabytes of archival data
-- If users bootstrap only recent partitions and skip cold ones entirely, Hudi queries that span into the cold date range silently return incomplete results — **silent data loss**
-- The bootstrap epic ([#14665](https://github.com/apache/hudi/issues/14665)) describes "Onboard for new partitions alone" but there is no implementation that safely handles query completeness for skipped partitions
-
-Real scenario:
-- A Hive table has 3 years of daily partitions (~1,095 partitions)
-- Only the last year of data is in standard storage; older data is in S3 Glacier
-- User wants to onboard to Hudi but cannot afford to restore 2+ years of Glacier data just to extract record keys
-- With today's bootstrap, the user must either: (a) pay the Glacier retrieval cost for all cold data, or (b) skip old partitions and risk silent data loss on queries
-
-### User Experience
-
-**How users will use this feature:**
-
-Configuration:
-```properties
-# Use the date-based 3-tier selector
-hoodie.bootstrap.mode.selector=org.apache.hudi.client.bootstrap.selector.DateBasedBootstrapModeSelector
-
-# Partitions newer than 30 days → FULL_RECORD (full rewrite into Hudi)
-hoodie.bootstrap.mode.selector.days.full_record=30
-
-# Partitions between 30 and 365 days → METADATA_ONLY (skeleton files, read warm storage for record keys)
-hoodie.bootstrap.mode.selector.days.metadata_only=365
-
-# Partitions older than 365 days → REGISTER_ONLY (no file content reading at all)
-# (implicit: anything older than metadata_only threshold)
-
-# Partition date format (to parse partition paths like datestr=2024-01-15)
-hoodie.bootstrap.mode.selector.partition.date.format=yyyy-MM-dd
-hoodie.bootstrap.mode.selector.partition.date.field=datestr
-```
-
-Usage Example — Spark bootstrap:
-```scala
-spark.emptyDataFrame.write
- .format("hudi")
- .option("hoodie.bootstrap.base.path", "/data/hive_table")
- .option("hoodie.table.name", "my_hudi_table")
- .option("hoodie.datasource.write.operation", "bootstrap")
- .option("hoodie.bootstrap.mode.selector",
- "org.apache.hudi.client.bootstrap.selector.DateBasedBootstrapModeSelector")
- .option("hoodie.bootstrap.mode.selector.days.full_record", "30")
- .option("hoodie.bootstrap.mode.selector.days.metadata_only", "365")
- .option("hoodie.bootstrap.mode.selector.partition.date.format", "yyyy-MM-dd")
- .option("hoodie.bootstrap.mode.selector.partition.date.field", "datestr")
- .mode(SaveMode.Overwrite)
- .save("/data/my_hudi_table")
-```
-
-Query behavior — **no query changes needed**:
-```sql
--- This returns ALL data: hot (FULL_RECORD) + warm (METADATA_ONLY) + cold (REGISTER_ONLY)
--- No views, no UNION ALL, no special syntax
-SELECT * FROM my_hudi_table;
-
--- Partition filtering works as expected
-SELECT * FROM my_hudi_table WHERE datestr >= '2024-01-01';
-
--- Cold partition queries work, just read as plain Parquet (may have different performance)
-SELECT * FROM my_hudi_table WHERE datestr = '2022-06-15';
-```
-
-Performance characteristics by tier:
-
-| Tier | Bootstrap cost | Query performance | Hudi meta columns |
-|------|---------------|-------------------|-------------------|
-| FULL_RECORD (hot) | Full rewrite | Best — native Hudi file | All populated |
-| METADATA_ONLY (warm) | Read for record keys | Moderate — skeleton stitching at read time | All populated |
-| REGISTER_ONLY (cold) | File listing only (no content read) | Same as plain Parquet | Returned as null |
-
-Write guardrails:
-- Upserts/deletes targeting REGISTER_ONLY partitions fail fast with a clear error message
-- This is expected: without record keys, Hudi cannot index or merge records in these partitions
-- If cold data is later restored to warm/hot storage, partitions can be "promoted" via re-bootstrap
-
-API Changes:
-
-New public APIs:
-```java
-// New enum value in BootstrapMode
-public enum BootstrapMode {
- FULL_RECORD,
- METADATA_ONLY,
- REGISTER_ONLY // NEW: register partition file listing without reading contents
-}
-
-// New selector
-public class DateBasedBootstrapModeSelector extends BootstrapModeSelector {
- @Override
- public Map> select(
- List>> partitions);
-}
-```
-
-### Hudi RFC Requirements
-
-**RFC PR link:** (if applicable)
-
-Why RFC is needed:
-- Does this change public interfaces/APIs? **Yes**
- - New `REGISTER_ONLY` value in `BootstrapMode` enum
- - New `DateBasedBootstrapModeSelector` class
- - Read path changes for bootstrapped tables to natively serve plain Parquet files without Hudi meta columns
- - New table property to indicate presence of REGISTER_ONLY partitions
-
-- Does this change storage format? **Minor**
- - Bootstrap commit metadata will include REGISTER_ONLY partition entries (file listings without skeleton files)
- - No new file formats; REGISTER_ONLY partitions reference original source Parquet files as-is
- - Backward compatible: tables without REGISTER_ONLY partitions are unaffected
-
-Justification:
-- Extends the bootstrap mode enum (affects selectors, executor, read path)
-- Read path changes require handling files without Hudi meta columns within a Hudi table
-- Needs design review for query completeness and schema merging guarantees
-- Affects how Hudi defines table boundaries (a Hudi table now includes "unmanaged" partitions)
-
-### Task Breakdown
-
-**Phase 1: Core Bootstrap Changes (write path)**
-- Add `REGISTER_ONLY` to `BootstrapMode` enum (`BootstrapMode.java`)
-- Create `DateBasedBootstrapModeSelector` with configurable date thresholds and partition date parsing
-- Add config properties to `HoodieBootstrapConfig.java` for date-based tier boundaries
-- Modify `SparkBootstrapCommitActionExecutor` to handle 3 modes:
- - Relax `checkArgument` validation (line 292-293) to accept `REGISTER_ONLY` partitions
- - For `REGISTER_ONLY`: record partition file listings in commit metadata without reading file contents or creating skeleton files
- - Skip bootstrap index entries for `REGISTER_ONLY` partitions (`HFileBootstrapIndex`)
-- Add table property `hoodie.bootstrap.has.register.only.partitions`
-- Unit tests for selector and executor changes
-
-**Phase 2: Read Path — Native Query Completeness (critical)**
-- Modify `HoodieBootstrapRelation` (Spark) to handle REGISTER_ONLY partitions:
- - When a base file has no bootstrap index entry AND its schema has no Hudi meta columns → read as plain Parquet
- - Return null for Hudi meta columns (`_hoodie_record_key`, `_hoodie_commit_time`, `_hoodie_partition_path`, `_hoodie_file_name`, `_hoodie_commit_seqno`)
-- Handle schema merging: queries spanning multiple tiers must produce a unified schema where Hudi meta columns are nullable
-- Ensure partition pruning works correctly for REGISTER_ONLY partitions
-- Integration tests verifying:
- - `SELECT *` across all 3 tiers returns complete results
- - `SELECT * WHERE partition_col = ` returns correct data
- - `SELECT * WHERE partition_col = ` performance is unaffected
- - Hudi meta columns are null for REGISTER_ONLY rows, populated for others
-
-**Phase 3: Write Path Guardrails**
-- Fail fast when upsert/delete targets a REGISTER_ONLY partition with a clear error message:
- `"Cannot upsert/delete in REGISTER_ONLY bootstrap partition [datestr=2022-06-15]. Re-bootstrap with FULL_RECORD or METADATA_ONLY mode to enable writes."`
-- Allow insert-overwrite to "promote" a REGISTER_ONLY partition to a regular Hudi partition (optional, future enhancement)
-
-**Phase 4: Tooling & Documentation** (optional, future)
-- CLI command to list partitions by bootstrap mode
-- CLI command to "promote" REGISTER_ONLY partitions to METADATA_ONLY or FULL_RECORD (when data is restored from cold storage)
-- Documentation and migration guide updates
-
-### Related Issues
-- [#14665](https://github.com/apache/hudi/issues/14665) — Efficient bootstrap and migration of existing non-Hudi dataset (parent epic)
-- [#15974](https://github.com/apache/hudi/issues/15974) — Treat full bootstrap table as regular table
-- [#15856](https://github.com/apache/hudi/issues/15856) — Precombine field is not required for metadata only bootstrap
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index ef5e89fe806b3..b20e19d57545b 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -875,7 +875,11 @@ private Pair