Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
package org.apache.hudi.client.validator;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.util.CheckpointUtils;
import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig.ValidationFailurePolicy;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieValidationException;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -50,7 +52,11 @@
*
* Subclasses specify:
* - Checkpoint format (SPARK_KAFKA, FLINK_KAFKA, etc.)
* - Checkpoint metadata key
* - Checkpoint metadata key (optional — when omitted, the validator auto-resolves the
* active streamer key from commit metadata using
* {@link org.apache.hudi.common.table.checkpoint.CheckpointUtils#getCheckpoint(HoodieCommitMetadata)},
* which prefers V2 and falls back to V1. Subclasses that read a custom non-streamer key
* (e.g. Flink's HOODIE_METADATA_KEY) must pass it explicitly.)
* - Source-specific parsing logic (if needed)
*
* Configuration:
Expand All @@ -66,7 +72,26 @@ public abstract class StreamingOffsetValidator extends BasePreCommitValidator {
protected final CheckpointFormat checkpointFormat;

/**
* Create a streaming offset validator.
* Create a streaming offset validator that auto-resolves the checkpoint key from commit
* metadata using {@link org.apache.hudi.common.table.checkpoint.CheckpointUtils#getCheckpoint(HoodieCommitMetadata)}.
*
* <p>Use this constructor for streamer pipelines (V1 or V2 checkpoint keys). The validator
* will prefer V2 (table version 8+) and fall back to V1 transparently, so subclasses don't
* need to know which key the writer used.</p>
*
* @param config Validator configuration
* @param checkpointFormat Format of the checkpoint string
*/
protected StreamingOffsetValidator(TypedProperties config,
CheckpointFormat checkpointFormat) {
this(config, null, checkpointFormat);
}

/**
* Create a streaming offset validator with an explicit checkpoint metadata key.
*
* <p>Use this constructor when the writer stores its checkpoint under a custom key that
* is not the standard streamer V1/V2 key (e.g. Flink's HOODIE_METADATA_KEY).</p>
*
* @param config Validator configuration
* @param checkpointKey Key to extract checkpoint from extraMetadata
Expand Down Expand Up @@ -95,10 +120,12 @@ public void validateWithMetadata(ValidationContext context) throws HoodieValidat
return;
}

// Extract current checkpoint
Option<String> currentCheckpointOpt = context.getExtraMetadata(checkpointKey);
// Extract current checkpoint — either from the explicit key (custom writers like Flink) or
// by auto-resolving from commit metadata (streamer pipelines, V2-then-V1 fallback).
Option<String> currentCheckpointOpt = resolveCheckpoint(context.getCommitMetadata());
if (!currentCheckpointOpt.isPresent()) {
log.warn("Current checkpoint not found with key: {}. Skipping validation.", checkpointKey);
log.warn("Current checkpoint not found (key: {}). Skipping validation.",
checkpointKey == null ? "<auto-resolved>" : checkpointKey);
return;
}
String currentCheckpoint = currentCheckpointOpt.get();
Expand All @@ -110,8 +137,7 @@ public void validateWithMetadata(ValidationContext context) throws HoodieValidat
}

// Extract previous checkpoint
Option<String> previousCheckpointOpt = context.getPreviousCommitMetadata()
.flatMap(metadata -> Option.ofNullable(metadata.getMetadata(checkpointKey)));
Option<String> previousCheckpointOpt = resolveCheckpoint(context.getPreviousCommitMetadata());

if (!previousCheckpointOpt.isPresent()) {
log.info("Previous checkpoint not found. May be first streaming commit. Skipping validation.");
Expand Down Expand Up @@ -139,6 +165,10 @@ public void validateWithMetadata(ValidationContext context) throws HoodieValidat
long recordsWritten = context.getTotalInsertRecordsWritten()
+ context.getTotalUpdateRecordsWritten();

// Track write errors so callers can distinguish write-failure deviation (write errors > 0)
// from silent data loss (write errors == 0) when the validator fires.
long writeErrors = context.getTotalWriteErrors();

// For empty commits (e.g., no new data from source), both offsetDiff and recordsWritten
// can be zero. This is a valid scenario — skip validation to avoid false positives.
if (offsetDifference == 0 && recordsWritten == 0) {
Expand All @@ -147,20 +177,21 @@ public void validateWithMetadata(ValidationContext context) throws HoodieValidat
}

// Validate offset vs record consistency
validateOffsetConsistency(offsetDifference, recordsWritten,
validateOffsetConsistency(offsetDifference, recordsWritten, writeErrors,
currentCheckpoint, previousCheckpoint);
}

/**
* Validate that offset difference matches record count within tolerance.
*
* @param offsetDiff Expected records based on offset difference
* @param recordsWritten Actual records written
* @param recordsWritten Actual records written (inserts + updates)
* @param writeErrors Records that failed to write (tracked in write status errors)
* @param currentCheckpoint Current checkpoint string (for error messages)
* @param previousCheckpoint Previous checkpoint string (for error messages)
* @throws HoodieValidationException if validation fails and policy is FAIL
*/
protected void validateOffsetConsistency(long offsetDiff, long recordsWritten,
protected void validateOffsetConsistency(long offsetDiff, long recordsWritten, long writeErrors,
String currentCheckpoint, String previousCheckpoint)
throws HoodieValidationException {

Expand All @@ -169,10 +200,13 @@ protected void validateOffsetConsistency(long offsetDiff, long recordsWritten,
if (deviation > tolerancePercentage) {
String errorMsg = String.format(
"Streaming offset validation failed. "
+ "Offset difference: %d, Records written: %d, Deviation: %.2f%%, Tolerance: %.2f%%. "
+ "This may indicate data loss or filtering. "
+ "Offset difference: %d, Records written: %d, Write errors: %d, Deviation: %.2f%%, Tolerance: %.2f%%. "
+ "%s"
+ "Previous checkpoint: %s, Current checkpoint: %s",
offsetDiff, recordsWritten, deviation, tolerancePercentage,
offsetDiff, recordsWritten, writeErrors, deviation, tolerancePercentage,
writeErrors > 0
? "Non-zero write errors suggest records failed to write rather than silent data loss. "
: "This may indicate data loss or filtering. ",
previousCheckpoint, currentCheckpoint);

if (failurePolicy == ValidationFailurePolicy.WARN_LOG) {
Expand All @@ -181,8 +215,8 @@ protected void validateOffsetConsistency(long offsetDiff, long recordsWritten,
throw new HoodieValidationException(errorMsg);
}
} else {
log.info("Offset validation passed. Offset diff: {}, Records: {}, Deviation: {}% (within {}%)",
offsetDiff, recordsWritten, String.format("%.2f", deviation), tolerancePercentage);
log.info("Offset validation passed. Offset diff: {}, Records: {}, Write errors: {}, Deviation: {}% (within {}%)",
offsetDiff, recordsWritten, writeErrors, String.format("%.2f", deviation), tolerancePercentage);
}
}

Expand Down Expand Up @@ -210,4 +244,33 @@ private double calculateDeviation(long offsetDiff, long recordsWritten) {
long difference = Math.abs(offsetDiff - recordsWritten);
return (100.0 * difference) / offsetDiff;
}

/**
* Resolve the checkpoint string from commit metadata.
*
* <p>When the validator was constructed with an explicit {@code checkpointKey}, that key
* is read directly. Otherwise, {@link org.apache.hudi.common.table.checkpoint.CheckpointUtils#getCheckpoint(HoodieCommitMetadata)}
* is used to locate the active streamer checkpoint (V2 first, V1 fallback), so callers
* don't need to know which key the writer used.</p>
*
* @param commitMetadataOpt Optional commit metadata containing extraMetadata
* @return Optional checkpoint string (empty if metadata is absent or no checkpoint key matches)
*/
private Option<String> resolveCheckpoint(Option<HoodieCommitMetadata> commitMetadataOpt) {
if (!commitMetadataOpt.isPresent()) {
return Option.empty();
}
HoodieCommitMetadata metadata = commitMetadataOpt.get();
if (checkpointKey != null) {
return Option.ofNullable(metadata.getMetadata(checkpointKey));
}
try {
return Option.ofNullable(
org.apache.hudi.common.table.checkpoint.CheckpointUtils.getCheckpoint(metadata)
.getCheckpointKey());
} catch (HoodieException e) {
// No V1 or V2 streamer checkpoint key present in extraMetadata.
return Option.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ public class HoodiePreCommitValidatorConfig extends HoodieConfig {
.key("hoodie.precommit.validators")
.defaultValue("")
.markAdvanced()
.withDocumentation("Comma separated list of class names that can be invoked to validate commit");
.withDocumentation("Comma separated list of class names that can be invoked to validate commit. "
+ "Available streaming offset validators: "
+ "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator (Flink Kafka), "
+ "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator (Spark/HoodieStreamer Kafka)");
public static final String VALIDATOR_TABLE_VARIABLE = "<TABLE_NAME>";

public static final ConfigProperty<String> EQUALITY_SQL_QUERIES = ConfigProperty
Expand Down Expand Up @@ -71,7 +74,8 @@ public class HoodiePreCommitValidatorConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Tolerance percentage for streaming offset validation "
+ "(used by org.apache.hudi.client.validator.StreamingOffsetValidator "
+ "and org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator). "
+ "and org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator "
+ "and org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator). "
+ "The validator compares the offset difference (expected records from source) "
+ "with actual records written. If the deviation exceeds this percentage, "
+ "the commit is rejected or warned depending on the validation failure policy. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public MockOffsetValidator(TypedProperties config) {
// Expose protected method for testing
public void testValidateOffsetConsistency(long offsetDiff, long recordsWritten,
String current, String previous) {
validateOffsetConsistency(offsetDiff, recordsWritten, current, previous);
validateOffsetConsistency(offsetDiff, recordsWritten, 0L, current, previous);
}

// Expose validateWithMetadata for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,28 @@ public static void runValidators(HoodieWriteConfig config,
Dataset<Row> beforeState = getRecordsFromCommittedFiles(sqlContext, partitionsModified, table, afterState.schema());

Stream<SparkPreCommitValidator> validators = Arrays.stream(config.getPreCommitValidators().split(","))
.map(validatorClass -> ((SparkPreCommitValidator) ReflectionUtils.loadClass(validatorClass,
new Class<?>[] {HoodieSparkTable.class, HoodieEngineContext.class, HoodieWriteConfig.class},
table, context, config)));
.map(String::trim)
.filter(s -> !s.isEmpty())
.flatMap(validatorClass -> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: this flatMap lambda has grown to ~16 lines doing class loading, type checking, reflection and two distinct exception translations. Could you extract it into a small private helper like instantiateSparkValidator(String validatorClass, HoodieSparkTable table, HoodieEngineContext context, HoodieWriteConfig config) returning Stream<SparkPreCommitValidator>? Would make the stream pipeline read at a glance.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

try {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Unlike SparkStreamerValidatorUtils which has .filter(s -> !s.isEmpty()), this path will call Class.forName("") if the config has a trailing comma (e.g. "ValidatorA,"), turning a benign config typo into a HoodieValidationException. Could you add an empty-string filter before the Class.forName call, like the new utility does?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Added .filter(s -> !s.isEmpty()) before the Class.forName call, matching the guard already present in SparkStreamerValidatorUtils.

Class<?> clazz = Class.forName(validatorClass);
if (!SparkPreCommitValidator.class.isAssignableFrom(clazz)) {
LOG.warn("Skipping validator {} — it does not implement SparkPreCommitValidator. "
+ "If this is a streaming offset validator (e.g. SparkKafkaOffsetValidator), "
+ "it will be invoked by SparkStreamerValidatorUtils instead.", validatorClass);
return Stream.empty();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: after the isAssignableFrom check, could you use ReflectionUtils.loadClass(validatorClass, new Class<?>[] {...}, table, context, config) for instantiation instead of getDeclaredConstructor(...).newInstance(...)? That would keep this consistent with the rest of the codebase (and with SparkStreamerValidatorUtils just below), while also getting Hudi's wrapper around instantiation errors for free.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

SparkPreCommitValidator validator = (SparkPreCommitValidator) ReflectionUtils.loadClass(
validatorClass,
new Class<?>[] {HoodieSparkTable.class, HoodieEngineContext.class, HoodieWriteConfig.class},
table, context, config);
return Stream.of(validator);
} catch (ClassNotFoundException e) {
throw new HoodieValidationException("Cannot find validator class: " + validatorClass, e);
} catch (ReflectiveOperationException e) {
throw new HoodieValidationException("Failed to instantiate validator: " + validatorClass, e);
}
});

boolean allSuccess = validators.map(v -> runValidatorAsync(v, writeMetadata, beforeState, afterState, instantTime)).map(CompletableFuture::join)
.reduce(true, Boolean::logicalAnd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,20 @@ default long getTotalUpdateRecordsWritten() {
.orElse(0L);
}

/**
* Calculate total write errors in the current commit.
* Records that failed to write are tracked in {@link org.apache.hudi.common.model.HoodieWriteStat#getTotalWriteErrors()}.
* A non-zero error count alongside a deviation in offset validation indicates write failures
* rather than silent data loss — useful context for distinguishing the two failure modes.
*
* @return Total count of records that failed to write
*/
default long getTotalWriteErrors() {
return getWriteStats()
.map(stats -> stats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum())
.orElse(0L);
}

/**
* Check if this is the first commit (no previous commits exist).
* Derived from {@link #getPreviousCommitInstant()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.data.HoodieJavaRDD;
Expand Down Expand Up @@ -115,6 +116,7 @@
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.streamer.HoodieStreamer.Config;
import org.apache.hudi.utilities.streamer.validator.SparkStreamerValidatorUtils;
import org.apache.hudi.utilities.transform.Transformer;

import com.codahale.metrics.Timer;
Expand All @@ -128,6 +130,7 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -874,8 +877,38 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSinkAndDoMetaSync(Hood
totalSuccessfulRecords);
String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType));

boolean success = writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata), commitActionType, partitionToReplacedFileIds, Option.empty(),
Option.of(writeStatusValidator));
// Cache the RDD only when pre-commit validators are configured. Validators collect the RDD
// before commit, so without caching the same DAG would re-evaluate inside writeClient.commit().
// When no validators are configured, commit consumes the RDD once and caching adds no value.
// shouldUnpersist is true only when we created the cache here (validators present and storage
// level was NONE), so the finally block knows to release it.
boolean validatorsConfigured = !StringUtils.isNullOrEmpty(props.getString(
HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()));
boolean shouldUnpersist = validatorsConfigured && writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
Comment thread
codope marked this conversation as resolved.
if (shouldUnpersist) {
writeStatusRDD.cache();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. can we trigger the cache only when the validators are not empty;
  2. I see we already have a writeStatusValidator for validating the write errors, it should be feasible that we migrate it as one of the pre-commit validators.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @danny0405!

1. Conditional cache — done in 5f4bca6. The cache/unpersist cycle is now guarded by a validatorsConfigured check derived from hoodie.precommit.validators, so the no-validator path stays a single-pass DAG with zero overhead.

2. Migrating writeStatusValidator to a pre-commit validator — agree with the direction, but I'd like to do this as a follow-up PR rather than expanding scope here. The reason: HoodieStreamerWriteStatusValidator (StreamSync.java:1403) does substantially more than pure validation:

  • Counts records/errors and writes to a shared AtomicLong totalSuccessfulRecords consumed after commit (drives the runMetaSync() decision)
  • Commits the error table as a side effect, with ROLLBACK_COMMIT/LOG_ERROR strategies
  • Rolls back the instant on validation failure and logs the top-100 errors
  • Runs inside writeClient.commit() via the WriteStatusValidator callback hook (not before commit), so the framework difference matters

A BasePreCommitValidator only takes TypedProperties and exposes state via ValidationContext — there's no path today to surface the raw JavaRDD<WriteStatus> (needed for error-table commit) or the post-commit counter through that interface. A clean migration would need to either (a) extend ValidationContext to expose the RDD and inject the error-table writer, or (b) split the side-effects out of the callback into a separate post-validation step.

Would you be OK if I file a follow-up issue + PR for that refactor so this PR can stay focused on the streaming-offset framework? Happy to take it on right after this lands.

Copy link
Copy Markdown
Contributor Author

@shangxinli shangxinli May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #18750 to track the writeStatusValidator migration as a follow-up. Will pick it up after this PR lands.

}
boolean success;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 If SparkStreamerValidatorUtils.runValidators(...) throws (e.g. HoodieValidationException under FAIL policy, or HoodieIOException from loadPreviousCommitMetadata), control leaves the method before the try { ... } finally { unpersist() } block is entered, so the cached RDD is never unpersisted. Could you move the runValidators(...) call inside the same try whose finally does the unpersist() (or wrap the cache+validate+commit together) so the cache is always released on validation failure?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

try {
if (validatorsConfigured) {
List<WriteStatus> writeStatuses = writeStatusRDD.collect();

// Run pre-commit streaming offset validators (if configured).
// Placement before writeClient.commit() is intentional: offset validation is a stronger
// guard than commitOnErrors — if offset deviation indicates potential data loss, the commit
// must be prevented regardless of the commitOnErrors policy.
SparkStreamerValidatorUtils.runValidators(props, instantTime, writeStatuses,
checkpointCommitMetadata, metaClient);
}

success = writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata), commitActionType, partitionToReplacedFileIds, Option.empty(),
Option.of(writeStatusValidator));
} finally {
if (shouldUnpersist) {
writeStatusRDD.unpersist();
}
}
releaseResourcesInvoked = true;
if (success) {
LOG.info("Commit " + instantTime + " successful!");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 If runValidators() or writeClient.commit() throws an exception, the unpersist() call is skipped and the cached RDD stays pinned in executor memory. Could you wrap the validate+commit+unpersist block in a try/finally so unpersist() always runs when weOwnCache is true?

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Expand Down
Loading
Loading