Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1160] Support update partial fields for CoW table #2666

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = createTable(config, hadoopConf);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, extraMetadata, operationType, config.getSchema(), commitActionType);
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, extraMetadata, operationType, config.getSchema(),
commitActionType, config.updatePartialFields(), table.getMetaClient());
// Finalize write
finalizeWrite(table, instantTime, stats);
HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version";
public static final String BASE_PATH_PROP = "hoodie.base.path";
public static final String AVRO_SCHEMA = "hoodie.avro.schema";
public static final String LAST_AVRO_SCHEMA = "hoodie.last.avro.schema";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be we can call this as "latest table schema".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i think it's good

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 Can you help answer your questions about SQL?

public static final String AVRO_SCHEMA_VALIDATE = "hoodie.avro.schema.validate";
public static final String DEFAULT_AVRO_SCHEMA_VALIDATE = "false";
public static final String DEFAULT_PARALLELISM = "1500";
Expand Down Expand Up @@ -113,6 +115,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT
.toString();

// Enable partial field updates
public static final String UPDATE_PARTIAL_FIELDS = "hoodie.update.partial.fields";
public static final String DEFAULT_UPDATE_PARTIAL_FIELDS = "false";

public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";
public static final String EMBEDDED_TIMELINE_SERVER_PORT = "hoodie.embed.timeline.server.port";
Expand Down Expand Up @@ -221,13 +227,24 @@ public String getBasePath() {
}

public String getSchema() {
if (updatePartialFields() && !StringUtils.isNullOrEmpty(getLastSchema())) {
return getLastSchema();
}
return props.getProperty(AVRO_SCHEMA);
}

public void setSchema(String schemaStr) {
props.setProperty(AVRO_SCHEMA, schemaStr);
}

public String getLastSchema() {
return props.getProperty(LAST_AVRO_SCHEMA);
}

public void setLastSchema(String schemaStr) {
props.setProperty(LAST_AVRO_SCHEMA, schemaStr);
}

public boolean getAvroSchemaValidate() {
return Boolean.parseBoolean(props.getProperty(AVRO_SCHEMA_VALIDATE));
}
Expand Down Expand Up @@ -369,6 +386,10 @@ public BulkInsertSortMode getBulkInsertSortMode() {
return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
}

public Boolean updatePartialFields() {
return Boolean.parseBoolean(props.getProperty(UPDATE_PARTIAL_FIELDS));
}

public boolean isMergeDataValidationCheckEnabled() {
return Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED));
}
Expand Down Expand Up @@ -1086,6 +1107,11 @@ public Builder withSchema(String schemaStr) {
return this;
}

public Builder withLastSchema(String schemaStr) {
props.setProperty(LAST_AVRO_SCHEMA, schemaStr);
return this;
}

public Builder withAvroSchemaValidate(boolean enable) {
props.setProperty(AVRO_SCHEMA_VALIDATE, String.valueOf(enable));
return this;
Expand Down Expand Up @@ -1291,6 +1317,11 @@ public Builder withExternalSchemaTrasformation(boolean enabled) {
return this;
}

public Builder withUpdatePartialFields(boolean updatePartialFields) {
props.setProperty(UPDATE_PARTIAL_FIELDS, String.valueOf(updatePartialFields));
return this;
}

public Builder withMergeDataValidationCheckEnabled(boolean enabled) {
props.setProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED, String.valueOf(enabled));
return this;
Expand Down Expand Up @@ -1333,6 +1364,7 @@ protected void setDefaults() {
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(UPDATE_PARTIAL_FIELDS), UPDATE_PARTIAL_FIELDS, DEFAULT_UPDATE_PARTIAL_FIELDS);

setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM,
DEFAULT_ROLLBACK_PARALLELISM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -82,7 +81,7 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(getPartitionId());
createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension()));
this.fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, writerSchemaWithMetafields, this.taskContextSupplier);
this.fileWriter = createNewFileWriter(instantTime, path, hoodieTable, config, writerSchemaWithMetafields, this.taskContextSupplier);
} catch (IOException e) {
throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.io;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -29,10 +30,13 @@
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
Expand Down Expand Up @@ -106,7 +110,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
super(config, instantTime, partitionPath, fileId, hoodieTable, getWriterSchemaIncludingAndExcludingMetadataPair(config, hoodieTable), taskContextSupplier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand something. Here we have HoodieWriteConfig at two places. As a separate entity (1st arg in constructor) and HoodieTable.getConfig(). I see within getWriterSchemaIncludingAndExcludingMetadataPair(...), we update lastSchema in hoodieWriteConfig, which will update the 1st arg. But table.getConfig() may not be updated right.
If above statement is right, how do we rely on table.getConfig.getLastSchema() in *MergeHelper classes.
May be I am missing something. can you throw some light.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found through debug,when config.setLastSchema is successfully updated, table.getConfig.getLastSchema is also updated, and they both take their values from the same properties

init(fileId, recordItr);
init(fileId, partitionPath, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get());
}
Expand All @@ -123,6 +127,22 @@ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTab
init(fileId, this.partitionPath, dataFileToBeMerged);
}

