Skip to content

Commit

Permalink
[HUDI-2429][Stacked On HUDI-2560]Support full schema evolution for spark
Browse files Browse the repository at this point in the history
  • Loading branch information
xiarixiaoyao committed Feb 25, 2022
1 parent b395803 commit 9c09a92
Show file tree
Hide file tree
Showing 41 changed files with 3,051 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
Expand Down Expand Up @@ -71,6 +72,7 @@
import org.apache.hudi.internal.schema.action.TableChange;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.internal.schema.utils.AvroSchemaUtil;
import org.apache.hudi.internal.schema.utils.SchemaChangePersistHelper;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
Expand Down Expand Up @@ -100,6 +102,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;

/**
* Abstract Write Client providing functionality for performing commit, index updates and rollback
* Reused for regular write operations like upsert/insert/bulk-insert.. as well as bootstrap
Expand Down Expand Up @@ -235,6 +239,30 @@ protected void commit(HoodieTable table, String commitActionType, String instant
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Finalize write
finalizeWrite(table, instantTime, stats);
// do save internal schema to support Implicitly add columns in write process
if (!metadata.getExtraMetadata().containsKey(SerDeHelper.LATESTSCHEMA)
&& metadata.getExtraMetadata().containsKey(SCHEMA_KEY) && table.getConfig().getSchemaEvolutionEnable()) {
TableSchemaResolver schemaUtil = new TableSchemaResolver(table.getMetaClient());
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
if (!historySchemaStr.isEmpty()) {
InternalSchema internalSchema = SerDeHelper.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr));
Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new Schema.Parser().parse(config.getSchema()));
InternalSchema evolutionSchema = AvroSchemaUtil.evolutionSchemaFromNewAvroSchema(avroSchema, internalSchema);
if (evolutionSchema.equals(internalSchema)) {
metadata.addMetadata(SerDeHelper.LATESTSCHEMA, SerDeHelper.toJson(evolutionSchema));
schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr);
} else {
evolutionSchema.setSchemaId(Long.parseLong(instantTime));
String newSchemaStr = SerDeHelper.toJson(evolutionSchema);
metadata.addMetadata(SerDeHelper.LATESTSCHEMA, newSchemaStr);
schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(evolutionSchema, historySchemaStr));
}
// update SCHEMA_KEY
metadata.addMetadata(SCHEMA_KEY, AvroInternalSchemaConverter.convert(evolutionSchema, avroSchema.getName()).toString());
}
}
// update Metadata table
writeTableMetadata(table, instantTime, commitActionType, metadata);
activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime),
Expand Down Expand Up @@ -1276,8 +1304,8 @@ protected void setWriteSchemaForDeletes(HoodieTableMetaClient metaClient) {
if (lastInstant.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
activeTimeline.getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class);
if (commitMetadata.getExtraMetadata().containsKey(HoodieCommitMetadata.SCHEMA_KEY)) {
config.setSchema(commitMetadata.getExtraMetadata().get(HoodieCommitMetadata.SCHEMA_KEY));
if (commitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY)) {
config.setSchema(commitMetadata.getExtraMetadata().get(SCHEMA_KEY));
} else {
throw new HoodieIOException("Latest commit does not have any schema in commit metadata");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,17 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "implementations of HoodieRecordPayload to convert incoming records to avro. This is also used as the write schema "
+ "evolving records during an update.");

public static final ConfigProperty<String> INTERNAL_SCHEMA_STRING = ConfigProperty
.key("hoodie.internal.schema")
.noDefaultValue()
.withDocumentation("Schema string representing the latest schema of the table. Hudi passes this to "
+ "implementations of evolution of schema");

public static final ConfigProperty<Boolean> SCHEMA_EVOLUTION_ENABLE = ConfigProperty
.key("hoodie.schema.evolution.enable")
.defaultValue(false)
.withDocumentation("enable full schema evolution for hoodie");

public static final ConfigProperty<String> AVRO_SCHEMA_VALIDATE_ENABLE = ConfigProperty
.key("hoodie.avro.schema.validate")
.defaultValue("false")
Expand Down Expand Up @@ -869,6 +880,22 @@ public void setSchema(String schemaStr) {
setValue(AVRO_SCHEMA_STRING, schemaStr);
}

public String getInternalSchema() {
return getString(INTERNAL_SCHEMA_STRING);
}

public void setInternalSchemaString(String internalSchemaString) {
setValue(INTERNAL_SCHEMA_STRING, internalSchemaString);
}

public boolean getSchemaEvolutionEnable() {
return getBoolean(SCHEMA_EVOLUTION_ENABLE);
}

public void setSchemaEvolutionEnable(boolean enable) {
setValue(SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable));
}

/**
* Get the write schema for written records.
*
Expand Down Expand Up @@ -2011,6 +2038,16 @@ public Builder withSchema(String schemaStr) {
return this;
}

public Builder withInternalSchema(String internalSchemaStr) {
writeConfig.setValue(INTERNAL_SCHEMA_STRING, internalSchemaStr);
return this;
}

public Builder withSchemaEvolutionEnable(boolean enable) {
writeConfig.setValue(SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable));
return this;
}

public Builder withAvroSchemaValidate(boolean enable) {
writeConfig.setValue(AVRO_SCHEMA_VALIDATE_ENABLE, String.valueOf(enable));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.table.HoodieCompactionHandler;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -117,8 +119,10 @@ public HoodieData<WriteStatus> compact(
// log file.That is because in the case of MergeInto, the config.getSchema may not
// the same with the table schema.
try {
Schema readerSchema = schemaResolver.getTableAvroSchema(false);
config.setSchema(readerSchema.toString());
if (config.getInternalSchema() == null) {
Schema readerSchema = schemaResolver.getTableAvroSchema(false);
config.setSchema(readerSchema.toString());
}
} catch (Exception e) {
// If there is no commit in the table, just ignore the exception.
}
Expand All @@ -145,9 +149,15 @@ public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
String instantTime,
TaskContextSupplier taskContextSupplier) throws IOException {
FileSystem fs = metaClient.getFs();

Schema readerSchema = HoodieAvroUtils.addMetadataFields(
new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
Schema readerSchema;
Option<InternalSchema> internalSchemaOption = Option.empty();
if (config.getInternalSchema() != null) {
readerSchema = new Schema.Parser().parse(config.getSchema());
internalSchemaOption = SerDeHelper.fromJson(config.getInternalSchema());
} else {
readerSchema = HoodieAvroUtils.addMetadataFields(
new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
}
LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames()
+ " for commit " + instantTime);
// TODO - FIX THIS
Expand All @@ -172,6 +182,7 @@ public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
.withLogFilePaths(logFiles)
.withReaderSchema(readerSchema)
.withLatestInstantTime(maxInstantTime)
.withInternalSchema(internalSchemaOption.orElse(null))
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
.withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
.withReverseReader(config.getCompactionReverseLogReadEnabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TableInternalSchemaUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.table.HoodieCompactionHandler;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
Expand Down Expand Up @@ -70,6 +73,14 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
HoodieCompactionPlan compactionPlan =
CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime);

// try to load internalSchema to support schema Evolution
Pair<Option<String>, Option<String>> schemaPair = TableInternalSchemaUtils
.getInternalSchemaAndAvroSchemaForClusteringAndCompaction(table.getMetaClient(), instantTime);
if (schemaPair.getLeft().isPresent() && schemaPair.getRight().isPresent()) {
config.setInternalSchemaString(schemaPair.getLeft().get());
config.setSchema(schemaPair.getRight().get());
}

HoodieData<WriteStatus> statuses = compactor.compact(
context, compactionPlan, table, config, instantTime, compactionHandler);

Expand All @@ -81,7 +92,9 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
metadata.addWriteStat(stat.getPartitionPath(), stat);
}
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());

if (schemaPair.getLeft().isPresent()) {
metadata.addMetadata(SerDeHelper.LATESTSCHEMA, schemaPair.getLeft().get());
}
compactionMetadata.setWriteStatuses(statuses);
compactionMetadata.setCommitted(false);
compactionMetadata.setCommitMetadata(Option.of(metadata));
Expand Down

0 comments on commit 9c09a92

Please sign in to comment.