Skip to content
Permalink
Browse files
[HUDI-1129] Improving schema evolution support in hudi (#2927)
* Adding support to ingest records with old schema after table's schema is evolved

* Rebasing against latest master

- Trimming test file to be < 800 lines
- Renaming config names

* Addressing feedback

Co-authored-by: Vinoth Chandar <vinoth@apache.org>
  • Loading branch information
nsivabalan and vinothchandar committed Aug 10, 2021
1 parent 73d8983 commit 1196736185272afaeda37328f1c52ccd5c6dc017
Showing 22 changed files with 778 additions and 213 deletions.
@@ -357,7 +357,7 @@ object AvroConversionHelper {
val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator
val rowIterator = item.asInstanceOf[Row].toSeq.iterator

while (convertersIterator.hasNext) {
while (convertersIterator.hasNext && rowIterator.hasNext) {
val converter = convertersIterator.next()
record.put(fieldNamesIterator.next(), converter(rowIterator.next()))
}
@@ -92,22 +92,43 @@ object HoodieSparkUtils extends SparkAdapterSupport {
new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
}

def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)
createRdd(df, avroSchema, structName, recordNamespace)
def createRdd(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean, latestTableSchema:
org.apache.hudi.common.util.Option[Schema] = org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
val dfWriteSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)
var writeSchema : Schema = null;
var toReconcileSchema : Schema = null;
if (reconcileToLatestSchema && latestTableSchema.isPresent) {
// if reconcileToLatestSchema is set to true and latestSchema is present, then try to leverage latestTableSchema.
// this code path will handle situations where records are serialized in odl schema, but callers wish to convert
// to Rdd[GenericRecord] using different schema(could be evolved schema or could be latest table schema)
writeSchema = dfWriteSchema
toReconcileSchema = latestTableSchema.get()
} else {
// there are paths where callers wish to use latestTableSchema to convert to Rdd[GenericRecords] and not use
// row's schema. So use latestTableSchema if present. if not available, fallback to using row's schema.
writeSchema = if (latestTableSchema.isPresent) { latestTableSchema.get()} else { dfWriteSchema}
}
createRddInternal(df, writeSchema, toReconcileSchema, structName, recordNamespace)
}

def createRdd(df: DataFrame, avroSchema: Schema, structName: String, recordNamespace: String)
def createRddInternal(df: DataFrame, writeSchema: Schema, latestTableSchema: Schema, structName: String, recordNamespace: String)
: RDD[GenericRecord] = {
// Use the Avro schema to derive the StructType which has the correct nullability information
val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val encoder = RowEncoder.apply(dataType).resolveAndBind()
// Use the write avro schema to derive the StructType which has the correct nullability information
val writeDataType = SchemaConverters.toSqlType(writeSchema).dataType.asInstanceOf[StructType]
val encoder = RowEncoder.apply(writeDataType).resolveAndBind()
val deserializer = sparkAdapter.createSparkRowSerDe(encoder)
// if records were serialized with old schema, but an evolved schema was passed in with latestTableSchema, we need
// latestTableSchema equivalent datatype to be passed in to AvroConversionHelper.createConverterToAvro()
val reconciledDataType =
if (latestTableSchema != null) SchemaConverters.toSqlType(latestTableSchema).dataType.asInstanceOf[StructType] else writeDataType
// Note: deserializer.deserializeRow(row) is not capable of handling evolved schema. i.e. if Row was serialized in
// old schema, but deserializer was created with an encoder with evolved schema, deserialization fails.
// Hence we always need to deserialize in the same schema as serialized schema.
df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row))
.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
else {
val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace)
val convertor = AvroConversionHelper.createConverterToAvro(reconciledDataType, structName, recordNamespace)
records.map { x => convertor(x).asInstanceOf[GenericRecord] }
}
}
@@ -109,6 +109,16 @@ public static FileSystem getFs(String path, Configuration conf, boolean localByD
return getFs(path, conf);
}

