Skip to content

Commit

Permalink
[HUDI-3921] Fixed schema evolution cannot work with HUDI-3855 (#5376)
Browse files Browse the repository at this point in the history
- when columns names are renamed (schema evolution enabled), while copying records from old data file with HoodieMergeHande, renamed columns wasn't handled well.
  • Loading branch information
xiarixiaoyao committed Apr 21, 2022
1 parent de5fa1f commit 037f89e
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.HashMap;

import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;

/**
* Base class for all write operations logically performed at the file group level.
Expand Down Expand Up @@ -98,6 +101,8 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
protected final String fileId;
protected final String writeToken;
protected final TaskContextSupplier taskContextSupplier;
// For full schema evolution
protected final boolean schemaOnReadEnabled;

public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath,
String fileId, HoodieTable<T, I, K, O> hoodieTable, TaskContextSupplier taskContextSupplier) {
Expand All @@ -120,6 +125,7 @@ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String
!hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
this.taskContextSupplier = taskContextSupplier;
this.writeToken = makeWriteToken();
schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
}

/**
Expand Down Expand Up @@ -224,11 +230,13 @@ public void write(HoodieRecord record, Option<IndexedRecord> avroRecord, Option<
* Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields.
*/
protected GenericRecord rewriteRecord(GenericRecord record) {
return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
return schemaOnReadEnabled ? HoodieAvroUtils.rewriteRecordWithNewSchema(record, writeSchemaWithMetaFields, new HashMap<>())
: HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
}

protected GenericRecord rewriteRecordWithMetadata(GenericRecord record, String fileName) {
return HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName);
return schemaOnReadEnabled ? HoodieAvroUtils.rewriteEvolutionRecordWithMetadata(record, writeSchemaWithMetaFields, fileName)
: HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName);
}

public abstract List<WriteStatus> close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
Expand All @@ -52,6 +53,8 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
Expand Down Expand Up @@ -93,6 +96,7 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood

Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema());
boolean needToReWriteRecord = false;
Map<String, String> renameCols = new HashMap<>();
// TODO support bootstrap
if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) {
// check implicitly add columns, and position reorder(spark sql may change cols order)
Expand All @@ -109,10 +113,14 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood
&& writeInternalSchema.findIdByName(f) == querySchema.findIdByName(f)
&& writeInternalSchema.findIdByName(f) != -1
&& writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
readSchema = AvroInternalSchemaConverter.convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false).mergeSchema(), readSchema.getName());
readSchema = AvroInternalSchemaConverter
.convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false, false).mergeSchema(), readSchema.getName());
Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
|| SchemaCompatibility.checkReaderWriterCompatibility(writeSchemaFromFile, readSchema).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
|| SchemaCompatibility.checkReaderWriterCompatibility(readSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
if (needToReWriteRecord) {
renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
}
}

try {
Expand All @@ -121,7 +129,7 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
} else {
if (needToReWriteRecord) {
readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema);
readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema, renameCols);
} else {
readerIterator = reader.getRecordIterator(readSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Deque;
import java.util.LinkedList;
import java.util.TimeZone;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -405,6 +407,14 @@ public static GenericRecord rewriteRecordWithMetadata(GenericRecord genericRecor
return newRecord;
}

// TODO Unify the logical of rewriteRecordWithMetadata and rewriteEvolutionRecordWithMetadata, and delete this function.
public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) {
GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new HashMap<>());
// do not preserve FILENAME_METADATA_FIELD
newRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, fileName);
return newRecord;
}

/**
* Converts list of {@link GenericRecord} provided into the {@link GenericRecord} adhering to the
* provided {@code newSchema}.
Expand Down Expand Up @@ -719,14 +729,28 @@ public static Object getRecordColumnValues(HoodieRecord<? extends HoodieRecordPa
*
* @param oldRecord oldRecord to be rewritten
* @param newSchema newSchema used to rewrite oldRecord
* @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
* @return newRecord for new Schema
*/
public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema) {
Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema);
public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema, Map<String, String> renameCols) {
Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>());
return (GenericData.Record) newRecord;
}

