Skip to content

Commit

Permalink
Enable streaming / CDF streaming reads on column mapping enabled tabl…
Browse files Browse the repository at this point in the history
…es with fewer limitations

Resolves #1357

As streaming uses the latest schema to read historical data batches and column mapping schema changes (e.g. rename/drop column) can cause latest schema to diverge, we decided to temporarily completely block streaming read on column mapping tables before.

As a close follow up in this PR, we think it is at least possible to enable the following use cases:
1. Read from a column mapping table without rename or drop column operations.
2. Upgrade to column mapping tables.
3. Existing compatible schema change operations such as ADD COLUMN.

Resolves #1358

new unit tests.

GitOrigin-RevId: f40a063dde0329da0750105186cc6711a7b0ea02
  • Loading branch information
jackierwzhang authored and allisonport-db committed Sep 6, 2022
1 parent 0bedbbb commit 8938463
Show file tree
Hide file tree
Showing 14 changed files with 894 additions and 387 deletions.
20 changes: 10 additions & 10 deletions core/src/main/resources/error/delta-error-classes.json
Expand Up @@ -68,15 +68,15 @@
],
"sqlState" : "42000"
},
"DELTA_BLOCK_CDF_COLUMN_MAPPING_READS" : {
"DELTA_BLOCK_COLUMN_MAPPING_AND_CDC_OPERATION" : {
"message" : [
"Change Data Feed (CDF) reads are not supported on tables with column mapping schema changes (e.g. rename or drop). Read schema: <newSchema>. Incompatible schema: <oldSchema>. <hint>"
"Operation \"<opName>\" is not allowed when the table has enabled change data feed (CDF) and has undergone schema changes using DROP COLUMN or RENAME COLUMN."
],
"sqlState" : "0A000"
},
"DELTA_BLOCK_COLUMN_MAPPING_AND_CDC_OPERATION" : {
"DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION" : {
"message" : [
"Operation \"<opName>\" is not allowed when the table has enabled change data feed (CDF) and has undergone schema changes using DROP COLUMN or RENAME COLUMN."
"<opName> is not supported on tables with column mapping schema changes (e.g. rename or drop). Read schema: <readSchema>. Incompatible schema: <incompatibleSchema>. You may force enable streaming read at your own risk by turning on <config>."
],
"sqlState" : "0A000"
},
Expand Down Expand Up @@ -1294,6 +1294,12 @@
],
"sqlState" : "22000"
},
"DELTA_STREAM_CHECK_COLUMN_MAPPING_NO_SNAPSHOT" : {
"message" : [
"Failed to obtain Delta log snapshot for the start version when checking column mapping schema changes. Please choose a different start version, or force enable streaming read at your own risk by setting '<config>' to 'true'."
],
"sqlState" : "22000"
},
"DELTA_TABLE_ALREADY_CONTAINS_CDC_COLUMNS" : {
"message" : [
"Unable to enable Change Data Capture on the table. The table already contains",
Expand Down Expand Up @@ -1541,12 +1547,6 @@
],
"sqlState" : "0A000"
},
"DELTA_UNSUPPORTED_COLUMN_MAPPING_STREAMING_READS" : {
"message" : [
"Streaming reads from a Delta table with column mapping enabled are not supported."
],
"sqlState" : "0A000"
},
"DELTA_UNSUPPORTED_COLUMN_MAPPING_WRITE" : {
"message" : [
"Writing data with column mapping mode is not supported."
Expand Down
Expand Up @@ -522,15 +522,17 @@ trait DeltaColumnMappingBase extends DeltaLogging {
def isColumnMappingReadCompatible(newMetadata: Metadata, oldMetadata: Metadata): Boolean = {
val (oldMode, newMode) = (oldMetadata.columnMappingMode, newMetadata.columnMappingMode)
if (oldMode != NoMapping && newMode != NoMapping) {
require(oldMode == newMode, "changing mode is not supported")
// Both changes are post column mapping enabled
!isRenameColumnOperation(newMetadata, oldMetadata) &&
!isDropColumnOperation(newMetadata, oldMetadata)
!isDropColumnOperation(newMetadata, oldMetadata)
} else if (oldMode == NoMapping && newMode != NoMapping) {
// The old metadata does not have column mapping while the new metadata does, in this case
// we assume an upgrade has happened in between.
// So we manually construct a post-upgrade schema for the old metadata and compare that with
// the new metadata, as the upgrade would use the logical name as the physical name, we could
// easily capture any difference in the schema using the same is{XXX}ColumnOperation utils.
// easily capture any difference in the schema using the same is{Drop,Rename}ColumnOperation
// utils.
var upgradedMetadata = assignColumnIdAndPhysicalName(
oldMetadata, oldMetadata, isChangingModeOnExistingTable = true
)
Expand Down
67 changes: 47 additions & 20 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Expand Up @@ -981,11 +981,6 @@ trait DeltaErrorsBase
)
}

def blockStreamingReadsOnColumnMappingEnabledTable: Throwable = {
new DeltaUnsupportedOperationException(
errorClass = "DELTA_UNSUPPORTED_COLUMN_MAPPING_STREAMING_READS")
}

def bloomFilterOnPartitionColumnNotSupportedException(name: String): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_UNSUPPORTED_PARTITION_COLUMN_IN_BLOOM_FILTER",
Expand Down Expand Up @@ -2356,23 +2351,37 @@ trait DeltaErrorsBase
// scalastyle:on line.size.limit
}

def blockBatchCdfReadOnColumnMappingEnabledTable(
readSchema: StructType,
incompatibleSchema: StructType): Throwable = {
new DeltaColumnMappingUnsupportedSchemaIncompatibleException(
"Change Data Feed (CDF) read",
readSchema,
incompatibleSchema,
DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key
)
}

val columnMappingCDFBatchBlockHint: String =
s"You may force enable batch CDF read at your own risk by turning on " +
s"${DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key}."
def blockStreamingReadsOnColumnMappingEnabledTable(
readSchema: StructType,
incompatibleSchema: StructType,
isCdfRead: Boolean,
detectedDuringStreaming: Boolean): Throwable = {
new DeltaColumnMappingUnsupportedSchemaIncompatibleException(
if (isCdfRead) "Streaming read of Change Data Feed (CDF)" else "Streaming read",
readSchema,
incompatibleSchema,
DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key,
additionalProperties = Map(
"detectedDuringStreaming" -> detectedDuringStreaming.toString
))
}

def blockCdfAndColumnMappingReads(
isStreaming: Boolean,
readSchema: Option[StructType] = None,
incompatibleSchema: Option[StructType] = None): Throwable = {
new DeltaUnsupportedOperationException(
errorClass = "DELTA_BLOCK_CDF_COLUMN_MAPPING_READS",
messageParameters = Array(
readSchema.map(_.json).getOrElse(""),
incompatibleSchema.map(_.json).getOrElse(""),
if (isStreaming) "" else columnMappingCDFBatchBlockHint
)
)
def failedToGetSnapshotDuringColumnMappingStreamingReadCheck(cause: Throwable): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_STREAM_CHECK_COLUMN_MAPPING_NO_SNAPSHOT",
Array(DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key),
Some(cause))
}

def showColumnsWithConflictDatabasesError(db: String, tableID: TableIdentifier): Throwable = {
Expand Down Expand Up @@ -2641,3 +2650,21 @@ class ColumnMappingUnsupportedException(msg: String)
extends UnsupportedOperationException(msg)
case class ColumnMappingException(msg: String, mode: DeltaColumnMappingMode)
extends AnalysisException(msg)

/**
* Errors thrown when an operation is not supported with column mapping schema changes
* (rename / drop column).
*
* To make compatible with existing behavior for those who accidentally has already used this
* operation, user should always be able to use `escapeConfigName` to fall back at own risk.
*/
class DeltaColumnMappingUnsupportedSchemaIncompatibleException(
val opName: String,
val readSchema: StructType,
val incompatibleSchema: StructType,
val escapeConfigName: String,
val additionalProperties: Map[String, String] = Map.empty)
extends DeltaUnsupportedOperationException(
errorClass = "DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION",
messageParameters = Array(opName, readSchema.json, incompatibleSchema.json, escapeConfigName)
)
Expand Up @@ -303,16 +303,18 @@ object DeltaOperations {
}

/** Recorded when columns are dropped. */
val OP_DROP_COLUMN = "DROP COLUMNS"
case class DropColumns(
colsToDrop: Seq[Seq[String]]) extends Operation("DROP COLUMNS") {
colsToDrop: Seq[Seq[String]]) extends Operation(OP_DROP_COLUMN) {

override val parameters: Map[String, Any] = Map(
"columns" -> JsonUtils.toJson(colsToDrop.map(UnresolvedAttribute(_).name)))
}

/** Recorded when column is renamed */
val OP_RENAME_COLUMN = "RENAME COLUMN"
case class RenameColumn(oldColumnPath: Seq[String], newColumnPath: Seq[String])
extends Operation("RENAME COLUMN") {
extends Operation(OP_RENAME_COLUMN) {
override val parameters: Map[String, Any] = Map(
"oldColumnPath" -> UnresolvedAttribute(oldColumnPath).name,
"newColumnPath" -> UnresolvedAttribute(newColumnPath).name
Expand Down
Expand Up @@ -263,14 +263,6 @@ object CDCReader extends DeltaLogging {

val snapshot = deltaLog.snapshot

// If the table has column mapping enabled, throw an error. With column mapping, certain schema
// changes are possible (rename a column or drop a column) which don't work well with CDF.
// TODO: remove this after the proper blocking semantics is rolled out
// This is only blocking streaming CDF, batch CDF will be blocked differently below.
if (isStreaming && snapshot.metadata.columnMappingMode != NoMapping) {
throw DeltaErrors.blockCdfAndColumnMappingReads(isStreaming)
}

// A map from change version to associated commit timestamp.
val timestampsByVersion: Map[Long, Timestamp] =
getTimestampsByVersion(deltaLog, start, end, spark)
Expand All @@ -280,7 +272,7 @@ object CDCReader extends DeltaLogging {
val removeFiles = ListBuffer[CDCDataSpec[RemoveFile]]()

val startVersionSnapshot = deltaLog.getSnapshotAt(start)
if (!isCDCEnabledOnTable(deltaLog.getSnapshotAt(start).metadata)) {
if (!isCDCEnabledOnTable(startVersionSnapshot.metadata)) {
throw DeltaErrors.changeDataNotRecordedException(start, start, end)
}

Expand All @@ -300,11 +292,8 @@ object CDCReader extends DeltaLogging {
if (shouldCheckToBlockBatchReadOnColumnMappingTable &&
!DeltaColumnMapping.isColumnMappingReadCompatible(
snapshot.metadata, startVersionSnapshot.metadata)) {
throw DeltaErrors.blockCdfAndColumnMappingReads(
isStreaming,
Some(snapshot.metadata.schema),
Some(startVersionSnapshot.metadata.schema)
)
throw DeltaErrors.blockBatchCdfReadOnColumnMappingEnabledTable(
snapshot.metadata.schema, startVersionSnapshot.metadata.schema)
}

var totalBytes = 0L
Expand All @@ -328,11 +317,8 @@ object CDCReader extends DeltaLogging {
if (shouldCheckToBlockBatchReadOnColumnMappingTable) {
actions.collect { case a: Metadata => a }.foreach { metadata =>
if (!DeltaColumnMapping.isColumnMappingReadCompatible(snapshot.metadata, metadata)) {
throw DeltaErrors.blockCdfAndColumnMappingReads(
isStreaming,
Some(snapshot.metadata.schema),
Some(metadata.schema)
)
throw DeltaErrors.blockBatchCdfReadOnColumnMappingEnabledTable(
snapshot.metadata.schema, metadata.schema)
}
}
}
Expand Down
Expand Up @@ -793,6 +793,17 @@ trait DeltaSQLConfBase {
.createWithDefault(false)
}

val DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES =
buildConf("streaming.unsafeReadOnIncompatibleSchemaChanges.enabled")
.doc(
"Streaming read on Delta table with column mapping schema operations " +
"(e.g. rename or drop column) is currently blocked due to potential data loss and " +
"schema confusion. However, existing users may use this flag to force unblock " +
"if they'd like to take the risk.")
.internal()
.booleanConf
.createWithDefault(false)

val DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES =
buildConf("changeDataFeed.unsafeBatchReadOnIncompatibleSchemaChanges.enabled")
.doc(
Expand Down

0 comments on commit 8938463

Please sign in to comment.