Skip to content
Permalink
Browse files
[HUDI-1771] Propagate CDC format for hoodie (#3285)
  • Loading branch information
swuferhong committed Aug 10, 2021
1 parent b4441ab commit 21db6d7a84d4a83ec98c110e92ff9c92d05dd530
Showing 50 changed files with 1,081 additions and 199 deletions.
@@ -373,6 +373,13 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Whether to allow generation of empty commits, even if no data was written in the commit. "
+ "It's useful in cases where extra metadata needs to be published regardless e.g tracking source offsets when ingesting data");

public static final ConfigProperty<Boolean> ALLOW_OPERATION_METADATA_FIELD = ConfigProperty
.key("hoodie.allow.operation.metadata.field")
.defaultValue(false)
.sinceVersion("0.9")
.withDocumentation("Whether to include '_hoodie_operation' in the metadata fields. "
+ "Once enabled, all the changes of a record are persisted to the delta log directly without merge");

private ConsistencyGuardConfig consistencyGuardConfig;

// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
@@ -1309,6 +1316,10 @@ public boolean allowEmptyCommit() {
return getBooleanOrDefault(ALLOW_EMPTY_COMMIT);
}

public boolean allowOperationMetadataField() {
return getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD);
}

public static class Builder {

protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -1615,6 +1626,11 @@ public Builder withPopulateMetaFields(boolean populateMetaFields) {
return this;
}

public Builder withAllowOperationMetadataField(boolean allowOperationMetadataField) {
writeConfig.setValue(ALLOW_OPERATION_METADATA_FIELD, Boolean.toString(allowOperationMetadataField));
return this;
}

public Builder withProperties(Properties properties) {
this.writeConfig.getProps().putAll(properties);
return this;
@@ -27,6 +27,7 @@
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -197,20 +198,26 @@ private Option<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
// Pass the isUpdateRecord to the props for HoodieRecordPayload to judge
// Whether it is a update or insert record.
boolean isUpdateRecord = isUpdateRecord(hoodieRecord);
// If the format can not record the operation field, nullify the DELETE payload manually.
boolean nullifyPayload = HoodieOperation.isDelete(hoodieRecord.getOperation()) && !config.allowOperationMetadataField();
recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, String.valueOf(isUpdateRecord));
Option<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(tableSchema, recordProperties);
Option<IndexedRecord> avroRecord = nullifyPayload ? Option.empty() : hoodieRecord.getData().getInsertValue(tableSchema, recordProperties);
if (avroRecord.isPresent()) {
if (avroRecord.get().equals(IGNORE_RECORD)) {
return avroRecord;
}
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
avroRecord = Option.of(rewriteRecord((GenericRecord) avroRecord.get()));
GenericRecord rewriteRecord = rewriteRecord((GenericRecord) avroRecord.get());
avroRecord = Option.of(rewriteRecord);
String seqId =
HoodieRecord.generateSequenceId(instantTime, getPartitionId(), RECORD_COUNTER.getAndIncrement());
if (config.populateMetaFields()) {
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord.get(), hoodieRecord.getRecordKey(),
HoodieAvroUtils.addHoodieKeyToRecord(rewriteRecord, hoodieRecord.getRecordKey(),
hoodieRecord.getPartitionPath(), fileId);
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord.get(), instantTime, seqId);
HoodieAvroUtils.addCommitMetadataToRecord(rewriteRecord, instantTime, seqId);
}
if (config.allowOperationMetadataField()) {
HoodieAvroUtils.addOperationToRecord(rewriteRecord, hoodieRecord.getOperation());
}
if (isUpdateRecord(hoodieRecord)) {
updatedRecordsWritten++;
@@ -22,6 +22,7 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -127,6 +128,9 @@ public boolean canWrite(HoodieRecord record) {
@Override
public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
Option recordMetadata = record.getData().getMetadata();
if (HoodieOperation.isDelete(record.getOperation())) {
avroRecord = Option.empty();
}
try {
if (avroRecord.isPresent()) {
if (avroRecord.get().equals(IGNORE_RECORD)) {
@@ -22,6 +22,7 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -264,6 +265,9 @@ protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord
writeStatus.markFailure(hoodieRecord, failureEx, recordMetadata);
return false;
}
if (HoodieOperation.isDelete(hoodieRecord.getOperation())) {
indexedRecord = Option.empty();
}
try {
if (indexedRecord.isPresent()) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
@@ -112,9 +112,9 @@ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String
this.partitionPath = partitionPath;
this.fileId = fileId;
this.tableSchema = overriddenSchema.orElseGet(() -> getSpecifiedTableSchema(config));
this.tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(tableSchema);
this.tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(tableSchema, config.allowOperationMetadataField());
this.writeSchema = overriddenSchema.orElseGet(() -> getWriteSchema(config));
this.writeSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(writeSchema);
this.writeSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(writeSchema, config.allowOperationMetadataField());
this.timer = new HoodieTimer().startTimer();
this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(),
!hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
@@ -18,7 +18,7 @@

package org.apache.hudi.client.model;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieOperation;

import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
@@ -35,12 +35,7 @@
* copy rather than fetching from {@link RowData}.
*/
public class HoodieRowData implements RowData {

private final String commitTime;
private final String commitSeqNumber;
private final String recordKey;
private final String partitionPath;
private final String fileName;
private final String[] metaColumns;
private final RowData row;
private final int metaColumnsNum;

@@ -49,14 +44,19 @@ public HoodieRowData(String commitTime,
String recordKey,
String partitionPath,
String fileName,
RowData row) {
this.commitTime = commitTime;
this.commitSeqNumber = commitSeqNumber;
this.recordKey = recordKey;
this.partitionPath = partitionPath;
this.fileName = fileName;
RowData row,
boolean withOperation) {
this.metaColumnsNum = withOperation ? 6 : 5;
this.metaColumns = new String[metaColumnsNum];
metaColumns[0] = commitTime;
metaColumns[1] = commitSeqNumber;
metaColumns[2] = recordKey;
metaColumns[3] = partitionPath;
metaColumns[4] = fileName;
if (withOperation) {
metaColumns[5] = HoodieOperation.fromValue(row.getRowKind().toByteValue()).getName();
}
this.row = row;
this.metaColumnsNum = HoodieRecord.HOODIE_META_COLUMNS.size();
}

@Override
@@ -74,28 +74,6 @@ public void setRowKind(RowKind kind) {
this.row.setRowKind(kind);
}

private String getMetaColumnVal(int ordinal) {
switch (ordinal) {
case 0: {
return commitTime;
}
case 1: {
return commitSeqNumber;
}
case 2: {
return recordKey;
}
case 3: {
return partitionPath;
}
case 4: {
return fileName;
}
default:
throw new IllegalArgumentException("Not expected");
}
}

@Override
public boolean isNullAt(int ordinal) {
if (ordinal < metaColumnsNum) {
@@ -181,4 +159,8 @@ public ArrayData getArray(int ordinal) {
public MapData getMap(int ordinal) {
return row.getMap(ordinal - metaColumnsNum);
}

private String getMetaColumnVal(int ordinal) {
return this.metaColumns[ordinal];
}
}
@@ -79,6 +79,9 @@ protected boolean needsUpdateLocation() {

@Override
protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
// do not use the HoodieRecord operation because hoodie writer has its own
// INSERT/MERGE bucket for 'UPSERT' semantics. For e.g, a hoodie record with fresh new key
// and operation HoodieCdcOperation.DELETE would be put into either an INSERT bucket or UPDATE bucket.
return hoodieRecord.getCurrentLocation() != null
&& hoodieRecord.getCurrentLocation().getInstantTime().equals("U");
}
@@ -117,7 +117,7 @@ public void write(String recordKey, String partitionPath, RowData record) throws
try {
String seqId = HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement());
HoodieRowData rowData = new HoodieRowData(instantTime, seqId, recordKey, partitionPath, path.getName(),
record);
record, writeConfig.allowOperationMetadataField());
try {
fileWriter.writeRow(recordKey, rowData);
writeStatus.markSuccess(recordKey);
@@ -131,7 +131,7 @@ public void write(String recordKey, String partitionPath, RowData record) throws
}

/**
* @returns {@code true} if this handle can take in more writes. else {@code false}.
* Returns {@code true} if this handle can take in more writes. else {@code false}.
*/
public boolean canWrite() {
return fileWriter.canWrite();
@@ -354,7 +354,7 @@ protected HoodieMergeHandle getUpdateHandle(String instantTime, String partition
dataFileToBeMerged, taskContextSupplier, Option.empty());
} else {
return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged,taskContextSupplier, Option.empty());
dataFileToBeMerged, taskContextSupplier, Option.empty());
}
}

@@ -21,6 +21,7 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
@@ -45,7 +46,7 @@
* <p>Computing the records batch locations all at a time is a pressure to the engine,
* we should avoid that in streaming system.
*/
public class FlinkWriteHelper<T extends HoodieRecordPayload,R> extends AbstractWriteHelper<T, List<HoodieRecord<T>>,
public class FlinkWriteHelper<T extends HoodieRecordPayload, R> extends AbstractWriteHelper<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>, R> {

private FlinkWriteHelper() {
@@ -80,22 +81,26 @@ public HoodieWriteMetadata<List<WriteStatus>> write(String instantTime, List<Hoo

@Override
public List<HoodieRecord<T>> deduplicateRecords(List<HoodieRecord<T>> records,
HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> index,
int parallelism) {
HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> index,
int parallelism) {
Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> {
// If index used is global, then records are expected to differ in their partitionPath
final Object key = record.getKey().getRecordKey();
return Pair.of(key, record);
}).collect(Collectors.groupingBy(Pair::getLeft));

return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
@SuppressWarnings("unchecked")
T reducedData = (T) rec1.getData().preCombine(rec2.getData());
final T data1 = rec1.getData();
final T data2 = rec2.getData();

@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1);
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
HoodieRecord<T> hoodieRecord = new HoodieRecord<>(reducedKey, reducedData);
boolean choosePrev = data1.equals(reducedData);
HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation();
HoodieRecord<T> hoodieRecord = new HoodieRecord<>(reducedKey, reducedData, operation);
// reuse the location from the first record.
hoodieRecord.setCurrentLocation(rec1.getCurrentLocation());
return hoodieRecord;
@@ -86,7 +86,7 @@

@Override
public List<WriteStatus> compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan,
HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException {
HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException {
throw new UnsupportedOperationException("HoodieFlinkMergeOnReadTableCompactor does not support compact directly, "
+ "the function works as a separate pipeline");
}
@@ -98,7 +98,7 @@ public List<WriteStatus> compact(HoodieFlinkCopyOnWriteTable hoodieCopyOnWriteTa
String instantTime) throws IOException {
FileSystem fs = metaClient.getFs();

Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames()
+ " for commit " + instantTime);
// TODO - FIX THIS

0 comments on commit 21db6d7

Please sign in to comment.