private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema) {
/**
* Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new schema.
* support deep rewrite for nested record and adjust rename operation.
* This particular method does the following things :
* a) Create a new empty GenericRecord with the new schema.
* b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this transformed schema
*
* @param oldRecord oldRecord to be rewritten
* @param newSchema newSchema used to rewrite oldRecord
* @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
* @param fieldNames track the full name of visited field when we travel new schema.
* @return newRecord for new Schema
*/
private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) {
if (oldRecord == null) {
return null;
}
Expand All @@ -741,10 +765,23 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch

for (int i = 0; i < fields.size(); i++) {
Schema.Field field = fields.get(i);
String fieldName = field.name();
fieldNames.push(fieldName);
if (oldSchema.getField(field.name()) != null) {
Schema.Field oldField = oldSchema.getField(field.name());
helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema()));
helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
} else {
String fieldFullName = createFullName(fieldNames);
String[] colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.");
String lastColNameFromOldSchema = colNamePartsFromOldSchema[colNamePartsFromOldSchema.length - 1];
// deal with rename
if (oldSchema.getField(field.name()) == null && oldSchema.getField(lastColNameFromOldSchema) != null) {
// find rename
Schema.Field oldField = oldSchema.getField(lastColNameFromOldSchema);
helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
}
}
fieldNames.pop();
}
GenericData.Record newRecord = new GenericData.Record(newSchema);
for (int i = 0; i < fields.size(); i++) {
Expand All @@ -765,27 +802,41 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
}
Collection array = (Collection)oldRecord;
List<Object> newArray = new ArrayList();
fieldNames.push("element");
for (Object element : array) {
newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType()));
newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames));
}
fieldNames.pop();
return newArray;
case MAP:
if (!(oldRecord instanceof Map)) {
throw new IllegalArgumentException("cannot rewrite record with different type");
}
Map<Object, Object> map = (Map<Object, Object>) oldRecord;
Map<Object, Object> newMap = new HashMap<>();
fieldNames.push("value");
for (Map.Entry<Object, Object> entry : map.entrySet()) {
newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType()));
newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames));
}
fieldNames.pop();
return newMap;
case UNION:
return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord));
return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames);
default:
return rewritePrimaryType(oldRecord, oldSchema, newSchema);
}
}

private static String createFullName(Deque<String> fieldNames) {
String result = "";
if (!fieldNames.isEmpty()) {
List<String> parentNames = new ArrayList<>();
fieldNames.descendingIterator().forEachRemaining(parentNames::add);
result = parentNames.stream().collect(Collectors.joining("."));
}
return result;
}

