Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6947] Refactored HoodieSchemaUtils.deduceWriterSchema with many flags #10810

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,107 +76,117 @@ object HoodieSchemaUtils {
latestTableSchemaOpt: Option[Schema],
internalSchemaOpt: Option[InternalSchema],
opts: Map[String, String]): Schema = {
val setNullForMissingColumns = opts.getOrDefault(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(),
DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean
val shouldReconcileSchema = opts.getOrDefault(DataSourceWriteOptions.RECONCILE_SCHEMA.key(),
DataSourceWriteOptions.RECONCILE_SCHEMA.defaultValue().toString).toBoolean
val shouldValidateSchemasCompatibility = opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean

latestTableSchemaOpt match {
// In case table schema is empty we're just going to use the source schema as a
// writer's schema.
// If table schema is empty, then we use the source schema as a writer's schema.
case None => AvroInternalSchemaConverter.fixNullOrdering(sourceSchema)
// Otherwise, we need to make sure we reconcile incoming and latest table schemas
case Some(latestTableSchemaWithMetaFields) =>
// NOTE: Meta-fields will be unconditionally injected by Hudi writing handles, for the sake of
// deducing proper writer schema we're stripping them to make sure we can perform proper
// analysis
//add call to fix null ordering to ensure backwards compatibility
// NOTE: Meta-fields will be unconditionally injected by Hudi writing handles, for the sake of deducing proper writer schema
// we're stripping them to make sure we can perform proper analysis
// add call to fix null ordering to ensure backwards compatibility
val latestTableSchema = AvroInternalSchemaConverter.fixNullOrdering(removeMetadataFields(latestTableSchemaWithMetaFields))

// Before validating whether schemas are compatible, we need to "canonicalize" source's schema
// relative to the table's one, by doing a (minor) reconciliation of the nullability constraints:
// for ex, if in incoming schema column A is designated as non-null, but it's designated as nullable
// in the table's one we want to proceed aligning nullability constraints w/ the table's schema
// Also, we promote types to the latest table schema if possible.
val shouldCanonicalizeSchema = opts.getOrDefault(CANONICALIZE_SCHEMA.key,
CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean

val shouldCanonicalizeSchema = opts.getOrDefault(CANONICALIZE_SCHEMA.key, CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) {
canonicalizeSchema(sourceSchema, latestTableSchema, opts)
} else {
AvroInternalSchemaConverter.fixNullOrdering(sourceSchema)
}

val allowAutoEvolutionColumnDrop = opts.getOrDefault(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key,
HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean

val shouldReconcileSchema = opts.getOrDefault(DataSourceWriteOptions.RECONCILE_SCHEMA.key(),
DataSourceWriteOptions.RECONCILE_SCHEMA.defaultValue().toString).toBoolean
if (shouldReconcileSchema) {
internalSchemaOpt match {
case Some(internalSchema) =>
// Apply schema evolution, by auto-merging write schema and read schema
val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, internalSchema)
val evolvedSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getFullName)
val shouldRemoveMetaDataFromInternalSchema = sourceSchema.getFields().filter(f => f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty
if (shouldRemoveMetaDataFromInternalSchema) HoodieAvroUtils.removeMetadataFields(evolvedSchema) else evolvedSchema
case None =>
// In case schema reconciliation is enabled we will employ (legacy) reconciliation
// strategy to produce target writer's schema (see definition below)
val (reconciledSchema, isCompatible) =
reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema)

// NOTE: In some cases we need to relax constraint of incoming dataset's schema to be compatible
// w/ the table's one and allow schemas to diverge. This is required in cases where
// partial updates will be performed (for ex, `MERGE INTO` Spark SQL statement) and as such
// only incoming dataset's projection has to match the table's schema, and not the whole one
if (!shouldValidateSchemasCompatibility || isCompatible) {
reconciledSchema
} else {
log.error(
s"""Failed to reconcile incoming batch schema with the table's one.
|Incoming schema ${sourceSchema.toString(true)}
|Incoming schema (canonicalized) ${canonicalizedSourceSchema.toString(true)}
|Table's schema ${latestTableSchema.toString(true)}
|""".stripMargin)
throw new SchemaCompatibilityException("Failed to reconcile incoming schema with the table's one")
}
}
deduceWriterSchemaWithReconcile(sourceSchema, canonicalizedSourceSchema, latestTableSchema, internalSchemaOpt, opts)
} else {
deduceWriterSchemaWithoutReconcile(sourceSchema, canonicalizedSourceSchema, latestTableSchema, opts)
}
}
}

/**
* Deducing with disabled reconciliation.
* We have to validate that the source's schema is compatible w/ the table's latest schema,
* such that we're able to read existing table's records using [[sourceSchema]].
*/
private def deduceWriterSchemaWithoutReconcile(sourceSchema: Schema,
canonicalizedSourceSchema: Schema,
latestTableSchema: Schema,
opts: Map[String, String]): Schema = {
// NOTE: In some cases we need to relax constraint of incoming dataset's schema to be compatible
// w/ the table's one and allow schemas to diverge. This is required in cases where
// partial updates will be performed (for ex, `MERGE INTO` Spark SQL statement) and as such
// only incoming dataset's projection has to match the table's schema, and not the whole one
val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(), SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
val shouldValidateSchemasCompatibility = opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
val allowAutoEvolutionColumnDrop = opts.getOrDefault(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key,
HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean
val setNullForMissingColumns = opts.getOrDefault(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(),
DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean

if (!mergeIntoWrites && !shouldValidateSchemasCompatibility && !allowAutoEvolutionColumnDrop) {
// Default behaviour
val reconciledSchema = if (setNullForMissingColumns) {
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, latestTableSchema)
} else {
canonicalizedSourceSchema
}
checkValidEvolution(reconciledSchema, latestTableSchema)
reconciledSchema
} else {
// If it's merge into writes, we don't check for projection nor schema compatibility. Writers down the line will take care of it.
// Or it's not merge into writes, and we don't validate schema, but we allow to drop columns automatically.
// Or it's not merge into writes, we validate schema, and schema is compatible.
if (shouldValidateSchemasCompatibility) {
checkSchemaCompatible(latestTableSchema, canonicalizedSourceSchema, true,
allowAutoEvolutionColumnDrop, java.util.Collections.emptySet())
}
canonicalizedSourceSchema
}
}

/**
* Deducing with enabled reconciliation.
* Marked as Deprecated.
*/
private def deduceWriterSchemaWithReconcile(sourceSchema: Schema,
canonicalizedSourceSchema: Schema,
latestTableSchema: Schema,
internalSchemaOpt: Option[InternalSchema],
opts: Map[String, String]): Schema = {
internalSchemaOpt match {
case Some(internalSchema) =>
// Apply schema evolution, by auto-merging write schema and read schema
val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, internalSchema)
val evolvedSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getFullName)
val shouldRemoveMetaDataFromInternalSchema = sourceSchema.getFields().filter(f => f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty
if (shouldRemoveMetaDataFromInternalSchema) HoodieAvroUtils.removeMetadataFields(evolvedSchema) else evolvedSchema
case None =>
// In case schema reconciliation is enabled we will employ (legacy) reconciliation
// strategy to produce target writer's schema (see definition below)
val (reconciledSchema, isCompatible) =
reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema)

// NOTE: In some cases we need to relax constraint of incoming dataset's schema to be compatible
// w/ the table's one and allow schemas to diverge. This is required in cases where
// partial updates will be performed (for ex, `MERGE INTO` Spark SQL statement) and as such
// only incoming dataset's projection has to match the table's schema, and not the whole one
val shouldValidateSchemasCompatibility = opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key, HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
if (!shouldValidateSchemasCompatibility || isCompatible) {
reconciledSchema
} else {
// In case reconciliation is disabled, we have to validate that the source's schema
// is compatible w/ the table's latest schema, such that we're able to read existing table's
// records using [[sourceSchema]].
//
// NOTE: In some cases we need to relax constraint of incoming dataset's schema to be compatible
// w/ the table's one and allow schemas to diverge. This is required in cases where
// partial updates will be performed (for ex, `MERGE INTO` Spark SQL statement) and as such
// only incoming dataset's projection has to match the table's schema, and not the whole one

if (mergeIntoWrites) {
// if its merge into writes, do not check for projection nor schema compatibility. Writers down the line will
// take care of it.
canonicalizedSourceSchema
} else {
if (!shouldValidateSchemasCompatibility) {
// if no validation is enabled, check for col drop
if (allowAutoEvolutionColumnDrop) {
canonicalizedSourceSchema
} else {
val reconciledSchema = if (setNullForMissingColumns) {
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, latestTableSchema)
} else {
canonicalizedSourceSchema
}
checkValidEvolution(reconciledSchema, latestTableSchema)
reconciledSchema
}
} else {
checkSchemaCompatible(latestTableSchema, canonicalizedSourceSchema, true,
allowAutoEvolutionColumnDrop, java.util.Collections.emptySet())
canonicalizedSourceSchema
}
}
log.error(
s"""Failed to reconcile incoming batch schema with the table's one.
|Incoming schema ${sourceSchema.toString(true)}
|Incoming schema (canonicalized) ${canonicalizedSourceSchema.toString(true)}
|Table's schema ${latestTableSchema.toString(true)}
|""".stripMargin)
throw new SchemaCompatibilityException("Failed to reconcile incoming schema with the table's one")
}
}
}
Expand Down
Loading