protected static Pair<Schema, Schema> getWriterSchemaIncludingAndExcludingMetadataPair(HoodieWriteConfig config, HoodieTable hoodieTable) {
Schema originalSchema = new Schema.Parser().parse(config.getSchema());
Schema hoodieSchema = HoodieAvroUtils.addMetadataFields(originalSchema);
boolean updatePartialFields = config.updatePartialFields();
if (updatePartialFields) {
try {
TableSchemaResolver resolver = new TableSchemaResolver(hoodieTable.getMetaClient());
Schema lastSchema = resolver.getTableAvroSchema();
config.setLastSchema(lastSchema.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastSchema could be null if this is first commit or last commit is an operation which does not inject schema in commit metadata. Can we check if not null and then set the last schema. If not, .toString() could throw NullPointerException.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

infact, would prefer to throw an exception if table schema is not present and if someone is trying to use updatePartialFields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastSchema could be null if this is first commit or last commit is an operation which does not inject schema in commit metadata. Can we check if not null and then set the last schema. If not, .toString() could throw NullPointerException.

ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

infact, would prefer to throw an exception if table schema is not present and if someone is trying to use updatePartialFields.

Is this too serious? I don't think it's necessary to throw this exception here

} catch (Exception e) {
// Ignore exception.
}
}
return Pair.of(originalSchema, hoodieSchema);
}

@Override
public Schema getWriterSchemaWithMetafields() {
return writerSchemaWithMetafields;
Expand All @@ -144,6 +164,7 @@ protected String generatesDataFileName() {
*/
private void init(String fileId, String partitionPath, HoodieBaseFile baseFileToMerge) {
LOG.info("partitionPath:" + partitionPath + ", fileId to be merged:" + fileId);
writerSchemaWithMetafields = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
this.baseFileToMerge = baseFileToMerge;
this.writtenRecordKeys = new HashSet<>();
writeStatus.setStat(new HoodieWriteStat());
Expand Down Expand Up @@ -266,6 +287,13 @@ protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord
return false;
}

protected GenericRecord rewriteRecord(GenericRecord record) {
if (config.updatePartialFields() && !StringUtils.isNullOrEmpty(config.getLastSchema())) {
return HoodieAvroUtils.rewriteRecord(record, new Schema.Parser().parse(config.getLastSchema()));
}
return HoodieAvroUtils.rewriteRecord(record, writerSchemaWithMetafields);
}

/**
* Go through an old record. Here if we detect a newer version shows up, we write the new one to the file.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
private static final Logger LOG = LogManager.getLogger(HoodieWriteHandle.class);

protected final Schema writerSchema;
protected final Schema writerSchemaWithMetafields;
protected Schema writerSchemaWithMetafields;
protected HoodieTimer timer;
protected WriteStatus writeStatus;
protected final String partitionPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMeta
LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType());
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType(), config.updatePartialFields(), table.getMetaClient());

activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -79,7 +80,11 @@ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List
} else {
gReader = null;
gWriter = null;
readSchema = mergeHandle.getWriterSchemaWithMetafields();
if (table.getConfig().updatePartialFields() && !StringUtils.isNullOrEmpty(table.getConfig().getLastSchema())) {
readSchema = new Schema.Parser().parse(table.getConfig().getLastSchema());
} else {
readSchema = upsertHandle.getWriterSchemaWithMetafields();
}
}

BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMeta
LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType());
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType(), config.updatePartialFields(), table.getMetaClient());

activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -76,7 +77,11 @@ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List
} else {
gReader = null;
gWriter = null;
readSchema = mergeHandle.getWriterSchemaWithMetafields();
if (table.getConfig().updatePartialFields() && !StringUtils.isNullOrEmpty(table.getConfig().getLastSchema())) {
readSchema = new Schema.Parser().parse(table.getConfig().getLastSchema());
} else {
readSchema = upsertHandle.getWriterSchemaWithMetafields();
}
}

BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
.map(inputGroup -> runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams()))
.map(CompletableFuture::join)
.reduce((rdd1, rdd2) -> rdd1.union(rdd2)).orElse(engineContext.emptyRDD());

HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = buildWriteMetadata(writeStatusRDD);
JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
Expand All @@ -103,7 +103,7 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
commitOnAutoCommit(writeMetadata);
if (!writeMetadata.getCommitMetadata().isPresent()) {
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeStatusRDD.map(WriteStatus::getStat).collect(), writeMetadata.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType(), config.updatePartialFields(), table.getMetaClient());
writeMetadata.setCommitMetadata(Option.of(commitMetadata));
}
return writeMetadata;
Expand All @@ -112,7 +112,7 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
/**
* Validate actions taken by clustering. In the first implementation, we validate at least one new file is written.
* But we can extend this to add more validation. E.g. number of records read = number of records written etc.
*
*
* We can also make these validations in BaseCommitActionExecutor to reuse pre-commit hooks for multiple actions.
*/
private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
Expand Down Expand Up @@ -211,7 +211,7 @@ private JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> readRecordsForGroup
iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema));
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMeta
LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType());
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType(), config.updatePartialFields(), table.getMetaClient());

activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
Expand Down Expand Up @@ -75,7 +76,11 @@ public void runMerge(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>
} else {
gReader = null;
gWriter = null;
readSchema = mergeHandle.getWriterSchemaWithMetafields();
if (table.getConfig().updatePartialFields() && !StringUtils.isNullOrEmpty(table.getConfig().getLastSchema())) {
readSchema = new Schema.Parser().parse(table.getConfig().getLastSchema());
} else {
readSchema = upsertHandle.getWriterSchemaWithMetafields();
}
}

BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
Expand Down
Loading