private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Schema newSchema) {
Schema realOldSchema = oldSchema;
if (realOldSchema.getType() == UNION) {
Expand Down Expand Up @@ -958,9 +1009,10 @@ private static Schema getActualSchemaFromUnion(Schema schema, Object data) {
*
* @param oldRecords oldRecords to be rewrite
* @param newSchema newSchema used to rewrite oldRecord
* @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
* @return a iterator of rewrote GeneriRcords
*/
public static Iterator<GenericRecord> rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema newSchema) {
public static Iterator<GenericRecord> rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema newSchema, Map<String, String> renameCols) {
if (oldRecords == null || newSchema == null) {
return Collections.emptyIterator();
}
Expand All @@ -972,7 +1024,7 @@ public boolean hasNext() {

@Override
public GenericRecord next() {
return rewriteRecordWithNewSchema(oldRecords.next(), newSchema);
return rewriteRecordWithNewSchema(oldRecords.next(), newSchema, renameCols);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -379,7 +380,7 @@ private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpec
Option<Schema> schemaOption = getMergedSchema(dataBlock);
while (recordIterator.hasNext()) {
IndexedRecord currentRecord = recordIterator.next();
IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get()) : currentRecord;
IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get(), new HashMap<>()) : currentRecord;
processNextRecord(createHoodieRecord(record, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN,
this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName));
totalLogRecords.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,25 @@ public class InternalSchemaMerger {
// we can pass decimalType to reWriteRecordWithNewSchema directly, everything is ok.
private boolean useColumnTypeFromFileSchema = true;

// deal with rename
// Whether to use column name from file schema to read files when we find some column name has changed.
// spark parquetReader need the original column name to read data, otherwise the parquetReader will read nothing.
// eg: current column name is colOldName, now we rename it to colNewName,
// we should not pass colNewName to parquetReader, we must pass colOldName to it; when we read out the data.
// for log reader
// since our reWriteRecordWithNewSchema function support rewrite directly, so we no need this parameter
// eg: current column name is colOldName, now we rename it to colNewName,
// we can pass colNewName to reWriteRecordWithNewSchema directly, everything is ok.
private boolean useColNameFromFileSchema = true;

public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchema, boolean ignoreRequiredAttribute, boolean useColumnTypeFromFileSchema, boolean useColNameFromFileSchema) {
this.fileSchema = fileSchema;
this.querySchema = querySchema;
this.ignoreRequiredAttribute = ignoreRequiredAttribute;
this.useColumnTypeFromFileSchema = useColumnTypeFromFileSchema;
this.useColNameFromFileSchema = useColNameFromFileSchema;
}

public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchema, boolean ignoreRequiredAttribute, boolean useColumnTypeFromFileSchema) {
this.fileSchema = fileSchema;
this.querySchema = querySchema;
Expand Down Expand Up @@ -131,12 +150,15 @@ private List<Types.Field> buildRecordType(List<Types.Field> oldFields, List<Type
private Types.Field dealWithRename(int fieldId, Type newType, Types.Field oldField) {
Types.Field fieldFromFileSchema = fileSchema.findField(fieldId);
String nameFromFileSchema = fieldFromFileSchema.name();
String nameFromQuerySchema = querySchema.findField(fieldId).name();
Type typeFromFileSchema = fieldFromFileSchema.type();
// Current design mechanism guarantees nestedType change is not allowed, so no need to consider.
if (newType.isNestedType()) {
return Types.Field.get(oldField.fieldId(), oldField.isOptional(), nameFromFileSchema, newType, oldField.doc());
return Types.Field.get(oldField.fieldId(), oldField.isOptional(),
useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, newType, oldField.doc());
} else {
return Types.Field.get(oldField.fieldId(), oldField.isOptional(), nameFromFileSchema, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc());
return Types.Field.get(oldField.fieldId(), oldField.isOptional(),
useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,20 @@ public static String createFullName(String name, Deque<String> fieldNames) {
}
return result;
}

/**
* Try to find all renamed cols between oldSchema and newSchema.
*
* @param oldSchema oldSchema
* @param newSchema newSchema which modified from oldSchema
* @return renameCols Map. (k, v) -> (colNameFromNewSchema, colNameFromOldSchema)
*/
public static Map<String, String> collectRenameCols(InternalSchema oldSchema, InternalSchema newSchema) {
List<String> colNamesFromWriteSchema = oldSchema.getAllColsFullName();
return colNamesFromWriteSchema.stream().filter(f -> {
int filedIdFromWriteSchema = oldSchema.findIdByName(f);
// try to find the cols which has the same id, but have different colName;
return newSchema.getAllIds().contains(filedIdFromWriteSchema) && !newSchema.findfullName(filedIdFromWriteSchema).equalsIgnoreCase(f);
}).collect(Collectors.toMap(e -> newSchema.findfullName(oldSchema.findIdByName(e)), e -> e));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public void testReWriteRecordWithTypeChanged() {
.updateColumnType("col6", Types.StringType.get());
InternalSchema newSchema = SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange);
Schema newAvroSchema = AvroInternalSchemaConverter.convert(newSchema, avroSchema.getName());
GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema);
GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap<>());

Assertions.assertEquals(GenericData.get().validate(newAvroSchema, newRecord), true);
}
Expand Down Expand Up @@ -349,7 +349,7 @@ public void testReWriteNestRecord() {
);

Schema newAvroSchema = AvroInternalSchemaConverter.convert(newRecord, schema.getName());
GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema);
GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap<>());
// test the correctly of rewrite
Assertions.assertEquals(GenericData.get().validate(newAvroSchema, newAvroRecord), true);
}
Expand Down
Loading

0 comments on commit 037f89e

Please sign in to comment.