Skip to content

Commit

Permalink
[HUDI-6947] Refactored HoodieSchemaUtils.deduceWriterSchema with many…
Browse files Browse the repository at this point in the history
… flags (#10810)
  • Loading branch information
geserdugarov authored and yihua committed May 14, 2024
1 parent 3d5d274 commit 45923f3
Showing 1 changed file with 93 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,107 +76,117 @@ object HoodieSchemaUtils {
latestTableSchemaOpt: Option[Schema],
internalSchemaOpt: Option[InternalSchema],
opts: Map[String, String]): Schema = {
val setNullForMissingColumns = opts.getOrDefault(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(),
DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean
val shouldReconcileSchema = opts.getOrDefault(DataSourceWriteOptions.RECONCILE_SCHEMA.key(),
DataSourceWriteOptions.RECONCILE_SCHEMA.defaultValue().toString).toBoolean
val shouldValidateSchemasCompatibility = opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean

latestTableSchemaOpt match {
// In case table schema is empty we're just going to use the source schema as a
// writer's schema.
// If table schema is empty, then we use the source schema as a writer's schema.
case None => AvroInternalSchemaConverter.fixNullOrdering(sourceSchema)
// Otherwise, we need to make sure we reconcile incoming and latest table schemas
case Some(latestTableSchemaWithMetaFields) =>
// NOTE: Meta-fields will be unconditionally injected by Hudi writing handles, for the sake of
// deducing proper writer schema we're stripping them to make sure we can perform proper
// analysis
//add call to fix null ordering to ensure backwards compatibility
// NOTE: Meta-fields will be unconditionally injected by Hudi writing handles, for the sake of deducing proper writer schema
// we're stripping them to make sure we can perform proper analysis
// add call to fix null ordering to ensure backwards compatibility
val latestTableSchema = AvroInternalSchemaConverter.fixNullOrdering(removeMetadataFields(latestTableSchemaWithMetaFields))

// Before validating whether schemas are compatible, we need to "canonicalize" source's schema
// relative to the table's one, by doing a (minor) reconciliation of the nullability constraints:
// for ex, if in incoming schema column A is designated as non-null, but it's designated as nullable
// in the table's one we want to proceed aligning nullability constraints w/ the table's schema
// Also, we promote types to the latest table schema if possible.
val shouldCanonicalizeSchema = opts.getOrDefault(CANONICALIZE_SCHEMA.key,
CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean

val shouldCanonicalizeSchema = opts.getOrDefault(CANONICALIZE_SCHEMA.key, CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) {
canonicalizeSchema(sourceSchema, latestTableSchema, opts)
} else {
AvroInternalSchemaConverter.fixNullOrdering(sourceSchema)
}

val allowAutoEvolutionColumnDrop = opts.getOrDefault(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key,
HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean

val shouldReconcileSchema = opts.getOrDefault(DataSourceWriteOptions.RECONCILE_SCHEMA.key(),
DataSourceWriteOptions.RECONCILE_SCHEMA.defaultValue().toString).toBoolean
if (shouldReconcileSchema) {
internalSchemaOpt match {
case Some(internalSchema) =>
// Apply schema evolution, by auto-merging write schema and read schema
val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, internalSchema)
val evolvedSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getFullName)
val shouldRemoveMetaDataFromInternalSchema = sourceSchema.getFields().filter(f => f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty
if (shouldRemoveMetaDataFromInternalSchema) HoodieAvroUtils.removeMetadataFields(evolvedSchema) else evolvedSchema
case None =>
// In case schema reconciliation is enabled we will employ (legacy) reconciliation
// strategy to produce target writer's schema (see definition below)
val (reconciledSchema, isCompatible) =
reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema)

// NOTE: In some cases we need to relax constraint of incoming dataset's schema to be compatible
// w/ the table's one and allow schemas to diverge. This is required in cases where
// partial updates will be performed (for ex, `MERGE INTO` Spark SQL statement) and as such
// only incoming dataset's projection has to match the table's schema, and not the whole one
if (!shouldValidateSchemasCompatibility || isCompatible) {
reconciledSchema
} else {
log.error(
s"""Failed to reconcile incoming batch schema with the table's one.
|Incoming schema ${sourceSchema.toString(true)}
|Incoming schema (canonicalized) ${canonicalizedSourceSchema.toString(true)}
|Table's schema ${latestTableSchema.toString(true)}
|""".stripMargin)
throw new SchemaCompatibilityException("Failed to reconcile incoming schema with the table's one")
}
}
deduceWriterSchemaWithReconcile(sourceSchema, canonicalizedSourceSchema, latestTableSchema, internalSchemaOpt, opts)
} else {
deduceWriterSchemaWithoutReconcile(sourceSchema, canonicalizedSourceSchema, latestTableSchema, opts)
}
}
}

/**
* Deducing with disabled reconciliation.
* We have to validate that the source's schema is compatible w/ the table's latest schema,
* such that we're able to read existing table's records using [[sourceSchema]].
*/
private def deduceWriterSchemaWithoutReconcile(sourceSchema: Schema,
canonicalizedSourceSchema: Schema,
latestTableSchema: Schema,
opts: Map[String, String]): Schema = {
// NOTE: In some cases we need to relax constraint of incoming dataset's schema to be compatible
// w/ the table's one and allow schemas to diverge. This is required in cases where
// partial updates will be performed (for ex, `MERGE INTO` Spark SQL statement) and as such
// only incoming dataset's projection has to match the table's schema, and not the whole one
val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(), SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
val shouldValidateSchemasCompatibility = opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
val allowAutoEvolutionColumnDrop = opts.getOrDefault(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key,
HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean
val setNullForMissingColumns = opts.getOrDefault(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(),
DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean

if (!mergeIntoWrites && !shouldValidateSchemasCompatibility && !allowAutoEvolutionColumnDrop) {
// Default behaviour
val reconciledSchema = if (setNullForMissingColumns) {
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, latestTableSchema)
} else {
canonicalizedSourceSchema
}
checkValidEvolution(reconciledSchema, latestTableSchema)
reconciledSchema
} else {
// If it's merge into writes, we don't check for projection nor schema compatibility. Writers down the line will take care of it.
// Or it's not merge into writes, and we don't validate schema, but we allow to drop columns automatically.
// Or it's not merge into writes, we validate schema, and schema is compatible.
if (shouldValidateSchemasCompatibility) {
checkSchemaCompatible(latestTableSchema, canonicalizedSourceSchema, true,
allowAutoEvolutionColumnDrop, java.util.Collections.emptySet())
}
canonicalizedSourceSchema
}
}

/**
* Deducing with enabled reconciliation.
* Marked as Deprecated.
*/
private def deduceWriterSchemaWithReconcile(sourceSchema: Schema,
canonicalizedSourceSchema: Schema,
latestTableSchema: Schema,
internalSchemaOpt: Option[InternalSchema],
opts: Map[String, String]): Schema = {
internalSchemaOpt match {
case Some(internalSchema) =>
// Apply schema evolution, by auto-merging write schema and read schema
val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, internalSchema)
val evolvedSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getFullName)
val shouldRemoveMetaDataFromInternalSchema = sourceSchema.getFields().filter(f => f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty
if (shouldRemoveMetaDataFromInternalSchema) HoodieAvroUtils.removeMetadataFields(evolvedSchema) else evolvedSchema
case None =>
// In case schema reconciliation is enabled we will employ (legacy) reconciliation
// strategy to produce target writer's schema (see definition below)
val (reconciledSchema, isCompatible) =
reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema)

// NOTE: In some cases we need to relax constraint of incoming dataset's schema to be compatible
// w/ the table's one and allow schemas to diverge. This is required in cases where
// partial updates will be performed (for ex, `MERGE INTO` Spark SQL statement) and as such
// only incoming dataset's projection has to match the table's schema, and not the whole one
val shouldValidateSchemasCompatibility = opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key, HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
if (!shouldValidateSchemasCompatibility || isCompatible) {
reconciledSchema
} else {
// In case reconciliation is disabled, we have to validate that the source's schema
// is compatible w/ the table's latest schema, such that we're able to read existing table's
// records using [[sourceSchema]].
//
// NOTE: In some cases we need to relax constraint of incoming dataset's schema to be compatible
// w/ the table's one and allow schemas to diverge. This is required in cases where
// partial updates will be performed (for ex, `MERGE INTO` Spark SQL statement) and as such
// only incoming dataset's projection has to match the table's schema, and not the whole one

if (mergeIntoWrites) {
// if its merge into writes, do not check for projection nor schema compatibility. Writers down the line will
// take care of it.
canonicalizedSourceSchema
} else {
if (!shouldValidateSchemasCompatibility) {
// if no validation is enabled, check for col drop
if (allowAutoEvolutionColumnDrop) {
canonicalizedSourceSchema
} else {
val reconciledSchema = if (setNullForMissingColumns) {
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, latestTableSchema)
} else {
canonicalizedSourceSchema
}
checkValidEvolution(reconciledSchema, latestTableSchema)
reconciledSchema
}
} else {
checkSchemaCompatible(latestTableSchema, canonicalizedSourceSchema, true,
allowAutoEvolutionColumnDrop, java.util.Collections.emptySet())
canonicalizedSourceSchema
}
}
log.error(
s"""Failed to reconcile incoming batch schema with the table's one.
|Incoming schema ${sourceSchema.toString(true)}
|Incoming schema (canonicalized) ${canonicalizedSourceSchema.toString(true)}
|Table's schema ${latestTableSchema.toString(true)}
|""".stripMargin)
throw new SchemaCompatibilityException("Failed to reconcile incoming schema with the table's one")
}
}
}
Expand Down

0 comments on commit 45923f3

Please sign in to comment.