Skip to content

Commit

Permalink
[HUDI-4453] Fix schema to include partition columns in bootstrap oper…
Browse files Browse the repository at this point in the history
…ation (apache#6676)

Turn off the type inference of the partition column to be consistent with 
existing behavior. Add notes around partition column type inference.
  • Loading branch information
yihua authored and fengjian committed Apr 5, 2023
1 parent 407bfda commit 442ae75
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 21 deletions.
Expand Up @@ -18,15 +18,14 @@

package org.apache.hudi.client.bootstrap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
Expand All @@ -36,8 +35,6 @@
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;

Expand Down Expand Up @@ -72,26 +69,22 @@ protected Schema getBootstrapSourceSchema(HoodieEngineContext context, List<Pair
}

private static Schema getBootstrapSourceSchemaParquet(HoodieWriteConfig writeConfig, HoodieEngineContext context, Path filePath) {
Configuration hadoopConf = context.getHadoopConf().get();
MessageType parquetSchema = new ParquetUtils().readSchema(hadoopConf, filePath);

hadoopConf.set(
SQLConf.PARQUET_BINARY_AS_STRING().key(),
SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString());
hadoopConf.set(
SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(),
SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString());
hadoopConf.set(
SQLConf.CASE_SENSITIVE().key(),
SQLConf.CASE_SENSITIVE().defaultValueString());
ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(hadoopConf);

StructType sparkSchema = converter.convert(parquetSchema);
// NOTE: The type inference of partition column in the parquet table is turned off explicitly,
// to be consistent with the existing bootstrap behavior, where the partition column is String
// typed in Hudi table.
// TODO(HUDI-4932): add a config to allow type inference of partition column in bootstrap and
// support other types of partition column as well
((HoodieSparkEngineContext) context).getSqlContext()
.setConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE(), false);
StructType parquetSchema = ((HoodieSparkEngineContext) context).getSqlContext().read()
.option("basePath", writeConfig.getBootstrapSourceBasePath())
.parquet(filePath.toString())
.schema();
String tableName = HoodieAvroUtils.sanitizeName(writeConfig.getTableName());
String structName = tableName + "_record";
String recordNamespace = "hoodie." + tableName;

return AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, structName, recordNamespace);
return AvroConversionUtils.convertStructTypeToAvroSchema(parquetSchema, structName, recordNamespace);
}

private static Schema getBootstrapSourceSchemaOrc(HoodieWriteConfig writeConfig, HoodieEngineContext context, Path filePath) {
Expand Down
Expand Up @@ -146,7 +146,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
if (fullSchema == null) {
logInfo("Inferring schema..")
val schemaResolver = new TableSchemaResolver(metaClient)
val tableSchema = TableSchemaResolver.appendPartitionColumns(schemaResolver.getTableAvroSchemaWithoutMetadataFields, metaClient.getTableConfig.getPartitionFields)
val tableSchema = schemaResolver.getTableAvroSchema(false)
dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
fullSchema = StructType(skeletonSchema.fields ++ dataSchema.fields)
}
Expand Down

0 comments on commit 442ae75

Please sign in to comment.