Skip to content

Commit

Permalink
fix new comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xiarixiaoyao committed Mar 25, 2022
1 parent 9a2e4c1 commit 4096466
Show file tree
Hide file tree
Showing 17 changed files with 82 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ protected void commit(HoodieTable table, String commitActionType, String instant
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(avroSchema, internalSchema);
if (evolvedSchema.equals(internalSchema)) {
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(evolvedSchema));
//TODO save history schema by metaTable
schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr);
} else {
evolvedSchema.setSchemaId(Long.parseLong(instantTime));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
if (!StringUtils.isNullOrEmpty(config.getInternalSchema())) {
readerSchema = new Schema.Parser().parse(config.getSchema());
internalSchemaOption = SerDeHelper.fromJson(config.getInternalSchema());
// its safe to modify config here, since we running in task side.
((HoodieTable) compactionHandler).getConfig().setDefault(config);
} else {
readerSchema = HoodieAvroUtils.addMetadataFields(
new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
if (schemaPair.getLeft().isPresent()) {
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, schemaPair.getLeft().get());
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schemaPair.getRight().get());
}
compactionMetadata.setWriteStatuses(statuses);
compactionMetadata.setCommitted(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private SparkInternalSchemaConverter() {
public static final String HOODIE_QUERY_SCHEMA = "hoodie.schema.internal.querySchema";
public static final String HOODIE_TABLE_PATH = "hoodie.tablePath";
public static final String HOODIE_VALID_COMMITS_LIST = "hoodie.valid.commits.list";

/**
* Converts a spark schema to an hudi internal schema. Fields without IDs are kept and assigned fallback IDs.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ public boolean hasNext() {

@Override
public GenericRecord next() {
return rewriteRecord(oldRecords.next(), newSchema);
return rewriteRecordWithNewSchema(oldRecords.next(), newSchema);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata() {
*/
private Option<InternalSchema> getTableInternalSchemaFromCommitMetadata(HoodieInstant instant) {
try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedInstants();
byte[] data = timeline.getInstantDetails(instant).get();
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
String latestInternalSchemaStr = metadata.getMetadata(SerDeHelper.LATEST_SCHEMA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public abstract class AbstractHoodieLogRecordReader {
// Total log files read - for metrics
private AtomicLong totalLogFiles = new AtomicLong(0);
// Internal schema
private InternalSchema internalSchema = InternalSchema.getEmptyInternalSchema();
private InternalSchema internalSchema;
private final String path;
// Total log blocks read - for metrics
private AtomicLong totalLogBlocks = new AtomicLong(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -68,10 +69,9 @@ public InternalSchema(Field... columns) {
public InternalSchema(long versionId, List<Field> cols) {
this.versionId = versionId;
this.record = RecordType.get(cols);
if (versionId >= 0) {
buildIdToName();
maxColumnId = idToName.keySet().stream().max(Comparator.comparing(Integer::valueOf)).get();
}
idToName = cols.isEmpty() ? new HashMap<>() : InternalSchemaBuilder.getBuilder().buildIdToName(record);
nameToId = cols.isEmpty() ? new HashMap<>() : idToName.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
maxColumnId = idToName.isEmpty() ? -1 : idToName.keySet().stream().max(Comparator.comparing(Integer::valueOf)).get();
}

public InternalSchema(long versionId, int maxColumnId, List<Field> cols) {
Expand All @@ -90,20 +90,10 @@ public RecordType getRecord() {
}

private Map<Integer, String> buildIdToName() {
if (idToName == null) {
idToName = InternalSchemaBuilder.getBuilder().buildIdToName(record);
}
return idToName;
}

private Map<String, Integer> buildNameToId() {
if (nameToId == null) {
if (idToName != null && !idToName.isEmpty()) {
nameToId = idToName.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
return nameToId;
}
nameToId = InternalSchemaBuilder.getBuilder().buildNameToId(record);
}
return nameToId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ public class InternalSchemaMerger {
// if mergeRequiredFiledForce is true, we will ignore the col's required attribute.
private final boolean ignoreRequiredAttribute;
// Whether to use column Type from file schema to read files when we find some column type has changed.
// spark parquetReader need the original column type to read data, otherwise the parquetReader will failed.
// eg: current column type is StringType, now we changed it to decimalType,
// we should not pass decimalType to parquetReader, we must pass StringType to it; when we read out the data, we convert data from String to Decimal, everything is ok.
// for log reader
// since our reWriteRecordWithNewSchema function support rewrite directly, so we no need this parameter
// eg: current column type is StringType, now we changed it to decimalType,
// we can pass decimalType to reWriteRecordWithNewSchema directly, everything is ok.
private boolean useColumnTypeFromFileSchema = true;

public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchema, boolean ignoreRequiredAttribute, boolean useColumnTypeFromFileSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ abstract class AbstractInternalSchemaStorageManager {
/**
* Get latest history schema string.
* Using give validCommits to validate all legal histroy Schema files, and return the latest one.
* If the passed valid commits is null or empty, valid instants will be fetched from the file-system and used.
*/
public abstract String getHistorySchemaStrByGivenValidCommits(List<String> validCommits);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,9 @@
import java.util.stream.Collectors;

/**
* Util methods to support evolve old avro schema based on a given schema.
* Utility methods to support evolve old avro schema based on a given schema.
*/
public class AvroSchemaEvolutionUtils {
private AvroSchemaEvolutionUtils() {
}

/**
* Support evolution from a new avroSchema.
* Now hoodie support implicitly add columns when hoodie write operation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private SerDeHelper() {

}

public static final String LATEST_SCHEMA = "latest_Schema";
public static final String LATEST_SCHEMA = "latest_schema";
public static final String SCHEMAS = "schemas";
private static final String MAX_COLUMN_ID = "max_column_id";
private static final String VERSION_ID = "version_id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ object HoodieWriterUtils {
diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n")
throw new HoodieException(diffConfigs.toString.trim)
}
// Check schema evolution for bootstrap table.
// now we do not support bootstrap table.
if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)
&& params.getOrElse(HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key(), "false").toBoolean) {
throw new HoodieException(String
.format("now schema evolution cannot support bootstrap table, pls set %s to false", HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key()))
}
}

private def getStringFromTableConfigWithAlternatives(tableConfig: HoodieConfig, key: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
filters: Array[Filter]): HoodieMergeOnReadRDD = {
if (!internalSchema.isEmptySchema) {
// it is safe to enable vectorizedReader
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "false")
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "true")
}
val fullSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,36 +155,4 @@ abstract class BaseSpark3Adapter extends SparkAdapter {
None
}
}

override def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan] = {
if (SPARK_VERSION.startsWith("3.1") || SPARK_VERSION.startsWith("3.2")) {
val loadClassName = if (SPARK_VERSION.startsWith("3.1")) {
"org.apache.spark.sql.hudi.ResolveHudiAlterTableCommand312"
} else {
"org.apache.spark.sql.hudi.ResolveHudiAlterTableCommandSpark32"
}
val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
val ctor = clazz.getConstructors.head
ctor.newInstance(sparkSession).asInstanceOf[Rule[LogicalPlan]]
} else {
new Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan
}
}
}

override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
if (SPARK_VERSION.startsWith("3.1") || SPARK_VERSION.startsWith("3.2")) {
val loadClassName = if (SPARK_VERSION.startsWith("3.1")) {
"org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat"
} else {
"org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat"
}
val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
val ctor = clazz.getConstructors.head
Some(ctor.newInstance().asInstanceOf[ParquetFileFormat])
} else {
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,28 @@ class Spark3_1Adapter extends BaseSpark3Adapter {

override def createCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark3_1CatalystExpressionUtils

override def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan] = {
if (SPARK_VERSION.startsWith("3.1")) {
val loadClassName = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommand312"
val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
val ctor = clazz.getConstructors.head
ctor.newInstance(sparkSession).asInstanceOf[Rule[LogicalPlan]]
} else {
new Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan
}
}
}

override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
if (SPARK_VERSION.startsWith("3.1")) {
val loadClassName = "org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat"
val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
val ctor = clazz.getConstructors.head
Some(ctor.newInstance().asInstanceOf[ParquetFileFormat])
} else {
None
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,28 @@ class Spark3_2Adapter extends BaseSpark3Adapter {
(spark: SparkSession, delegate: ParserInterface) => new HoodieSpark3_2ExtendedSqlParser(spark, delegate)
)
}

override def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan] = {
if (SPARK_VERSION.startsWith("3.2")) {
val loadClassName = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommandSpark32"
val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
val ctor = clazz.getConstructors.head
ctor.newInstance(sparkSession).asInstanceOf[Rule[LogicalPlan]]
} else {
new Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan
}
}
}

override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
if (SPARK_VERSION.startsWith("3.2")) {
val loadClassName = "org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat"
val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
val ctor = clazz.getConstructors.head
Some(ctor.newInstance().asInstanceOf[ParquetFileFormat])
} else {
None
}
}
}

0 comments on commit 4096466

Please sign in to comment.