diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
index 4d57ce18f3caa..0b0d4f6191bc0 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java
@@ -18,97 +18,58 @@
package org.apache.hudi.source.reader.function;
-import org.apache.hudi.client.model.HoodieFlinkRecord;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaCache;
-import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
-import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
-import org.apache.hudi.common.table.read.BufferedRecord;
-import org.apache.hudi.common.table.read.BufferedRecordMerger;
-import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
-import org.apache.hudi.common.table.read.BufferedRecords;
-import org.apache.hudi.common.table.read.DeleteContext;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
-import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.ConfigUtils;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.source.reader.BatchRecords;
import org.apache.hudi.source.reader.HoodieRecordWithPosition;
import org.apache.hudi.source.split.HoodieCdcSourceSplit;
import org.apache.hudi.source.split.HoodieSourceSplit;
-import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.HoodieStorageUtils;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.FilePathUtils;
-import org.apache.hudi.table.format.FlinkReaderContextFactory;
+import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.table.format.RecordIterators;
+import org.apache.hudi.table.format.cdc.CdcImageManager;
import org.apache.hudi.table.format.cdc.CdcInputFormat;
+import org.apache.hudi.table.format.cdc.CdcIterators;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
-import org.apache.hudi.util.AvroToRowDataConverters;
-import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
-import org.apache.hudi.util.RowDataProjection;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
import org.apache.hadoop.fs.Path;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
-import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
-
/**
* CDC reader function for source V2. Reads CDC splits ({@link HoodieCdcSourceSplit}) and
- * emits change-log {@link RowData} records tagged with the appropriate {@link RowKind}.
+ * emits change-log {@link RowData} records tagged with the appropriate {@link org.apache.flink.types.RowKind}.
*
*
The implementation mirrors the logic in {@link CdcInputFormat}, adapted for the
* {@link SplitReaderFunction} contract.
@@ -158,10 +119,9 @@ public RecordsWithSplitIds> read(HoodieSourceS
HoodieCdcSourceSplit cdcSplit = (HoodieCdcSourceSplit) split;
HoodieCDCSupplementalLoggingMode mode = OptionsResolver.getCDCSupplementalLoggingMode(conf);
- HoodieTableMetaClient client = getMetaClient();
- HoodieWriteConfig wConfig = getWriteConfig();
- ImageManager imageManager = new ImageManager(tableState.getRowType(), wConfig, this::getFileSliceIterator);
+ CdcImageManager imageManager = new CdcImageManager(
+ tableState.getRowType(), getWriteConfig(), this::getFileSliceIterator);
Function> recordIteratorFunc =
cdcFileSplit -> createRecordIteratorSafe(
@@ -169,10 +129,9 @@ public RecordsWithSplitIds> read(HoodieSourceS
cdcSplit.getMaxCompactionMemoryInBytes(),
cdcFileSplit,
mode,
- imageManager,
- client);
+ imageManager);
- currentIterator = new CdcFileSplitsIterator(cdcSplit.getChanges(), imageManager, recordIteratorFunc);
+ currentIterator = new CdcIterators.CdcFileSplitsIterator(cdcSplit.getChanges(), imageManager, recordIteratorFunc);
BatchRecords records = BatchRecords.forRecords(
split.splitId(), currentIterator, split.getFileOffset(), split.getConsumed());
records.seek(split.getConsumed());
@@ -212,10 +171,9 @@ private ClosableIterator createRecordIteratorSafe(
long maxCompactionMemoryInBytes,
HoodieCDCFileSplit fileSplit,
HoodieCDCSupplementalLoggingMode mode,
- ImageManager imageManager,
- HoodieTableMetaClient client) {
+ CdcImageManager imageManager) {
try {
- return createRecordIterator(tablePath, maxCompactionMemoryInBytes, fileSplit, mode, imageManager, client);
+ return createRecordIterator(tablePath, maxCompactionMemoryInBytes, fileSplit, mode, imageManager);
} catch (IOException e) {
throw new HoodieException("Failed to create CDC record iterator for split: " + fileSplit, e);
}
@@ -226,8 +184,7 @@ private ClosableIterator createRecordIterator(
long maxCompactionMemoryInBytes,
HoodieCDCFileSplit fileSplit,
HoodieCDCSupplementalLoggingMode mode,
- ImageManager imageManager,
- HoodieTableMetaClient client) throws IOException {
+ CdcImageManager imageManager) throws IOException {
final HoodieSchema tableSchema = HoodieSchema.parse(tableState.getTableSchema());
final HoodieSchema requiredSchema = HoodieSchema.parse(tableState.getRequiredSchema());
@@ -236,29 +193,33 @@ private ClosableIterator createRecordIterator(
ValidationUtils.checkState(fileSplit.getCdcFiles() != null && fileSplit.getCdcFiles().size() == 1,
"CDC file path should exist and be singleton for BASE_FILE_INSERT");
String path = new Path(tablePath, fileSplit.getCdcFiles().get(0)).toString();
- return new AddBaseFileIterator(getBaseFileIterator(path));
+ return new CdcIterators.AddBaseFileIterator(getBaseFileIterator(path));
}
case BASE_FILE_DELETE: {
ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
"Before file slice should exist for BASE_FILE_DELETE");
- FileSlice fileSlice = fileSplit.getBeforeFileSlice().get();
- MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(tablePath, fileSlice, maxCompactionMemoryInBytes);
- return new RemoveBaseFileIterator(tableState.getRequiredRowType(), tableState.getRequiredPositions(), getFileSliceIterator(inputSplit));
+ MergeOnReadInputSplit inputSplit = CdcIterators.fileSlice2Split(
+ tablePath, fileSplit.getBeforeFileSlice().get(), maxCompactionMemoryInBytes);
+ return new CdcIterators.RemoveBaseFileIterator(
+ tableState.getRequiredRowType(), tableState.getRequiredPositions(), getFileSliceIterator(inputSplit));
}
case AS_IS: {
HoodieSchema dataSchema = HoodieSchemaUtils.removeMetadataFields(tableSchema);
HoodieSchema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
switch (mode) {
case DATA_BEFORE_AFTER:
- return new BeforeAfterImageIterator(
- getHadoopConf(), tablePath, tableSchema, requiredSchema, tableState.getRequiredRowType(), cdcSchema, fileSplit);
+ return new CdcIterators.BeforeAfterImageIterator(
+ getHadoopConf(), tablePath, tableSchema, requiredSchema,
+ tableState.getRequiredRowType(), cdcSchema, fileSplit);
case DATA_BEFORE:
- return new BeforeImageIterator(
- conf, getHadoopConf(), tablePath, tableSchema, requiredSchema, tableState.getRequiredRowType(),
+ return new CdcIterators.BeforeImageIterator(
+ getHadoopConf(), tablePath, tableSchema, requiredSchema,
+ tableState.getRequiredRowType(), tableState.getRequiredPositions(),
maxCompactionMemoryInBytes, cdcSchema, fileSplit, imageManager);
case OP_KEY_ONLY:
- return new RecordKeyImageIterator(
- conf, getHadoopConf(), tablePath, tableSchema, requiredSchema, tableState.getRequiredRowType(),
+ return new CdcIterators.RecordKeyImageIterator(
+ getHadoopConf(), tablePath, tableSchema, requiredSchema,
+ tableState.getRequiredRowType(), tableState.getRequiredPositions(),
maxCompactionMemoryInBytes, cdcSchema, fileSplit, imageManager);
default:
throw new AssertionError("Unexpected CDC supplemental logging mode: " + mode);
@@ -268,16 +229,17 @@ conf, getHadoopConf(), tablePath, tableSchema, requiredSchema, tableState.getReq
ValidationUtils.checkState(fileSplit.getCdcFiles() != null && fileSplit.getCdcFiles().size() == 1,
"CDC file path should exist and be singleton for LOG_FILE");
String logFilePath = new Path(tablePath, fileSplit.getCdcFiles().get(0)).toString();
- MergeOnReadInputSplit split = CdcInputFormat.singleLogFile2Split(tablePath, logFilePath, maxCompactionMemoryInBytes);
+ MergeOnReadInputSplit split = CdcIterators.singleLogFile2Split(tablePath, logFilePath, maxCompactionMemoryInBytes);
ClosableIterator> recordIterator = getFileSliceHoodieRecordIterator(split);
- return new DataLogFileIterator(
- maxCompactionMemoryInBytes, imageManager, fileSplit, tableSchema, tableState.getRequiredRowType(), tableState.getRequiredPositions(),
- recordIterator, client, getWriteConfig());
+ return new CdcIterators.DataLogFileIterator(
+ maxCompactionMemoryInBytes, imageManager, fileSplit, tableSchema,
+ tableState.getRequiredRowType(), tableState.getRequiredPositions(),
+ recordIterator, getMetaClient(), getWriteConfig());
}
case REPLACE_COMMIT: {
- return new ReplaceCommitIterator(
- conf, tablePath, tableState.getRequiredRowType(), tableState.getRequiredPositions(), maxCompactionMemoryInBytes,
- fileSplit, this::getFileSliceIterator);
+ return new CdcIterators.ReplaceCommitIterator(
+ tablePath, tableState.getRequiredRowType(), tableState.getRequiredPositions(),
+ maxCompactionMemoryInBytes, fileSplit, this::getFileSliceIterator);
}
default:
throw new AssertionError("Unexpected CDC file split infer case: " + fileSplit.getCdcInferCase());
@@ -355,689 +317,10 @@ private static FileSlice buildFileSlice(MergeOnReadInputSplit split) {
.orElse(Collections.emptyList()));
}
- private static int[] computeRequiredPositions(RowType rowType, RowType requiredRowType) {
- List allNames = rowType.getFieldNames();
- return requiredRowType.getFieldNames().stream()
- .map(allNames::indexOf)
- .mapToInt(i -> i)
- .toArray();
- }
-
private HoodieTableMetaClient getMetaClient() {
if (metaClient == null) {
metaClient = StreamerUtil.metaClientForReader(conf, getHadoopConf());
}
return metaClient;
}
-
- // -------------------------------------------------------------------------
- // Inner iterators (adapted from CdcInputFormat inner classes)
- // -------------------------------------------------------------------------
-
- /** Iterates over an ordered list of {@link HoodieCDCFileSplit}s, delegating record reading to a factory. */
- private static class CdcFileSplitsIterator implements ClosableIterator {
- private ImageManager imageManager;
- private final Iterator fileSplitIterator;
- private final Function> recordIteratorFunc;
- private ClosableIterator recordIterator;
-
- CdcFileSplitsIterator(
- HoodieCDCFileSplit[] changes,
- ImageManager imageManager,
- Function> recordIteratorFunc) {
- this.fileSplitIterator = Arrays.asList(changes).iterator();
- this.imageManager = imageManager;
- this.recordIteratorFunc = recordIteratorFunc;
- }
-
- @Override
- public boolean hasNext() {
- if (recordIterator != null) {
- if (recordIterator.hasNext()) {
- return true;
- } else {
- recordIterator.close();
- recordIterator = null;
- }
- }
- if (fileSplitIterator.hasNext()) {
- recordIterator = recordIteratorFunc.apply(fileSplitIterator.next());
- return recordIterator.hasNext();
- }
- return false;
- }
-
- @Override
- public RowData next() {
- return recordIterator.next();
- }
-
- @Override
- public void close() {
- if (recordIterator != null) {
- recordIterator.close();
- }
- if (imageManager != null) {
- imageManager.close();
- imageManager = null;
- }
- }
- }
-
- /** Wraps a base-file parquet iterator and marks every record as {@link RowKind#INSERT}. */
- private static class AddBaseFileIterator implements ClosableIterator {
- private ClosableIterator nested;
- private RowData currentRecord;
-
- AddBaseFileIterator(ClosableIterator nested) {
- this.nested = nested;
- }
-
- @Override
- public boolean hasNext() {
- if (nested.hasNext()) {
- currentRecord = nested.next();
- currentRecord.setRowKind(RowKind.INSERT);
- return true;
- }
- return false;
- }
-
- @Override
- public RowData next() {
- return currentRecord;
- }
-
- @Override
- public void close() {
- if (nested != null) {
- nested.close();
- nested = null;
- }
- }
- }
-
- /** Wraps a file-slice iterator and marks every record as {@link RowKind#DELETE}, with projection. */
- private static class RemoveBaseFileIterator implements ClosableIterator {
- private ClosableIterator nested;
- private final RowDataProjection projection;
-
- RemoveBaseFileIterator(RowType requiredRowType, int[] requiredPositions, ClosableIterator iterator) {
- this.nested = iterator;
- this.projection = RowDataProjection.instance(requiredRowType, requiredPositions);
- }
-
- @Override
- public boolean hasNext() {
- return nested.hasNext();
- }
-
- @Override
- public RowData next() {
- RowData row = nested.next();
- row.setRowKind(RowKind.DELETE);
- return projection.project(row);
- }
-
- @Override
- public void close() {
- if (nested != null) {
- nested.close();
- nested = null;
- }
- }
- }
-
- /**
- * Handles the {@code LOG_FILE} CDC inference case: compares records from the log file
- * against before-image snapshots to emit INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE events.
- */
- private static class DataLogFileIterator implements ClosableIterator {
- private final HoodieSchema tableSchema;
- private final long maxCompactionMemoryInBytes;
- private final ImageManager imageManager;
- private final RowDataProjection projection;
- private final BufferedRecordMerger recordMerger;
- private final ClosableIterator> logRecordIterator;
- private final DeleteContext deleteContext;
- private final HoodieReaderContext readerContext;
- private final String[] orderingFields;
- private final TypedProperties props;
-
- private ExternalSpillableMap beforeImages;
- private RowData currentImage;
- private RowData sideImage;
-
- DataLogFileIterator(
- long maxCompactionMemoryInBytes,
- ImageManager imageManager,
- HoodieCDCFileSplit cdcFileSplit,
- HoodieSchema tableSchema,
- RowType requiredRowType,
- int[] requiredPositions,
- ClosableIterator> logRecordIterator,
- HoodieTableMetaClient metaClient,
- HoodieWriteConfig writeConfig) throws IOException {
- this.tableSchema = tableSchema;
- this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
- this.imageManager = imageManager;
- this.projection = HoodieSchemaConverter.convertToRowType(tableSchema).equals(requiredRowType)
- ? null : RowDataProjection.instance(requiredRowType, requiredPositions);
- this.props = writeConfig.getProps();
- this.readerContext = new FlinkReaderContextFactory(metaClient).getContext();
- readerContext.initRecordMerger(props);
- this.orderingFields = ConfigUtils.getOrderingFields(props);
- this.recordMerger = BufferedRecordMergerFactory.create(
- readerContext,
- readerContext.getMergeMode(),
- false,
- Option.of(writeConfig.getRecordMerger()),
- tableSchema,
- Option.ofNullable(Pair.of(metaClient.getTableConfig().getPayloadClass(), writeConfig.getPayloadClass())),
- props,
- metaClient.getTableConfig().getPartialUpdateMode());
- this.logRecordIterator = logRecordIterator;
- this.deleteContext = new DeleteContext(props, tableSchema).withReaderSchema(tableSchema);
- initImages(cdcFileSplit, writeConfig);
- }
-
- private void initImages(HoodieCDCFileSplit fileSplit, HoodieWriteConfig writeConfig) throws IOException {
- if (fileSplit.getBeforeFileSlice().isPresent() && !fileSplit.getBeforeFileSlice().get().isEmpty()) {
- this.beforeImages = this.imageManager.getOrLoadImages(
- maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
- } else {
- this.beforeImages = FormatUtils.spillableMap(writeConfig, maxCompactionMemoryInBytes, getClass().getSimpleName());
- }
- }
-
- @Override
- public boolean hasNext() {
- if (sideImage != null) {
- currentImage = sideImage;
- sideImage = null;
- return true;
- }
- while (logRecordIterator.hasNext()) {
- HoodieRecord record = logRecordIterator.next();
- RowData existed = imageManager.removeImageRecord(record.getRecordKey(), beforeImages);
- if (isDelete(record)) {
- if (existed != null) {
- existed.setRowKind(RowKind.DELETE);
- currentImage = existed;
- return true;
- }
- } else {
- if (existed == null) {
- RowData newRow = record.getData();
- newRow.setRowKind(RowKind.INSERT);
- currentImage = newRow;
- return true;
- } else {
- HoodieOperation operation = HoodieOperation.fromValue(existed.getRowKind().toByteValue());
- HoodieRecord historyRecord = new HoodieFlinkRecord(record.getKey(), operation, existed);
- HoodieRecord merged = mergeRowWithLog(historyRecord, record).get();
- if (merged.getData() != existed) {
- existed.setRowKind(RowKind.UPDATE_BEFORE);
- currentImage = existed;
- RowData mergedRow = merged.getData();
- mergedRow.setRowKind(RowKind.UPDATE_AFTER);
- imageManager.updateImageRecord(record.getRecordKey(), beforeImages, mergedRow);
- sideImage = mergedRow;
- return true;
- }
- }
- }
- }
- return false;
- }
-
- @Override
- public RowData next() {
- return projection != null ? projection.project(currentImage) : currentImage;
- }
-
- @Override
- public void close() {
- logRecordIterator.close();
- imageManager.close();
- }
-
- @SuppressWarnings("unchecked")
- private Option> mergeRowWithLog(
- HoodieRecord historyRecord, HoodieRecord newRecord) {
- try {
- BufferedRecord histBuf = BufferedRecords.fromHoodieRecord(
- historyRecord, tableSchema, readerContext.getRecordContext(), props, orderingFields, deleteContext);
- BufferedRecord newBuf = BufferedRecords.fromHoodieRecord(
- newRecord, tableSchema, readerContext.getRecordContext(), props, orderingFields, deleteContext);
- BufferedRecord merged = recordMerger.finalMerge(histBuf, newBuf);
- return Option.ofNullable(readerContext.getRecordContext()
- .constructHoodieRecord(merged, historyRecord.getPartitionPath()));
- } catch (IOException e) {
- throw new HoodieIOException("Merge base and delta payloads exception", e);
- }
- }
-
- private boolean isDelete(HoodieRecord record) {
- return record.isDelete(deleteContext, CollectionUtils.emptyProps());
- }
- }
-
- /**
- * Base iterator for CDC log files stored with supplemental logging (AS_IS inference case).
- * Reads {@link HoodieCDCLogRecordIterator} and resolves before/after images using
- * subclass-specific logic.
- */
- private abstract static class BaseImageIterator implements ClosableIterator {
- private final HoodieSchema requiredSchema;
- private final int[] requiredPos;
- private final GenericRecordBuilder recordBuilder;
- private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
- private HoodieCDCLogRecordIterator cdcItr;
-
- private GenericRecord cdcRecord;
- private RowData sideImage;
- private RowData currentImage;
-
- BaseImageIterator(
- org.apache.hadoop.conf.Configuration hadoopConf,
- String tablePath,
- HoodieSchema tableSchema,
- HoodieSchema requiredSchema,
- RowType requiredRowType,
- HoodieSchema cdcSchema,
- HoodieCDCFileSplit fileSplit) {
- this.requiredSchema = requiredSchema;
- this.requiredPos = computeRequiredPos(tableSchema, requiredSchema);
- this.recordBuilder = new GenericRecordBuilder(requiredSchema.getAvroSchema());
- this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType);
-
- StoragePath hadoopTablePath = new StoragePath(tablePath);
- HoodieStorage storage = HoodieStorageUtils.getStorage(
- tablePath, HadoopFSUtils.getStorageConf(hadoopConf));
- HoodieLogFile[] cdcLogFiles = fileSplit.getCdcFiles().stream()
- .map(cdcFile -> {
- try {
- return new HoodieLogFile(storage.getPathInfo(new StoragePath(hadoopTablePath, cdcFile)));
- } catch (IOException e) {
- throw new HoodieIOException("Failed to get file status for CDC log: " + cdcFile, e);
- }
- })
- .toArray(HoodieLogFile[]::new);
- this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles, cdcSchema);
- }
-
- private static int[] computeRequiredPos(HoodieSchema tableSchema, HoodieSchema requiredSchema) {
- HoodieSchema dataSchema = HoodieSchemaUtils.removeMetadataFields(tableSchema);
- List fields = dataSchema.getFields().stream()
- .map(HoodieSchemaField::name)
- .collect(Collectors.toList());
- return requiredSchema.getFields().stream()
- .map(f -> fields.indexOf(f.name()))
- .mapToInt(i -> i)
- .toArray();
- }
-
- @Override
- public boolean hasNext() {
- if (sideImage != null) {
- currentImage = sideImage;
- sideImage = null;
- return true;
- } else if (cdcItr.hasNext()) {
- cdcRecord = (GenericRecord) cdcItr.next();
- String op = String.valueOf(cdcRecord.get(0));
- resolveImage(op);
- return true;
- }
- return false;
- }
-
- protected abstract RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord);
-
- protected abstract RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord);
-
- @Override
- public RowData next() {
- return currentImage;
- }
-
- @Override
- public void close() {
- if (cdcItr != null) {
- cdcItr.close();
- cdcItr = null;
- }
- }
-
- private void resolveImage(String op) {
- switch (op) {
- case "i":
- currentImage = getAfterImage(RowKind.INSERT, cdcRecord);
- break;
- case "u":
- currentImage = getBeforeImage(RowKind.UPDATE_BEFORE, cdcRecord);
- sideImage = getAfterImage(RowKind.UPDATE_AFTER, cdcRecord);
- break;
- case "d":
- currentImage = getBeforeImage(RowKind.DELETE, cdcRecord);
- break;
- default:
- throw new AssertionError("Unexpected CDC operation: " + op);
- }
- }
-
- protected RowData resolveAvro(RowKind rowKind, GenericRecord avroRecord) {
- GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
- avroRecord, requiredSchema, requiredPos, recordBuilder);
- RowData resolved = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
- resolved.setRowKind(rowKind);
- return resolved;
- }
- }
-
- /** Reads CDC log files that contain both before and after images ({@code DATA_BEFORE_AFTER} mode). */
- private static class BeforeAfterImageIterator extends BaseImageIterator {
- BeforeAfterImageIterator(
- org.apache.hadoop.conf.Configuration hadoopConf,
- String tablePath,
- HoodieSchema tableSchema,
- HoodieSchema requiredSchema,
- RowType requiredRowType,
- HoodieSchema cdcSchema,
- HoodieCDCFileSplit fileSplit) {
- super(hadoopConf, tablePath, tableSchema, requiredSchema, requiredRowType, cdcSchema, fileSplit);
- }
-
- @Override
- protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
- return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(3));
- }
-
- @Override
- protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) {
- return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
- }
- }
-
- /**
- * Reads CDC log files containing op + key + before_image ({@code DATA_BEFORE} mode).
- * The after-image is loaded from the after file-slice via the {@link ImageManager}.
- */
- private static class BeforeImageIterator extends BaseImageIterator {
- protected ExternalSpillableMap afterImages;
- protected final long maxCompactionMemoryInBytes;
- protected final RowDataProjection projection;
- protected final ImageManager imageManager;
-
- BeforeImageIterator(
- org.apache.flink.configuration.Configuration flinkConf,
- org.apache.hadoop.conf.Configuration hadoopConf,
- String tablePath,
- HoodieSchema tableSchema,
- HoodieSchema requiredSchema,
- RowType requiredRowType,
- long maxCompactionMemoryInBytes,
- HoodieSchema cdcSchema,
- HoodieCDCFileSplit fileSplit,
- ImageManager imageManager) throws IOException {
- super(hadoopConf, tablePath, tableSchema, requiredSchema, requiredRowType, cdcSchema, fileSplit);
- this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
- this.projection = RowDataProjection.instance(requiredRowType,
- computePositions(tableSchema, requiredRowType));
- this.imageManager = imageManager;
- initImages(fileSplit);
- }
-
- protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException {
- ValidationUtils.checkState(fileSplit.getAfterFileSlice().isPresent(),
- "Current file slice does not exist for instant: " + fileSplit.getInstant());
- this.afterImages = imageManager.getOrLoadImages(
- maxCompactionMemoryInBytes, fileSplit.getAfterFileSlice().get());
- }
-
- @Override
- protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
- String recordKey = cdcRecord.get(1).toString();
- RowData row = imageManager.getImageRecord(recordKey, afterImages, rowKind);
- row.setRowKind(rowKind);
- return projection.project(row);
- }
-
- @Override
- protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) {
- return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
- }
-
- private static int[] computePositions(HoodieSchema tableSchema, RowType requiredRowType) {
- List allFields = tableSchema.getFields().stream()
- .map(HoodieSchemaField::name)
- .collect(Collectors.toList());
- return requiredRowType.getFieldNames().stream()
- .map(allFields::indexOf)
- .mapToInt(i -> i)
- .toArray();
- }
- }
-
- /**
- * Reads CDC log files containing only op + key ({@code OP_KEY_ONLY} mode).
- * Both before and after images are loaded from file-slice snapshots via {@link ImageManager}.
- */
- private static class RecordKeyImageIterator extends BeforeImageIterator {
- protected ExternalSpillableMap beforeImages;
-
- RecordKeyImageIterator(
- org.apache.flink.configuration.Configuration flinkConf,
- org.apache.hadoop.conf.Configuration hadoopConf,
- String tablePath,
- HoodieSchema tableSchema,
- HoodieSchema requiredSchema,
- RowType requiredRowType,
- long maxCompactionMemoryInBytes,
- HoodieSchema cdcSchema,
- HoodieCDCFileSplit fileSplit,
- ImageManager imageManager) throws IOException {
- super(flinkConf, hadoopConf, tablePath, tableSchema, requiredSchema, requiredRowType,
- maxCompactionMemoryInBytes, cdcSchema, fileSplit, imageManager);
- }
-
- @Override
- protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException {
- super.initImages(fileSplit);
- ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
- "Before file slice does not exist for instant: " + fileSplit.getInstant());
- this.beforeImages = imageManager.getOrLoadImages(
- maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
- }
-
- @Override
- protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) {
- String recordKey = cdcRecord.get(1).toString();
- RowData row = imageManager.getImageRecord(recordKey, beforeImages, rowKind);
- row.setRowKind(rowKind);
- return projection.project(row);
- }
- }
-
- /** Handles the {@code REPLACE_COMMIT} CDC inference case: emits all records from before-slice as DELETE. */
- private static class ReplaceCommitIterator implements ClosableIterator {
- private final ClosableIterator itr;
- private final RowDataProjection projection;
-
- ReplaceCommitIterator(
- org.apache.flink.configuration.Configuration flinkConf,
- String tablePath,
- RowType requiredRowType,
- int[] requiredPositions,
- long maxCompactionMemoryInBytes,
- HoodieCDCFileSplit fileSplit,
- Function> splitIteratorFunc) {
- ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
- "Before file slice does not exist for instant: " + fileSplit.getInstant());
- MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(
- tablePath, fileSplit.getBeforeFileSlice().get(), maxCompactionMemoryInBytes);
- this.itr = splitIteratorFunc.apply(inputSplit);
- this.projection = RowDataProjection.instance(requiredRowType, requiredPositions);
- }
-
- @Override
- public boolean hasNext() {
- return itr.hasNext();
- }
-
- @Override
- public RowData next() {
- RowData row = itr.next();
- row.setRowKind(RowKind.DELETE);
- return projection.project(row);
- }
-
- @Override
- public void close() {
- itr.close();
- }
- }
-
- // -------------------------------------------------------------------------
- // ImageManager - caches full-schema row images keyed by record key
- // -------------------------------------------------------------------------
-
- /**
- * Manages serialized before/after image snapshots for a file group, cached by instant time.
- * At most two versions (before and after) are kept in memory; older entries are spilled to disk.
- */
- private static class ImageManager implements AutoCloseable {
- private final HoodieWriteConfig writeConfig;
- private final RowDataSerializer serializer;
- private final Function> splitIteratorFunc;
- private final Map> cache;
-
- ImageManager(
- RowType rowType,
- HoodieWriteConfig writeConfig,
- Function> splitIteratorFunc) {
- this.serializer = new RowDataSerializer(rowType);
- this.writeConfig = writeConfig;
- this.splitIteratorFunc = splitIteratorFunc;
- this.cache = new TreeMap<>();
- }
-
- ExternalSpillableMap getOrLoadImages(
- long maxCompactionMemoryInBytes, FileSlice fileSlice) throws IOException {
- final String instant = fileSlice.getBaseInstantTime();
- if (cache.containsKey(instant)) {
- return cache.get(instant);
- }
- if (cache.size() > 1) {
- String oldest = cache.keySet().iterator().next();
- cache.remove(oldest).close();
- }
- ExternalSpillableMap images = loadImageRecords(maxCompactionMemoryInBytes, fileSlice);
- cache.put(instant, images);
- return images;
- }
-
- private ExternalSpillableMap loadImageRecords(
- long maxCompactionMemoryInBytes, FileSlice fileSlice) throws IOException {
- MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(
- writeConfig.getBasePath(), fileSlice, maxCompactionMemoryInBytes);
- ExternalSpillableMap imageRecordsMap =
- FormatUtils.spillableMap(writeConfig, maxCompactionMemoryInBytes, getClass().getSimpleName());
- try (ClosableIterator itr = splitIteratorFunc.apply(inputSplit)) {
- while (itr.hasNext()) {
- RowData row = itr.next();
- String recordKey = row.getString(HOODIE_RECORD_KEY_COL_POS).toString();
- ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
- serializer.serialize(row, new BytesArrayOutputView(baos));
- imageRecordsMap.put(recordKey, baos.toByteArray());
- }
- }
- return imageRecordsMap;
- }
-
- RowData getImageRecord(
- String recordKey, ExternalSpillableMap imageCache, RowKind rowKind) {
- byte[] bytes = imageCache.get(recordKey);
- ValidationUtils.checkState(bytes != null,
- "Key " + recordKey + " does not exist in current file group image");
- try {
- RowData row = serializer.deserialize(new BytesArrayInputView(bytes));
- row.setRowKind(rowKind);
- return row;
- } catch (IOException e) {
- throw new HoodieException("Failed to deserialize image record for key: " + recordKey, e);
- }
- }
-
- void updateImageRecord(
- String recordKey, ExternalSpillableMap imageCache, RowData row) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
- try {
- serializer.serialize(row, new BytesArrayOutputView(baos));
- } catch (IOException e) {
- throw new HoodieException("Failed to serialize image record for key: " + recordKey, e);
- }
- imageCache.put(recordKey, baos.toByteArray());
- }
-
- RowData removeImageRecord(
- String recordKey, ExternalSpillableMap imageCache) {
- byte[] bytes = imageCache.remove(recordKey);
- if (bytes == null) {
- return null;
- }
- try {
- return serializer.deserialize(new BytesArrayInputView(bytes));
- } catch (IOException e) {
- throw new HoodieException("Failed to deserialize image record for key: " + recordKey, e);
- }
- }
-
- @Override
- public void close() {
- cache.values().forEach(ExternalSpillableMap::close);
- cache.clear();
- }
- }
-
- // -------------------------------------------------------------------------
- // I/O view adapters for RowDataSerializer
- // -------------------------------------------------------------------------
-
- private static final class BytesArrayInputView extends DataInputStream
- implements org.apache.flink.core.memory.DataInputView {
- BytesArrayInputView(byte[] data) {
- super(new ByteArrayInputStream(data));
- }
-
- @Override
- public void skipBytesToRead(int numBytes) throws IOException {
- while (numBytes > 0) {
- int skipped = skipBytes(numBytes);
- numBytes -= skipped;
- }
- }
- }
-
- private static final class BytesArrayOutputView extends DataOutputStream
- implements org.apache.flink.core.memory.DataOutputView {
- BytesArrayOutputView(ByteArrayOutputStream baos) {
- super(baos);
- }
-
- @Override
- public void skipBytesToWrite(int numBytes) throws IOException {
- for (int i = 0; i < numBytes; i++) {
- write(0);
- }
- }
-
- @Override
- public void write(org.apache.flink.core.memory.DataInputView source, int numBytes) throws IOException {
- byte[] buffer = new byte[numBytes];
- source.readFully(buffer);
- write(buffer);
- }
- }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java
new file mode 100644
index 0000000000000..cad323b7fb815
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cdc;
+
+import lombok.Getter;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.format.FormatUtils;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Function;
+
+import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
+
+/**
+ * Manages serialized before/after image snapshots for a CDC file group, cached by instant time.
+ *
+ * At most two versions (before and after) are kept in memory at once; older entries are
+ * evicted and spilled to disk via {@link ExternalSpillableMap}.
+ *
+ *
Also owns the I/O-view adapters ({@link BytesArrayInputView} / {@link BytesArrayOutputView})
+ * used for serialising {@link RowData} records into byte arrays.
+ */
+public class CdcImageManager implements AutoCloseable {
+ @Getter
+ private final HoodieWriteConfig writeConfig;
+ private final RowDataSerializer serializer;
+ private final Function> splitIteratorFunc;
+ private final Map> cache;
+
+ public CdcImageManager(
+ RowType rowType,
+ HoodieWriteConfig writeConfig,
+ Function> splitIteratorFunc) {
+ this.serializer = new RowDataSerializer(rowType);
+ this.writeConfig = writeConfig;
+ this.splitIteratorFunc = splitIteratorFunc;
+ this.cache = new TreeMap<>();
+ }
+
+ public ExternalSpillableMap getOrLoadImages(
+ long maxCompactionMemoryInBytes,
+ FileSlice fileSlice) throws IOException {
+ final String instant = fileSlice.getBaseInstantTime();
+ if (cache.containsKey(instant)) {
+ return cache.get(instant);
+ }
+ // evict the earliest version when more than two are cached (keep before & after)
+ if (cache.size() > 1) {
+ String oldest = cache.keySet().iterator().next();
+ cache.remove(oldest).close();
+ }
+ ExternalSpillableMap images = loadImageRecords(maxCompactionMemoryInBytes, fileSlice);
+ cache.put(instant, images);
+ return images;
+ }
+
+ private ExternalSpillableMap loadImageRecords(
+ long maxCompactionMemoryInBytes,
+ FileSlice fileSlice) throws IOException {
+ MergeOnReadInputSplit inputSplit = CdcIterators.fileSlice2Split(
+ writeConfig.getBasePath(), fileSlice, maxCompactionMemoryInBytes);
+ ExternalSpillableMap imageRecordsMap =
+ FormatUtils.spillableMap(writeConfig, maxCompactionMemoryInBytes, getClass().getSimpleName());
+ try (ClosableIterator itr = splitIteratorFunc.apply(inputSplit)) {
+ while (itr.hasNext()) {
+ RowData row = itr.next();
+ String recordKey = row.getString(HOODIE_RECORD_KEY_COL_POS).toString();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ serializer.serialize(row, new BytesArrayOutputView(baos));
+ imageRecordsMap.put(recordKey, baos.toByteArray());
+ }
+ }
+ return imageRecordsMap;
+ }
+
+ public RowData getImageRecord(
+ String recordKey,
+ ExternalSpillableMap imageCache,
+ RowKind rowKind) {
+ byte[] bytes = imageCache.get(recordKey);
+ ValidationUtils.checkState(bytes != null,
+ "Key " + recordKey + " does not exist in current file group image");
+ try {
+ RowData row = serializer.deserialize(new BytesArrayInputView(bytes));
+ row.setRowKind(rowKind);
+ return row;
+ } catch (IOException e) {
+ throw new HoodieException("Failed to deserialize image record for key: " + recordKey, e);
+ }
+ }
+
+ public void updateImageRecord(
+ String recordKey,
+ ExternalSpillableMap imageCache,
+ RowData row) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ try {
+ serializer.serialize(row, new BytesArrayOutputView(baos));
+ } catch (IOException e) {
+ throw new HoodieException("Failed to serialize image record for key: " + recordKey, e);
+ }
+ imageCache.put(recordKey, baos.toByteArray());
+ }
+
+ public RowData removeImageRecord(
+ String recordKey,
+ ExternalSpillableMap imageCache) {
+ byte[] bytes = imageCache.remove(recordKey);
+ if (bytes == null) {
+ return null;
+ }
+ try {
+ return serializer.deserialize(new BytesArrayInputView(bytes));
+ } catch (IOException e) {
+ throw new HoodieException("Failed to deserialize image record for key: " + recordKey, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ cache.values().forEach(ExternalSpillableMap::close);
+ cache.clear();
+ }
+
+ // -------------------------------------------------------------------------
+ // I/O view adapters for RowDataSerializer
+ // -------------------------------------------------------------------------
+
+ static final class BytesArrayInputView extends DataInputStream implements DataInputView {
+ BytesArrayInputView(byte[] data) {
+ super(new ByteArrayInputStream(data));
+ }
+
+ @Override
+ public void skipBytesToRead(int numBytes) throws IOException {
+ while (numBytes > 0) {
+ int skipped = skipBytes(numBytes);
+ numBytes -= skipped;
+ }
+ }
+ }
+
+ static final class BytesArrayOutputView extends DataOutputStream implements DataOutputView {
+ BytesArrayOutputView(ByteArrayOutputStream baos) {
+ super(baos);
+ }
+
+ @Override
+ public void skipBytesToWrite(int numBytes) throws IOException {
+ for (int i = 0; i < numBytes; i++) {
+ write(0);
+ }
+ }
+
+ @Override
+ public void write(DataInputView source, int numBytes) throws IOException {
+ byte[] buffer = new byte[numBytes];
+ source.readFully(buffer);
+ write(buffer);
+ }
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index 5aa29e3d99dcb..27bb2b3a56e8d 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -18,92 +18,39 @@
package org.apache.hudi.table.format.cdc;
-import org.apache.hudi.client.model.HoodieFlinkRecord;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaCache;
-import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
-import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
-import org.apache.hudi.common.table.read.BufferedRecord;
-import org.apache.hudi.common.table.read.BufferedRecordMerger;
-import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
-import org.apache.hudi.common.table.read.BufferedRecords;
-import org.apache.hudi.common.table.read.DeleteContext;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
-import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.ConfigUtils;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.source.ExpressionPredicates.Predicate;
-import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.HoodieStorageUtils;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.table.format.FlinkReaderContextFactory;
-import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
-import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.FlinkWriteClients;
-import org.apache.hudi.util.RowDataProjection;
-import org.apache.hudi.util.StreamerUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
import org.apache.hadoop.fs.Path;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
-import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
/**
* The base InputFormat class to read Hoodie data set as change logs.
*/
-@Slf4j
public class CdcInputFormat extends MergeOnReadInputFormat {
private static final long serialVersionUID = 1L;
@@ -121,17 +68,20 @@ private CdcInputFormat(
protected ClosableIterator initIterator(MergeOnReadInputSplit split) throws IOException {
if (split instanceof CdcInputSplit) {
HoodieCDCSupplementalLoggingMode mode = OptionsResolver.getCDCSupplementalLoggingMode(conf);
- ImageManager manager = new ImageManager(conf, tableState.getRowType(), this::getFileSliceIterator);
+ CdcImageManager manager = new CdcImageManager(
+ tableState.getRowType(),
+ FlinkWriteClients.getHoodieClientConfig(conf),
+ this::getFileSliceIterator);
Function> recordIteratorFunc =
- cdcFileSplit -> getRecordIteratorV2(split.getTablePath(), split.getMaxCompactionMemoryInBytes(), cdcFileSplit, mode, manager);
- return new CdcFileSplitsIterator((CdcInputSplit) split, manager, recordIteratorFunc);
+ cdcFileSplit -> getRecordIteratorSafe(split.getTablePath(), split.getMaxCompactionMemoryInBytes(), cdcFileSplit, mode, manager);
+ return new CdcIterators.CdcFileSplitsIterator(((CdcInputSplit) split).getChanges(), manager, recordIteratorFunc);
} else {
return super.initIterator(split);
}
}
/**
- * Returns the builder for {@link MergeOnReadInputFormat}.
+ * Returns the builder for {@link CdcInputFormat}.
*/
public static Builder builder() {
return new Builder();
@@ -139,22 +89,21 @@ public static Builder builder() {
private ClosableIterator getFileSliceIterator(MergeOnReadInputSplit split) {
try {
- // get full schema iterator.
final HoodieSchema schema = HoodieSchemaCache.intern(
HoodieSchema.parse(tableState.getTableSchema()));
- // before/after images have assumption of snapshot scan, so `emitDelete` is set as false
+ // before/after images use snapshot scan semantics, so emitDelete is false
return getSplitRowIterator(split, schema, schema, FlinkOptions.REALTIME_PAYLOAD_COMBINE, false);
} catch (IOException e) {
throw new HoodieException("Failed to create iterator for split: " + split, e);
}
}
- private ClosableIterator getRecordIteratorV2(
+ private ClosableIterator getRecordIteratorSafe(
String tablePath,
long maxCompactionMemoryInBytes,
HoodieCDCFileSplit fileSplit,
HoodieCDCSupplementalLoggingMode mode,
- ImageManager imageManager) {
+ CdcImageManager imageManager) {
try {
return getRecordIterator(tablePath, maxCompactionMemoryInBytes, fileSplit, mode, imageManager);
} catch (IOException e) {
@@ -167,41 +116,56 @@ private ClosableIterator getRecordIterator(
long maxCompactionMemoryInBytes,
HoodieCDCFileSplit fileSplit,
HoodieCDCSupplementalLoggingMode mode,
- ImageManager imageManager) throws IOException {
+ CdcImageManager imageManager) throws IOException {
switch (fileSplit.getCdcInferCase()) {
case BASE_FILE_INSERT:
ValidationUtils.checkState(fileSplit.getCdcFiles() != null && fileSplit.getCdcFiles().size() == 1,
"CDC file path should exist and be singleton");
String path = new Path(tablePath, fileSplit.getCdcFiles().get(0)).toString();
- return new AddBaseFileIterator(getBaseFileIterator(path));
+ return new CdcIterators.AddBaseFileIterator(getBaseFileIterator(path));
case BASE_FILE_DELETE:
ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
"Before file slice should exist");
FileSlice fileSlice = fileSplit.getBeforeFileSlice().get();
- MergeOnReadInputSplit inputSplit = fileSlice2Split(tablePath, fileSlice, maxCompactionMemoryInBytes);
- return new RemoveBaseFileIterator(tableState, getFileSliceIterator(inputSplit));
- case AS_IS:
- HoodieSchema dataSchema = HoodieSchemaUtils.removeMetadataFields(HoodieSchema.parse(tableState.getTableSchema()));
+ MergeOnReadInputSplit inputSplit = CdcIterators.fileSlice2Split(tablePath, fileSlice, maxCompactionMemoryInBytes);
+ return new CdcIterators.RemoveBaseFileIterator(
+ tableState.getRequiredRowType(), tableState.getRequiredPositions(), getFileSliceIterator(inputSplit));
+ case AS_IS: {
+ HoodieSchema tblSchema = HoodieSchema.parse(tableState.getTableSchema());
+ HoodieSchema reqSchema = HoodieSchema.parse(tableState.getRequiredSchema());
+ HoodieSchema dataSchema = HoodieSchemaUtils.removeMetadataFields(tblSchema);
HoodieSchema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
switch (mode) {
case DATA_BEFORE_AFTER:
- return new BeforeAfterImageIterator(tablePath, tableState, hadoopConf, cdcSchema, fileSplit);
+ return new CdcIterators.BeforeAfterImageIterator(
+ hadoopConf, tablePath, tblSchema, reqSchema, tableState.getRequiredRowType(), cdcSchema, fileSplit);
case DATA_BEFORE:
- return new BeforeImageIterator(conf, hadoopConf, tablePath, tableState, cdcSchema, fileSplit, imageManager);
+ return new CdcIterators.BeforeImageIterator(
+ hadoopConf, tablePath, tblSchema, reqSchema, tableState.getRequiredRowType(),
+ tableState.getRequiredPositions(), maxCompactionMemoryInBytes, cdcSchema, fileSplit, imageManager);
case OP_KEY_ONLY:
- return new RecordKeyImageIterator(conf, hadoopConf, tablePath, tableState, cdcSchema, fileSplit, imageManager);
+ return new CdcIterators.RecordKeyImageIterator(
+ hadoopConf, tablePath, tblSchema, reqSchema, tableState.getRequiredRowType(),
+ tableState.getRequiredPositions(), maxCompactionMemoryInBytes, cdcSchema, fileSplit, imageManager);
default:
- throw new AssertionError("Unexpected mode" + mode);
+ throw new AssertionError("Unexpected mode: " + mode);
}
+ }
case LOG_FILE:
ValidationUtils.checkState(fileSplit.getCdcFiles() != null && fileSplit.getCdcFiles().size() == 1,
"CDC file path should exist and be singleton");
String logFilepath = new Path(tablePath, fileSplit.getCdcFiles().get(0)).toString();
- MergeOnReadInputSplit split = singleLogFile2Split(tablePath, logFilepath, maxCompactionMemoryInBytes);
+ MergeOnReadInputSplit split = CdcIterators.singleLogFile2Split(tablePath, logFilepath, maxCompactionMemoryInBytes);
ClosableIterator> recordIterator = getSplitRecordIterator(split);
- return new DataLogFileIterator(maxCompactionMemoryInBytes, imageManager, fileSplit, tableState, recordIterator, metaClient);
+ return new CdcIterators.DataLogFileIterator(
+ maxCompactionMemoryInBytes, imageManager, fileSplit,
+ HoodieSchema.parse(tableState.getTableSchema()),
+ tableState.getRequiredRowType(), tableState.getRequiredPositions(),
+ recordIterator, metaClient, imageManager.getWriteConfig());
case REPLACE_COMMIT:
- return new ReplaceCommitIterator(conf, tablePath, tableState, fileSplit, this::getFileSliceIterator);
+ return new CdcIterators.ReplaceCommitIterator(
+ tablePath, tableState.getRequiredRowType(), tableState.getRequiredPositions(),
+ maxCompactionMemoryInBytes, fileSplit, this::getFileSliceIterator);
default:
throw new AssertionError("Unexpected cdc file split infer case: " + fileSplit.getCdcInferCase());
}
@@ -222,656 +186,6 @@ private ClosableIterator> getSplitRecordIterator(MergeOnRe
return fileGroupReader.getClosableHoodieRecordIterator();
}
- // -------------------------------------------------------------------------
- // Inner Class
- // -------------------------------------------------------------------------
- static class CdcFileSplitsIterator implements ClosableIterator {
- private ImageManager imageManager; // keep a reference to release resource
- private final Iterator fileSplitIterator;
- private final Function> recordIteratorFunc;
- private ClosableIterator recordIterator;
-
- CdcFileSplitsIterator(
- CdcInputSplit inputSplit,
- ImageManager imageManager,
- Function> recordIteratorFunc) {
- this.fileSplitIterator = Arrays.asList(inputSplit.getChanges()).iterator();
- this.imageManager = imageManager;
- this.recordIteratorFunc = recordIteratorFunc;
- }
-
- @Override
- public boolean hasNext() {
- if (recordIterator != null) {
- if (recordIterator.hasNext()) {
- return true;
- } else {
- recordIterator.close(); // release resource
- recordIterator = null;
- }
- }
- if (fileSplitIterator.hasNext()) {
- HoodieCDCFileSplit fileSplit = fileSplitIterator.next();
- recordIterator = recordIteratorFunc.apply(fileSplit);
- return recordIterator.hasNext();
- }
- return false;
- }
-
- @Override
- public RowData next() {
- return recordIterator.next();
- }
-
- @Override
- public void close() {
- if (recordIterator != null) {
- recordIterator.close();
- }
- if (imageManager != null) {
- imageManager.close();
- imageManager = null;
- }
- }
- }
-
- static class AddBaseFileIterator implements ClosableIterator {
- // base file record iterator
- private ClosableIterator nested;
-
- private RowData currentRecord;
-
- AddBaseFileIterator(ClosableIterator nested) {
- this.nested = nested;
- }
-
- @Override
- public boolean hasNext() {
- if (this.nested.hasNext()) {
- currentRecord = this.nested.next();
- currentRecord.setRowKind(RowKind.INSERT);
- return true;
- }
- return false;
- }
-
- @Override
- public RowData next() {
- return currentRecord;
- }
-
- @Override
- public void close() {
- if (this.nested != null) {
- this.nested.close();
- this.nested = null;
- }
- }
- }
-
- static class RemoveBaseFileIterator implements ClosableIterator {
- private ClosableIterator nested;
- private final RowDataProjection projection;
-
- RemoveBaseFileIterator(MergeOnReadTableState tableState, ClosableIterator iterator) {
- this.nested = iterator;
- this.projection = RowDataProjection.instance(tableState.getRequiredRowType(), tableState.getRequiredPositions());
- }
-
- @Override
- public boolean hasNext() {
- return nested.hasNext();
- }
-
- @Override
- public RowData next() {
- RowData row = nested.next();
- row.setRowKind(RowKind.DELETE);
- return this.projection.project(row);
- }
-
- @Override
- public void close() {
- if (this.nested != null) {
- this.nested.close();
- this.nested = null;
- }
- }
- }
-
- // accounting to HoodieCDCInferenceCase.LOG_FILE
- static class DataLogFileIterator implements ClosableIterator {
- private final HoodieSchema tableSchema;
- private final long maxCompactionMemoryInBytes;
- private final ImageManager imageManager;
- private final RowDataProjection projection;
- private final BufferedRecordMerger recordMerger;
- private final ClosableIterator> logRecordIterator;
- private final DeleteContext deleteContext;
-
- private ExternalSpillableMap beforeImages;
- private RowData currentImage;
- private RowData sideImage;
- private HoodieReaderContext readerContext;
- private String[] orderingFields;
- private TypedProperties props;
-
- DataLogFileIterator(
- long maxCompactionMemoryInBytes,
- ImageManager imageManager,
- HoodieCDCFileSplit cdcFileSplit,
- MergeOnReadTableState tableState,
- ClosableIterator> logRecordIterator,
- HoodieTableMetaClient metaClient) throws IOException {
- this.tableSchema = HoodieSchema.parse(tableState.getTableSchema());
- this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
- this.imageManager = imageManager;
- this.projection = tableState.getRequiredRowType().equals(tableState.getRowType())
- ? null
- : RowDataProjection.instance(tableState.getRequiredRowType(), tableState.getRequiredPositions());
- HoodieWriteConfig writeConfig = this.imageManager.writeConfig;
- this.props = writeConfig.getProps();
- this.readerContext = new FlinkReaderContextFactory(metaClient).getContext();
- readerContext.initRecordMerger(props);
- this.orderingFields = ConfigUtils.getOrderingFields(props);
- this.recordMerger = BufferedRecordMergerFactory.create(
- readerContext,
- readerContext.getMergeMode(),
- false,
- Option.of(imageManager.writeConfig.getRecordMerger()),
- tableSchema,
- Option.ofNullable(Pair.of(metaClient.getTableConfig().getPayloadClass(), writeConfig.getPayloadClass())),
- props,
- metaClient.getTableConfig().getPartialUpdateMode());
- this.logRecordIterator = logRecordIterator;
- initImages(cdcFileSplit);
- this.deleteContext = new DeleteContext(props, tableSchema).withReaderSchema(tableSchema);
- }
-
- private void initImages(HoodieCDCFileSplit fileSplit) throws IOException {
- // init before images
- if (fileSplit.getBeforeFileSlice().isPresent() && !fileSplit.getBeforeFileSlice().get().isEmpty()) {
- this.beforeImages = this.imageManager.getOrLoadImages(
- maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
- } else {
- // still initializes an empty map
- this.beforeImages = FormatUtils.spillableMap(this.imageManager.writeConfig, maxCompactionMemoryInBytes, getClass().getSimpleName());
- }
- }
-
- @Override
- public boolean hasNext() {
- if (this.sideImage != null) {
- this.currentImage = this.sideImage;
- this.sideImage = null;
- return true;
- }
- while (logRecordIterator.hasNext()) {
- HoodieRecord record = logRecordIterator.next();
- RowData existed = imageManager.removeImageRecord(record.getRecordKey(), beforeImages);
- if (isDelete(record)) {
- // it's a deleted record.
- if (existed != null) {
- // there is a real record deleted.
- existed.setRowKind(RowKind.DELETE);
- this.currentImage = existed;
- return true;
- }
- } else {
- if (existed == null) {
- // a new record is inserted.
- RowData newRow = record.getData();
- newRow.setRowKind(RowKind.INSERT);
- this.currentImage = newRow;
- return true;
- } else {
- // an existed record is updated, assuming new record and existing record share the same hoodie key
- HoodieOperation operation = HoodieOperation.fromValue(existed.getRowKind().toByteValue());
- HoodieRecord historyRecord = new HoodieFlinkRecord(record.getKey(), operation, existed);
- HoodieRecord merged = mergeRowWithLog(historyRecord, record).get();
- if (merged.getData() != existed) {
- // update happens
- existed.setRowKind(RowKind.UPDATE_BEFORE);
- this.currentImage = existed;
- RowData mergedRow = merged.getData();
- mergedRow.setRowKind(RowKind.UPDATE_AFTER);
- this.imageManager.updateImageRecord(record.getRecordKey(), beforeImages, mergedRow);
- this.sideImage = mergedRow;
-
- return true;
- }
- }
- }
- }
- return false;
- }
-
- @Override
- public RowData next() {
- return this.projection != null ? this.projection.project(this.currentImage) : this.currentImage;
- }
-
- @Override
- public void close() {
- this.logRecordIterator.close();
- this.imageManager.close();
- }
-
- @SuppressWarnings("unchecked")
- private Option> mergeRowWithLog(HoodieRecord historyRecord, HoodieRecord newRecord) {
- try {
- BufferedRecord historyBufferedRecord = BufferedRecords.fromHoodieRecord(historyRecord, tableSchema, readerContext.getRecordContext(), props, orderingFields, deleteContext);
- BufferedRecord newBufferedRecord = BufferedRecords.fromHoodieRecord(newRecord, tableSchema, readerContext.getRecordContext(), props, orderingFields, deleteContext);
- BufferedRecord mergedRecord = recordMerger.finalMerge(historyBufferedRecord, newBufferedRecord);
- return Option.ofNullable(readerContext.getRecordContext().constructHoodieRecord(mergedRecord, historyRecord.getPartitionPath()));
- } catch (IOException e) {
- throw new HoodieIOException("Merge base and delta payloads exception", e);
- }
- }
-
- private boolean isDelete(HoodieRecord record) {
- return record.isDelete(deleteContext, CollectionUtils.emptyProps());
- }
- }
-
- abstract static class BaseImageIterator implements ClosableIterator {
- private final HoodieSchema requiredSchema;
- private final int[] requiredPos;
- private final GenericRecordBuilder recordBuilder;
- private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
-
- // the changelog records iterator
- private HoodieCDCLogRecordIterator cdcItr;
-
- private GenericRecord cdcRecord;
-
- private RowData sideImage;
-
- private RowData currentImage;
-
- BaseImageIterator(
- org.apache.hadoop.conf.Configuration hadoopConf,
- String tablePath,
- MergeOnReadTableState tableState,
- HoodieSchema cdcSchema,
- HoodieCDCFileSplit fileSplit) {
- this.requiredSchema = HoodieSchema.parse(tableState.getRequiredSchema());
- this.requiredPos = getRequiredPos(tableState.getTableSchema(), this.requiredSchema);
- this.recordBuilder = new GenericRecordBuilder(requiredSchema.getAvroSchema());
- this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
- StoragePath hadoopTablePath = new StoragePath(tablePath);
- HoodieStorage storage = HoodieStorageUtils.getStorage(tablePath, HadoopFSUtils.getStorageConf(hadoopConf));
- HoodieLogFile[] cdcLogFiles = fileSplit.getCdcFiles().stream().map(cdcFile -> {
- try {
- return new HoodieLogFile(
- storage.getPathInfo(new StoragePath(hadoopTablePath, cdcFile)));
- } catch (IOException e) {
- throw new HoodieIOException("Fail to call getFileStatus", e);
- }
- }).toArray(HoodieLogFile[]::new);
- this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles, cdcSchema);
- }
-
- private int[] getRequiredPos(String tableSchema, HoodieSchema required) {
- HoodieSchema dataSchema = HoodieSchemaUtils.removeMetadataFields(HoodieSchema.parse(tableSchema));
- List fields = dataSchema.getFields().stream().map(HoodieSchemaField::name).collect(Collectors.toList());
- return required.getFields().stream()
- .map(f -> fields.indexOf(f.name()))
- .mapToInt(i -> i)
- .toArray();
- }
-
- @Override
- public boolean hasNext() {
- if (this.sideImage != null) {
- currentImage = this.sideImage;
- this.sideImage = null;
- return true;
- } else if (this.cdcItr.hasNext()) {
- cdcRecord = (GenericRecord) this.cdcItr.next();
- String op = String.valueOf(cdcRecord.get(0));
- resolveImage(op);
- return true;
- }
- return false;
- }
-
- protected abstract RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord);
-
- protected abstract RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord);
-
- @Override
- public RowData next() {
- return currentImage;
- }
-
- @Override
- public void close() {
- if (this.cdcItr != null) {
- this.cdcItr.close();
- this.cdcItr = null;
- }
- }
-
- private void resolveImage(String op) {
- switch (op) {
- case "i":
- currentImage = getAfterImage(RowKind.INSERT, cdcRecord);
- break;
- case "u":
- currentImage = getBeforeImage(RowKind.UPDATE_BEFORE, cdcRecord);
- sideImage = getAfterImage(RowKind.UPDATE_AFTER, cdcRecord);
- break;
- case "d":
- currentImage = getBeforeImage(RowKind.DELETE, cdcRecord);
- break;
- default:
- throw new AssertionError("Unexpected");
- }
- }
-
- protected RowData resolveAvro(RowKind rowKind, GenericRecord avroRecord) {
- GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
- avroRecord,
- requiredSchema,
- requiredPos,
- recordBuilder);
- RowData resolved = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
- resolved.setRowKind(rowKind);
- return resolved;
- }
- }
-
- // op, ts, before_image, after_image
- static class BeforeAfterImageIterator extends BaseImageIterator {
- BeforeAfterImageIterator(
- String tablePath,
- MergeOnReadTableState tableState,
- org.apache.hadoop.conf.Configuration hadoopConf,
- HoodieSchema cdcSchema,
- HoodieCDCFileSplit fileSplit) {
- super(hadoopConf, tablePath, tableState, cdcSchema, fileSplit);
- }
-
- @Override
- protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
- return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(3));
- }
-
- @Override
- protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) {
- return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
- }
- }
-
- // op, key, before_image
- static class BeforeImageIterator extends BaseImageIterator {
- protected ExternalSpillableMap afterImages;
-
- protected final long maxCompactionMemoryInBytes;
-
- protected final RowDataProjection projection;
- protected final ImageManager imageManager;
-
- BeforeImageIterator(
- Configuration flinkConf,
- org.apache.hadoop.conf.Configuration hadoopConf,
- String tablePath,
- MergeOnReadTableState tableState,
- HoodieSchema cdcSchema,
- HoodieCDCFileSplit fileSplit,
- ImageManager imageManager) throws IOException {
- super(hadoopConf, tablePath, tableState, cdcSchema, fileSplit);
- this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf);
- this.projection = RowDataProjection.instance(tableState.getRequiredRowType(), tableState.getRequiredPositions());
- this.imageManager = imageManager;
- initImages(fileSplit);
- }
-
- protected void initImages(
- HoodieCDCFileSplit fileSplit) throws IOException {
- ValidationUtils.checkState(fileSplit.getAfterFileSlice().isPresent(),
- "Current file slice does not exist for instant: " + fileSplit.getInstant());
- this.afterImages = this.imageManager.getOrLoadImages(
- maxCompactionMemoryInBytes, fileSplit.getAfterFileSlice().get());
- }
-
- @Override
- protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
- String recordKey = cdcRecord.get(1).toString();
- RowData row = imageManager.getImageRecord(recordKey, this.afterImages, rowKind);
- row.setRowKind(rowKind);
- return this.projection.project(row);
- }
-
- @Override
- protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) {
- return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
- }
- }
-
- // op, key
- static class RecordKeyImageIterator extends BeforeImageIterator {
- protected ExternalSpillableMap beforeImages;
-
- RecordKeyImageIterator(
- Configuration flinkConf,
- org.apache.hadoop.conf.Configuration hadoopConf,
- String tablePath,
- MergeOnReadTableState tableState,
- HoodieSchema cdcSchema,
- HoodieCDCFileSplit fileSplit,
- ImageManager imageManager) throws IOException {
- super(flinkConf, hadoopConf, tablePath, tableState, cdcSchema, fileSplit, imageManager);
- }
-
- protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException {
- // init after images
- super.initImages(fileSplit);
- // init before images
- ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
- "Before file slice does not exist for instant: " + fileSplit.getInstant());
- this.beforeImages = this.imageManager.getOrLoadImages(
- maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
- }
-
- @Override
- protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) {
- String recordKey = cdcRecord.get(1).toString();
- RowData row = this.imageManager.getImageRecord(recordKey, this.beforeImages, rowKind);
- row.setRowKind(rowKind);
- return this.projection.project(row);
- }
- }
-
- static class ReplaceCommitIterator implements ClosableIterator {
- private final ClosableIterator itr;
- private final RowDataProjection projection;
-
- ReplaceCommitIterator(
- Configuration flinkConf,
- String tablePath,
- MergeOnReadTableState tableState,
- HoodieCDCFileSplit fileSplit,
- Function> splitIteratorFunc) {
- this.itr = initIterator(tablePath, StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf), fileSplit, splitIteratorFunc);
- this.projection = RowDataProjection.instance(tableState.getRequiredRowType(), tableState.getRequiredPositions());
- }
-
- private ClosableIterator initIterator(
- String tablePath,
- long maxCompactionMemoryInBytes,
- HoodieCDCFileSplit fileSplit,
- Function> splitIteratorFunc) {
- // init before images
-
- // the before file slice must exist,
- // see HoodieCDCExtractor#extractCDCFileSplits for details
- ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
- "Before file slice does not exist for instant: " + fileSplit.getInstant());
- MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(
- tablePath, fileSplit.getBeforeFileSlice().get(), maxCompactionMemoryInBytes);
- return splitIteratorFunc.apply(inputSplit);
- }
-
- @Override
- public boolean hasNext() {
- return this.itr.hasNext();
- }
-
- @Override
- public RowData next() {
- RowData row = this.itr.next();
- row.setRowKind(RowKind.DELETE);
- return this.projection.project(row);
- }
-
- @Override
- public void close() {
- this.itr.close();
- }
- }
-
- public static final class BytesArrayInputView extends DataInputStream implements DataInputView {
- public BytesArrayInputView(byte[] data) {
- super(new ByteArrayInputStream(data));
- }
-
- public void skipBytesToRead(int numBytes) throws IOException {
- while (numBytes > 0) {
- int skipped = this.skipBytes(numBytes);
- numBytes -= skipped;
- }
- }
- }
-
- public static final class BytesArrayOutputView extends DataOutputStream implements DataOutputView {
- public BytesArrayOutputView(ByteArrayOutputStream baos) {
- super(baos);
- }
-
- public void skipBytesToWrite(int numBytes) throws IOException {
- for (int i = 0; i < numBytes; ++i) {
- this.write(0);
- }
- }
-
- public void write(DataInputView source, int numBytes) throws IOException {
- byte[] buffer = new byte[numBytes];
- source.readFully(buffer);
- this.write(buffer);
- }
- }
-
- /**
- * A before/after image manager
- * that caches the image records by versions(file slices).
- */
- private static class ImageManager implements AutoCloseable {
- private final HoodieWriteConfig writeConfig;
-
- private final RowDataSerializer serializer;
- private final Function> splitIteratorFunc;
-
- private final Map> cache;
-
- public ImageManager(
- Configuration flinkConf,
- RowType rowType,
- Function> splitIteratorFunc) {
- this.serializer = new RowDataSerializer(rowType);
- this.splitIteratorFunc = splitIteratorFunc;
- this.cache = new TreeMap<>();
- this.writeConfig = FlinkWriteClients.getHoodieClientConfig(flinkConf);
- }
-
- public ExternalSpillableMap getOrLoadImages(
- long maxCompactionMemoryInBytes,
- FileSlice fileSlice) throws IOException {
- final String instant = fileSlice.getBaseInstantTime();
- if (this.cache.containsKey(instant)) {
- return cache.get(instant);
- }
- // clean the earliest file slice first
- if (this.cache.size() > 1) {
- // keep at most 2 versions: before & after
- String instantToClean = this.cache.keySet().iterator().next();
- this.cache.remove(instantToClean).close();
- }
- ExternalSpillableMap images = loadImageRecords(maxCompactionMemoryInBytes, fileSlice);
- this.cache.put(instant, images);
- return images;
- }
-
- private ExternalSpillableMap loadImageRecords(
- long maxCompactionMemoryInBytes,
- FileSlice fileSlice) throws IOException {
- MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(writeConfig.getBasePath(), fileSlice, maxCompactionMemoryInBytes);
- // initialize the image records map
- ExternalSpillableMap imageRecordsMap =
- FormatUtils.spillableMap(writeConfig, maxCompactionMemoryInBytes, getClass().getSimpleName());
- try (ClosableIterator itr = splitIteratorFunc.apply(inputSplit)) {
- while (itr.hasNext()) {
- RowData row = itr.next();
- String recordKey = row.getString(HOODIE_RECORD_KEY_COL_POS).toString();
- ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
- serializer.serialize(row, new BytesArrayOutputView(baos));
- imageRecordsMap.put(recordKey, baos.toByteArray());
- }
- }
- return imageRecordsMap;
- }
-
- public RowData getImageRecord(
- String recordKey,
- ExternalSpillableMap cache,
- RowKind rowKind) {
- byte[] bytes = cache.get(recordKey);
- ValidationUtils.checkState(bytes != null,
- "Key " + recordKey + " does not exist in current file group");
- try {
- RowData row = serializer.deserialize(new BytesArrayInputView(bytes));
- row.setRowKind(rowKind);
- return row;
- } catch (IOException e) {
- throw new HoodieException("Deserialize bytes into row data exception", e);
- }
- }
-
- public void updateImageRecord(
- String recordKey,
- ExternalSpillableMap cache,
- RowData row) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
- try {
- serializer.serialize(row, new BytesArrayOutputView(baos));
- } catch (IOException e) {
- throw new HoodieException("Serialize row data into bytes exception", e);
- }
- cache.put(recordKey, baos.toByteArray());
- }
-
- public RowData removeImageRecord(
- String recordKey,
- ExternalSpillableMap cache) {
- byte[] bytes = cache.remove(recordKey);
- if (bytes == null) {
- return null;
- }
- try {
- return serializer.deserialize(new BytesArrayInputView(bytes));
- } catch (IOException e) {
- throw new HoodieException("Deserialize bytes into row data exception", e);
- }
- }
-
- @Override
- public void close() {
- this.cache.values().forEach(ExternalSpillableMap::close);
- this.cache.clear();
- }
- }
-
/**
* Builder for {@link CdcInputFormat}.
*/
@@ -911,30 +225,4 @@ public CdcInputFormat build() {
return new CdcInputFormat(conf, tableState, fieldTypes, predicates, limit, emitDelete);
}
}
-
- // -------------------------------------------------------------------------
- // Utilities
- // -------------------------------------------------------------------------
- public static MergeOnReadInputSplit fileSlice2Split(
- String tablePath,
- FileSlice fileSlice,
- long maxCompactionMemoryInBytes) {
- Option> logPaths = Option.ofNullable(fileSlice.getLogFiles()
- .sorted(HoodieLogFile.getLogFileComparator())
- .map(logFile -> logFile.getPath().toString())
- // filter out the cdc logs
- .filter(path -> !path.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
- .collect(Collectors.toList()));
- String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
- return new MergeOnReadInputSplit(0, basePath, logPaths, fileSlice.getLatestInstantTime(),
- tablePath, maxCompactionMemoryInBytes, FlinkOptions.REALTIME_PAYLOAD_COMBINE, null,
- fileSlice.getFileId(), fileSlice.getPartitionPath());
- }
-
- public static MergeOnReadInputSplit singleLogFile2Split(String tablePath, String filePath, long maxCompactionMemoryInBytes) {
- return new MergeOnReadInputSplit(0, null, Option.of(Collections.singletonList(filePath)),
- FSUtils.getDeltaCommitTimeFromLogPath(new StoragePath(filePath)), tablePath, maxCompactionMemoryInBytes,
- FlinkOptions.REALTIME_PAYLOAD_COMBINE, null, FSUtils.getFileIdFromLogPath(new StoragePath(filePath)),
- FSUtils.getRelativePartitionPath(new StoragePath(tablePath), new StoragePath(filePath).getParent()));
- }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java
new file mode 100644
index 0000000000000..5f47814942173
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcIterators.java
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cdc;
+
+import org.apache.hudi.client.model.HoodieFlinkRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.table.read.BufferedRecords;
+import org.apache.hudi.common.table.read.DeleteContext;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.format.FlinkReaderContextFactory;
+import org.apache.hudi.table.format.FormatUtils;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.RowDataProjection;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * Shared iterator implementations for CDC record reading, used by both
+ * {@link CdcInputFormat} and the Source V2 CDC split reader.
+ */
+public final class CdcIterators {
+
+ private CdcIterators() {
+ }
+
+ // -------------------------------------------------------------------------
+ // Top-level iterator: fans out over an ordered list of CDC file-splits
+ // -------------------------------------------------------------------------
+
+ /**
+ * Iterates over an ordered sequence of {@link HoodieCDCFileSplit}s, delegating
+ * per-split record reading to a user-supplied factory function.
+ */
+ public static class CdcFileSplitsIterator implements ClosableIterator {
+ private CdcImageManager imageManager;
+ private final Iterator fileSplitIterator;
+ private final Function> recordIteratorFunc;
+ private ClosableIterator recordIterator;
+
+ public CdcFileSplitsIterator(
+ HoodieCDCFileSplit[] changes,
+ CdcImageManager imageManager,
+ Function> recordIteratorFunc) {
+ this.fileSplitIterator = Arrays.asList(changes).iterator();
+ this.imageManager = imageManager;
+ this.recordIteratorFunc = recordIteratorFunc;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (recordIterator != null) {
+ if (recordIterator.hasNext()) {
+ return true;
+ } else {
+ recordIterator.close();
+ recordIterator = null;
+ }
+ }
+ if (fileSplitIterator.hasNext()) {
+ recordIterator = recordIteratorFunc.apply(fileSplitIterator.next());
+ return recordIterator.hasNext();
+ }
+ return false;
+ }
+
+ @Override
+ public RowData next() {
+ return recordIterator.next();
+ }
+
+ @Override
+ public void close() {
+ if (recordIterator != null) {
+ recordIterator.close();
+ }
+ if (imageManager != null) {
+ imageManager.close();
+ imageManager = null;
+ }
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // BASE_FILE_INSERT / BASE_FILE_DELETE
+ // -------------------------------------------------------------------------
+
+ /**
+ * Wraps a base-file parquet iterator and marks every record as {@link RowKind#INSERT}.
+ * Used for the {@code BASE_FILE_INSERT} CDC inference case.
+ */
+ public static class AddBaseFileIterator implements ClosableIterator {
+ private ClosableIterator nested;
+ private RowData currentRecord;
+
+ public AddBaseFileIterator(ClosableIterator nested) {
+ this.nested = nested;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (nested.hasNext()) {
+ currentRecord = nested.next();
+ currentRecord.setRowKind(RowKind.INSERT);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public RowData next() {
+ return currentRecord;
+ }
+
+ @Override
+ public void close() {
+ if (nested != null) {
+ nested.close();
+ nested = null;
+ }
+ }
+ }
+
+ /**
+ * Wraps a file-slice iterator and marks every record as {@link RowKind#DELETE}, applying
+ * required-column projection. Used for the {@code BASE_FILE_DELETE} CDC inference case.
+ */
+ public static class RemoveBaseFileIterator implements ClosableIterator {
+ private ClosableIterator nested;
+ private final RowDataProjection projection;
+
+ public RemoveBaseFileIterator(
+ RowType requiredRowType,
+ int[] requiredPositions,
+ ClosableIterator iterator) {
+ this.nested = iterator;
+ this.projection = RowDataProjection.instance(requiredRowType, requiredPositions);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nested.hasNext();
+ }
+
+ @Override
+ public RowData next() {
+ RowData row = nested.next();
+ row.setRowKind(RowKind.DELETE);
+ return projection.project(row);
+ }
+
+ @Override
+ public void close() {
+ if (nested != null) {
+ nested.close();
+ nested = null;
+ }
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // LOG_FILE
+ // -------------------------------------------------------------------------
+
+ /**
+ * Handles the {@code LOG_FILE} CDC inference case: compares records from the log file against
+ * before-image snapshots to emit INSERT / UPDATE_BEFORE / UPDATE_AFTER / DELETE events.
+ */
+ public static class DataLogFileIterator implements ClosableIterator {
+ private final HoodieSchema tableSchema;
+ private final long maxCompactionMemoryInBytes;
+ private final CdcImageManager imageManager;
+ private final RowDataProjection projection;
+ private final BufferedRecordMerger recordMerger;
+ private final ClosableIterator> logRecordIterator;
+ private final DeleteContext deleteContext;
+ private final HoodieReaderContext readerContext;
+ private final String[] orderingFields;
+ private final TypedProperties props;
+
+ private ExternalSpillableMap beforeImages;
+ private RowData currentImage;
+ private RowData sideImage;
+
+ public DataLogFileIterator(
+ long maxCompactionMemoryInBytes,
+ CdcImageManager imageManager,
+ HoodieCDCFileSplit cdcFileSplit,
+ HoodieSchema tableSchema,
+ RowType requiredRowType,
+ int[] requiredPositions,
+ ClosableIterator> logRecordIterator,
+ HoodieTableMetaClient metaClient,
+ HoodieWriteConfig writeConfig) throws IOException {
+ this.tableSchema = tableSchema;
+ this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+ this.imageManager = imageManager;
+ this.projection = HoodieSchemaConverter.convertToRowType(tableSchema).equals(requiredRowType)
+ ? null : RowDataProjection.instance(requiredRowType, requiredPositions);
+ this.props = writeConfig.getProps();
+ this.readerContext = new FlinkReaderContextFactory(metaClient).getContext();
+ readerContext.initRecordMerger(props);
+ this.orderingFields = ConfigUtils.getOrderingFields(props);
+ this.recordMerger = BufferedRecordMergerFactory.create(
+ readerContext,
+ readerContext.getMergeMode(),
+ false,
+ Option.of(writeConfig.getRecordMerger()),
+ tableSchema,
+ Option.ofNullable(Pair.of(metaClient.getTableConfig().getPayloadClass(), writeConfig.getPayloadClass())),
+ props,
+ metaClient.getTableConfig().getPartialUpdateMode());
+ this.logRecordIterator = logRecordIterator;
+ this.deleteContext = new DeleteContext(props, tableSchema).withReaderSchema(tableSchema);
+ initImages(cdcFileSplit, writeConfig);
+ }
+
+ private void initImages(HoodieCDCFileSplit fileSplit, HoodieWriteConfig writeConfig) throws IOException {
+ if (fileSplit.getBeforeFileSlice().isPresent() && !fileSplit.getBeforeFileSlice().get().isEmpty()) {
+ this.beforeImages = imageManager.getOrLoadImages(
+ maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
+ } else {
+ this.beforeImages = FormatUtils.spillableMap(writeConfig, maxCompactionMemoryInBytes, getClass().getSimpleName());
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (sideImage != null) {
+ currentImage = sideImage;
+ sideImage = null;
+ return true;
+ }
+ while (logRecordIterator.hasNext()) {
+ HoodieRecord record = logRecordIterator.next();
+ RowData existed = imageManager.removeImageRecord(record.getRecordKey(), beforeImages);
+ if (isDelete(record)) {
+ if (existed != null) {
+ existed.setRowKind(RowKind.DELETE);
+ currentImage = existed;
+ return true;
+ }
+ } else {
+ if (existed == null) {
+ RowData newRow = record.getData();
+ newRow.setRowKind(RowKind.INSERT);
+ currentImage = newRow;
+ return true;
+ } else {
+ HoodieOperation operation = HoodieOperation.fromValue(existed.getRowKind().toByteValue());
+ HoodieRecord historyRecord = new HoodieFlinkRecord(record.getKey(), operation, existed);
+ HoodieRecord merged = mergeRowWithLog(historyRecord, record).get();
+ if (merged.getData() != existed) {
+ existed.setRowKind(RowKind.UPDATE_BEFORE);
+ currentImage = existed;
+ RowData mergedRow = merged.getData();
+ mergedRow.setRowKind(RowKind.UPDATE_AFTER);
+ imageManager.updateImageRecord(record.getRecordKey(), beforeImages, mergedRow);
+ sideImage = mergedRow;
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public RowData next() {
+ return projection != null ? projection.project(currentImage) : currentImage;
+ }
+
+ @Override
+ public void close() {
+ logRecordIterator.close();
+ imageManager.close();
+ }
+
+ @SuppressWarnings("unchecked")
+ private Option> mergeRowWithLog(
+ HoodieRecord historyRecord, HoodieRecord newRecord) {
+ try {
+ BufferedRecord histBuf = BufferedRecords.fromHoodieRecord(
+ historyRecord, tableSchema, readerContext.getRecordContext(), props, orderingFields, deleteContext);
+ BufferedRecord newBuf = BufferedRecords.fromHoodieRecord(
+ newRecord, tableSchema, readerContext.getRecordContext(), props, orderingFields, deleteContext);
+ BufferedRecord merged = recordMerger.finalMerge(histBuf, newBuf);
+ return Option.ofNullable(readerContext.getRecordContext()
+ .constructHoodieRecord(merged, historyRecord.getPartitionPath()));
+ } catch (IOException e) {
+ throw new HoodieIOException("Merge base and delta payloads exception", e);
+ }
+ }
+
+ private boolean isDelete(HoodieRecord record) {
+ return record.isDelete(deleteContext, CollectionUtils.emptyProps());
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // AS_IS — supplemental logging modes
+ // -------------------------------------------------------------------------
+
+ /**
+ * Base iterator for CDC log files stored with supplemental logging (AS_IS inference case).
+ * Reads a {@link HoodieCDCLogRecordIterator} and resolves before/after images using
+ * subclass-specific logic.
+ */
+ public abstract static class BaseImageIterator implements ClosableIterator {
+ private final HoodieSchema requiredSchema;
+ private final int[] requiredPos;
+ private final GenericRecordBuilder recordBuilder;
+ private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
+ private HoodieCDCLogRecordIterator cdcItr;
+
+ private GenericRecord cdcRecord;
+ private RowData sideImage;
+ private RowData currentImage;
+
+ protected BaseImageIterator(
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ String tablePath,
+ HoodieSchema tableSchema,
+ HoodieSchema requiredSchema,
+ RowType requiredRowType,
+ HoodieSchema cdcSchema,
+ HoodieCDCFileSplit fileSplit) {
+ this.requiredSchema = requiredSchema;
+ this.requiredPos = computeRequiredPos(tableSchema, requiredSchema);
+ this.recordBuilder = new GenericRecordBuilder(requiredSchema.getAvroSchema());
+ this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType);
+
+ StoragePath hadoopTablePath = new StoragePath(tablePath);
+ HoodieStorage storage = HoodieStorageUtils.getStorage(
+ tablePath, HadoopFSUtils.getStorageConf(hadoopConf));
+ HoodieLogFile[] cdcLogFiles = fileSplit.getCdcFiles().stream()
+ .map(cdcFile -> {
+ try {
+ return new HoodieLogFile(storage.getPathInfo(new StoragePath(hadoopTablePath, cdcFile)));
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to get file status for CDC log: " + cdcFile, e);
+ }
+ })
+ .toArray(HoodieLogFile[]::new);
+ this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles, cdcSchema);
+ }
+
+ private static int[] computeRequiredPos(HoodieSchema tableSchema, HoodieSchema requiredSchema) {
+ HoodieSchema dataSchema = HoodieSchemaUtils.removeMetadataFields(tableSchema);
+ List fields = dataSchema.getFields().stream()
+ .map(HoodieSchemaField::name)
+ .collect(Collectors.toList());
+ return requiredSchema.getFields().stream()
+ .map(f -> fields.indexOf(f.name()))
+ .mapToInt(i -> i)
+ .toArray();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (sideImage != null) {
+ currentImage = sideImage;
+ sideImage = null;
+ return true;
+ } else if (cdcItr.hasNext()) {
+ cdcRecord = (GenericRecord) cdcItr.next();
+ String op = String.valueOf(cdcRecord.get(0));
+ resolveImage(op);
+ return true;
+ }
+ return false;
+ }
+
+ protected abstract RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord);
+
+ protected abstract RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord);
+
+ @Override
+ public RowData next() {
+ return currentImage;
+ }
+
+ @Override
+ public void close() {
+ if (cdcItr != null) {
+ cdcItr.close();
+ cdcItr = null;
+ }
+ }
+
+ private void resolveImage(String op) {
+ switch (op) {
+ case "i":
+ currentImage = getAfterImage(RowKind.INSERT, cdcRecord);
+ break;
+ case "u":
+ currentImage = getBeforeImage(RowKind.UPDATE_BEFORE, cdcRecord);
+ sideImage = getAfterImage(RowKind.UPDATE_AFTER, cdcRecord);
+ break;
+ case "d":
+ currentImage = getBeforeImage(RowKind.DELETE, cdcRecord);
+ break;
+ default:
+ throw new AssertionError("Unexpected CDC operation: " + op);
+ }
+ }
+
+ protected RowData resolveAvro(RowKind rowKind, GenericRecord avroRecord) {
+ GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
+ avroRecord, requiredSchema, requiredPos, recordBuilder);
+ RowData resolved = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
+ resolved.setRowKind(rowKind);
+ return resolved;
+ }
+ }
+
+ /**
+ * Reads CDC log files that contain both before and after images ({@code DATA_BEFORE_AFTER} mode).
+ * CDC record layout: [op, ts, before_image, after_image].
+ */
+ public static class BeforeAfterImageIterator extends BaseImageIterator {
+ public BeforeAfterImageIterator(
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ String tablePath,
+ HoodieSchema tableSchema,
+ HoodieSchema requiredSchema,
+ RowType requiredRowType,
+ HoodieSchema cdcSchema,
+ HoodieCDCFileSplit fileSplit) {
+ super(hadoopConf, tablePath, tableSchema, requiredSchema, requiredRowType, cdcSchema, fileSplit);
+ }
+
+ @Override
+ protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
+ return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(3));
+ }
+
+ @Override
+ protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) {
+ return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
+ }
+ }
+
+ /**
+ * Reads CDC log files containing op + key + before_image ({@code DATA_BEFORE} mode).
+ * The after-image is loaded from the after file-slice snapshot via {@link CdcImageManager}.
+ * CDC record layout: [op, key, before_image].
+ */
+ public static class BeforeImageIterator extends BaseImageIterator {
+ protected ExternalSpillableMap afterImages;
+ protected final long maxCompactionMemoryInBytes;
+ protected final RowDataProjection projection;
+ protected final CdcImageManager imageManager;
+
+ public BeforeImageIterator(
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ String tablePath,
+ HoodieSchema tableSchema,
+ HoodieSchema requiredSchema,
+ RowType requiredRowType,
+ int[] requiredPositions,
+ long maxCompactionMemoryInBytes,
+ HoodieSchema cdcSchema,
+ HoodieCDCFileSplit fileSplit,
+ CdcImageManager imageManager) throws IOException {
+ super(hadoopConf, tablePath, tableSchema, requiredSchema, requiredRowType, cdcSchema, fileSplit);
+ this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+ this.projection = RowDataProjection.instance(requiredRowType, requiredPositions);
+ this.imageManager = imageManager;
+ initImages(fileSplit);
+ }
+
+ protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException {
+ ValidationUtils.checkState(fileSplit.getAfterFileSlice().isPresent(),
+ "Current file slice does not exist for instant: " + fileSplit.getInstant());
+ this.afterImages = imageManager.getOrLoadImages(
+ maxCompactionMemoryInBytes, fileSplit.getAfterFileSlice().get());
+ }
+
+ @Override
+ protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
+ String recordKey = cdcRecord.get(1).toString();
+ RowData row = imageManager.getImageRecord(recordKey, afterImages, rowKind);
+ row.setRowKind(rowKind);
+ return projection.project(row);
+ }
+
+ @Override
+ protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) {
+ return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
+ }
+ }
+
+ /**
+ * Reads CDC log files containing only op + key ({@code OP_KEY_ONLY} mode).
+ * Both before and after images are loaded from file-slice snapshots via {@link CdcImageManager}.
+ * CDC record layout: [op, key].
+ */
+ public static class RecordKeyImageIterator extends BeforeImageIterator {
+ protected ExternalSpillableMap beforeImages;
+
+ public RecordKeyImageIterator(
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ String tablePath,
+ HoodieSchema tableSchema,
+ HoodieSchema requiredSchema,
+ RowType requiredRowType,
+ int[] requiredPositions,
+ long maxCompactionMemoryInBytes,
+ HoodieSchema cdcSchema,
+ HoodieCDCFileSplit fileSplit,
+ CdcImageManager imageManager) throws IOException {
+ super(hadoopConf, tablePath, tableSchema, requiredSchema, requiredRowType,
+ requiredPositions, maxCompactionMemoryInBytes, cdcSchema, fileSplit, imageManager);
+ }
+
+ @Override
+ protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException {
+ super.initImages(fileSplit);
+ ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+ "Before file slice does not exist for instant: " + fileSplit.getInstant());
+ this.beforeImages = imageManager.getOrLoadImages(
+ maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
+ }
+
+ @Override
+ protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord) {
+ String recordKey = cdcRecord.get(1).toString();
+ RowData row = imageManager.getImageRecord(recordKey, beforeImages, rowKind);
+ row.setRowKind(rowKind);
+ return projection.project(row);
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // REPLACE_COMMIT
+ // -------------------------------------------------------------------------
+
+ /**
+ * Handles the {@code REPLACE_COMMIT} CDC inference case: emits all records from the
+ * before-slice as {@link RowKind#DELETE}.
+ */
+ public static class ReplaceCommitIterator implements ClosableIterator {
+ private final ClosableIterator itr;
+ private final RowDataProjection projection;
+
+ public ReplaceCommitIterator(
+ String tablePath,
+ RowType requiredRowType,
+ int[] requiredPositions,
+ long maxCompactionMemoryInBytes,
+ HoodieCDCFileSplit fileSplit,
+ Function> splitIteratorFunc) {
+ ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+ "Before file slice does not exist for instant: " + fileSplit.getInstant());
+ MergeOnReadInputSplit inputSplit = fileSlice2Split(
+ tablePath, fileSplit.getBeforeFileSlice().get(), maxCompactionMemoryInBytes);
+ this.itr = splitIteratorFunc.apply(inputSplit);
+ this.projection = RowDataProjection.instance(requiredRowType, requiredPositions);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return itr.hasNext();
+ }
+
+ @Override
+ public RowData next() {
+ RowData row = itr.next();
+ row.setRowKind(RowKind.DELETE);
+ return projection.project(row);
+ }
+
+ @Override
+ public void close() {
+ itr.close();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ public static MergeOnReadInputSplit fileSlice2Split(
+ String tablePath,
+ FileSlice fileSlice,
+ long maxCompactionMemoryInBytes) {
+ Option> logPaths = Option.ofNullable(fileSlice.getLogFiles()
+ .sorted(HoodieLogFile.getLogFileComparator())
+ .map(logFile -> logFile.getPath().toString())
+ // filter out the cdc logs
+ .filter(p -> !p.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+ .collect(Collectors.toList()));
+ String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+ return new MergeOnReadInputSplit(0, basePath, logPaths, fileSlice.getLatestInstantTime(),
+ tablePath, maxCompactionMemoryInBytes, FlinkOptions.REALTIME_PAYLOAD_COMBINE, null,
+ fileSlice.getFileId(), fileSlice.getPartitionPath());
+ }
+
+ public static MergeOnReadInputSplit singleLogFile2Split(String tablePath, String filePath, long maxCompactionMemoryInBytes) {
+ return new MergeOnReadInputSplit(0, null, Option.of(Collections.singletonList(filePath)),
+ FSUtils.getDeltaCommitTimeFromLogPath(new StoragePath(filePath)), tablePath, maxCompactionMemoryInBytes,
+ FlinkOptions.REALTIME_PAYLOAD_COMBINE, null, FSUtils.getFileIdFromLogPath(new StoragePath(filePath)),
+ FSUtils.getRelativePartitionPath(new StoragePath(tablePath), new StoragePath(filePath).getParent()));
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
index d0de3f930d37a..b0d0579da0846 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieCdcSplitReaderFunction.java
@@ -237,13 +237,7 @@ public void testReadAcceptsCdcSourceSplitType() {
1, tempDir.getAbsolutePath(), 128 * 1024 * 1024L, "file-cdc",
EMPTY_PARTITION_PATH, changes, "read_optimized", "20230101000000000");
- // The call should NOT throw IllegalArgumentException (type guard passes).
- // It will throw some other exception when trying to do real I/O.
- Exception ex = assertThrows(Exception.class, () -> function.read(cdcSplit));
- assertNotNull(ex);
- // Must not be an IllegalArgumentException (which the type guard throws)
- if (ex instanceof IllegalArgumentException) {
- throw new AssertionError("read() should not throw IllegalArgumentException for a CdcSourceSplit", ex);
- }
+ // Should not throw exception
+ function.read(cdcSplit);
}
}