Skip to content

Commit

Permalink
try to get internalSchema by parser commit file/history file directly…
Browse files Browse the repository at this point in the history
…, not use metaclient which is time cost

address some comments
  • Loading branch information
xiarixiaoyao committed Mar 23, 2022
1 parent f63e59e commit 85cc0c3
Show file tree
Hide file tree
Showing 27 changed files with 205 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,18 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime);

// try to load internalSchema to support schema Evolution
HoodieWriteConfig configCopy = config;
Pair<Option<String>, Option<String>> schemaPair = InternalSchemaCache
.getInternalSchemaAndAvroSchemaForClusteringAndCompaction(table.getMetaClient(), instantTime);
if (schemaPair.getLeft().isPresent() && schemaPair.getRight().isPresent()) {
config.setInternalSchemaString(schemaPair.getLeft().get());
config.setSchema(schemaPair.getRight().get());
// should not influence the original config, just copy it
configCopy = HoodieWriteConfig.newBuilder().withProperties(config.getProps()).build();
configCopy.setInternalSchemaString(schemaPair.getLeft().get());
configCopy.setSchema(schemaPair.getRight().get());
}

HoodieData<WriteStatus> statuses = compactor.compact(
context, compactionPlan, table, config, instantTime, compactionHandler);
context, compactionPlan, table, configCopy, instantTime, compactionHandler);

