Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
String commitActionType, Map<String, List<String>> partitionToReplaceFileIds) {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = createTable(config, hadoopConf);
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, extraMetadata, operationType, config.getSchema(), commitActionType);
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds,
extraMetadata, operationType, config.getWriteSchema(), commitActionType);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so switching to config.getWriteSchema() here, would affect hive sync? We use the schema from the commit file to sync to hive. So if the write schema is a subset of the table schema, then we can have an issue here. Did you run into any issues like that? I think we could actually write both into the commit metadata.?

Copy link
Author

@pengzhiwei2018 pengzhiwei2018 Apr 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @vinothchandar ,as I described below, The writeSchema is the same to the table schema. So there is no negative effects to the hive sync. I have run the case in our production environment and it works well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below you mentioned that writeSchema will be not the same as inputSchema right? I think inputSchema is what will be equal to the table schema. no?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the writeSchema may not equal to the inputSchema for MergeInto.
The inputSchema is the schema of the incoming records( come from the hoodie.avro.schema) , we use it to parse the bytes to the avro record in the HoodiePayload.
The writeSchema is always equal to the table schema, we use the writeSchema to write the record to the table. The writeSchema come from the hoodie.write.schema if we set this property, or not, we get it from the hoodie.avro.schema.
So here, we pass the writeSchema to the HoodieCommitMetadata.

// Finalize write
finalizeWrite(table, instantTime, stats);
HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,15 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
"hoodie.write.meta.key.prefixes";
public static final String DEFAULT_WRITE_META_KEY_PREFIXES = "";

/**
* The specified write schema. In most case, we do not need set this parameter,
* but for the case the write schema is not equal to the specified table schema, we can
* specify the write schema by this parameter.
*
* Currently the MergeIntoHoodieTableCommand use this to specify the write schema.
*/
public static final String WRITE_SCHEMA_PROP = "hoodie.write.schema";

/**
* HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
* multiple write operations (upsert/buk-insert/...) to be executed within a single commit.
Expand Down Expand Up @@ -230,6 +239,20 @@ public void setSchema(String schemaStr) {
props.setProperty(AVRO_SCHEMA, schemaStr);
}

/**
* Get the write schema for written records.
*
* If the WRITE_SCHEMA has specified, we use the WRITE_SCHEMA.
* Or else we use the AVRO_SCHEMA as the write schema.
* @return
*/
public String getWriteSchema() {
if (props.containsKey(WRITE_SCHEMA_PROP)) {
return props.getProperty(WRITE_SCHEMA_PROP);
}
return getSchema();
}