/**
* Check if table already exists in the given path.
* @param path base path of the table.
* @param fs instance of {@link FileSystem}.
* @return {@code true} if table exists. {@code false} otherwise.
*/
public static boolean isTableExists(String path, FileSystem fs) throws IOException {
return fs.exists(new Path(path + "/" + HoodieTableMetaClient.METAFOLDER_NAME));
}

public static Path addSchemeIfLocalPath(String path) {
Path providedPath = new Path(path);
File localFile = new File(path);
@@ -414,6 +414,13 @@ public static FileStatus[] scanFiles(FileSystem fs, Path metaPath, PathFilter na
return fs.listStatus(metaPath, nameFilter);
}

/**
* @return {@code true} if any commits are found, else {@code false}.
*/
public boolean isTimelineNonEmpty() {
return getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toList()).size() > 0;
}

/**
* Get the commit timeline visible for this table.
*/
@@ -18,13 +18,6 @@

package org.apache.hudi.common.table;

import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -36,11 +29,18 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Functions.Function1;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.InvalidTableException;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
@@ -49,6 +49,8 @@
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;

import java.io.IOException;

/**
* Helper class to read schema from data files and log files and to convert it between different formats.
*/
@@ -381,6 +383,37 @@ public static boolean isSchemaCompatible(String oldSchema, String newSchema) {
return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema));
}

/**
* Get latest schema either from incoming schema or table schema.
* @param writeSchema incoming batch's write schema.
* @param convertTableSchemaToAddNamespace {@code true} if table schema needs to be converted. {@code false} otherwise.
* @param converterFn converter function to be called over table schema (to add namespace may be). Each caller can decide if any conversion is required.
* @return the latest schema.
*/
public Schema getLatestSchema(Schema writeSchema, boolean convertTableSchemaToAddNamespace,
Function1<Schema, Schema> converterFn) {
Schema latestSchema = writeSchema;
try {
if (metaClient.isTimelineNonEmpty()) {
Schema tableSchema = getTableAvroSchemaWithoutMetadataFields();
if (convertTableSchemaToAddNamespace && converterFn != null) {
tableSchema = converterFn.apply(tableSchema);
}
if (writeSchema.getFields().size() < tableSchema.getFields().size() && isSchemaCompatible(writeSchema, tableSchema)) {
// if incoming schema is a subset (old schema) compared to table schema. For eg, one of the
// ingestion pipeline is still producing events in old schema
latestSchema = tableSchema;
LOG.debug("Using latest table schema to rewrite incoming records " + tableSchema.toString());
}
}
} catch (IllegalArgumentException | InvalidTableException e) {
LOG.warn("Could not find any commits, falling back to using incoming batch's write schema");
} catch (Exception e) {
LOG.warn("Unknown exception thrown " + e.getMessage() + ", Falling back to using incoming batch's write schema");
}
return latestSchema;
}

/**
* Read the parquet schema from a parquet File.
*/
@@ -18,15 +18,18 @@

package org.apache.hudi.integ.testsuite.reader;

import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;

import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.List;

import scala.collection.JavaConverters;


@@ -51,7 +54,7 @@ public static JavaRDD<GenericRecord> readAvro(SparkSession sparkSession, String

return HoodieSparkUtils
.createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME),
nameSpace.orElse(RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE))
nameSpace.orElse(RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE), false, Option.empty())
.toJavaRDD();
}

@@ -63,7 +66,7 @@ public static JavaRDD<GenericRecord> readParquet(SparkSession sparkSession, List

return HoodieSparkUtils
.createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME),
RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE)
RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE, false, Option.empty())
.toJavaRDD();
}

