Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3478][HUDI-4887] Use Avro as the format of persisted cdc data #6734

Merged
merged 2 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,11 @@ public HoodieCDCLogger(
this.cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse(
config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));

if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA;
this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_STRING;
} else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) {
this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE;
this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING;
} else {
this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY;
this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY_STRING;
}
this.cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
cdcSupplementalLoggingMode,
dataSchema
);
this.cdcSchemaString = this.cdcSchema.toString();

this.cdcData = new ExternalSpillableMap<>(
maxInMemorySizeInBytes,
Expand Down Expand Up @@ -158,18 +153,21 @@ private GenericData.Record createCDCRecord(HoodieCDCOperation operation,
GenericRecord newRecord) {
GenericData.Record record;
if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
record = HoodieCDCUtils.cdcRecord(operation.getValue(), commitTime,
record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime,
removeCommitMetadata(oldRecord), newRecord);
} else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) {
record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey,
record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey,
removeCommitMetadata(oldRecord));
} else {
record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey);
record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey);
}
return record;
}

private GenericRecord removeCommitMetadata(GenericRecord record) {
if (record == null) {
return null;
}
return HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, new HashMap<>());
}

Expand Down Expand Up @@ -221,18 +219,6 @@ public void close() {
}
}

public static Option<AppendResult> writeCDCDataIfNeeded(HoodieCDCLogger cdcLogger,
long recordsWritten,
long insertRecordsWritten) {
if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
// the following cases where we do not need to write out the cdc file:
// case 1: all the data from the previous file slice are deleted. and no new data is inserted;
// case 2: all the data are new-coming,
return Option.empty();
}
return cdcLogger.writeCDCData();
}

public static void setCDCStatIfNeeded(HoodieWriteStat stat,
Option<AppendResult> cdcResult,
String partitionPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,16 @@ protected void writeIncomingRecords() throws IOException {
}
}

private Option<AppendResult> writeCDCDataIfNeeded() {
if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
// the following cases where we do not need to write out the cdc file:
// case 1: all the data from the previous file slice are deleted. and no new data is inserted;
// case 2: all the data are new-coming,
return Option.empty();
}
return cdcLogger.writeCDCData();
}

@Override
public List<WriteStatus> close() {
try {
Expand All @@ -445,8 +455,7 @@ public List<WriteStatus> close() {
}

// if there are cdc data written, set the CDC-related information.
Option<AppendResult> cdcResult =
HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten, insertRecordsWritten);
Option<AppendResult> cdcResult = writeCDCDataIfNeeded();
HoodieCDCLogger.setCDCStatIfNeeded(stat, cdcResult, partitionPath, fs);

long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public static Schema createNullableSchema(Schema.Type avroType) {
return Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(avroType));
xushiyan marked this conversation as resolved.
Show resolved Hide resolved
}

public static Schema createNullableSchema(Schema schema) {
checkState(schema.getType() != Schema.Type.NULL);
return Schema.createUnion(Schema.create(Schema.Type.NULL), schema);
}

/**
* Returns true in case when schema contains the field w/ provided name
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@

package org.apache.hudi.common.table.cdc;

import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.exception.HoodieException;

import java.util.Arrays;
import java.util.List;

public class HoodieCDCUtils {

public static final String CDC_LOGFILE_SUFFIX = "-cdc";
Expand All @@ -50,33 +55,6 @@ public class HoodieCDCUtils {
CDC_AFTER_IMAGE
};

/**
* This is the standard CDC output format.
* Also, this is the schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'.
*/
public static final String CDC_SCHEMA_STRING = "{\"type\":\"record\",\"name\":\"Record\","
+ "\"fields\":["
+ "{\"name\":\"op\",\"type\":[\"string\",\"null\"]},"
+ "{\"name\":\"ts_ms\",\"type\":[\"string\",\"null\"]},"
+ "{\"name\":\"before\",\"type\":[\"string\",\"null\"]},"
+ "{\"name\":\"after\",\"type\":[\"string\",\"null\"]}"
+ "]}";

public static final Schema CDC_SCHEMA = new Schema.Parser().parse(CDC_SCHEMA_STRING);

/**
* The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'.
*/
public static final String CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING = "{\"type\":\"record\",\"name\":\"Record\","
+ "\"fields\":["
+ "{\"name\":\"op\",\"type\":[\"string\",\"null\"]},"
+ "{\"name\":\"record_key\",\"type\":[\"string\",\"null\"]},"
+ "{\"name\":\"before\",\"type\":[\"string\",\"null\"]}"
+ "]}";

public static final Schema CDC_SCHEMA_OP_RECORDKEY_BEFORE =
new Schema.Parser().parse(CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING);

