feat(common): add core pre-commit validation framework - Phase 1 #18068
feat(common): add core pre-commit validation framework - Phase 1 #18068shangxinli wants to merge 4 commits intoapache:masterfrom
Conversation
Implement engine-agnostic pre-commit validation framework in hudi-common that enables validators to access commit metadata, timeline, and write statistics across all write engines (Spark, Flink, Java). This is Phase 1 of a 3-phase implementation: - Phase 1 (this commit): Core framework in hudi-common - Phase 2: Flink-specific implementation - Phase 3: Spark/DeltaStreamer implementation Key components added: 1. BasePreCommitValidator - Abstract base class for all validators - Supports metadata-based validation - Engine-agnostic design 2. ValidationContext (interface) - Provides access to commit metadata, timeline, write stats - Engine-specific implementations provide concrete access - Abstracts engine details from validation logic 3. StreamingOffsetValidator - Base class for streaming offset validators - Compares source offset differences with record counts - Configurable tolerance and warn-only mode - Supports multiple checkpoint formats (Kafka, Flink, Pulsar, Kinesis) 4. CheckpointUtils - Multi-format checkpoint parsing utility - Supports DeltaStreamer Kafka format (Phase 1) - Extensible for Flink, Pulsar, Kinesis (future phases) - Offset difference calculation with edge case handling 5. Comprehensive unit tests - TestCheckpointUtils: 14 test cases - TestStreamingOffsetValidator: 9 test cases Configuration: - hoodie.precommit.validators.streaming.offset.tolerance.percentage (default: 0.0) - hoodie.precommit.validators.warn.only (default: false)
yihua
left a comment
There was a problem hiding this comment.
Thanks for contributing this! This PR adds a well-structured generalized pre-commit validation framework with test coverage for the checkpoint parsing logic. I left a few comments for clarification.
| * Phase 3: Spark-specific implementations in hudi-client/hudi-spark-client | ||
| */ | ||
| public abstract class BasePreCommitValidator { | ||
|
|
There was a problem hiding this comment.
There's already a validator framework under org.apache.hudi.client.validator in hudi-client/hudi-spark-client (e.g. SparkPreCommitValidator), with HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES to specify the validator implementation classes to use. Have you considered how this new BasePreCommitValidator will integrate with the existing SparkPreCommitValidator and SparkValidatorUtils.runValidators() in Phase 3?
There was a problem hiding this comment.
Added Javadoc explaining the plan: in Phase 3, SparkPreCommitValidator will be refactored to extend BasePreCommitValidator, and SparkValidatorUtils.runValidators() will be updated to invoke validateWithMetadata() for validators extending this class. Existing VALIDATOR_CLASS_NAMES config will continue to work.
| throw new IllegalArgumentException( | ||
| "Invalid checkpoint format. Expected: topic,partition:offset,... Got: " + checkpointStr); | ||
| } | ||
|
|
There was a problem hiding this comment.
The splits.length < 1 check is unreachable; String.split() always returns at least one element. Did you mean splits.length < 2 to validate that partition data is present (consistent with parseDeltaStreamerKafkaCheckpoint)?
There was a problem hiding this comment.
Fixed to splits.length < 2. Also added a test case for topic-only input.
| public interface ValidationContext { | ||
|
|
||
| /** | ||
| * Get the current commit instant time being validated. |
There was a problem hiding this comment.
This is a fairly large interface with 11 methods. Some of these (like getTotalInsertRecordsWritten, getTotalUpdateRecordsWritten) can be derived from getWriteStats(). Have you considered keeping the interface minimal (metadata + timeline + stats access) and providing the computed methods as defaults for common logic across engines?
There was a problem hiding this comment.
Good call. Slimmed the interface down to 6 core abstract methods (getInstantTime, getCommitMetadata, getWriteStats, getActiveTimeline, getPreviousCommitInstant, getPreviousCommitMetadata). The 5 computed methods are now default implementations derived from the core methods.
| * Phase 2: Flink-specific implementations in hudi-flink-datasource | ||
| * Phase 3: Spark-specific implementations in hudi-client/hudi-spark-client | ||
| */ | ||
| public abstract class BasePreCommitValidator { |
There was a problem hiding this comment.
If this new validator abstraction intended to be public and user-facing? If so, mark this @PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING) and the methods with @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)?
There was a problem hiding this comment.
Done. Added @PublicAPIClass(maturity = EVOLVING) on BasePreCommitValidator and ValidationContext, and @PublicAPIMethod(maturity = EVOLVING) on public/protected methods.
| protected boolean supportsMetadataValidation() { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
What's the reason of having this? Is exposing validate or validateWithMetadata not good enough?
There was a problem hiding this comment.
Agreed, it's unnecessary indirection. Removed supportsMetadataValidation() entirely — validateWithMetadata is sufficient.
| protected static final String TOLERANCE_PERCENTAGE_KEY = "hoodie.precommit.validators.streaming.offset.tolerance.percentage"; | ||
| protected static final String WARN_ONLY_MODE_KEY = "hoodie.precommit.validators.warn.only"; |
There was a problem hiding this comment.
Define the configs in HoodiePreCommitValidatorConfig so that they will be surfaces to the configs documentation on the Hudi website during the release process?
There was a problem hiding this comment.
Done. Added STREAMING_OFFSET_TOLERANCE_PERCENTAGE and WARN_ONLY_MODE as ConfigProperty entries in HoodiePreCommitValidatorConfig so they get surfaced to the website docs.
| LOG.info("Offset validation passed. Offset diff: {}, Records: {}, Deviation: {:.2f}% (within {}%)", | ||
| offsetDiff, recordsWritten, deviation, tolerancePercentage); |
There was a problem hiding this comment.
{:.2f} does not work with SLF4J. Could you double check the format here?
There was a problem hiding this comment.
Good catch. Fixed — using String.format("%.2f", deviation) with SLF4J {} placeholder.
| private double calculateDeviation(long offsetDiff, long recordsWritten) { | ||
| // Handle edge cases | ||
| if (offsetDiff == 0 && recordsWritten == 0) { | ||
| return 0.0; // Both zero - perfect match (no data processed) | ||
| } | ||
| if (offsetDiff == 0 || recordsWritten == 0) { | ||
| return 100.0; // One is zero - complete mismatch | ||
| } | ||
|
|
||
| long difference = Math.abs(offsetDiff - recordsWritten); | ||
| return (100.0 * difference) / offsetDiff; | ||
| } |
There was a problem hiding this comment.
Is this for append-only case? For upsert case with dedup and/or event-time ordering, the deviation could be legit.
There was a problem hiding this comment.
Good point. Added a note in the class Javadoc clarifying this is primarily for append-only ingestion. For upsert workloads with dedup or event-time ordering, users should configure a higher tolerance or use warn-only mode.
| * @return Map of partition → offset | ||
| * @throws IllegalArgumentException if format is invalid | ||
| */ | ||
| private static Map<Integer, Long> parseDeltaStreamerKafkaCheckpoint(String checkpointStr) { |
There was a problem hiding this comment.
There is already KafkaOffsetGen.CheckpointUtils#strToOffsets to parse the Kafka offset. Should that be removed and be consolidated to reuse this one?
There was a problem hiding this comment.
Added a Javadoc note about the duplication. Cannot consolidate directly right now since hudi-common cannot depend on Kafka client types (TopicPartition) from hudi-utilities. This method returns Map<Integer, Long> to avoid that dependency. Noted for future refactoring.
- Add @PublicAPIClass/@PublicAPIMethod annotations to BasePreCommitValidator and ValidationContext
- Add Javadoc on integration plan with existing SparkPreCommitValidator
- Remove unnecessary supportsMetadataValidation() method
- Slim ValidationContext from 11 abstract methods to 6 core + 5 default computed methods
- Add ConfigProperty entries to HoodiePreCommitValidatorConfig for website doc surfacing
- Fix SLF4J format string (was using Python-style {:.2f})
- Add Javadoc clarifying append-only vs upsert/dedup deviation
- Fix unreachable splits.length < 1 check to < 2 in extractTopicName
- Add consolidation note referencing KafkaOffsetGen.CheckpointUtils#strToOffsets
Describe the issue this Pull Request addresses
This PR implements Phase 1 of a pre-commit validation framework that enables validators to access commit metadata, timeline, and write statistics. This addresses the need for data quality validation before commits are finalized, particularly for detecting data loss in streaming ingestion scenarios.
Closes #18067
Summary and Changelog
Summary:
Adds an engine-agnostic pre-commit validation framework in hudi-common that provides validators with access to commit metadata, write statistics, and timeline information. This enables validators to perform sophisticated data quality checks, such as comparing streaming source offsets with actual record counts to detect data loss.
Changelog:
BasePreCommitValidatorabstract class as foundation for all validatorsValidationContextinterface to provide metadata access across enginesStreamingOffsetValidatorbase class for streaming offset validation with configurable tolerance and warn-only modeCheckpointUtilsutility for parsing and comparing multi-format streaming checkpoints (DeltaStreamer Kafka, Flink Kafka, Pulsar, Kinesis)TestCheckpointUtils(14 test cases) andTestStreamingOffsetValidator(9 test cases)Configuration properties introduced:
hoodie.precommit.validators.streaming.offset.tolerance.percentage(default: 0.0)hoodie.precommit.validators.warn.only(default: false)This is Phase 1 of a 3-phase implementation:
Impact
Public API Changes:
org.apache.hudi.client.validatorpackage:BasePreCommitValidator(abstract class)ValidationContext(interface)StreamingOffsetValidator(abstract class)org.apache.hudi.common.util.CheckpointUtilsUser-Facing Changes:
None in Phase 1. This provides the framework foundation; actual validator implementations will be added in Phase 2 (Flink) and Phase 3 (Spark/DeltaStreamer).
Performance Impact:
None. The framework is passive until validators are configured and implemented in future phases.
Risk Level
Risk Level: low
Justification:
Verification:
mvn clean install -pl hudi-common -am -DskipTestsDocumentation Update
Documentation needed for future phases:
When Phase 2 (Flink) and Phase 3 (Spark/DeltaStreamer) are implemented, the following documentation will be added:
For Phase 1:
No user-facing documentation needed as the framework is not yet exposed to users. Code-level documentation is complete with comprehensive Javadocs in all classes.
Contributor's checklist