public boolean getAvroSchemaValidate() {
return Boolean.parseBoolean(props.getProperty(AVRO_SCHEMA_VALIDATE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.hudi.execution;

import java.util.Properties;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.CreateHandleFactory;
Expand Down Expand Up @@ -82,10 +84,10 @@ public static class HoodieInsertValueGenResult<T extends HoodieRecord> {
// It caches the exception seen while fetching insert value.
public Option<Exception> exception = Option.empty();

public HoodieInsertValueGenResult(T record, Schema schema) {
public HoodieInsertValueGenResult(T record, Schema schema, Properties properties) {
this.record = record;
try {
this.insertValue = record.getData().getInsertValue(schema);
this.insertValue = record.getData().getInsertValue(schema, properties);
} catch (Exception e) {
this.exception = Option.of(e);
}
Expand All @@ -96,9 +98,14 @@ public HoodieInsertValueGenResult(T record, Schema schema) {
* Transformer function to help transform a HoodieRecord. This transformer is used by BufferedIterator to offload some
* expensive operations of transformation to the reader thread.
*/
static <T extends HoodieRecordPayload> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformFunction(
Schema schema, HoodieWriteConfig config) {
return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema, config.getProps());
}

static <T extends HoodieRecordPayload> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformFunction(
Schema schema) {
return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema);
return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema, CollectionUtils.EMPTY_PROPERTIES);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,12 @@ protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
private Option<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
Option<Map<String, String>> recordMetadata = hoodieRecord.getData().getMetadata();
try {
Option<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(writerSchema);
Option<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(tableSchema,
config.getProps());
if (avroRecord.isPresent()) {
if (avroRecord.get().equals(IGNORE_RECORD)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still curious how this equals() works with real shuffles i.e data transferred across machines. IIUC the comparision is delegated to Object.equals(). Could this be different on different JVMs? i.e we get a IGNORE_RECORD out of network after shuffle, and its different from the hashCode on the local jvm HoodieWriteHandle.IGNORE_RECORD

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, the IGNORE_RECORD is always used in the single JVM currently, It used in the HoodieXXHandler, so we can not consider the shuffle case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please leave a comment around this in code? if something changes, we will be in trouble.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will add some comment here.

return avroRecord;
}
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
avroRecord = Option.of(rewriteRecord((GenericRecord) avroRecord.get()));
String seqId =
Expand Down Expand Up @@ -336,7 +340,7 @@ public void doAppend() {
protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header) {
try {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchemaWithMetafields.toString());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchemaWithMetaFields.toString());
List<HoodieLogBlock> blocks = new ArrayList<>(2);
if (recordList.size() > 0) {
blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header));
Expand Down Expand Up @@ -444,7 +448,10 @@ private void writeToBuffer(HoodieRecord<T> record) {
}
Option<IndexedRecord> indexedRecord = getIndexedRecord(record);
if (indexedRecord.isPresent()) {
recordList.add(indexedRecord.get());
// Skip the Ignore Record.
if (!indexedRecord.get().equals(IGNORE_RECORD)) {
recordList.add(indexedRecord.get());
}
} else {
keysToDelete.add(record.getKey());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;

Expand All @@ -38,8 +38,7 @@ public class HoodieBootstrapHandle<T extends HoodieRecordPayload, I, K, O> exten
public HoodieBootstrapHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
super(config, commitTime, hoodieTable, partitionPath, fileId,
Pair.of(HoodieAvroUtils.RECORD_KEY_SCHEMA,
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.RECORD_KEY_SCHEMA)), taskContextSupplier);
Option.of(HoodieAvroUtils.RECORD_KEY_SCHEMA), taskContextSupplier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.io.storage.HoodieFileWriter;
Expand Down Expand Up @@ -63,14 +62,14 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends

public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, getWriterSchemaIncludingAndExcludingMetadataPair(config),
this(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(),
taskContextSupplier);
}

public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Pair<Schema, Schema> writerSchemaIncludingAndExcludingMetadataPair,
String partitionPath, String fileId, Option<Schema> overriddenSchema,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, writerSchemaIncludingAndExcludingMetadataPair,
super(config, instantTime, partitionPath, fileId, hoodieTable, overriddenSchema,
taskContextSupplier);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
Expand All @@ -82,7 +81,8 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(getPartitionId());
createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension()));
this.fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, writerSchemaWithMetafields, this.taskContextSupplier);
this.fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config,
writeSchemaWithMetaFields, this.taskContextSupplier);
} catch (IOException e) {
throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e);
}
Expand Down Expand Up @@ -113,6 +113,9 @@ public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
Option recordMetadata = record.getData().getMetadata();
try {
if (avroRecord.isPresent()) {
if (avroRecord.get().equals(IGNORE_RECORD)) {
return;
}
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
Expand Down Expand Up @@ -154,9 +157,9 @@ public void write() {
final String key = keyIterator.next();
HoodieRecord<T> record = recordMap.get(key);
if (useWriterSchema) {
write(record, record.getData().getInsertValue(writerSchemaWithMetafields));
write(record, record.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
} else {
write(record, record.getData().getInsertValue(writerSchema));
write(record, record.getData().getInsertValue(tableSchema, config.getProps()));
}
}
} catch (IOException io) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTab
}

@Override
public Schema getWriterSchemaWithMetafields() {
return writerSchemaWithMetafields;
public Schema getWriterSchemaWithMetaFields() {
return writeSchemaWithMetaFields;
}

public Schema getWriterSchema() {
return writerSchema;
return writeSchema;
}

/**
Expand Down Expand Up @@ -171,7 +171,8 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo
createMarkerFile(partitionPath, newFileName);

// Create the writer for writing the new version file
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, taskContextSupplier);
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config,
writeSchemaWithMetaFields, taskContextSupplier);
} catch (IOException io) {
LOG.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);
Expand All @@ -198,7 +199,7 @@ protected void initializeIncomingRecordsMap() {
long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps());
LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema));
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(tableSchema));
} catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
}
Expand Down Expand Up @@ -284,11 +285,17 @@ public void write(GenericRecord oldRecord) {
HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key));
try {
Option<IndexedRecord> combinedAvroRecord =
hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchemaWithMetafields : writerSchema,
hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
useWriterSchema ? tableSchemaWithMetaFields : tableSchema,
config.getPayloadConfig().getProps());
if (writeUpdateRecord(hoodieRecord, combinedAvroRecord)) {

if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {
// If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
copyOldRecord = true;
} else if (writeUpdateRecord(hoodieRecord, combinedAvroRecord)) {
/*
* ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully write the the combined new
* ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully
* write the the combined new
* value
*
* We no longer need to copy the old record over.
Expand All @@ -308,7 +315,7 @@ public void write(GenericRecord oldRecord) {
fileWriter.writeAvro(key, oldRecord);
} catch (IOException | RuntimeException e) {
String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath, writerSchemaWithMetafields.toString(true));
key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
LOG.debug("Old record is " + oldRecord);
throw new HoodieUpsertException(errMsg, e);
}
Expand All @@ -325,11 +332,14 @@ public List<WriteStatus> close() {
while (newRecordsItr.hasNext()) {
HoodieRecord<T> hoodieRecord = newRecordsItr.next();
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
if (useWriterSchema) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchemaWithMetafields));
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema));
Schema schema = useWriterSchema ? tableSchemaWithMetaFields : tableSchema;
Option<IndexedRecord> insertRecord =
hoodieRecord.getData().getInsertValue(schema, config.getProps());
// just skip the ignore record
if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
continue;
}
writeRecord(hoodieRecord, insertRecord);
insertRecordsWritten++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ public void write(GenericRecord oldRecord) {
}
try {
if (useWriterSchema) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchemaWithMetafields));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields));
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema));
}
insertRecordsWritten++;
writtenRecordKeys.add(keyToPreWrite);
Expand All @@ -109,9 +109,9 @@ public List<WriteStatus> close() {
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
if (useWriterSchema) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchemaWithMetafields));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields));
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema));
}
insertRecordsWritten++;
}
Expand Down
Loading