/**
* The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'.
*/
Expand All @@ -89,32 +67,50 @@ public class HoodieCDCUtils {
public static final Schema CDC_SCHEMA_OP_AND_RECORDKEY =
new Schema.Parser().parse(CDC_SCHEMA_OP_AND_RECORDKEY_STRING);

public static final Schema schemaBySupplementalLoggingMode(HoodieCDCSupplementalLoggingMode supplementalLoggingMode) {
switch (supplementalLoggingMode) {
case WITH_BEFORE_AFTER:
return CDC_SCHEMA;
case WITH_BEFORE:
return CDC_SCHEMA_OP_RECORDKEY_BEFORE;
case OP_KEY:
return CDC_SCHEMA_OP_AND_RECORDKEY;
default:
throw new HoodieException("not support this supplemental logging mode: " + supplementalLoggingMode);
public static Schema schemaBySupplementalLoggingMode(
HoodieCDCSupplementalLoggingMode supplementalLoggingMode,
Schema tableSchema) {
if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.OP_KEY) {
return CDC_SCHEMA_OP_AND_RECORDKEY;
} else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.WITH_BEFORE) {
return createCDCSchema(tableSchema, false);
} else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER) {
return createCDCSchema(tableSchema, true);
} else {
throw new HoodieException("not support this supplemental logging mode: " + supplementalLoggingMode);
}
}

private static Schema createCDCSchema(Schema tableSchema, boolean withAfterImage) {
Schema imageSchema = AvroSchemaUtils.createNullableSchema(tableSchema);
Schema.Field opField = new Schema.Field(CDC_OPERATION_TYPE,
AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE);
Schema.Field beforeField = new Schema.Field(
CDC_BEFORE_IMAGE, imageSchema, "", JsonProperties.NULL_VALUE);
List<Schema.Field> fields;
if (withAfterImage) {
Schema.Field tsField = new Schema.Field(CDC_COMMIT_TIMESTAMP,
AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE);
Schema.Field afterField = new Schema.Field(
CDC_AFTER_IMAGE, imageSchema, "", JsonProperties.NULL_VALUE);
fields = Arrays.asList(opField, tsField, beforeField, afterField);
} else {
Schema.Field keyField = new Schema.Field(CDC_RECORD_KEY,
AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE);
fields = Arrays.asList(opField, keyField, beforeField);
}

Schema mergedSchema = Schema.createRecord("CDC", null, tableSchema.getNamespace(), false);
mergedSchema.setFields(fields);
return mergedSchema;
}

/**
* Build the cdc record which has all the cdc fields when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'.
*/
public static GenericData.Record cdcRecord(
String op, String commitTime, GenericRecord before, GenericRecord after) {
String beforeJsonStr = recordToJson(before);
String afterJsonStr = recordToJson(after);
return cdcRecord(op, commitTime, beforeJsonStr, afterJsonStr);
}

