Skip to content

Commit

Permalink
Support non-additive Delta source schema evolution with schema tracki…
Browse files Browse the repository at this point in the history
…ng log.

Closes #1690

GitOrigin-RevId: 5b06490b5bb16ea1f92f5e68d67674537ab7cb24
  • Loading branch information
jackierwzhang authored and vkorukanti committed Apr 20, 2023
1 parent c87ea1d commit 3441df1
Show file tree
Hide file tree
Showing 17 changed files with 1,892 additions and 331 deletions.
40 changes: 32 additions & 8 deletions core/src/main/resources/error/delta-error-classes.json
Expand Up @@ -80,14 +80,6 @@
],
"sqlState" : "42KD4"
},
"DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION" : {
"message" : [
"<opName> is not supported on tables with read-incompatible schema changes (e.g. rename or drop or datatype changes).",
"Read schema: <readSchema>. Incompatible data schema: <incompatibleSchema>.",
"Although strongly not recommended, you may also force ignore the schema checks during <opName> at your own risk of potentially incorrect results by turning on the SQL conf <escapeConfig>."
],
"sqlState" : "42KD4"
},
"DELTA_BLOOM_FILTER_DROP_ON_NON_EXISTING_COLUMNS" : {
"message" : [
"Cannot drop bloom filter indices for the following non-existent column(s): <unknownColumns>"
Expand Down Expand Up @@ -1571,6 +1563,30 @@
],
"sqlState" : "KD002"
},
"DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE" : {
"message" : [
"Streaming read is not supported on tables with read-incompatible schema changes (e.g. rename or drop or datatype changes).",
"Read schema: <readSchema>. Incompatible data schema: <incompatibleSchema>."
],
"sqlState" : "42KD4"
},
"DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_SCHEMA_LOG" : {
"message" : [
"Streaming read is not supported on tables with read-incompatible schema changes (e.g. rename or drop or datatype changes).",
"Read schema: <readSchema>. Incompatible data schema: <incompatibleSchema>.",
"Please provide a 'schemaTrackingLocation' to enable non-additive schema evolution for Delta stream processing.",
"See <docLink> for more details."
],
"sqlState" : "42KD4"
},
"DELTA_STREAMING_SCHEMA_EVOLUTION" : {
"message" : [
"The schema of your Delta table has changed during streaming, and the schema tracking log has been updated",
"Please restart the stream to continue processing using the updated schema:",
"<schema>"
],
"sqlState" : "22000"
},
"DELTA_STREAMING_SCHEMA_LOCATION_CONFLICT" : {
"message" : [
"Detected conflicting schema location '<loc>' while streaming from table or table located at '<table>'.",
Expand Down Expand Up @@ -1608,6 +1624,14 @@
],
"sqlState" : "22000"
},
"DELTA_STREAMING_SCHEMA_LOG_INIT_FAILED_INCOMPATIBLE_SCHEMA" : {
"message" : [
"We could not initialize the Delta streaming source schema log with a valid schema because",
"we detected an incompatible schema change while serving a streaming batch from table version <a> to <b>.",
"To continue processing the stream with latest schema, please turn on <config>."
],
"sqlState" : "22000"
},
"DELTA_STREAMING_SCHEMA_LOG_PARSE_SCHEMA_FAILED" : {
"message" : [
"Failed to parse the schema from the Delta streaming source schema log.",
Expand Down
Expand Up @@ -856,7 +856,7 @@ class DeltaAnalysis(session: SparkSession)
assert(options.get("path").isDefined, "Path for Delta table must be defined")
val log = DeltaLog.forTable(session, options.get("path").get)
val sourceIdOpt = options.get(DeltaOptions.STREAMING_SOURCE_TRACKING_ID)
val schemaTrackingLocation = DeltaSourceSchemaLog.fullSchemaTrackingLocation(
val schemaTrackingLocation = DeltaSourceSchemaTrackingLog.fullSchemaTrackingLocation(
rootSchemaTrackingLocation, log.tableId, sourceIdOpt)
// Make sure schema location is under checkpoint
if (!allowSchemaLocationOutsideOfCheckpoint &&
Expand Down
46 changes: 34 additions & 12 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Expand Up @@ -2552,16 +2552,18 @@ trait DeltaErrorsBase
)
}

def blockStreamingReadsOnColumnMappingEnabledTable(
def blockStreamingReadsWithIncompatibleColumnMappingSchemaChanges(
spark: SparkSession,
readSchema: StructType,
incompatibleSchema: StructType,
isCdfRead: Boolean,
detectedDuringStreaming: Boolean): Throwable = {
new DeltaColumnMappingUnsupportedSchemaIncompatibleException(
if (isCdfRead) "Streaming read of Change Data Feed (CDF)" else "Streaming read",
val enableNonAdditiveSchemaEvolution = spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_STREAMING_ENABLE_NON_ADDITIVE_SCHEMA_EVOLUTION)
new DeltaStreamingColumnMappingSchemaIncompatibleException(
readSchema,
incompatibleSchema,
DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES.key,
"",
enableNonAdditiveSchemaEvolution,
additionalProperties = Map(
"detectedDuringStreaming" -> detectedDuringStreaming.toString
))
Expand Down Expand Up @@ -2635,6 +2637,24 @@ trait DeltaErrorsBase
cause = cause)
}

def streamingSchemaEvolutionException(newSchema: StructType): Throwable = {
new DeltaRuntimeException(
errorClass = "DELTA_STREAMING_SCHEMA_EVOLUTION",
messageParameters = Array(formatSchema(newSchema)))
}

def streamingSchemaLogInitFailedIncompatibleSchemaException(
startVersion: Long,
endVersion: Long): Throwable = {
new DeltaRuntimeException(
errorClass = "DELTA_STREAMING_SCHEMA_LOG_INIT_FAILED_INCOMPATIBLE_SCHEMA",
messageParameters = Array(
startVersion.toString, endVersion.toString,
DeltaSQLConf.
DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES_DURING_STREAM_START.key)
)
}

def failToDeserializeSchemaLog(location: String): Throwable = {
new DeltaRuntimeException(
errorClass = "DELTA_STREAMING_SCHEMA_LOG_DESERIALIZE_FAILED",
Expand Down Expand Up @@ -3076,18 +3096,20 @@ class DeltaChecksumException(
* To make compatible with existing behavior for those who accidentally has already used this
* operation, user should always be able to use `escapeConfigName` to fall back at own risk.
*/
class DeltaColumnMappingUnsupportedSchemaIncompatibleException(
val opName: String,
class DeltaStreamingColumnMappingSchemaIncompatibleException(
val readSchema: StructType,
val incompatibleSchema: StructType,
val escapeConfigName: String,
val docLink: String,
val enableNonAdditiveSchemaEvolution: Boolean = false,
val additionalProperties: Map[String, String] = Map.empty)
extends DeltaUnsupportedOperationException(
errorClass = "DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION",
errorClass = if (enableNonAdditiveSchemaEvolution) {
"DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_SCHEMA_LOG"
} else {
"DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE"
},
messageParameters = Array(
opName,
readSchema.json,
incompatibleSchema.json,
opName,
escapeConfigName)
docLink)
)
20 changes: 9 additions & 11 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Expand Up @@ -457,18 +457,19 @@ class DeltaLog private(
* Returns a [[org.apache.spark.sql.DataFrame]] containing the new files within the specified
* version range.
*
* It can optionally take a customReadSchema which consists of the actual read schema to read
* the files. This is used to support non-additive Delta Source streaming schema evolution.
* The customReadSchema requires that its partitionSchema for the Delta table does not change from
* the snapshot's partitionSchema.
* @param customDataSchema Optional data schema that will be used to read the files.
* This is used when reading multiple snapshots using one all-encompassing
* schema, e.g. during streaming.
* This parameter only modifies the data schema. The partition schema is
* not updated, so the caller should ensure that it does not change
* compared to the snapshot.
*/
def createDataFrame(
snapshot: Snapshot,
addFiles: Seq[AddFile],
isStreaming: Boolean = false,
actionTypeOpt: Option[String] = None,
customReadSchema: Option[PersistedSchema] = None
): DataFrame = {
customDataSchema: Option[StructType] = None): DataFrame = {
val actionType = actionTypeOpt.getOrElse(if (isStreaming) "streaming" else "batch")
// It's ok to not pass down the partitionSchema to TahoeBatchFileIndex. Schema evolution will
// ensure any partitionSchema changes will be captured, and upon restart, the new snapshot will
Expand All @@ -479,13 +480,10 @@ class DeltaLog private(
val partitionSchema = snapshot.metadata.partitionSchema
var metadata = snapshot.metadata

require(customReadSchema.forall(_.partitionSchema == partitionSchema),
"Cannot specify a custom read schema with different partition schema than the Delta table")

// Replace schema inside snapshot metadata so that later `fileFormat()` can generate the correct
// DeltaParquetFormat with the correct schema to references, the customReadSchema should also
// DeltaParquetFormat with the correct schema to references, the customDataSchema should also
// contain the correct column mapping metadata if needed after being loaded from schema log.
customReadSchema.map(_.dataSchema).foreach { readSchema =>
customDataSchema.foreach { readSchema =>
metadata = snapshot.metadata.copy(schemaString = readSchema.json)
}

Expand Down
Expand Up @@ -271,6 +271,10 @@ object DeltaOptions extends DeltaLogging {
* An option to allow column mapping enabled tables to conduct schema evolution during streaming
*/
val SCHEMA_TRACKING_LOCATION = "schemaTrackingLocation"
/**
* Alias for `schemaTrackingLocation`, so users familiar with AutoLoader can migrate easily.
*/
val SCHEMA_TRACKING_LOCATION_ALIAS = "schemaLocation"
/**
* An option to instruct DeltaSource to pick a customized subdirectory for schema log in case of
* rare conflicts such as when a stream needs to do a self-union of two Delta sources from the
Expand Down Expand Up @@ -307,6 +311,7 @@ object DeltaOptions extends DeltaLogging {
TXN_APP_ID,
TXN_VERSION,
SCHEMA_TRACKING_LOCATION,
SCHEMA_TRACKING_LOCATION_ALIAS,
STREAMING_SOURCE_TRACKING_ID,
"queryName",
"checkpointLocation",
Expand Down
Expand Up @@ -1079,6 +1079,9 @@ object SingleAction extends Logging {

lazy val nullLitForAddCDCFile: Column =
new Column(Literal(null, ScalaReflection.schemaFor[AddCDCFile].dataType))

lazy val nullLitForMetadataAction: Column =
new Column(Literal(null, ScalaReflection.schemaFor[Metadata].dataType))
}

/** Serializes Maps containing JSON strings without extra escaping. */
Expand Down
Expand Up @@ -90,9 +90,8 @@ class DeltaDataSource

val (_, snapshot) = DeltaLog.forTableWithSnapshot(sqlContext.sparkSession, path)
val readSchema = {
getSchemaLogForDeltaSource(sqlContext.sparkSession, snapshot, parameters)
// Use `getSchemaAtLogInit` so it's always consistent between analysis and execution phase
.flatMap(_.getSchemaAtLogInit.map(_.dataSchema))
getSchemaTrackingLogForDeltaSource(sqlContext.sparkSession, snapshot, parameters)
.flatMap(_.getCurrentTrackedSchema.map(_.dataSchema))
.getOrElse(snapshot.schema)
}

Expand Down Expand Up @@ -122,10 +121,12 @@ class DeltaDataSource
})
val options = new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf)
val (deltaLog, snapshot) = DeltaLog.forTableWithSnapshot(sqlContext.sparkSession, path)
val schemaLogOpt =
getSchemaLogForDeltaSource(sqlContext.sparkSession, snapshot, parameters)
val readSchema = schemaLogOpt
.flatMap(_.getSchemaAtLogInit.map(_.dataSchema))

val schemaTrackingLogOpt =
getSchemaTrackingLogForDeltaSource(sqlContext.sparkSession, snapshot, parameters)

val readSchema = schemaTrackingLogOpt
.flatMap(_.getCurrentTrackedSchema.map(_.dataSchema))
.getOrElse(snapshot.schema)

if (readSchema.isEmpty) {
Expand All @@ -136,7 +137,7 @@ class DeltaDataSource
deltaLog,
options,
snapshot,
schemaLog = schemaLogOpt
schemaTrackingLog = schemaTrackingLogOpt
)
}

Expand Down Expand Up @@ -237,14 +238,26 @@ class DeltaDataSource
/**
* Create a schema log for Delta streaming source if possible
*/
private def getSchemaLogForDeltaSource(
private def getSchemaTrackingLogForDeltaSource(
spark: SparkSession,
sourceSnapshot: Snapshot,
parameters: Map[String, String]): Option[DeltaSourceSchemaLog] = {
parameters: Map[String, String]): Option[DeltaSourceSchemaTrackingLog] = {
val options = new CaseInsensitiveStringMap(parameters.asJava)
Option(options.get(DeltaOptions.SCHEMA_TRACKING_LOCATION))
.orElse(Option(options.get(DeltaOptions.SCHEMA_TRACKING_LOCATION_ALIAS)))
.map { schemaTrackingLocation =>
DeltaSourceSchemaLog.create(
if (!spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_STREAMING_ENABLE_NON_ADDITIVE_SCHEMA_EVOLUTION)) {
// TODO: remove once non-additive schema evolution is released
throw new UnsupportedOperationException(
"Schema tracking location is not supported for Delta streaming source")
}
if (Option(options.get(DeltaOptions.CDC_READ_OPTION)).exists(_.toBoolean)) {
// TODO: remove once we support CDC streaming with schema log
throw new UnsupportedOperationException(
"Reading change data feed and streaming is not supported with schema tracking log")
}
DeltaSourceSchemaTrackingLog.create(
spark, schemaTrackingLocation, sourceSnapshot,
Option(options.get(DeltaOptions.STREAMING_SOURCE_TRACKING_ID)))
}
Expand Down
Expand Up @@ -901,7 +901,7 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES_DURING_STREAM_SATRT =
val DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES_DURING_STREAM_START =
buildConf("streaming.unsafeReadOnIncompatibleSchemaChangesDuringStreamStart.enabled")
.doc(
"""A legacy config to disable schema read-compatibility check on the start version schema
Expand All @@ -912,6 +912,16 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val DELTA_STREAMING_ENABLE_NON_ADDITIVE_SCHEMA_EVOLUTION =
buildConf("streaming.nonAdditiveSchemaEvolution.enabled")
.doc(
"""If enabled, Delta streaming source can support non-additive schema evolution for
|operations such as rename or drop column on column mapping enabled tables.
|""".stripMargin)
.internal()
.booleanConf
.createWithDefault(false)

val DELTA_STREAMING_ALLOW_SCHEMA_LOCATION_OUTSIDE_CHECKPOINT_LOCATION =
buildConf("streaming.allowSchemaLocationOutsideCheckpointLocation")
.doc(
Expand Down

0 comments on commit 3441df1

Please sign in to comment.