@@ -73,10 +76,11 @@ public static JavaRDD<GenericRecord> readOrc(SparkSession sparkSession, List<Str
Dataset<Row> dataSet = sparkSession.read()
.orc((JavaConverters.asScalaIteratorConverter(listOfPaths.iterator()).asScala().toSeq()));

return HoodieSparkUtils
.createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME),
RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE)
.toJavaRDD();
return HoodieSparkUtils.createRdd(dataSet.toDF(),
structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME),
RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE,
false, Option.empty()
).toJavaRDD();
}

}
@@ -308,6 +308,13 @@ object DataSourceWriteOptions {
.defaultValue(classOf[HiveSyncTool].getName)
.withDocumentation("Sync tool class name used to sync to metastore. Defaults to Hive.")

val RECONCILE_SCHEMA: ConfigProperty[Boolean] = ConfigProperty
.key("hoodie.datasource.write.reconcile.schema")
.defaultValue(false)
.withDocumentation("When a new batch of write has records with old schema, but latest table schema got "
+ "evolved, this config will upgrade the records to leverage latest table schema(default values will be "
+ "injected to missing fields). If not, the write batch would fail.")

// HIVE SYNC SPECIFIC CONFIGS
// NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes
// unexpected issues with config getting reset
@@ -27,6 +27,7 @@
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.keygen.KeyGenerator;
@@ -66,7 +67,8 @@ public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourc
KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
String structName = tableName + "_record";
String namespace = "hoodie." + tableName;
RDD<GenericRecord> genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace);
RDD<GenericRecord> genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false,
Option.empty());
return genericRecords.toJavaRDD().map(gr -> {
String orderingVal = HoodieAvroUtils.getNestedFieldValAsString(
gr, props.getString("hoodie.datasource.write.precombine.field"), false);
@@ -17,6 +17,8 @@

package org.apache.hudi


import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -25,9 +27,10 @@ import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
@@ -158,13 +161,17 @@ object HoodieSparkSqlWriter {
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
if (reconcileSchema) {
schema = getLatestTableSchema(fs, basePath, sparkContext, schema)
}
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")

// Convert to RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)

val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
org.apache.hudi.common.util.Option.of(schema))
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT) ||
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),
@@ -212,7 +219,8 @@ object HoodieSparkSqlWriter {
classOf[org.apache.avro.Schema]))

// Convert to RDD[HoodieKey]
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace)
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace,
parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean)
val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()

if (!tableExists) {
@@ -249,6 +257,25 @@ object HoodieSparkSqlWriter {
}
}

/**
* Checks if schema needs upgrade (if incoming record's write schema is old while table schema got evolved).
*
* @param fs instance of FileSystem.
* @param basePath base path.
* @param sparkContext instance of spark context.
* @param schema incoming record's schema.
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
*/
def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext, schema: Schema): Schema = {
var latestSchema: Schema = schema
if (FSUtils.isTableExists(basePath.toString, fs)) {
val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build()
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
latestSchema = tableSchemaResolver.getLatestSchema(schema, false, null);
}
latestSchema
}

def bootstrap(sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
@@ -17,19 +17,16 @@

package org.apache.hudi

import java.util.Properties

import scala.collection.JavaConverters._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig.{METADATA_ENABLE_PROP, METADATA_VALIDATE_PROP}
import org.apache.hudi.common.config.{HoodieConfig, TypedProperties}

import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters.mapAsScalaMapConverter
import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP
import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}

import java.util.Properties
import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters.{mapAsScalaMapConverter, _}

/**
* WriterUtils to assist in write path in Datasource and tests.
*/
@@ -78,7 +75,8 @@ object HoodieWriterUtils {
ASYNC_COMPACT_ENABLE.key -> ASYNC_COMPACT_ENABLE.defaultValue,
INLINE_CLUSTERING_ENABLE.key -> INLINE_CLUSTERING_ENABLE.defaultValue,
ASYNC_CLUSTERING_ENABLE.key -> ASYNC_CLUSTERING_ENABLE.defaultValue,
ENABLE_ROW_WRITER.key -> ENABLE_ROW_WRITER.defaultValue
ENABLE_ROW_WRITER.key -> ENABLE_ROW_WRITER.defaultValue,
RECONCILE_SCHEMA.key -> RECONCILE_SCHEMA.defaultValue.toString
) ++ DataSourceOptionsHelper.translateConfigurations(parameters)
}

0 comments on commit 1196736

Please sign in to comment.