public static GenericData.Record cdcRecord(
String op, String commitTime, String before, String after) {
GenericData.Record record = new GenericData.Record(CDC_SCHEMA);
public static GenericData.Record cdcRecord(Schema cdcSchema, String op, String commitTime,
GenericRecord before, GenericRecord after) {
GenericData.Record record = new GenericData.Record(cdcSchema);
record.put(CDC_OPERATION_TYPE, op);
record.put(CDC_COMMIT_TIMESTAMP, commitTime);
record.put(CDC_BEFORE_IMAGE, before);
Expand All @@ -125,20 +121,20 @@ public static GenericData.Record cdcRecord(
/**
* Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'.
*/
public static GenericData.Record cdcRecord(String op, String recordKey, GenericRecord before) {
GenericData.Record record = new GenericData.Record(CDC_SCHEMA_OP_RECORDKEY_BEFORE);
public static GenericData.Record cdcRecord(Schema cdcSchema, String op,
String recordKey, GenericRecord before) {
GenericData.Record record = new GenericData.Record(cdcSchema);
record.put(CDC_OPERATION_TYPE, op);
record.put(CDC_RECORD_KEY, recordKey);
String beforeJsonStr = recordToJson(before);
record.put(CDC_BEFORE_IMAGE, beforeJsonStr);
record.put(CDC_BEFORE_IMAGE, before);
return record;
}

/**
* Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'.
*/
public static GenericData.Record cdcRecord(String op, String recordKey) {
GenericData.Record record = new GenericData.Record(CDC_SCHEMA_OP_AND_RECORDKEY);
public static GenericData.Record cdcRecord(Schema cdcSchema, String op, String recordKey) {
GenericData.Record record = new GenericData.Record(cdcSchema);
record.put(CDC_OPERATION_TYPE, op);
record.put(CDC_RECORD_KEY, recordKey);
return record;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.common.functional;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

Expand All @@ -37,6 +38,7 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFileReader;
Expand Down Expand Up @@ -561,46 +563,79 @@ public void testCDCBlock() throws IOException, InterruptedException {
.withFs(fs)
.build();

GenericRecord record1 = HoodieCDCUtils.cdcRecord("i", "100",
null, "{\"uuid\": 1, \"name\": \"apple\"}, \"ts\": 1100}");
GenericRecord record2 = HoodieCDCUtils.cdcRecord("u", "100",
"{\"uuid\": 2, \"name\": \"banana\"}, \"ts\": 1000}",
"{\"uuid\": 2, \"name\": \"blueberry\"}, \"ts\": 1100}");
GenericRecord record3 = HoodieCDCUtils.cdcRecord("d", "100",
"{\"uuid\": 3, \"name\": \"cherry\"}, \"ts\": 1000}", null);
String dataSchameString = "{\"type\":\"record\",\"name\":\"Record\","
+ "\"fields\":["
+ "{\"name\":\"uuid\",\"type\":[\"int\",\"null\"]},"
+ "{\"name\":\"name\",\"type\":[\"string\",\"null\"]},"
+ "{\"name\":\"ts\",\"type\":[\"long\",\"null\"]}"
+ "]}";
Schema dataSchema = new Schema.Parser().parse(dataSchameString);
Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER, dataSchema);
GenericRecord insertedRecord = new GenericData.Record(dataSchema);
insertedRecord.put("uuid", 1);
insertedRecord.put("name", "apple");
insertedRecord.put("ts", 1100L);

GenericRecord updateBeforeImageRecord = new GenericData.Record(dataSchema);
updateBeforeImageRecord.put("uuid", 2);
updateBeforeImageRecord.put("name", "banana");
updateBeforeImageRecord.put("ts", 1000L);
GenericRecord updateAfterImageRecord = new GenericData.Record(dataSchema);
updateAfterImageRecord.put("uuid", 2);
updateAfterImageRecord.put("name", "blueberry");
updateAfterImageRecord.put("ts", 1100L);

GenericRecord deletedRecord = new GenericData.Record(dataSchema);
deletedRecord.put("uuid", 3);
deletedRecord.put("name", "cherry");
deletedRecord.put("ts", 1000L);

GenericRecord record1 = HoodieCDCUtils.cdcRecord(cdcSchema, "i", "100",
null, insertedRecord);
GenericRecord record2 = HoodieCDCUtils.cdcRecord(cdcSchema, "u", "100",
updateBeforeImageRecord, updateAfterImageRecord);
GenericRecord record3 = HoodieCDCUtils.cdcRecord(cdcSchema, "d", "100",
deletedRecord, null);
List<IndexedRecord> records = new ArrayList<>(Arrays.asList(record1, record2, record3));
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, HoodieCDCUtils.CDC_SCHEMA_STRING);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, cdcSchema.toString());
HoodieDataBlock dataBlock = getDataBlock(HoodieLogBlockType.CDC_DATA_BLOCK, records, header);
writer.appendBlock(dataBlock);
writer.close();

Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), HoodieCDCUtils.CDC_SCHEMA);
Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), cdcSchema);
assertTrue(reader.hasNext());
HoodieLogBlock block = reader.next();
HoodieDataBlock dataBlockRead = (HoodieDataBlock) block;
List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
assertEquals(3, recordsRead.size(),
"Read records size should be equal to the written records size");
assertEquals(dataBlockRead.getSchema(), HoodieCDCUtils.CDC_SCHEMA);
assertEquals(dataBlockRead.getSchema(), cdcSchema);

GenericRecord insert = (GenericRecord) recordsRead.stream()
.filter(record -> record.get(0).toString().equals("i")).findFirst().get();
assertNull(insert.get("before"));
assertNotNull(insert.get("after"));
assertEquals(((GenericRecord) insert.get("after")).get("name").toString(), "apple");

GenericRecord update = (GenericRecord) recordsRead.stream()
.filter(record -> record.get(0).toString().equals("u")).findFirst().get();
assertNotNull(update.get("before"));
assertNotNull(update.get("after"));
assertTrue(update.get("before").toString().contains("banana"));
assertTrue(update.get("after").toString().contains("blueberry"));
GenericRecord uBefore = (GenericRecord) update.get("before");
GenericRecord uAfter = (GenericRecord) update.get("after");
assertEquals(String.valueOf(uBefore.get("name")), "banana");
assertEquals(Long.valueOf(uBefore.get("ts").toString()), 1000L);
assertEquals(String.valueOf(uAfter.get("name")), "blueberry");
assertEquals(Long.valueOf(uAfter.get("ts").toString()), 1100L);

GenericRecord delete = (GenericRecord) recordsRead.stream()
.filter(record -> record.get(0).toString().equals("d")).findFirst().get();
assertNotNull(delete.get("before"));
assertNull(delete.get("after"));
assertEquals(((GenericRecord) delete.get("before")).get("name").toString(), "cherry");

reader.close();
}
Expand Down
Loading