diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java index ce577d84ca018..40e7a4f1635f9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java @@ -20,11 +20,13 @@ package org.apache.hudi.client.validator; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.util.CheckpointUtils; import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodiePreCommitValidatorConfig; import org.apache.hudi.config.HoodiePreCommitValidatorConfig.ValidationFailurePolicy; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieValidationException; import lombok.extern.slf4j.Slf4j; @@ -50,7 +52,11 @@ * * Subclasses specify: * - Checkpoint format (SPARK_KAFKA, FLINK_KAFKA, etc.) - * - Checkpoint metadata key + * - Checkpoint metadata key (optional — when omitted, the validator auto-resolves the + * active streamer key from commit metadata using + * {@link org.apache.hudi.common.table.checkpoint.CheckpointUtils#getCheckpoint(HoodieCommitMetadata)}, + * which prefers V2 and falls back to V1. Subclasses that read a custom non-streamer key + * (e.g. Flink's HOODIE_METADATA_KEY) must pass it explicitly.) * - Source-specific parsing logic (if needed) * * Configuration: @@ -66,7 +72,26 @@ public abstract class StreamingOffsetValidator extends BasePreCommitValidator { protected final CheckpointFormat checkpointFormat; /** - * Create a streaming offset validator. + * Create a streaming offset validator that auto-resolves the checkpoint key from commit + * metadata using {@link org.apache.hudi.common.table.checkpoint.CheckpointUtils#getCheckpoint(HoodieCommitMetadata)}. + * + *

Use this constructor for streamer pipelines (V1 or V2 checkpoint keys). The validator + * will prefer V2 (table version 8+) and fall back to V1 transparently, so subclasses don't + * need to know which key the writer used.

+ * + * @param config Validator configuration + * @param checkpointFormat Format of the checkpoint string + */ + protected StreamingOffsetValidator(TypedProperties config, + CheckpointFormat checkpointFormat) { + this(config, null, checkpointFormat); + } + + /** + * Create a streaming offset validator with an explicit checkpoint metadata key. + * + *

Use this constructor when the writer stores its checkpoint under a custom key that + * is not the standard streamer V1/V2 key (e.g. Flink's HOODIE_METADATA_KEY).

* * @param config Validator configuration * @param checkpointKey Key to extract checkpoint from extraMetadata @@ -95,10 +120,12 @@ public void validateWithMetadata(ValidationContext context) throws HoodieValidat return; } - // Extract current checkpoint - Option currentCheckpointOpt = context.getExtraMetadata(checkpointKey); + // Extract current checkpoint — either from the explicit key (custom writers like Flink) or + // by auto-resolving from commit metadata (streamer pipelines, V2-then-V1 fallback). + Option currentCheckpointOpt = resolveCheckpoint(context.getCommitMetadata()); if (!currentCheckpointOpt.isPresent()) { - log.warn("Current checkpoint not found with key: {}. Skipping validation.", checkpointKey); + log.warn("Current checkpoint not found (key: {}). Skipping validation.", + checkpointKey == null ? "" : checkpointKey); return; } String currentCheckpoint = currentCheckpointOpt.get(); @@ -110,8 +137,7 @@ public void validateWithMetadata(ValidationContext context) throws HoodieValidat } // Extract previous checkpoint - Option previousCheckpointOpt = context.getPreviousCommitMetadata() - .flatMap(metadata -> Option.ofNullable(metadata.getMetadata(checkpointKey))); + Option previousCheckpointOpt = resolveCheckpoint(context.getPreviousCommitMetadata()); if (!previousCheckpointOpt.isPresent()) { log.info("Previous checkpoint not found. May be first streaming commit. Skipping validation."); @@ -139,6 +165,10 @@ public void validateWithMetadata(ValidationContext context) throws HoodieValidat long recordsWritten = context.getTotalInsertRecordsWritten() + context.getTotalUpdateRecordsWritten(); + // Track write errors so callers can distinguish write-failure deviation (write errors > 0) + // from silent data loss (write errors == 0) when the validator fires. + long writeErrors = context.getTotalWriteErrors(); + // For empty commits (e.g., no new data from source), both offsetDiff and recordsWritten // can be zero. This is a valid scenario — skip validation to avoid false positives. if (offsetDifference == 0 && recordsWritten == 0) { @@ -147,7 +177,7 @@ public void validateWithMetadata(ValidationContext context) throws HoodieValidat } // Validate offset vs record consistency - validateOffsetConsistency(offsetDifference, recordsWritten, + validateOffsetConsistency(offsetDifference, recordsWritten, writeErrors, currentCheckpoint, previousCheckpoint); } @@ -155,12 +185,13 @@ public void validateWithMetadata(ValidationContext context) throws HoodieValidat * Validate that offset difference matches record count within tolerance. * * @param offsetDiff Expected records based on offset difference - * @param recordsWritten Actual records written + * @param recordsWritten Actual records written (inserts + updates) + * @param writeErrors Records that failed to write (tracked in write status errors) * @param currentCheckpoint Current checkpoint string (for error messages) * @param previousCheckpoint Previous checkpoint string (for error messages) * @throws HoodieValidationException if validation fails and policy is FAIL */ - protected void validateOffsetConsistency(long offsetDiff, long recordsWritten, + protected void validateOffsetConsistency(long offsetDiff, long recordsWritten, long writeErrors, String currentCheckpoint, String previousCheckpoint) throws HoodieValidationException { @@ -169,10 +200,13 @@ protected void validateOffsetConsistency(long offsetDiff, long recordsWritten, if (deviation > tolerancePercentage) { String errorMsg = String.format( "Streaming offset validation failed. " - + "Offset difference: %d, Records written: %d, Deviation: %.2f%%, Tolerance: %.2f%%. " - + "This may indicate data loss or filtering. " + + "Offset difference: %d, Records written: %d, Write errors: %d, Deviation: %.2f%%, Tolerance: %.2f%%. " + + "%s" + "Previous checkpoint: %s, Current checkpoint: %s", - offsetDiff, recordsWritten, deviation, tolerancePercentage, + offsetDiff, recordsWritten, writeErrors, deviation, tolerancePercentage, + writeErrors > 0 + ? "Non-zero write errors suggest records failed to write rather than silent data loss. " + : "This may indicate data loss or filtering. ", previousCheckpoint, currentCheckpoint); if (failurePolicy == ValidationFailurePolicy.WARN_LOG) { @@ -181,8 +215,8 @@ protected void validateOffsetConsistency(long offsetDiff, long recordsWritten, throw new HoodieValidationException(errorMsg); } } else { - log.info("Offset validation passed. Offset diff: {}, Records: {}, Deviation: {}% (within {}%)", - offsetDiff, recordsWritten, String.format("%.2f", deviation), tolerancePercentage); + log.info("Offset validation passed. Offset diff: {}, Records: {}, Write errors: {}, Deviation: {}% (within {}%)", + offsetDiff, recordsWritten, writeErrors, String.format("%.2f", deviation), tolerancePercentage); } } @@ -210,4 +244,33 @@ private double calculateDeviation(long offsetDiff, long recordsWritten) { long difference = Math.abs(offsetDiff - recordsWritten); return (100.0 * difference) / offsetDiff; } + + /** + * Resolve the checkpoint string from commit metadata. + * + *

When the validator was constructed with an explicit {@code checkpointKey}, that key + * is read directly. Otherwise, {@link org.apache.hudi.common.table.checkpoint.CheckpointUtils#getCheckpoint(HoodieCommitMetadata)} + * is used to locate the active streamer checkpoint (V2 first, V1 fallback), so callers + * don't need to know which key the writer used.

+ * + * @param commitMetadataOpt Optional commit metadata containing extraMetadata + * @return Optional checkpoint string (empty if metadata is absent or no checkpoint key matches) + */ + private Option resolveCheckpoint(Option commitMetadataOpt) { + if (!commitMetadataOpt.isPresent()) { + return Option.empty(); + } + HoodieCommitMetadata metadata = commitMetadataOpt.get(); + if (checkpointKey != null) { + return Option.ofNullable(metadata.getMetadata(checkpointKey)); + } + try { + return Option.ofNullable( + org.apache.hudi.common.table.checkpoint.CheckpointUtils.getCheckpoint(metadata) + .getCheckpointKey()); + } catch (HoodieException e) { + // No V1 or V2 streamer checkpoint key present in extraMetadata. + return Option.empty(); + } + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java index f85cc44120d4e..169494b7244ac 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java @@ -43,7 +43,10 @@ public class HoodiePreCommitValidatorConfig extends HoodieConfig { .key("hoodie.precommit.validators") .defaultValue("") .markAdvanced() - .withDocumentation("Comma separated list of class names that can be invoked to validate commit"); + .withDocumentation("Comma separated list of class names that can be invoked to validate commit. " + + "Available streaming offset validators: " + + "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator (Flink Kafka), " + + "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator (Spark/HoodieStreamer Kafka)"); public static final String VALIDATOR_TABLE_VARIABLE = ""; public static final ConfigProperty EQUALITY_SQL_QUERIES = ConfigProperty @@ -71,7 +74,8 @@ public class HoodiePreCommitValidatorConfig extends HoodieConfig { .markAdvanced() .withDocumentation("Tolerance percentage for streaming offset validation " + "(used by org.apache.hudi.client.validator.StreamingOffsetValidator " - + "and org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator). " + + "and org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator " + + "and org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator). " + "The validator compares the offset difference (expected records from source) " + "with actual records written. If the deviation exceeds this percentage, " + "the commit is rejected or warned depending on the validation failure policy. " diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java index 818bc2087a3c1..bc402e5ee5a0e 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java @@ -62,7 +62,7 @@ public MockOffsetValidator(TypedProperties config) { // Expose protected method for testing public void testValidateOffsetConsistency(long offsetDiff, long recordsWritten, String current, String previous) { - validateOffsetConsistency(offsetDiff, recordsWritten, current, previous); + validateOffsetConsistency(offsetDiff, recordsWritten, 0L, current, previous); } // Expose validateWithMetadata for testing diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java index c7804631d3f65..40ee7d8cefff9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java @@ -84,9 +84,28 @@ public static void runValidators(HoodieWriteConfig config, Dataset beforeState = getRecordsFromCommittedFiles(sqlContext, partitionsModified, table, afterState.schema()); Stream validators = Arrays.stream(config.getPreCommitValidators().split(",")) - .map(validatorClass -> ((SparkPreCommitValidator) ReflectionUtils.loadClass(validatorClass, - new Class[] {HoodieSparkTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, - table, context, config))); + .map(String::trim) + .filter(s -> !s.isEmpty()) + .flatMap(validatorClass -> { + try { + Class clazz = Class.forName(validatorClass); + if (!SparkPreCommitValidator.class.isAssignableFrom(clazz)) { + LOG.warn("Skipping validator {} — it does not implement SparkPreCommitValidator. " + + "If this is a streaming offset validator (e.g. SparkKafkaOffsetValidator), " + + "it will be invoked by SparkStreamerValidatorUtils instead.", validatorClass); + return Stream.empty(); + } + SparkPreCommitValidator validator = (SparkPreCommitValidator) ReflectionUtils.loadClass( + validatorClass, + new Class[] {HoodieSparkTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, + table, context, config); + return Stream.of(validator); + } catch (ClassNotFoundException e) { + throw new HoodieValidationException("Cannot find validator class: " + validatorClass, e); + } catch (ReflectiveOperationException e) { + throw new HoodieValidationException("Failed to instantiate validator: " + validatorClass, e); + } + }); boolean allSuccess = validators.map(v -> runValidatorAsync(v, writeMetadata, beforeState, afterState, instantTime)).map(CompletableFuture::join) .reduce(true, Boolean::logicalAnd); diff --git a/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java b/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java index 30fdbb3ba3b4a..b85218e587e87 100644 --- a/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java @@ -169,6 +169,20 @@ default long getTotalUpdateRecordsWritten() { .orElse(0L); } + /** + * Calculate total write errors in the current commit. + * Records that failed to write are tracked in {@link org.apache.hudi.common.model.HoodieWriteStat#getTotalWriteErrors()}. + * A non-zero error count alongside a deviation in offset validation indicates write failures + * rather than silent data loss — useful context for distinguishing the two failure modes. + * + * @return Total count of records that failed to write + */ + default long getTotalWriteErrors() { + return getWriteStats() + .map(stats -> stats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum()) + .orElse(0L); + } + /** * Check if this is the first commit (no previous commits exist). * Derived from {@link #getPreviousCommitInstant()}. diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index 600891c85dff6..2b1406c7deab5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -74,6 +74,7 @@ import org.apache.hudi.config.HoodieErrorTableConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodiePayloadConfig; +import org.apache.hudi.config.HoodiePreCommitValidatorConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.data.HoodieJavaRDD; @@ -115,6 +116,7 @@ import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.Source; import org.apache.hudi.utilities.streamer.HoodieStreamer.Config; +import org.apache.hudi.utilities.streamer.validator.SparkStreamerValidatorUtils; import org.apache.hudi.utilities.transform.Transformer; import com.codahale.metrics.Timer; @@ -128,6 +130,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.StructType; +import org.apache.spark.storage.StorageLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -874,8 +877,38 @@ private Pair, JavaRDD> writeToSinkAndDoMetaSync(Hood totalSuccessfulRecords); String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType)); - boolean success = writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata), commitActionType, partitionToReplacedFileIds, Option.empty(), - Option.of(writeStatusValidator)); + // Cache the RDD only when pre-commit validators are configured. Validators collect the RDD + // before commit, so without caching the same DAG would re-evaluate inside writeClient.commit(). + // When no validators are configured, commit consumes the RDD once and caching adds no value. + // shouldUnpersist is true only when we created the cache here (validators present and storage + // level was NONE), so the finally block knows to release it. + boolean validatorsConfigured = !StringUtils.isNullOrEmpty(props.getString( + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue())); + boolean shouldUnpersist = validatorsConfigured && writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE()); + if (shouldUnpersist) { + writeStatusRDD.cache(); + } + boolean success; + try { + if (validatorsConfigured) { + List writeStatuses = writeStatusRDD.collect(); + + // Run pre-commit streaming offset validators (if configured). + // Placement before writeClient.commit() is intentional: offset validation is a stronger + // guard than commitOnErrors — if offset deviation indicates potential data loss, the commit + // must be prevented regardless of the commitOnErrors policy. + SparkStreamerValidatorUtils.runValidators(props, instantTime, writeStatuses, + checkpointCommitMetadata, metaClient); + } + + success = writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata), commitActionType, partitionToReplacedFileIds, Option.empty(), + Option.of(writeStatusValidator)); + } finally { + if (shouldUnpersist) { + writeStatusRDD.unpersist(); + } + } releaseResourcesInvoked = true; if (success) { LOG.info("Commit " + instantTime + " successful!"); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkKafkaOffsetValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkKafkaOffsetValidator.java new file mode 100644 index 0000000000000..589e09e96de89 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkKafkaOffsetValidator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.client.validator.StreamingOffsetValidator; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat; + +/** + * Spark/HoodieStreamer-specific Kafka offset validator. + * + *

Validates that the number of records written matches the Kafka offset difference + * between the current and previous HoodieStreamer checkpoints. The active checkpoint key + * (V1 {@code deltastreamer.checkpoint.key} or V2 {@code streamer.checkpoint.key.v2}) is + * resolved at validation time via {@link org.apache.hudi.common.table.checkpoint.CheckpointUtils#getCheckpoint}, + * so this validator works against tables written with either checkpoint key version.

+ * + *

Configuration: + *

    + *
  • {@code hoodie.precommit.validators}: Include + * {@code org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator}
  • + *
  • {@code hoodie.precommit.validators.streaming.offset.tolerance.percentage}: + * Acceptable deviation (default: 0.0 = strict)
  • + *
  • {@code hoodie.precommit.validators.failure.policy}: + * FAIL (default) or WARN_LOG
  • + *

+ * + *

This validator is primarily intended for append-only ingestion from Kafka via HoodieStreamer. + * For upsert workloads with deduplication, configure a higher tolerance or use WARN_LOG.

+ * + *

Important: This class extends {@link org.apache.hudi.client.validator.BasePreCommitValidator} + * and is invoked by {@link SparkStreamerValidatorUtils}, NOT by {@code SparkValidatorUtils} + * (which expects {@code SparkPreCommitValidator} with a different constructor signature). + * Listing this class in {@code hoodie.precommit.validators} while also using the standard + * Spark table write-path validators will cause an instantiation failure in {@code SparkValidatorUtils}. + * Use this validator exclusively with HoodieStreamer pipelines.

+ */ +public class SparkKafkaOffsetValidator extends StreamingOffsetValidator { + + public SparkKafkaOffsetValidator(TypedProperties config) { + super(config, CheckpointFormat.SPARK_KAFKA); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java new file mode 100644 index 0000000000000..474215ec0f3c1 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.validator.BasePreCommitValidator; +import org.apache.hudi.client.validator.ValidationContext; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodiePreCommitValidatorConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieValidationException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Utility for running pre-commit validators in the HoodieStreamer commit flow. + * + *

Instantiates and executes validators configured via + * {@code hoodie.precommit.validators}. Each validator must extend + * {@link BasePreCommitValidator} and have a constructor that accepts + * {@link TypedProperties}.

+ * + *

Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before + * the commit is finalized.

+ * + *

Note on validator compatibility: This utility uses a different instantiation + * mechanism than {@code SparkValidatorUtils} (used by the Spark table write path). + * {@code SparkValidatorUtils} expects validators implementing {@code SparkPreCommitValidator} + * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)} constructor. + * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend + * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor and + * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under the same + * {@code hoodie.precommit.validators} config if both paths are active.

+ */ +public class SparkStreamerValidatorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(SparkStreamerValidatorUtils.class); + + /** + * Run all configured pre-commit validators. + * + *

The caller is responsible for caching and unpersisting the source RDD if needed. + * This method accepts pre-collected write statuses to avoid a second DAG evaluation — + * the caller should cache the RDD, collect to this list, call this method, then pass + * the same RDD to {@code writeClient.commit()}, and unpersist after commit completes.

+ * + * @param props Configuration properties containing validator class names + * @param instantTime Commit instant time + * @param writeStatuses Pre-collected write statuses from Spark write operations + * @param checkpointCommitMetadata Extra metadata being committed (contains checkpoint info) + * @param metaClient Table meta client for timeline access and previous commit lookup + * @throws HoodieValidationException if any validator fails with FAIL policy + */ + public static void runValidators(TypedProperties props, + String instantTime, + List writeStatuses, + Map checkpointCommitMetadata, + HoodieTableMetaClient metaClient) { + String validatorClassNames = props.getString( + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()); + + if (StringUtils.isNullOrEmpty(validatorClassNames)) { + return; + } + + HoodieCommitMetadata currentMetadata = buildCommitMetadata(writeStatuses, checkpointCommitMetadata); + List writeStats = writeStatuses.stream() + .map(WriteStatus::getStat) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + Option previousCommitMetadata = loadPreviousCommitMetadata(metaClient); + + ValidationContext context = new SparkValidationContext( + instantTime, + Option.of(currentMetadata), + Option.of(writeStats), + previousCommitMetadata, + metaClient); + + List classNames = Arrays.stream(validatorClassNames.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + + for (String className : classNames) { + try { + Class clazz = Class.forName(className); + if (!BasePreCommitValidator.class.isAssignableFrom(clazz)) { + LOG.warn("Skipping validator {} in HoodieStreamer path — it does not extend BasePreCommitValidator. " + + "If this is a SparkPreCommitValidator (e.g. SqlQueryEqualityPreCommitValidator), " + + "it must be invoked via SparkValidatorUtils in the standard Spark write path instead.", className); + continue; + } + BasePreCommitValidator validator = (BasePreCommitValidator) + ReflectionUtils.loadClass(className, new Class[] {TypedProperties.class}, props); + LOG.info("Running pre-commit validator: {} for instant: {}", className, instantTime); + validator.validateWithMetadata(context); + LOG.info("Pre-commit validator {} passed for instant: {}", className, instantTime); + } catch (HoodieValidationException e) { + LOG.error("Pre-commit validator {} failed for instant: {}", className, instantTime, e); + throw e; + } catch (Exception e) { + LOG.error("Failed to instantiate or run validator: {}", className, e); + throw new HoodieValidationException( + "Failed to run pre-commit validator: " + className, e); + } + } + } + + /** + * Build a pre-commit snapshot of {@link HoodieCommitMetadata} from write statuses and extra metadata. + * + *

This is intentionally a partial/preview object used only for validation — it contains + * write stats and checkpoint extra-metadata, but omits fields that are not available before the + * commit (e.g. schema, operation type). Validators should treat this as a read-only snapshot + * of what will be committed, not a fully-constructed commit record.

+ */ + private static HoodieCommitMetadata buildCommitMetadata( + List writeStatuses, Map extraMetadata) { + HoodieCommitMetadata metadata = new HoodieCommitMetadata(); + + // Add write stats + for (WriteStatus status : writeStatuses) { + HoodieWriteStat stat = status.getStat(); + if (stat != null) { + metadata.addWriteStat(stat.getPartitionPath(), stat); + } + } + + // Add extra metadata (includes checkpoint info like deltastreamer.checkpoint.key) + if (extraMetadata != null) { + extraMetadata.forEach(metadata::addMetadata); + } + + return metadata; + } + + /** + * Load the previous completed commit metadata from the timeline. + */ + private static Option loadPreviousCommitMetadata(HoodieTableMetaClient metaClient) { + try { + HoodieTimeline completedTimeline = metaClient.reloadActiveTimeline() + .getWriteTimeline() + .filterCompletedInstants(); + Option lastInstant = completedTimeline.lastInstant(); + if (lastInstant.isPresent()) { + return Option.of(completedTimeline.readCommitMetadata(lastInstant.get())); + } + } catch (IOException e) { + throw new HoodieIOException("Failed to load previous commit metadata", e); + } + return Option.empty(); + } + + private SparkStreamerValidatorUtils() { + // Utility class + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java new file mode 100644 index 0000000000000..1d46e13aeedbd --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.client.validator.ValidationContext; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; + +import java.util.List; + +/** + * Spark/HoodieStreamer implementation of {@link ValidationContext}. + * + *

Constructed from data available in {@code StreamSync.writeToSinkAndDoMetaSync()} + * before the commit is finalized. Provides validators with access to commit metadata, + * write statistics, and previous commit information for streaming offset validation.

+ * + *

Unlike Flink's implementation, Spark can optionally provide active timeline access + * via {@link HoodieTableMetaClient} for richer validation patterns.

+ */ +public class SparkValidationContext implements ValidationContext { + + private final String instantTime; + private final Option commitMetadata; + private final Option> writeStats; + private final Option previousCommitMetadata; + private final HoodieTableMetaClient metaClient; + + /** + * Create a Spark validation context with full timeline access. + * + * @param instantTime Current commit instant time + * @param commitMetadata Current commit metadata (with extraMetadata including checkpoints) + * @param writeStats Write statistics from write operations + * @param previousCommitMetadata Metadata from the previous completed commit + * @param metaClient Table meta client for timeline access (may be null for testing) + */ + public SparkValidationContext(String instantTime, + Option commitMetadata, + Option> writeStats, + Option previousCommitMetadata, + HoodieTableMetaClient metaClient) { + this.instantTime = instantTime; + this.commitMetadata = commitMetadata; + this.writeStats = writeStats; + this.previousCommitMetadata = previousCommitMetadata; + this.metaClient = metaClient; + } + + /** + * Create a Spark validation context without timeline access (for testing). + * + * @param instantTime Current commit instant time + * @param commitMetadata Current commit metadata (with extraMetadata including checkpoints) + * @param writeStats Write statistics from write operations + * @param previousCommitMetadata Metadata from the previous completed commit + */ + public SparkValidationContext(String instantTime, + Option commitMetadata, + Option> writeStats, + Option previousCommitMetadata) { + this(instantTime, commitMetadata, writeStats, previousCommitMetadata, null); + } + + @Override + public String getInstantTime() { + return instantTime; + } + + @Override + public Option getCommitMetadata() { + return commitMetadata; + } + + @Override + public Option> getWriteStats() { + return writeStats; + } + + /** + * Get the active timeline. Available when metaClient is provided. + * + * @throws UnsupportedOperationException if metaClient was not provided + */ + @Override + public HoodieActiveTimeline getActiveTimeline() { + if (metaClient == null) { + throw new UnsupportedOperationException( + "Active timeline is not available without HoodieTableMetaClient."); + } + return metaClient.getActiveTimeline(); + } + + /** + * Get the previous completed commit instant by querying the timeline. + * Returns {@link Option#empty()} if this is the first commit or metaClient is unavailable. + */ + @Override + public Option getPreviousCommitInstant() { + if (metaClient == null) { + return Option.empty(); + } + return metaClient.getActiveTimeline() + .getWriteTimeline() + .filterCompletedInstants() + .lastInstant(); + } + + @Override + public boolean isFirstCommit() { + return !previousCommitMetadata.isPresent(); + } + + @Override + public Option getPreviousCommitMetadata() { + return previousCommitMetadata; + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkKafkaOffsetValidator.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkKafkaOffsetValidator.java new file mode 100644 index 0000000000000..d109aa3246f64 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkKafkaOffsetValidator.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodiePreCommitValidatorConfig; +import org.apache.hudi.exception.HoodieValidationException; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Tests for {@link SparkKafkaOffsetValidator}. + */ +public class TestSparkKafkaOffsetValidator { + + // ========== Helper methods ========== + + private static TypedProperties defaultConfig() { + TypedProperties props = new TypedProperties(); + props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(), "0.0"); + props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), "FAIL"); + return props; + } + + private static TypedProperties configWithTolerance(double tolerance) { + TypedProperties props = defaultConfig(); + props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(), + String.valueOf(tolerance)); + return props; + } + + private static TypedProperties configWithWarnPolicy() { + TypedProperties props = defaultConfig(); + props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), "WARN_LOG"); + return props; + } + + /** + * Build a Spark Kafka checkpoint string. + * Format: topic,partition:offset,partition:offset,... + */ + private static String buildSparkKafkaCheckpoint(String topic, int[] partitions, long[] offsets) { + StringBuilder sb = new StringBuilder(); + sb.append(topic); + for (int i = 0; i < partitions.length; i++) { + sb.append(",").append(partitions[i]).append(":").append(offsets[i]); + } + return sb.toString(); + } + + private static HoodieCommitMetadata buildMetadata(String checkpointValue) { + HoodieCommitMetadata metadata = new HoodieCommitMetadata(); + if (checkpointValue != null) { + metadata.addMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, checkpointValue); + } + return metadata; + } + + private static List buildWriteStats(long numInserts, long numUpdates) { + HoodieWriteStat stat = new HoodieWriteStat(); + stat.setNumInserts(numInserts); + stat.setNumUpdateWrites(numUpdates); + stat.setPartitionPath("partition1"); + return Collections.singletonList(stat); + } + + private static SparkValidationContext buildContext( + String instantTime, + HoodieCommitMetadata currentMetadata, + List writeStats, + HoodieCommitMetadata previousMetadata) { + return new SparkValidationContext( + instantTime, + Option.of(currentMetadata), + Option.of(writeStats), + previousMetadata != null ? Option.of(previousMetadata) : Option.empty()); + } + + // ========== Tests ========== + + @Test + public void testExactMatchPasses() { + // Previous: partition 0 at offset 100, partition 1 at offset 200 + // Current: partition 0 at offset 200, partition 1 at offset 300 + // Diff = (200-100) + (300-200) = 200. Records written = 200. + String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0, 1}, new long[]{100, 200}); + String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0, 1}, new long[]{200, 300}); + + SparkValidationContext ctx = buildContext("20260320120000000", + buildMetadata(currCheckpoint), + buildWriteStats(200, 0), + buildMetadata(prevCheckpoint)); + + SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig()); + assertDoesNotThrow(() -> validator.validateWithMetadata(ctx)); + } + + @Test + public void testDataLossDetected() { + // Diff = 1000 but only 500 records written -> 50% deviation + String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{0}); + String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{1000}); + + SparkValidationContext ctx = buildContext("20260320120000000", + buildMetadata(currCheckpoint), + buildWriteStats(500, 0), + buildMetadata(prevCheckpoint)); + + SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig()); + assertThrows(HoodieValidationException.class, () -> validator.validateWithMetadata(ctx)); + } + + @Test + public void testWithinTolerancePasses() { + // Diff = 1000, records = 950 -> 5% deviation, tolerance = 10% + String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{0}); + String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{1000}); + + SparkValidationContext ctx = buildContext("20260320120000000", + buildMetadata(currCheckpoint), + buildWriteStats(950, 0), + buildMetadata(prevCheckpoint)); + + SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(configWithTolerance(10.0)); + assertDoesNotThrow(() -> validator.validateWithMetadata(ctx)); + } + + @Test + public void testWarnPolicyDoesNotThrow() { + // Data loss but WARN_LOG policy + String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{0}); + String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{1000}); + + SparkValidationContext ctx = buildContext("20260320120000000", + buildMetadata(currCheckpoint), + buildWriteStats(0, 0), + buildMetadata(prevCheckpoint)); + + SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(configWithWarnPolicy()); + assertDoesNotThrow(() -> validator.validateWithMetadata(ctx)); + } + + @Test + public void testSkipsFirstCommit() { + String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{1000}); + + // No previous commit + SparkValidationContext ctx = new SparkValidationContext( + "20260320120000000", + Option.of(buildMetadata(currCheckpoint)), + Option.of(buildWriteStats(500, 0)), + Option.empty()); + + SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig()); + assertDoesNotThrow(() -> validator.validateWithMetadata(ctx)); + } + + @Test + public void testSkipsWhenNoCheckpointKey() { + // Current metadata has no checkpoint key + HoodieCommitMetadata currentMeta = new HoodieCommitMetadata(); + String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{100}); + + SparkValidationContext ctx = buildContext("20260320120000000", + currentMeta, + buildWriteStats(500, 0), + buildMetadata(prevCheckpoint)); + + SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig()); + assertDoesNotThrow(() -> validator.validateWithMetadata(ctx)); + } + + @Test + public void testMultiPartitionValidation() { + // 4 partitions, each advancing by 250 = total diff 1000 + String prevCheckpoint = buildSparkKafkaCheckpoint("events", + new int[]{0, 1, 2, 3}, new long[]{0, 0, 0, 0}); + String currCheckpoint = buildSparkKafkaCheckpoint("events", + new int[]{0, 1, 2, 3}, new long[]{250, 250, 250, 250}); + + SparkValidationContext ctx = buildContext("20260320120000000", + buildMetadata(currCheckpoint), + buildWriteStats(800, 200), // 800 inserts + 200 updates = 1000 + buildMetadata(prevCheckpoint)); + + SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig()); + assertDoesNotThrow(() -> validator.validateWithMetadata(ctx)); + } + + @Test + public void testEmptyCommitSkipsValidation() { + // Both offsets same and no records written + String checkpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{100}); + + SparkValidationContext ctx = buildContext("20260320120000000", + buildMetadata(checkpoint), + buildWriteStats(0, 0), + buildMetadata(checkpoint)); + + SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig()); + assertDoesNotThrow(() -> validator.validateWithMetadata(ctx)); + } + + @Test + public void testPreviousCheckpointMissingSkipsValidation() { + // Previous metadata exists but has no checkpoint key + HoodieCommitMetadata prevMeta = new HoodieCommitMetadata(); + + String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{1000}); + + SparkValidationContext ctx = buildContext("20260320120000000", + buildMetadata(currCheckpoint), + buildWriteStats(500, 0), + prevMeta); + + SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig()); + assertDoesNotThrow(() -> validator.validateWithMetadata(ctx)); + } + + @Test + public void testOvercountingDetected() { + // More records written than offset diff + // Diff = 100, records = 200 -> |100-200|/100 = 100% deviation + String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{0}); + String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{100}); + + SparkValidationContext ctx = buildContext("20260320120000000", + buildMetadata(currCheckpoint), + buildWriteStats(200, 0), + buildMetadata(prevCheckpoint)); + + SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig()); + assertThrows(HoodieValidationException.class, () -> validator.validateWithMetadata(ctx)); + } + + @Test + public void testExactToleranceBoundaryPasses() { + // Diff = 1000, records = 900 -> 10% deviation, tolerance = 10% + String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{0}); + String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{1000}); + + SparkValidationContext ctx = buildContext("20260320120000000", + buildMetadata(currCheckpoint), + buildWriteStats(900, 0), + buildMetadata(prevCheckpoint)); + + SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(configWithTolerance(10.0)); + assertDoesNotThrow(() -> validator.validateWithMetadata(ctx)); + } + + @Test + public void testJustOverToleranceFails() { + // Diff = 1000, records = 899 -> 10.1% deviation, tolerance = 10% + String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{0}); + String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{1000}); + + SparkValidationContext ctx = buildContext("20260320120000000", + buildMetadata(currCheckpoint), + buildWriteStats(899, 0), + buildMetadata(prevCheckpoint)); + + SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(configWithTolerance(10.0)); + assertThrows(HoodieValidationException.class, () -> validator.validateWithMetadata(ctx)); + } + + @Test + public void testOnlyInsertsNoUpdates() { + // Pure insert workload + String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0, 1}, new long[]{0, 0}); + String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0, 1}, new long[]{500, 500}); + + SparkValidationContext ctx = buildContext("20260320120000000", + buildMetadata(currCheckpoint), + buildWriteStats(1000, 0), + buildMetadata(prevCheckpoint)); + + SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig()); + assertDoesNotThrow(() -> validator.validateWithMetadata(ctx)); + } + + @Test + public void testUpdatesCountedInRecordTotal() { + // Diff = 1000. 600 inserts + 400 updates = 1000 total + String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{0}); + String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new long[]{1000}); + + SparkValidationContext ctx = buildContext("20260320120000000", + buildMetadata(currCheckpoint), + buildWriteStats(600, 400), + buildMetadata(prevCheckpoint)); + + SparkKafkaOffsetValidator validator = new SparkKafkaOffsetValidator(defaultConfig()); + assertDoesNotThrow(() -> validator.validateWithMetadata(ctx)); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkStreamerValidatorUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkStreamerValidatorUtils.java new file mode 100644 index 0000000000000..69d5d228dab60 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkStreamerValidatorUtils.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; +import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2; +import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodiePreCommitValidatorConfig; +import org.apache.hudi.exception.HoodieValidationException; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for {@link SparkStreamerValidatorUtils}. + * + *

Tests cover orchestration logic (class loading, config passing, error handling) + * as well as end-to-end offset validation using a two-commit timeline to verify + * the real comparison path is exercised.

+ */ +public class TestSparkStreamerValidatorUtils { + + @TempDir + Path tempDir; + + private static TypedProperties propsWithValidator(String validatorClassName) { + TypedProperties props = new TypedProperties(); + props.setProperty(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), validatorClassName); + props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(), "0.0"); + props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), "FAIL"); + return props; + } + + private static WriteStatus buildWriteStatus(String partitionPath, long numInserts, long numUpdates) { + HoodieWriteStat stat = new HoodieWriteStat(); + stat.setPartitionPath(partitionPath); + stat.setNumInserts(numInserts); + stat.setNumUpdateWrites(numUpdates); + + WriteStatus ws = new WriteStatus(false, 0.0); + ws.setStat(stat); + return ws; + } + + private HoodieTableMetaClient createMetaClient() throws IOException { + return HoodieTestUtils.init(tempDir.toAbsolutePath().toString()); + } + + private HoodieTableMetaClient createMetaClient(HoodieTableVersion version) throws IOException { + return HoodieTestUtils.init(tempDir.toAbsolutePath().toString(), HoodieTableType.COPY_ON_WRITE, version); + } + + // ========== Tests ========== + + @Test + public void testNoValidatorsConfigured() throws IOException { + TypedProperties props = new TypedProperties(); + List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 100, 0)); + + assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators( + props, "20260320120000000", writeStatuses, + new HashMap<>(), createMetaClient())); + } + + @Test + public void testEmptyValidatorString() throws IOException { + TypedProperties props = new TypedProperties(); + props.setProperty(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), ""); + List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 100, 0)); + + assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators( + props, "20260320120000000", writeStatuses, + new HashMap<>(), createMetaClient())); + } + + @Test + public void testValidValidatorFirstCommitPasses() throws IOException { + TypedProperties props = propsWithValidator( + "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator"); + + List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 100, 0)); + Map extraMeta = new HashMap<>(); + extraMeta.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "events,0:100"); + + // First commit (no previous metadata on timeline) — validator should skip and pass + assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators( + props, "20260320120000000", writeStatuses, extraMeta, createMetaClient())); + } + + @Test + public void testInvalidValidatorClassThrows() throws IOException { + TypedProperties props = propsWithValidator("com.nonexistent.FakeValidator"); + List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 100, 0)); + + assertThrows(HoodieValidationException.class, + () -> SparkStreamerValidatorUtils.runValidators( + props, "20260320120000000", writeStatuses, new HashMap<>(), createMetaClient())); + } + + @Test + public void testMultipleValidators() throws IOException { + TypedProperties props = propsWithValidator( + "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator," + + "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator"); + + List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 100, 0)); + Map extraMeta = new HashMap<>(); + extraMeta.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "events,0:100"); + + assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators( + props, "20260320120000000", writeStatuses, extraMeta, createMetaClient())); + } + + @Test + public void testValidatorWithWhitespaceInClassNames() throws IOException { + TypedProperties props = propsWithValidator( + " org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator , "); + + List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 100, 0)); + + assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators( + props, "20260320120000000", writeStatuses, new HashMap<>(), createMetaClient())); + } + + @Test + public void testNullExtraMetadataHandled() throws IOException { + TypedProperties props = propsWithValidator( + "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator"); + + List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 100, 0)); + + assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators( + props, "20260320120000000", writeStatuses, null, createMetaClient())); + } + + @Test + public void testMultipleWriteStatusesAggregated() throws IOException { + TypedProperties props = propsWithValidator( + "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator"); + + List writeStatuses = new ArrayList<>(); + writeStatuses.add(buildWriteStatus("p1", 60, 0)); + writeStatuses.add(buildWriteStatus("p2", 40, 0)); + + Map extraMeta = new HashMap<>(); + extraMeta.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "events,0:100"); + + assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators( + props, "20260320120000000", writeStatuses, extraMeta, createMetaClient())); + } + + @Test + public void testEmptyWriteStatuses() throws IOException { + TypedProperties props = propsWithValidator( + "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator"); + + List writeStatuses = Collections.emptyList(); + Map extraMeta = new HashMap<>(); + extraMeta.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "events,0:100"); + + assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators( + props, "20260320120000000", writeStatuses, extraMeta, createMetaClient())); + } + + @Test + public void testValidationExceptionPreservedAcrossValidators() throws IOException { + TypedProperties props = propsWithValidator( + "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator," + + "com.nonexistent.FakeValidator"); + + List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 100, 0)); + + HoodieValidationException ex = assertThrows(HoodieValidationException.class, + () -> SparkStreamerValidatorUtils.runValidators( + props, "20260320120000000", writeStatuses, new HashMap<>(), createMetaClient())); + assertTrue(ex.getMessage().contains("FakeValidator")); + } + + @ParameterizedTest + @ValueSource(strings = { + StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, + StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2 + }) + public void testSecondCommitMatchingOffsetsPasses(String checkpointKey) throws Exception { + TypedProperties props = propsWithValidator( + "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator"); + + // Create table with a previous committed instant: offset 0 -> 500 + HoodieTableMetaClient metaClient = createMetaClient(); + HoodieCommitMetadata prevMeta = new HoodieCommitMetadata(); + prevMeta.addMetadata(checkpointKey, "events,0:500"); + HoodieTestTable.of(metaClient).addCommit("20260320110000000", Option.of(prevMeta)); + + // Second commit: offset 500 -> 600, 100 records written — matches diff exactly + Map extraMeta = new HashMap<>(); + extraMeta.put(checkpointKey, "events,0:600"); + List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 100, 0)); + + assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators( + props, "20260320120000000", writeStatuses, extraMeta, metaClient)); + } + + @ParameterizedTest + @ValueSource(strings = { + StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, + StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2 + }) + public void testSecondCommitDataLossDetected(String checkpointKey) throws Exception { + TypedProperties props = propsWithValidator( + "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator"); + + // Create table with a previous committed instant: offset 0 -> 1000 + HoodieTableMetaClient metaClient = createMetaClient(); + HoodieCommitMetadata prevMeta = new HoodieCommitMetadata(); + prevMeta.addMetadata(checkpointKey, "events,0:1000"); + HoodieTestTable.of(metaClient).addCommit("20260320110000000", Option.of(prevMeta)); + + // Second commit: offset 1000 -> 2000 (diff=1000) but only 500 records written — data loss + Map extraMeta = new HashMap<>(); + extraMeta.put(checkpointKey, "events,0:2000"); + List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 500, 0)); + + assertThrows(HoodieValidationException.class, + () -> SparkStreamerValidatorUtils.runValidators( + props, "20260320120000000", writeStatuses, extraMeta, metaClient)); + } + + @Test + public void testV2CheckpointKeyOnTableVersionEightFires() throws Exception { + // Verifies the validator actually fires on a writeTableVersion=8 table that uses the + // V2 checkpoint key — i.e. the auto-resolution in StreamingOffsetValidator picks up V2 + // and runs the comparison instead of silently skipping. + TypedProperties props = propsWithValidator( + "org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator"); + + HoodieTableMetaClient metaClient = createMetaClient(HoodieTableVersion.EIGHT); + HoodieCommitMetadata prevMeta = new HoodieCommitMetadata(); + prevMeta.addMetadata(StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2, "events,0:1000"); + HoodieTestTable.of(metaClient).addCommit("20260320110000000", Option.of(prevMeta)); + + // Offset diff = 1000 but only 200 records written — must fail + Map extraMeta = new HashMap<>(); + extraMeta.put(StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2, "events,0:2000"); + List writeStatuses = Collections.singletonList(buildWriteStatus("p1", 200, 0)); + + assertThrows(HoodieValidationException.class, + () -> SparkStreamerValidatorUtils.runValidators( + props, "20260320120000000", writeStatuses, extraMeta, metaClient)); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkValidationContext.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkValidationContext.java new file mode 100644 index 0000000000000..7f94262e98c43 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkValidationContext.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; +import org.apache.hudi.common.util.Option; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for {@link SparkValidationContext}. + */ +public class TestSparkValidationContext { + + private static HoodieWriteStat buildStat(long inserts, long updates) { + HoodieWriteStat stat = new HoodieWriteStat(); + stat.setNumInserts(inserts); + stat.setNumUpdateWrites(updates); + stat.setPartitionPath("partition1"); + return stat; + } + + @Test + public void testBasicProperties() { + HoodieCommitMetadata metadata = new HoodieCommitMetadata(); + metadata.addMetadata("key1", "value1"); + List writeStats = Collections.singletonList(buildStat(100, 50)); + + SparkValidationContext ctx = new SparkValidationContext( + "20260320120000000", + Option.of(metadata), + Option.of(writeStats), + Option.empty()); + + assertEquals("20260320120000000", ctx.getInstantTime()); + assertTrue(ctx.getCommitMetadata().isPresent()); + assertTrue(ctx.getWriteStats().isPresent()); + assertEquals(1, ctx.getWriteStats().get().size()); + } + + @Test + public void testRecordCounting() { + List writeStats = Arrays.asList( + buildStat(100, 50), // partition1: 100 inserts, 50 updates + buildStat(200, 30)); // partition2: 200 inserts, 30 updates + + SparkValidationContext ctx = new SparkValidationContext( + "20260320120000000", + Option.of(new HoodieCommitMetadata()), + Option.of(writeStats), + Option.empty()); + + assertEquals(300, ctx.getTotalInsertRecordsWritten()); + assertEquals(80, ctx.getTotalUpdateRecordsWritten()); + assertEquals(380, ctx.getTotalRecordsWritten()); + } + + @Test + public void testFirstCommitDetection() { + // No previous commit metadata -> first commit + SparkValidationContext ctx = new SparkValidationContext( + "20260320120000000", + Option.of(new HoodieCommitMetadata()), + Option.of(Collections.emptyList()), + Option.empty()); + + assertTrue(ctx.isFirstCommit()); + } + + @Test + public void testNotFirstCommitWhenPreviousExists() { + HoodieCommitMetadata prevMeta = new HoodieCommitMetadata(); + + SparkValidationContext ctx = new SparkValidationContext( + "20260320120000000", + Option.of(new HoodieCommitMetadata()), + Option.of(Collections.emptyList()), + Option.of(prevMeta)); + + assertFalse(ctx.isFirstCommit()); + } + + @Test + public void testExtraMetadataAccess() { + HoodieCommitMetadata metadata = new HoodieCommitMetadata(); + metadata.addMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "events,0:1000"); + metadata.addMetadata("custom.key", "custom_value"); + + SparkValidationContext ctx = new SparkValidationContext( + "20260320120000000", + Option.of(metadata), + Option.of(Collections.emptyList()), + Option.empty()); + + assertEquals("events,0:1000", + ctx.getExtraMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1).get()); + assertEquals("custom_value", ctx.getExtraMetadata("custom.key").get()); + assertFalse(ctx.getExtraMetadata("nonexistent.key").isPresent()); + } + + @Test + public void testPreviousCommitMetadataAccess() { + HoodieCommitMetadata prevMeta = new HoodieCommitMetadata(); + prevMeta.addMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "events,0:500"); + + SparkValidationContext ctx = new SparkValidationContext( + "20260320120000000", + Option.of(new HoodieCommitMetadata()), + Option.of(Collections.emptyList()), + Option.of(prevMeta)); + + assertTrue(ctx.getPreviousCommitMetadata().isPresent()); + assertEquals("events,0:500", + ctx.getPreviousCommitMetadata().get().getMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1)); + } + + @Test + public void testEmptyWriteStats() { + SparkValidationContext ctx = new SparkValidationContext( + "20260320120000000", + Option.of(new HoodieCommitMetadata()), + Option.empty(), + Option.empty()); + + assertEquals(0, ctx.getTotalRecordsWritten()); + assertEquals(0, ctx.getTotalInsertRecordsWritten()); + assertEquals(0, ctx.getTotalUpdateRecordsWritten()); + } +}