compactor.maybePersist(statuses, config);
context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ private SparkInternalSchemaConverter() {

public static final String HOODIE_QUERY_SCHEMA = "hoodie.schema.internal.querySchema";
public static final String HOODIE_TABLE_PATH = "hoodie.tablePath";
public static final String HOODIE_VALID_COMMITS_LIST = "hoodie.valid.commits.list";
/**
* Converts a spark schema to an hudi internal schema. Fields without IDs are kept and assigned fallback IDs.
*
Expand Down Expand Up @@ -155,7 +156,7 @@ public static Type buildTypeFromStructType(DataType sparkType, Boolean firstVisi
}

/**
* Converts a spark schema to an hudi internal schema, and prunes fields.
* Converts Spark schema to Hudi internal schema, and prune fields.
* Fields without IDs are kept and assigned fallback IDs.
*
* @param sparkSchema a pruned spark schema
Expand All @@ -168,7 +169,7 @@ public static InternalSchema convertAndPruneStructTypeToInternalSchema(StructTyp
}

/**
* collect all the leaf nodes names.
* Collect all the leaf nodes names.
*
* @param sparkSchema a spark schema
* @return leaf nodes full names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import com.github.benmanes.caffeine.cache.Caffeine;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand All @@ -33,7 +37,10 @@
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -138,21 +145,74 @@ public static Pair<Option<String>, Option<String>> getInternalSchemaAndAvroSchem
HoodieTimeline timelineBeforeCurrentCompaction = metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBefore(compactionAndClusteringInstant);
Option<HoodieInstant> lastInstantBeforeCurrentCompaction = timelineBeforeCurrentCompaction.lastInstant();
if (lastInstantBeforeCurrentCompaction.isPresent()) {
// try to find internalSchema
byte[] data = timelineBeforeCurrentCompaction.getInstantDetails(lastInstantBeforeCurrentCompaction.get()).get();
HoodieCommitMetadata metadata;
try {
// try to find internalSchema
byte[] data = timelineBeforeCurrentCompaction.getInstantDetails(lastInstantBeforeCurrentCompaction.get()).get();
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
String internalSchemaStr = metadata.getMetadata(SerDeHelper.LATEST_SCHEMA);
if (internalSchemaStr != null) {
String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
return Pair.of(Option.of(internalSchemaStr), Option.of(existingSchemaStr));
}
metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
} catch (Exception e) {
// swallow this exception. If we has not done some DDL, load internalSchema will failed,
// schema evolution should not affect the main process of current compaction.
throw new HoodieException(String.format("cannot read metadata from commit: %s", lastInstantBeforeCurrentCompaction.get()), e);
}
String internalSchemaStr = metadata.getMetadata(SerDeHelper.LATEST_SCHEMA);
if (internalSchemaStr != null) {
String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
return Pair.of(Option.of(internalSchemaStr), Option.of(existingSchemaStr));
}
}
return Pair.of(Option.empty(), Option.empty());
}

/**
* Give a schema versionId return its internalSchema.
* This method will be called by spark tasks, we should minimize time cost.
* We try our best to not use metaClient, since the initialization of metaClient is time cost
* step1:
* try to parser internalSchema from HoodieInstant directly
* step2:
* if we cannot parser internalSchema in step1,
* try to find internalSchema in historySchema.
*
* @param versionId the internalSchema version to be search.
* @param tablePath table path
* @param hadoopConf conf
* @param validCommits current validate commits, use to make up the commit file path/verify the validity of the history schema files
* @return a internalSchema.
*/
public static InternalSchema getInternalSchemaByVersionId(long versionId, String tablePath, Configuration hadoopConf, String validCommits) {
Set<String> commitSet = Arrays.stream(validCommits.split(",")).collect(Collectors.toSet());
List<String> validateCommitList = commitSet.stream().map(fileName -> {
String fileExtension = HoodieInstant.getTimelineFileExtension(fileName);
return fileName.replace(fileExtension, "");
}).collect(Collectors.toList());

FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
Path hoodieMetaPath = new Path(tablePath, HoodieTableMetaClient.METAFOLDER_NAME);
//step1:
Path candidateCommitFile = commitSet.stream().filter(fileName -> {
String fileExtension = HoodieInstant.getTimelineFileExtension(fileName);
return fileName.replace(fileExtension, "").equals(versionId + "");
}).findFirst().map(f -> new Path(hoodieMetaPath, f)).orElse(null);
if (candidateCommitFile != null) {
try {
byte[] data;
try (FSDataInputStream is = fs.open(candidateCommitFile)) {
data = FileIOUtils.readAsByteArray(is);
} catch (IOException e) {
throw e;
}
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
String latestInternalSchemaStr = metadata.getMetadata(SerDeHelper.LATEST_SCHEMA);
if (latestInternalSchemaStr != null) {
return SerDeHelper.fromJson(latestInternalSchemaStr).orElse(null);
}
} catch (Exception e1) {
// swallow this exception.
}
}
// step2:
FileBasedInternalSchemaStorageManager fileBasedInternalSchemaStorageManager = new FileBasedInternalSchemaStorageManager(hadoopConf, new Path(tablePath));
String lastestHistorySchema = fileBasedInternalSchemaStorageManager.getHistorySchemaStrByGivenValidCommits(validateCommitList);
return InternalSchemaUtils.searchSchema(versionId, SerDeHelper.parseSchemas(lastestHistorySchema));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private Map<Integer, Field> buildIdToField() {
}

/**
* get all columns full name.
* Get all columns full name.
*/
public List<String> getAllColsFullName() {
if (nameToId == null) {
Expand All @@ -125,7 +125,7 @@ public List<String> getAllColsFullName() {
}

/**
* set the version ID for this schema.
* Set the version ID for this schema.
*/
public InternalSchema setSchemaId(long versionId) {
this.versionId = versionId;
Expand All @@ -140,7 +140,7 @@ public long schemaId() {
}

/**
* set the version ID for this schema.
* Set the version ID for this schema.
*/
public void setMax_column_id(int maxColumnId) {
this.maxColumnId = maxColumnId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private InternalSchemaBuilder() {


/**
* build a mapping from id to full field name for a internal Type.
* Build a mapping from id to full field name for a internal Type.
* if a field y belong to a struct filed x, then the full name of y is x.y
*
* @param type hoodie internal type
Expand All @@ -58,7 +58,7 @@ public Map<Integer, String> buildIdToName(Type type) {
}

/**
* build a mapping from full field name to id for a internal Type.
* Build a mapping from full field name to id for a internal Type.
* if a field y belong to a struct filed x, then the full name of y is x.y
*
* @param type hoodie internal type
Expand All @@ -69,7 +69,7 @@ public Map<String, Integer> buildNameToId(Type type) {
}

/**
* use to traverse all types in internalSchema with visitor.
* Use to traverse all types in internalSchema with visitor.
*
* @param schema hoodie internal schema
* @return vistor expected result.
Expand Down Expand Up @@ -130,7 +130,7 @@ public <T> T visit(Type type, InternalSchemaVisitor<T> visitor) {
}

/**
* build a mapping from id to field for a internal Type.
* Build a mapping from id to field for a internal Type.
*
* @param type hoodie internal type
* @return a mapping from id to field
Expand Down Expand Up @@ -171,7 +171,7 @@ private void visitIdToField(Type type, Map<Integer, Types.Field> index) {
}

/**
* build a mapping which maintain the relation between child field id and it's parent field id.
* Build a mapping which maintain the relation between child field id and it's parent field id.
* if a child field y(which id is 9) belong to a nest field x(which id is 6), then (9 -> 6) will be added to the result map.
* if a field has no parent field, nothings will be added.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public InternalSchemaChangeApplier(InternalSchema latestSchema) {
}

/**
* add columns to table.
* Add columns to table.
*
* @param colName col name to be added. if we want to add col to a nested filed, the fullName should be specify
* @param colType col type to be added.
Expand Down Expand Up @@ -80,7 +80,7 @@ public InternalSchema applyAddChange(
}

/**
* delete columns to table.
* Delete columns to table.
*
* @param colNames col name to be deleted. if we want to delete col from a nested filed, the fullName should be specify
*/
Expand All @@ -91,7 +91,7 @@ public InternalSchema applyDeleteChange(String... colNames) {
}

/**
* rename col name for hudi table.
* Rename col name for hudi table.
*
* @param colName col name to be renamed. if we want to rename col from a nested filed, the fullName should be specify
* @param newName new name for current col. no need to specify fullName.
Expand All @@ -103,7 +103,7 @@ public InternalSchema applyRenameChange(String colName, String newName) {
}

/**
* update col nullability for hudi table.
* Update col nullability for hudi table.
*
* @param colName col name to be changed. if we want to change col from a nested filed, the fullName should be specify
* @param nullable .
Expand All @@ -115,7 +115,7 @@ public InternalSchema applyColumnNullabilityChange(String colName, boolean nulla
}

/**
* update col type for hudi table.
* Update col type for hudi table.
*
* @param colName col name to be changed. if we want to change col from a nested filed, the fullName should be specify
* @param newType .
Expand All @@ -127,7 +127,7 @@ public InternalSchema applyColumnTypeChange(String colName, Type newType) {
}

/**
* update col comment for hudi table.
* Update col comment for hudi table.
*
* @param colName col name to be changed. if we want to change col from a nested filed, the fullName should be specify
* @param doc .
Expand All @@ -139,7 +139,7 @@ public InternalSchema applyColumnCommentChange(String colName, String doc) {
}

/**
* reorder the position of col.
* Reorder the position of col.
*
* @param colName column which need to be reordered. if we want to change col from a nested filed, the fullName should be specify.
* @param referColName reference position.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.List;

/**
* auxiliary class.
* Auxiliary class.
* help to merge file schema and query schema to produce final read schema for avro/parquet file
*/
public class InternalSchemaMerger {
Expand All @@ -49,7 +49,7 @@ public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchem
}

/**
* create final read schema to read avro/parquet file.
* Create final read schema to read avro/parquet file.
*
* @return read schema to read avro/parquet file.
*/
Expand All @@ -59,7 +59,7 @@ public InternalSchema mergeSchema() {
}

/**
* create final read schema to read avro/parquet file.
* Create final read schema to read avro/parquet file.
* this is auxiliary function used by mergeSchema.
*/
private Type mergeType(Type type, int currentTypeId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ abstract class BaseColumnChange implements TableChange {
}

/**
* add position change.
* Add position change.
*
* @param srcName column which need to be reordered
* @param dsrName reference position
Expand Down Expand Up @@ -143,7 +143,7 @@ public BaseColumnChange addPositionChange(String srcName, String dsrName, String
}

/**
* abstract method.
* Abstract method.
* give a column fullName and return the field id
*
* @param fullName column fullName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

public class TableChanges {

/** deal with update columns changes for table. */
/** Deal with update columns changes for table. */
public static class ColumnUpdateChange extends TableChange.BaseColumnChange {
private final Map<Integer, Types.Field> updates = new HashMap<>();

Expand Down Expand Up @@ -172,7 +172,7 @@ public ColumnUpdateChange renameColumn(String name, String newName) {
}

/**
* update nullable for column.
* Update nullable for column.
* only support required type -> optional type
*
* @param name name of the column to update
Expand Down Expand Up @@ -228,7 +228,7 @@ protected Integer findIdByFullName(String fullName) {
}
}

/** deal with delete columns changes for table. */
/** Deal with delete columns changes for table. */
public static class ColumnDeleteChange extends TableChange.BaseColumnChange {
private final Set deletes = new HashSet<>();

Expand Down Expand Up @@ -282,7 +282,7 @@ protected Integer findIdByFullName(String fullName) {
}
}

/** deal with add columns changes for table. */
/** Deal with add columns changes for table. */
public static class ColumnAddChange extends TableChange.BaseColumnChange {
private final Map<String, Integer> fullColName2Id = new HashMap<>();
private final Map<Integer, ArrayList<Types.Field>> parentId2AddCols = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import java.util.List;

/**
* helper class to support Table schema changes.
* Helper class to support Table schema changes.
*/
public class TableChangesHelper {
/**
* apply add operation and column position change operation.
* Apply add operation and column position change operation.
*
* @param fields origin column fields.
* @param adds column fields to be added.
Expand Down

0 comments on commit 85cc0c3

Please sign in to comment.