diff --git a/core/src/main/resources/error/delta-error-classes.json b/core/src/main/resources/error/delta-error-classes.json index 443a30a822..707369ba2d 100644 --- a/core/src/main/resources/error/delta-error-classes.json +++ b/core/src/main/resources/error/delta-error-classes.json @@ -68,15 +68,15 @@ ], "sqlState" : "42000" }, - "DELTA_BLOCK_CDF_COLUMN_MAPPING_READS" : { + "DELTA_BLOCK_COLUMN_MAPPING_AND_CDC_OPERATION" : { "message" : [ - "Change Data Feed (CDF) reads are not supported on tables with column mapping schema changes (e.g. rename or drop). Read schema: . Incompatible schema: . " + "Operation \"\" is not allowed when the table has enabled change data feed (CDF) and has undergone schema changes using DROP COLUMN or RENAME COLUMN." ], "sqlState" : "0A000" }, - "DELTA_BLOCK_COLUMN_MAPPING_AND_CDC_OPERATION" : { + "DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION" : { "message" : [ - "Operation \"\" is not allowed when the table has enabled change data feed (CDF) and has undergone schema changes using DROP COLUMN or RENAME COLUMN." + " is not supported on tables with column mapping schema changes (e.g. rename or drop). Read schema: . Incompatible schema: . You may force enable streaming read at your own risk by turning on ." ], "sqlState" : "0A000" }, @@ -1294,6 +1294,12 @@ ], "sqlState" : "22000" }, + "DELTA_STREAM_CHECK_COLUMN_MAPPING_NO_SNAPSHOT" : { + "message" : [ + "Failed to obtain Delta log snapshot for the start version when checking column mapping schema changes. Please choose a different start version, or force enable streaming read at your own risk by setting '' to 'true'." + ], + "sqlState" : "22000" + }, "DELTA_TABLE_ALREADY_CONTAINS_CDC_COLUMNS" : { "message" : [ "Unable to enable Change Data Capture on the table. The table already contains", @@ -1541,12 +1547,6 @@ ], "sqlState" : "0A000" }, - "DELTA_UNSUPPORTED_COLUMN_MAPPING_STREAMING_READS" : { - "message" : [ - "Streaming reads from a Delta table with column mapping enabled are not supported." - ], - "sqlState" : "0A000" - }, "DELTA_UNSUPPORTED_COLUMN_MAPPING_WRITE" : { "message" : [ "Writing data with column mapping mode is not supported." diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala index fbc12b7665..5ed56e70be 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala @@ -522,15 +522,17 @@ trait DeltaColumnMappingBase extends DeltaLogging { def isColumnMappingReadCompatible(newMetadata: Metadata, oldMetadata: Metadata): Boolean = { val (oldMode, newMode) = (oldMetadata.columnMappingMode, newMetadata.columnMappingMode) if (oldMode != NoMapping && newMode != NoMapping) { + require(oldMode == newMode, "changing mode is not supported") // Both changes are post column mapping enabled !isRenameColumnOperation(newMetadata, oldMetadata) && - !isDropColumnOperation(newMetadata, oldMetadata) + !isDropColumnOperation(newMetadata, oldMetadata) } else if (oldMode == NoMapping && newMode != NoMapping) { // The old metadata does not have column mapping while the new metadata does, in this case // we assume an upgrade has happened in between. // So we manually construct a post-upgrade schema for the old metadata and compare that with // the new metadata, as the upgrade would use the logical name as the physical name, we could - // easily capture any difference in the schema using the same is{XXX}ColumnOperation utils. + // easily capture any difference in the schema using the same is{Drop,Rename}ColumnOperation + // utils. var upgradedMetadata = assignColumnIdAndPhysicalName( oldMetadata, oldMetadata, isChangingModeOnExistingTable = true ) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index bf079bfc33..38981239b9 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -981,11 +981,6 @@ trait DeltaErrorsBase ) } - def blockStreamingReadsOnColumnMappingEnabledTable: Throwable = { - new DeltaUnsupportedOperationException( - errorClass = "DELTA_UNSUPPORTED_COLUMN_MAPPING_STREAMING_READS") - } - def bloomFilterOnPartitionColumnNotSupportedException(name: String): Throwable = { new DeltaAnalysisException( errorClass = "DELTA_UNSUPPORTED_PARTITION_COLUMN_IN_BLOOM_FILTER", @@ -2356,23 +2351,37 @@ trait DeltaErrorsBase // scalastyle:on line.size.limit } + def blockBatchCdfReadOnColumnMappingEnabledTable( + readSchema: StructType, + incompatibleSchema: StructType): Throwable = { + new DeltaColumnMappingUnsupportedSchemaIncompatibleException( + "Change Data Feed (CDF) read", + readSchema, + incompatibleSchema, + DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key + ) + } - val columnMappingCDFBatchBlockHint: String = - s"You may force enable batch CDF read at your own risk by turning on " + - s"${DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key}." + def blockStreamingReadsOnColumnMappingEnabledTable( + readSchema: StructType, + incompatibleSchema: StructType, + isCdfRead: Boolean, + detectedDuringStreaming: Boolean): Throwable = { + new DeltaColumnMappingUnsupportedSchemaIncompatibleException( + if (isCdfRead) "Streaming read of Change Data Feed (CDF)" else "Streaming read", + readSchema, + incompatibleSchema, + DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key, + additionalProperties = Map( + "detectedDuringStreaming" -> detectedDuringStreaming.toString + )) + } - def blockCdfAndColumnMappingReads( - isStreaming: Boolean, - readSchema: Option[StructType] = None, - incompatibleSchema: Option[StructType] = None): Throwable = { - new DeltaUnsupportedOperationException( - errorClass = "DELTA_BLOCK_CDF_COLUMN_MAPPING_READS", - messageParameters = Array( - readSchema.map(_.json).getOrElse(""), - incompatibleSchema.map(_.json).getOrElse(""), - if (isStreaming) "" else columnMappingCDFBatchBlockHint - ) - ) + def failedToGetSnapshotDuringColumnMappingStreamingReadCheck(cause: Throwable): Throwable = { + new DeltaAnalysisException( + errorClass = "DELTA_STREAM_CHECK_COLUMN_MAPPING_NO_SNAPSHOT", + Array(DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key), + Some(cause)) } def showColumnsWithConflictDatabasesError(db: String, tableID: TableIdentifier): Throwable = { @@ -2641,3 +2650,21 @@ class ColumnMappingUnsupportedException(msg: String) extends UnsupportedOperationException(msg) case class ColumnMappingException(msg: String, mode: DeltaColumnMappingMode) extends AnalysisException(msg) + +/** + * Errors thrown when an operation is not supported with column mapping schema changes + * (rename / drop column). + * + * To make compatible with existing behavior for those who accidentally has already used this + * operation, user should always be able to use `escapeConfigName` to fall back at own risk. + */ +class DeltaColumnMappingUnsupportedSchemaIncompatibleException( + val opName: String, + val readSchema: StructType, + val incompatibleSchema: StructType, + val escapeConfigName: String, + val additionalProperties: Map[String, String] = Map.empty) + extends DeltaUnsupportedOperationException( + errorClass = "DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION", + messageParameters = Array(opName, readSchema.json, incompatibleSchema.json, escapeConfigName) + ) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala index 33cc33664f..457a7b8e0a 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala @@ -303,16 +303,18 @@ object DeltaOperations { } /** Recorded when columns are dropped. */ + val OP_DROP_COLUMN = "DROP COLUMNS" case class DropColumns( - colsToDrop: Seq[Seq[String]]) extends Operation("DROP COLUMNS") { + colsToDrop: Seq[Seq[String]]) extends Operation(OP_DROP_COLUMN) { override val parameters: Map[String, Any] = Map( "columns" -> JsonUtils.toJson(colsToDrop.map(UnresolvedAttribute(_).name))) } /** Recorded when column is renamed */ + val OP_RENAME_COLUMN = "RENAME COLUMN" case class RenameColumn(oldColumnPath: Seq[String], newColumnPath: Seq[String]) - extends Operation("RENAME COLUMN") { + extends Operation(OP_RENAME_COLUMN) { override val parameters: Map[String, Any] = Map( "oldColumnPath" -> UnresolvedAttribute(oldColumnPath).name, "newColumnPath" -> UnresolvedAttribute(newColumnPath).name diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala index da81643cc6..4a4372d937 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala @@ -263,14 +263,6 @@ object CDCReader extends DeltaLogging { val snapshot = deltaLog.snapshot - // If the table has column mapping enabled, throw an error. With column mapping, certain schema - // changes are possible (rename a column or drop a column) which don't work well with CDF. - // TODO: remove this after the proper blocking semantics is rolled out - // This is only blocking streaming CDF, batch CDF will be blocked differently below. - if (isStreaming && snapshot.metadata.columnMappingMode != NoMapping) { - throw DeltaErrors.blockCdfAndColumnMappingReads(isStreaming) - } - // A map from change version to associated commit timestamp. val timestampsByVersion: Map[Long, Timestamp] = getTimestampsByVersion(deltaLog, start, end, spark) @@ -280,7 +272,7 @@ object CDCReader extends DeltaLogging { val removeFiles = ListBuffer[CDCDataSpec[RemoveFile]]() val startVersionSnapshot = deltaLog.getSnapshotAt(start) - if (!isCDCEnabledOnTable(deltaLog.getSnapshotAt(start).metadata)) { + if (!isCDCEnabledOnTable(startVersionSnapshot.metadata)) { throw DeltaErrors.changeDataNotRecordedException(start, start, end) } @@ -300,11 +292,8 @@ object CDCReader extends DeltaLogging { if (shouldCheckToBlockBatchReadOnColumnMappingTable && !DeltaColumnMapping.isColumnMappingReadCompatible( snapshot.metadata, startVersionSnapshot.metadata)) { - throw DeltaErrors.blockCdfAndColumnMappingReads( - isStreaming, - Some(snapshot.metadata.schema), - Some(startVersionSnapshot.metadata.schema) - ) + throw DeltaErrors.blockBatchCdfReadOnColumnMappingEnabledTable( + snapshot.metadata.schema, startVersionSnapshot.metadata.schema) } var totalBytes = 0L @@ -328,11 +317,8 @@ object CDCReader extends DeltaLogging { if (shouldCheckToBlockBatchReadOnColumnMappingTable) { actions.collect { case a: Metadata => a }.foreach { metadata => if (!DeltaColumnMapping.isColumnMappingReadCompatible(snapshot.metadata, metadata)) { - throw DeltaErrors.blockCdfAndColumnMappingReads( - isStreaming, - Some(snapshot.metadata.schema), - Some(metadata.schema) - ) + throw DeltaErrors.blockBatchCdfReadOnColumnMappingEnabledTable( + snapshot.metadata.schema, metadata.schema) } } } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 337f49cde6..082eb38ad3 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -793,6 +793,17 @@ trait DeltaSQLConfBase { .createWithDefault(false) } + val DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES = + buildConf("streaming.unsafeReadOnIncompatibleSchemaChanges.enabled") + .doc( + "Streaming read on Delta table with column mapping schema operations " + + "(e.g. rename or drop column) is currently blocked due to potential data loss and " + + "schema confusion. However, existing users may use this flag to force unblock " + + "if they'd like to take the risk.") + .internal() + .booleanConf + .createWithDefault(false) + val DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES = buildConf("changeDataFeed.unsafeBatchReadOnIncompatibleSchemaChanges.enabled") .doc( diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala index 177b157109..79efb611bd 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala @@ -20,6 +20,9 @@ package org.apache.spark.sql.delta.sources import java.io.FileNotFoundException import java.sql.Timestamp +import scala.collection.mutable +import scala.util.control.NonFatal + import scala.util.matching.Regex import org.apache.spark.sql.delta._ @@ -96,9 +99,30 @@ trait DeltaSourceBase extends Source with SupportsTriggerAvailableNow with DeltaLogging { self: DeltaSource => + /** + * Pin down the snapshot during initialization of DeltaSource so we could consistently use this + * same snapshot across the lifespan of this Delta Source. + * + * Visible for testing. + */ + protected[delta] val snapshotAtSourceInit: Snapshot = deltaLog.snapshot + + /** + * Flag that allows user to force enable unsafe streaming read on Delta table with + * column mapping enabled AND drop/rename actions. + */ + protected lazy val forceEnableStreamingRead: Boolean = spark.sessionState.conf + .getConf(DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES) + + /** + * A global flag to mark whether we have done a per-stream start check for column mapping + * schema changes (rename / drop). + */ + protected var hasCheckedColumnMappingChangesOnStreamStart: Boolean = false + override val schema: StructType = { val schemaWithoutCDC = - ColumnWithDefaultExprUtils.removeDefaultExpressions(deltaLog.snapshot.metadata.schema) + ColumnWithDefaultExprUtils.removeDefaultExpressions(snapshotAtSourceInit.schema) if (options.readChangeFeed) { CDCReader.cdcReadSchema(schemaWithoutCDC) } else { @@ -135,7 +159,7 @@ trait DeltaSourceBase extends Source // Take each change until we've seen the configured number of addFiles. Some changes don't // represent file additions; we retain them for offset tracking, but they don't count towards // the maxFilesPerTrigger conf. - var admissionControl = limits.get + val admissionControl = limits.get changes.withClose { it => it.takeWhile { index => admissionControl.admit(Option(index.add)) @@ -191,7 +215,7 @@ trait DeltaSourceBase extends Source .asInstanceOf[Iterator[AddFile]].toArray deltaLog.createDataFrame( - deltaLog.snapshot, + snapshotAtSourceInit, addFilesList, isStreaming = true) } @@ -287,11 +311,91 @@ trait DeltaSourceBase extends Source protected def cleanUpSnapshotResources(): Unit = { if (initialState != null) { - initialState.close(unpersistSnapshot = initialStateVersion < deltaLog.snapshot.version) + initialState.close(unpersistSnapshot = initialStateVersion < snapshotAtSourceInit.version) initialState = null } } + /** + * Check column mapping changes during stream (re)start so we could fail fast. + * + * Note that this won't block on serving the initial snapshot when isStartingVersion=true, + * because serving the initial snapshot of the table is considered same as serving a batch-load + * of the table's data, so if we have any drop/rename prior to that, we won't block. + * + * TODO: unblock this after we roll out the proper semantics. + */ + protected def checkColumnMappingSchemaChangesOnStreamStart(startVersion: Long): Unit = { + if (snapshotAtSourceInit.metadata.columnMappingMode != NoMapping && + !forceEnableStreamingRead) { + + val snapshotAtStartVersionToScan = try { + getSnapshotFromDeltaLog(startVersion) + } catch { + case NonFatal(e) => + // If we could not construct a snapshot, unfortunately there isn't much we could do + // to completely ensure data consistency. + throw DeltaErrors.failedToGetSnapshotDuringColumnMappingStreamingReadCheck(e) + } + + // Compare stream metadata with that to detect column mapping schema changes + if (!DeltaColumnMapping.isColumnMappingReadCompatible( + snapshotAtSourceInit.metadata, snapshotAtStartVersionToScan.metadata)) { + throw DeltaErrors.blockStreamingReadsOnColumnMappingEnabledTable( + readSchema = snapshotAtSourceInit.schema, + incompatibleSchema = snapshotAtStartVersionToScan.schema, + isCdfRead = options.readChangeFeed, + detectedDuringStreaming = false + ) + } + } + } + + /** + * Check column mapping schema changes during stream execution. It does the following: + * 1. Unifies the error messages thrown when encountering rename/drop column during streaming. + * 2. Prevents a tricky case in which when a drop column happened, the read compatibility check + * could NOT detect that, so it would move PAST that version and later MicroBatchExecution + * would throw a different exception. BUT, if the stream restarts, it would start serving + * the batches POST the drop column and none of our checks will be able to capture that. + * This check, however, would make sure the stream to NOT move past the drop column change so + * next time when stream restarts, it would be blocked right again. + * + * We need to compare change versions so that we won't accidentally block an ADD column by + * detecting it as a reverse DROP column. + * + * TODO: unblock this after we roll out the proper semantics. + */ + protected def checkColumnMappingSchemaChangesDuringStreaming( + curMetadata: Metadata, + curVersion: Long): Unit = { + + val metadataAtSourceInit = snapshotAtSourceInit.metadata + + if (metadataAtSourceInit.columnMappingMode != NoMapping && !forceEnableStreamingRead) { + if (curVersion < snapshotAtSourceInit.version) { + // Stream version is newer, ensure there's no column mapping schema changes + // from cur -> stream. + if (!DeltaColumnMapping.isColumnMappingReadCompatible(metadataAtSourceInit, curMetadata)) { + throw DeltaErrors.blockStreamingReadsOnColumnMappingEnabledTable( + curMetadata.schema, + metadataAtSourceInit.schema, + isCdfRead = options.readChangeFeed, + detectedDuringStreaming = true) + } + } else { + // Current metadata action version is newer, ensure there's no column mapping schema changes + // from stream -> cur. + if (!DeltaColumnMapping.isColumnMappingReadCompatible(curMetadata, metadataAtSourceInit)) { + throw DeltaErrors.blockStreamingReadsOnColumnMappingEnabledTable( + metadataAtSourceInit.schema, + curMetadata.schema, + isCdfRead = options.readChangeFeed, + detectedDuringStreaming = true) + } + } + } + } } /** @@ -332,7 +436,7 @@ case class DeltaSource( // This was checked before creating ReservoirSource assert(schema.nonEmpty) - protected val tableId = deltaLog.snapshot.metadata.id + protected val tableId = snapshotAtSourceInit.metadata.id private var previousOffset: DeltaSourceOffset = null @@ -343,11 +447,15 @@ case class DeltaSource( /** * Get the changes starting from (startVersion, startIndex). The start point should not be * included in the result. + * + * If verifyMetadataAction = true, we will break the stream when we detect any read-incompatible + * metadata changes. */ protected def getFileChanges( fromVersion: Long, fromIndex: Long, - isStartingVersion: Boolean): ClosableIterator[IndexedFile] = { + isStartingVersion: Boolean, + verifyMetadataAction: Boolean = true): ClosableIterator[IndexedFile] = { /** Returns matching files that were added on or after startVersion among delta logs. */ def filterAndIndexDeltaLogs(startVersion: Long): ClosableIterator[IndexedFile] = { @@ -358,7 +466,8 @@ case class DeltaSource( // entire file can be read into memory val actions = deltaLog.store.read(filestatus, deltaLog.newDeltaHadoopConf()) .map(Action.fromJson) - val addFiles = verifyStreamHygieneAndFilterAddFiles(actions, version) + val addFiles = verifyStreamHygieneAndFilterAddFiles( + actions, version, verifyMetadataAction) (Iterator.single(IndexedFile(version, -1, null)) ++ addFiles .map(_.asInstanceOf[AddFile]) @@ -371,7 +480,7 @@ case class DeltaSource( filestatus, deltaLog.newDeltaHadoopConf()) try { - verifyStreamHygiene(fileIterator.map(Action.fromJson), version) + verifyStreamHygiene(fileIterator.map(Action.fromJson), version, verifyMetadataAction) } finally { fileIterator.close() } @@ -394,12 +503,6 @@ case class DeltaSource( } } - // If the table has column mapping enabled, throw an error. With column mapping, certain schema - // changes are possible (rename a column or drop a column) which don't work well with streaming. - if (deltaLog.snapshot.metadata.columnMappingMode != NoMapping) { - throw DeltaErrors.blockStreamingReadsOnColumnMappingEnabledTable - } - var iter = if (isStartingVersion) { Iterator(1, 2).flatMapWithClose { // so that the filterAndIndexDeltaLogs call is lazy case 1 => getSnapshotAt(fromVersion).toClosable @@ -438,6 +541,9 @@ case class DeltaSource( initialState.iterator() } + /** + * Narrow-waist for generating snapshot from Delta Log within Delta Source + */ protected def getSnapshotFromDeltaLog(version: Long): Snapshot = { try { deltaLog.getSnapshotAt(version) @@ -464,7 +570,7 @@ case class DeltaSource( val (version, isStartingVersion) = getStartingVersion match { case Some(v) => (v, false) - case None => (deltaLog.snapshot.version, true) + case None => (snapshotAtSourceInit.version, true) } if (version < 0) { return None @@ -494,12 +600,19 @@ case class DeltaSource( "latestOffset(Offset, ReadLimit) should be called instead of this method") } + /** + * Check stream for violating any constraints. + * + * If verifyMetadataAction = true, we will break the stream when we detect any read-incompatible + * metadata changes. + */ protected def verifyStreamHygiene( actions: Iterator[Action], - version: Long): Unit = { + version: Long, + verifyMetadataAction: Boolean = true): Unit = { var seenFileAdd = false var removeFileActionPath: Option[String] = None - actions.foreach{ + actions.foreach { case a: AddFile if a.dataChange => seenFileAdd = true case r: RemoveFile if r.dataChange => @@ -507,8 +620,11 @@ case class DeltaSource( removeFileActionPath = Some(r.path) } case m: Metadata => - if (!SchemaUtils.isReadCompatible(m.schema, schema)) { - throw DeltaErrors.schemaChangedException(schema, m.schema, false) + if (verifyMetadataAction) { + checkColumnMappingSchemaChangesDuringStreaming(m, version) + if (!SchemaUtils.isReadCompatible(m.schema, schema)) { + throw DeltaErrors.schemaChangedException(schema, m.schema, false) + } } case protocol: Protocol => deltaLog.protocolRead(protocol) @@ -523,9 +639,16 @@ case class DeltaSource( } } + /** + * Check the stream for violating any constraints. + * + * If verifyMetadataAction = true, we will break the stream when we detect any read-incompatible + * metadata changes. + */ protected def verifyStreamHygieneAndFilterAddFiles( actions: Seq[Action], - version: Long): Seq[Action] = { + version: Long, + verifyMetadataAction: Boolean = true): Seq[Action] = { var seenFileAdd = false var removeFileActionPath: Option[String] = None val filteredActions = actions.filter { @@ -545,8 +668,11 @@ case class DeltaSource( case _: RemoveFile => false case m: Metadata => - if (!SchemaUtils.isReadCompatible(m.schema, schema)) { - throw DeltaErrors.schemaChangedException(schema, m.schema, false) + if (verifyMetadataAction) { + checkColumnMappingSchemaChangesDuringStreaming(m, version) + if (!SchemaUtils.isReadCompatible(m.schema, schema)) { + throw DeltaErrors.schemaChangedException(schema, m.schema, false) + } } false case protocol: Protocol => @@ -609,6 +735,16 @@ case class DeltaSource( } logDebug(s"start: $startOffsetOption end: $end") + // Block streaming read if we detected any column mapping schema changes during stream start + // This is necessary to detect cases when verifySchemaHygiene could not identify any violating + // metadata actions, such as when a table only had ONE rename/drop, which could results in a + // metadata action that looks exactly the same as the stream snapshot schema. + if (!hasCheckedColumnMappingChangesOnStreamStart) { + checkColumnMappingSchemaChangesOnStreamStart(startVersion) + // Turn on the flag so we don't do this for every single batch + hasCheckedColumnMappingChangesOnStreamStart = true + } + val createdDf = createDataFrameBetweenOffsets(startVersion, startIndex, isStartingVersion, startSourceVersion, startOffsetOption, endOffset) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala index 7a1a6b7fbb..3391585cb6 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala @@ -176,19 +176,23 @@ trait DeltaSourceCDCSupport { self: DeltaSource => /** * Get the changes starting from (fromVersion, fromIndex). fromVersion is included. - * It returns an iterator of (log_version, fileActions) + * It returns an iterator of (log_version, fileActions) + * + * If verifyMetadataAction = true, we will break the stream when we detect any read-incompatible + * metadata changes. */ protected def getFileChangesForCDC( fromVersion: Long, fromIndex: Long, isStartingVersion: Boolean, limits: Option[AdmissionLimits], - endOffset: Option[DeltaSourceOffset]): Iterator[(Long, Iterator[IndexedFile])] = { + endOffset: Option[DeltaSourceOffset], + verifyMetadataAction: Boolean = true): Iterator[(Long, Iterator[IndexedFile])] = { /** Returns matching files that were added on or after startVersion among delta logs. */ def filterAndIndexDeltaLogs(startVersion: Long): Iterator[(Long, IndexedChangeFileSeq)] = { deltaLog.getChanges(startVersion, options.failOnDataLoss).map { case (version, actions) => - val fileActions = filterCDCActions(actions, version) + val fileActions = filterCDCActions(actions, version, verifyMetadataAction) val itr = Iterator(IndexedFile(version, -1, null)) ++ fileActions .zipWithIndex.map { case (action: AddFile, index) => @@ -247,10 +251,14 @@ trait DeltaSourceCDCSupport { self: DeltaSource => /** * Filter out non CDC actions and only return CDC ones. This will either be AddCDCFiles * or AddFile and RemoveFiles + * + * If verifyMetadataAction = true, we will break the stream when we detect any read-incompatible + * metadata changes. */ private def filterCDCActions( actions: Seq[Action], - version: Long): Seq[FileAction] = { + version: Long, + verifyMetadataAction: Boolean = true): Seq[FileAction] = { if (actions.exists(_.isInstanceOf[AddCDCFile])) { actions.filter(_.isInstanceOf[AddCDCFile]).asInstanceOf[Seq[FileAction]] } else { @@ -262,9 +270,12 @@ trait DeltaSourceCDCSupport { self: DeltaSource => case cdc: AddCDCFile => false case m: Metadata => - val cdcSchema = CDCReader.cdcReadSchema(m.schema) - if (!SchemaUtils.isReadCompatible(cdcSchema, schema)) { - throw DeltaErrors.schemaChangedException(schema, cdcSchema, false) + if (verifyMetadataAction) { + checkColumnMappingSchemaChangesDuringStreaming(m, version) + val cdcSchema = CDCReader.cdcReadSchema(m.schema) + if (!SchemaUtils.isReadCompatible(cdcSchema, schema)) { + throw DeltaErrors.schemaChangedException(schema, cdcSchema, false) + } } false case protocol: Protocol => diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala index 4d44ada576..8eda17c8e2 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala @@ -26,11 +26,12 @@ import scala.language.implicitConversions import org.apache.spark.sql.delta.actions.AddCDCFile import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest} import org.apache.spark.sql.delta.util.FileNames import org.apache.hadoop.fs.Path -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkThrowable} +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} import org.apache.spark.sql.types.StructType @@ -820,69 +821,30 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest } } } +} - test("should block CDC reads when Column Mapping enabled - streaming") { - def assertError(f: => Any): Unit = { - val e = intercept[StreamingQueryException] { - f - }.getCause.getMessage - assert(e.contains("Change Data Feed (CDF) reads are not supported on tables with " + - "column mapping schema changes (e.g. rename or drop)")) - } +class DeltaCDCStreamSuite extends DeltaCDCStreamSuiteBase +abstract class DeltaCDCStreamColumnMappingSuiteBase extends DeltaCDCStreamSuite + with ColumnMappingStreamingWorkflowSuiteBase with DeltaColumnMappingSelectedTestMixin { + + override protected def isCdcTest: Boolean = true + + + override def runOnlyTests: Seq[String] = Seq( + "no startingVersion should result fetch the entire snapshot", + "user provided startingVersion", + "maxFilesPerTrigger - 2 successive AddCDCFile commits", + + // streaming blocking semantics test + "deltaLog snapshot should not be updated outside of the stream", + "column mapping + streaming - allowed workflows - column addition", + "column mapping + streaming - allowed workflows - upgrade to name mode", + "column mapping + streaming: blocking workflow - drop column", + "column mapping + streaming: blocking workflow - rename column" + ) - Seq(0, 1).foreach { startingVersion => - withClue(s"using CDC starting version $startingVersion") { - withTable("t1") { - withTempDir { dir => - val path = dir.getCanonicalPath - sql( - s""" - |CREATE TABLE t1 (id LONG) USING DELTA - |TBLPROPERTIES( - | '${DeltaConfigs.CHANGE_DATA_FEED.key}'='true', - | '${DeltaConfigs.MIN_READER_VERSION.key}'='2', - | '${DeltaConfigs.MIN_WRITER_VERSION.key}'='5' - |) - |LOCATION '$path' - |""".stripMargin) - - spark.range(10).write.format("delta").mode("append").save(path) - spark.range(10, 20).write.format("delta").mode("append").save(path) - - val df = spark.readStream - .format("delta") - .option(DeltaOptions.CDC_READ_OPTION, "true") - .option("startingVersion", startingVersion) - .load(path) - - // case 1: column-mapping is enabled mid-stream - testStream(df)( - ProcessAllAvailable(), - Execute { _ => - sql(s""" - |ALTER TABLE t1 - |SET TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}'='name') - |""".stripMargin) - }, - AddToReservoir(dir, spark.range(10, 20).toDF()), - Execute { q => - assertError { - q.processAllAvailable() - } - } - ) - - // case 2: perform CDC stream read on table with column mapping already enabled - assertError { - val stream = df.writeStream.format("console").start() - stream.awaitTermination(2000) - stream.stop() - } - } - } - } - } - } } -class DeltaCDCStreamSuite extends DeltaCDCStreamSuiteBase + +class DeltaCDCStreamNameColumnMappingSuite extends DeltaCDCStreamColumnMappingSuiteBase + with DeltaColumnMappingEnableNameMode diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala index 9e15f1f700..272b1f9444 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala @@ -799,10 +799,10 @@ abstract class DeltaCDCColumnMappingSuiteBase extends DeltaCDCScalaSuite with DeltaColumnMappingTestUtils { private def assertBlocked(f: => Unit): Unit = { - val e = intercept[DeltaUnsupportedOperationException] { + val e = intercept[DeltaColumnMappingUnsupportedSchemaIncompatibleException] { f } - assert(e.getErrorClass == "DELTA_BLOCK_CDF_COLUMN_MAPPING_READS" && + assert(e.getErrorClass == "DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION" && e.getMessage.contains( DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key)) } diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala index 5a18e71dde..1e00682e27 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala @@ -352,6 +352,21 @@ trait DeltaColumnMappingTestUtilsBase extends SharedSparkSession { sql(s"CONVERT TO DELTA $tableOrPath") } + /** + * Force enable streaming read (with possible data loss) on column mapping enabled table with + * drop / rename schema changes. + */ + protected def withStreamingReadOnColumnMappingTableEnabled(f: => Unit): Unit = { + if (columnMappingEnabled) { + withSQLConf( + DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key -> "true") { + f + } + } else { + f + } + } + } trait DeltaColumnMappingTestUtils extends DeltaColumnMappingTestUtilsBase diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index c811636ecb..9c8a9bb425 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -2418,15 +2418,6 @@ trait DeltaErrorsSuiteBase assert(e.getMessage == "Can only drop nested columns from StructType. Found StructField(invalid1,StringType,true)") } - { - val e = intercept[DeltaUnsupportedOperationException] { - throw DeltaErrors.blockStreamingReadsOnColumnMappingEnabledTable - } - assert(e.getErrorClass == "DELTA_UNSUPPORTED_COLUMN_MAPPING_STREAMING_READS") - assert(e.getSqlState == "0A000") - assert(e.getMessage == - "Streaming reads from a Delta table with column mapping enabled are not supported.") - } { val columnsThatNeedRename = Set("c0", "c1") val schema = StructType(Seq(StructField("schema1", StringType))) @@ -2449,15 +2440,56 @@ trait DeltaErrorsSuiteBase assert(e.getMessage == s"Can't set location multiple times. Found ${locations}") } { - val e = intercept[DeltaUnsupportedOperationException] { - throw DeltaErrors.blockCdfAndColumnMappingReads(isStreaming = false) + val e = intercept[DeltaColumnMappingUnsupportedSchemaIncompatibleException] { + throw DeltaErrors.blockStreamingReadsOnColumnMappingEnabledTable( + StructType.fromDDL("id int"), + StructType.fromDDL("id2 int"), + isCdfRead = true, + detectedDuringStreaming = true + ) } - assert(e.getErrorClass == "DELTA_BLOCK_CDF_COLUMN_MAPPING_READS") + assert(e.getErrorClass == "DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION") assert(e.getSqlState == "0A000") - assert(e.getMessage.contains("Change Data Feed (CDF) reads are not supported on tables with" + - " column mapping schema changes (e.g. rename or drop)")) - assert(e.getMessage.contains( - DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key)) + assert(e.opName == "Streaming read of Change Data Feed (CDF)") + assert(e.readSchema == StructType.fromDDL("id int")) + assert(e.incompatibleSchema == StructType.fromDDL("id2 int")) + assert(e.escapeConfigName == + DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key) + assert(e.additionalProperties("detectedDuringStreaming").toBoolean) + } + { + val e = intercept[DeltaColumnMappingUnsupportedSchemaIncompatibleException] { + throw DeltaErrors.blockStreamingReadsOnColumnMappingEnabledTable( + StructType.fromDDL("id int"), + StructType.fromDDL("id2 int"), + isCdfRead = false, + detectedDuringStreaming = false + ) + } + assert(e.getErrorClass == "DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION") + assert(e.getSqlState == "0A000") + assert(e.opName == "Streaming read") + assert(e.readSchema == StructType.fromDDL("id int")) + assert(e.incompatibleSchema == StructType.fromDDL("id2 int")) + assert(e.escapeConfigName == + DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key) + assert(!e.additionalProperties("detectedDuringStreaming").toBoolean) + } + { + val e = intercept[DeltaColumnMappingUnsupportedSchemaIncompatibleException] { + throw DeltaErrors.blockBatchCdfReadOnColumnMappingEnabledTable( + readSchema = StructType.fromDDL("id int"), + incompatibleSchema = StructType.fromDDL("id2 int")) + } + assert(e.getErrorClass == "DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION") + assert(e.getSqlState == "0A000") + assert(e.opName == "Change Data Feed (CDF) read") + assert(e.readSchema == StructType.fromDDL("id int")) + assert(e.incompatibleSchema == StructType.fromDDL("id2 int")) + assert(e.escapeConfigName == + DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key) + assert(e.additionalProperties.isEmpty) + } { val e = intercept[DeltaUnsupportedOperationException] { diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceColumnMappingSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceColumnMappingSuite.scala new file mode 100644 index 0000000000..1e5b368577 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceColumnMappingSuite.scala @@ -0,0 +1,509 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import java.io.File +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.delta.commands.cdc.CDCReader +import org.apache.spark.sql.delta.sources.{DeltaSource, DeltaSQLConf} +import org.apache.spark.sql.delta.util.JsonUtils +import org.apache.commons.lang3.exception.ExceptionUtils +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.execution.streaming.StreamingExecutionRelation +import org.apache.spark.sql.streaming.{DataStreamReader, StreamTest} +import org.apache.spark.sql.types.StructType + +trait ColumnMappingStreamingWorkflowSuiteBase extends StreamTest + with DeltaColumnMappingTestUtils { + + import testImplicits._ + + // Whether we are requesting CDC streaming changes + protected def isCdcTest: Boolean + + // Drop CDC fields because they are not useful for testing the blocking behavior + private def dropCDCFields(df: DataFrame): DataFrame = + df.drop(CDCReader.CDC_COMMIT_TIMESTAMP) + .drop(CDCReader.CDC_TYPE_COLUMN_NAME) + .drop(CDCReader.CDC_COMMIT_VERSION) + + // DataStreamReader to use + // Set a small max file per trigger to ensure we could catch failures ASAP + private def dsr: DataStreamReader = if (isCdcTest) { + spark.readStream.format("delta") + .option(DeltaOptions.MAX_FILES_PER_TRIGGER_OPTION, "1") + .option(DeltaOptions.CDC_READ_OPTION, "true") + } else { + spark.readStream.format("delta") + .option(DeltaOptions.MAX_FILES_PER_TRIGGER_OPTION, "1") + } + + private val ProcessAllAvailableIgnoreError = Execute { q => + try { + q.processAllAvailable() + } catch { + case _: Throwable => + // swallow the errors so we could check answer and failure on the query later + } + } + + private def isColumnMappingSchemaIncompatibleFailure( + t: Throwable, + detectedDuringStreaming: Boolean): Boolean = t match { + case e: DeltaColumnMappingUnsupportedSchemaIncompatibleException => + if (isCdcTest) { + e.opName == "Streaming read of Change Data Feed (CDF)" + } else { + e.opName == "Streaming read" + } && e.additionalProperties.get("detectedDuringStreaming") + .exists(_.toBoolean == detectedDuringStreaming) + case _ => false + } + + private val ExpectStreamStartInCompatibleSchemaFailure = + ExpectFailure[DeltaColumnMappingUnsupportedSchemaIncompatibleException] { t => + assert(isColumnMappingSchemaIncompatibleFailure(t, detectedDuringStreaming = false)) + } + + private val ExpectInStreamSchemaChangeFailure = + ExpectFailure[DeltaColumnMappingUnsupportedSchemaIncompatibleException] { t => + assert(isColumnMappingSchemaIncompatibleFailure(t, detectedDuringStreaming = true)) + } + + private val ExpectGenericColumnMappingFailure = + ExpectFailure[DeltaColumnMappingUnsupportedSchemaIncompatibleException]() + + // Failure thrown by the current DeltaSource schema change incompatible check + private val existingRetryableInStreamSchemaChangeFailure = Execute { q => + // Similar to ExpectFailure but allows more fine-grained checking of exceptions + failAfter(streamingTimeout) { + try { + q.awaitTermination() + } catch { + case _: Throwable => + // swallow the exception + } + val cause = ExceptionUtils.getRootCause(q.exception.get) + assert(cause.getMessage.contains("Detected schema change")) + } + } + + private def checkStreamStartBlocked( + df: DataFrame, + ckpt: File, + expectedFailure: StreamAction): Unit = { + // Restart the stream from the same checkpoint will pick up the dropped schema and our + // column mapping check will kick in and error out. + testStream(df)( + StartStream(checkpointLocation = ckpt.getCanonicalPath), + ProcessAllAvailableIgnoreError, + // No batches have been served + CheckLastBatch(Nil: _*), + expectedFailure + ) + } + + private def writeDeltaData( + data: Seq[Int], + deltaLog: DeltaLog, + userSpecifiedSchema: Option[StructType] = None): Unit = { + val schema = userSpecifiedSchema.getOrElse(deltaLog.update().schema) + data.foreach { i => + val data = Seq(Row(schema.map(_ => i.toString): _*)) + spark.createDataFrame(data.asJava, schema) + .write.format("delta").mode("append").save(deltaLog.dataPath.toString) + } + } + + test("deltaLog snapshot should not be updated outside of the stream") { + withTempDir { dir => + val tablePath = dir.getCanonicalPath + // write initial data + Seq(1).toDF("id").write.format("delta").mode("overwrite").save(tablePath) + // record initial snapshot version and warm DeltaLog cache + val initialDeltaLog = DeltaLog.forTable(spark, tablePath) + // start streaming + val df = spark.readStream.format("delta").load(tablePath) + testStream(df)( + StartStream(), + ProcessAllAvailable(), + AssertOnQuery { q => + // write more data + Seq(2).toDF("id").write.format("delta").mode("append").save(tablePath) + // update deltaLog externally + initialDeltaLog.update() + assert(initialDeltaLog.snapshot.version == 1) + // query start snapshot should not change + val source = q.logicalPlan.collectFirst { + case r: StreamingExecutionRelation => + r.source.asInstanceOf[DeltaSource] + }.get + // same delta log but stream start version not affected + source.deltaLog == initialDeltaLog && source.snapshotAtSourceInit.version == 0 + } + ) + } + } + + test("column mapping + streaming - allowed workflows - column addition") { + // column addition schema evolution should not be blocked upon restart + withTempDir { inputDir => + val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI)) + writeDeltaData(0 until 5, deltaLog, Some(StructType.fromDDL("id string, value string"))) + + val checkpointDir = new File(inputDir, "_checkpoint") + + def df: DataFrame = dropCDCFields(dsr.load(inputDir.getCanonicalPath)) + + testStream(df)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + ProcessAllAvailable(), + CheckAnswer((0 until 5).map(i => (i.toString, i.toString)): _*), + Execute { _ => + sql(s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` ADD COLUMN (value2 string)") + }, + Execute { _ => + writeDeltaData(5 until 10, deltaLog) + }, + existingRetryableInStreamSchemaChangeFailure + ) + + testStream(df)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + ProcessAllAvailable(), + // Sink is reinitialized, only 5-10 are ingested + CheckAnswer( + (5 until 10).map(i => (i.toString, i.toString, i.toString)): _*) + ) + } + + } + + test("column mapping + streaming - allowed workflows - upgrade to name mode") { + // upgrade should not blocked both during the stream AND during stream restart + withTempDir { inputDir => + val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI)) + withColumnMappingConf("none") { + writeDeltaData(0 until 5, deltaLog, Some(StructType.fromDDL("id string, name string"))) + } + + def df: DataFrame = dropCDCFields(dsr.load(inputDir.getCanonicalPath)) + + val checkpointDir = new File(inputDir, "_checkpoint") + + testStream(df)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + ProcessAllAvailable(), + CheckAnswer((0 until 5).map(i => (i.toString, i.toString)): _*), + Execute { _ => + sql( + s""" + |ALTER TABLE delta.`${inputDir.getCanonicalPath}` + |SET TBLPROPERTIES ( + | ${DeltaConfigs.COLUMN_MAPPING_MODE.key} = "name", + | ${DeltaConfigs.MIN_READER_VERSION.key} = "2", + | ${DeltaConfigs.MIN_WRITER_VERSION.key} = "5")""".stripMargin) + }, + Execute { _ => + writeDeltaData(5 until 10, deltaLog) + }, + ProcessAllAvailable(), + CheckAnswer((0 until 10).map(i => (i.toString, i.toString)): _*), + // add column schema evolution should fail the stream + Execute { _ => + sql(s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` ADD COLUMN (value2 string)") + }, + Execute { _ => + writeDeltaData(10 until 15, deltaLog) + }, + existingRetryableInStreamSchemaChangeFailure + ) + + // but should not block after restarting, now in column mapping mode + testStream(df)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + ProcessAllAvailable(), + // Sink is reinitialized, only 10-15 are ingested + CheckAnswer( + (10 until 15).map(i => (i.toString, i.toString, i.toString)): _*) + ) + + // use a different checkpoint to simulate a clean stream restart + val checkpointDir2 = new File(inputDir, "_checkpoint2") + + testStream(df)( + StartStream(checkpointLocation = checkpointDir2.getCanonicalPath), + ProcessAllAvailable(), + // Since the latest schema contain the additional column, it is null for previous batches. + // This is fine as it is consistent with the current semantics. + CheckAnswer((0 until 10).map(i => (i.toString, i.toString, null)) ++ + (10 until 15).map(i => (i.toString, i.toString, i.toString)): _*), + StopStream + ) + } + } + + /** + * Setup the test table for testing blocked workflow, this will create a id or name mode table + * based on which tests it is run. + */ + protected def setupTestTable(deltaLog: DeltaLog): Unit = { + require(columnMappingModeString != NoMapping.name) + val tablePath = deltaLog.dataPath.toString + + // For name mapping, we use upgrade to stir things up a little + if (columnMappingModeString == NameMapping.name) { + // initialize with no column mapping + withColumnMappingConf("none") { + writeDeltaData(0 until 5, deltaLog, Some(StructType.fromDDL("id string, value string"))) + } + + // upgrade to name mode + sql( + s""" + |ALTER TABLE delta.`${tablePath}` + |SET TBLPROPERTIES ( + | ${DeltaConfigs.COLUMN_MAPPING_MODE.key} = "name", + | ${DeltaConfigs.MIN_READER_VERSION.key} = "2", + | ${DeltaConfigs.MIN_WRITER_VERSION.key} = "5")""".stripMargin) + + // write more data post upgrade + writeDeltaData(5 until 10, deltaLog) + } + } + + test("column mapping + streaming: blocking workflow - drop column") { + val schemaAlterQuery = "DROP COLUMN value" + val schemaRestoreQuery = "ADD COLUMN (value string)" + + withTempDir { inputDir => + val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI)) + setupTestTable(deltaLog) + + // change schema + sql(s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` $schemaAlterQuery") + + // write more data post change schema + writeDeltaData(10 until 15, deltaLog) + + // Test the two code paths below + // Case 1 - Restart did not specify a start version, this will successfully serve the initial + // entire existing data based on the initial snapshot's schema, which is basically + // the stream schema, all schema changes in between are ignored. + // But once the initial snapshot is served, all subsequent batches will fail if + // encountering a schema change during streaming, and all restart effort should fail. + val checkpointDir = new File(inputDir, "_checkpoint") + val df = dropCDCFields(dsr.load(inputDir.getCanonicalPath)) + + testStream(df)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + ProcessAllAvailable(), + // Initial data (pre + post upgrade + post change schema) all served + CheckAnswer((0 until 15).map(i => i.toString): _*), + Execute { _ => + // write more data in new schema during streaming + writeDeltaData(15 until 20, deltaLog) + }, + ProcessAllAvailable(), + // can still work because the schema is still compatible + CheckAnswer((0 until 20).map(i => i.toString): _*), + // But a new schema change would cause stream to fail + // Note here we are restoring back the original schema, see next case for how we test + // some extra special cases when schemas are reverted. + Execute { _ => + sql(s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` $schemaRestoreQuery") + }, + // write more data in updated schema again + Execute { _ => + writeDeltaData(20 until 25, deltaLog) + }, + // The last batch should not be processed and stream should fail + ProcessAllAvailableIgnoreError, + // sink data did not change + CheckAnswer((0 until 20).map(i => i.toString): _*), + // The schemaRestoreQuery for DROP column is ADD column so it fails a more benign error + existingRetryableInStreamSchemaChangeFailure + ) + + val df2 = dropCDCFields(dsr.load(inputDir.getCanonicalPath)) + // Since the initial snapshot ignores all schema changes, the most recent schema change + // is just ADD COLUMN, which can be retried. + testStream(df2)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + // but an additional drop should fail the stream as we are capturing data changes now + Execute { _ => + sql(s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` $schemaAlterQuery") + }, + ProcessAllAvailableIgnoreError, + ExpectInStreamSchemaChangeFailure + ) + // The latest DROP columns blocks the stream. + if (isCdcTest) { + checkStreamStartBlocked(df2, checkpointDir, ExpectGenericColumnMappingFailure) + } else { + checkStreamStartBlocked(df2, checkpointDir, ExpectStreamStartInCompatibleSchemaFailure) + } + + // Case 2 - Specifically we use startingVersion=0 to simulate serving the entire table's data + // in a streaming fashion, ignoring the initialSnapshot. + // Here we test the special case when the latest schema is "restored". + val checkpointDir2 = new File(inputDir, "_checkpoint2") + val dfStartAtZero = dropCDCFields(dsr + .option(DeltaOptions.STARTING_VERSION_OPTION, "0") + .load(inputDir.getCanonicalPath)) + + if (isCdcTest) { + checkStreamStartBlocked(dfStartAtZero, checkpointDir2, ExpectGenericColumnMappingFailure) + } else { + // In the case when we drop and add a column back + // the restart should still fail directly because all the historical batches with the same + // old logical name now will have a different physical name we would have data loss + + // lets add back the column we just dropped before + sql(s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` $schemaRestoreQuery") + assert(DeltaLog.forTable(spark, inputDir.getCanonicalPath).snapshot.schema.size == 2) + + // restart should block right away + checkStreamStartBlocked( + dfStartAtZero, checkpointDir, ExpectStreamStartInCompatibleSchemaFailure) + } + } + } + + test("column mapping + streaming: blocking workflow - rename column") { + val schemaAlterQuery = "RENAME COLUMN value TO value2" + val schemaRestoreQuery = "RENAME COLUMN value2 TO value" + + withTempDir { inputDir => + val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI)) + setupTestTable(deltaLog) + + // change schema + sql(s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` $schemaAlterQuery") + + // write more data post change schema + writeDeltaData(10 until 15, deltaLog) + + // Test the two code paths below + // Case 1 - Restart did not specify a start version, this will successfully serve the initial + // entire existing data based on the initial snapshot's schema, which is basically + // the stream schema, all schema changes in between are ignored. + // But once the initial snapshot is served, all subsequent batches will fail if + // encountering a schema change during streaming, and all restart effort should fail. + val checkpointDir = new File(inputDir, "_checkpoint") + val df = dropCDCFields(dsr.load(inputDir.getCanonicalPath)) + + testStream(df)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + ProcessAllAvailable(), + // Initial data (pre + post upgrade + post change schema) all served + CheckAnswer((0 until 15).map(i => (i.toString, i.toString)): _*), + Execute { _ => + // write more data in new schema during streaming + writeDeltaData(15 until 20, deltaLog) + }, + ProcessAllAvailable(), + // can still work because the schema is still compatible + CheckAnswer((0 until 20).map(i => (i.toString, i.toString)): _*), + // But a new schema change would cause stream to fail + // Note here we are restoring back the original schema, see next case for how we test + // some extra special cases when schemas are reverted. + Execute { _ => + sql(s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` $schemaRestoreQuery") + }, + // write more data in updated schema again + Execute { _ => + writeDeltaData(20 until 25, deltaLog) + }, + // the last batch should not be processed and stream should fail + ProcessAllAvailableIgnoreError, + // sink data did not change + CheckAnswer((0 until 20).map(i => (i.toString, i.toString)): _*), + // detected schema change + ExpectInStreamSchemaChangeFailure + ) + + val df2 = dropCDCFields(dsr.load(inputDir.getCanonicalPath)) + // Renamed columns could not proceed after restore because its schema change is not read + // compatible at all. + checkStreamStartBlocked(df2, checkpointDir, ExpectStreamStartInCompatibleSchemaFailure) + + // Case 2 - Specifically we use startingVersion=0 to simulate serving the entire table's data + // in a streaming fashion, ignoring the initialSnapshot. + // Here we test the special case when the latest schema is "restored". + if (isCdcTest) { + val checkpointDir2 = new File(inputDir, "_checkpoint2") + val dfStartAtZero = dropCDCFields(dsr + .option(DeltaOptions.STARTING_VERSION_OPTION, "0") + .load(inputDir.getCanonicalPath)) + checkStreamStartBlocked(dfStartAtZero, checkpointDir2, ExpectGenericColumnMappingFailure) + } else { + // In the trickier case when we rename a column and rename back, we could not + // immediately detect the schema incompatibility at stream start, so we will move on. + // This is fine because the batches served will be compatible until the in-stream check + // finds another schema change action and fail. + val checkpointDir2 = new File(inputDir, s"_checkpoint_${UUID.randomUUID.toString}") + val dfStartAtZero = dropCDCFields(dsr + .option(DeltaOptions.STARTING_VERSION_OPTION, "0") + .load(inputDir.getCanonicalPath)) + testStream(dfStartAtZero)( + // The stream could not move past version 10, because batches after which + // will be incompatible with the latest schema. + StartStream(checkpointLocation = checkpointDir2.getCanonicalPath), + ProcessAllAvailableIgnoreError, + AssertOnQuery { q => + val latestLoadedVersion = JsonUtils.fromJson[Map[String, Any]]( + q.committedOffsets.values.head.json() + ).apply("reservoirVersion").asInstanceOf[Number].longValue() + latestLoadedVersion <= 10 + }, + ExpectInStreamSchemaChangeFailure + ) + // restart won't move forward either + val df2 = dropCDCFields(dsr.load(inputDir.getCanonicalPath)) + checkStreamStartBlocked(df2, checkpointDir2, ExpectInStreamSchemaChangeFailure) + } + } + } +} + + +class DeltaSourceNameColumnMappingSuite extends DeltaSourceSuite + with ColumnMappingStreamingWorkflowSuiteBase + with DeltaColumnMappingEnableNameMode { + + override protected def isCdcTest: Boolean = false + + override protected def runOnlyTests = Seq( + "basic", + "maxBytesPerTrigger: metadata checkpoint", + "maxFilesPerTrigger: metadata checkpoint", + "allow to change schema before starting a streaming query", + + // streaming blocking semantics test + "deltaLog snapshot should not be updated outside of the stream", + "column mapping + streaming - allowed workflows - column addition", + "column mapping + streaming - allowed workflows - upgrade to name mode", + "column mapping + streaming: blocking workflow - drop column", + "column mapping + streaming: blocking workflow - rename column" + ) +} diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala index 6916d224dd..44b1794938 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala @@ -41,7 +41,9 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{ManualClock, Utils} -class DeltaSourceSuite extends DeltaSourceSuiteBase with DeltaSQLCommandTest { +class DeltaSourceSuite extends DeltaSourceSuiteBase + with DeltaColumnMappingTestUtils + with DeltaSQLCommandTest { import testImplicits._ @@ -1182,11 +1184,15 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase with DeltaSQLCommandTest { }.getMessage.contains("Cannot time travel Delta table to version 0")) // Can start from version 1 even if it's not recreatable - withTempView("startingVersion_test") { - testStartingVersion(1L) - checkAnswer( - spark.table("startingVersion_test"), - (10 until 20).map(_.toLong).toDF()) + // TODO: currently we would error out if we couldn't construct the snapshot to check column + // mapping enable tables. Unblock this once we roll out the proper semantics. + withStreamingReadOnColumnMappingTableEnabled { + withTempView("startingVersion_test") { + testStartingVersion(1L) + checkAnswer( + spark.table("startingVersion_test"), + (10 until 20).map(_.toLong).toDF()) + } } } } @@ -1242,19 +1248,23 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase with DeltaSQLCommandTest { // Although version 1 has been deleted, restarting the query should still work as we have // processed files in version 1. In other words, query restart should ignore "startingVersion" - testStartingVersion(1L) - checkAnswer( - spark.read.format("delta").load(outputDir.getCanonicalPath), - ((10 until 30) ++ (40 until 50)).map(_.toLong).toDF()) // the gap caused by "alter table" + // TODO: currently we would error out if we couldn't construct the snapshot to check column + // mapping enable tables. Unblock this once we roll out the proper semantics. + withStreamingReadOnColumnMappingTableEnabled { + testStartingVersion(1L) + checkAnswer( + spark.read.format("delta").load(outputDir.getCanonicalPath), + ((10 until 30) ++ (40 until 50)).map(_.toLong).toDF()) // the gap caused by "alter table" - // But if we start a new query, it should fail. - val newCheckpointDir = Utils.createTempDir() - try { - assert(intercept[StreamingQueryException] { - testStartingVersion(1L, newCheckpointDir.getCanonicalPath) - }.getMessage.contains("[2, 4]")) - } finally { - Utils.deleteRecursively(newCheckpointDir) + // But if we start a new query, it should fail. + val newCheckpointDir = Utils.createTempDir() + try { + assert(intercept[StreamingQueryException] { + testStartingVersion(1L, newCheckpointDir.getCanonicalPath) + }.getMessage.contains("[2, 4]")) + } finally { + Utils.deleteRecursively(newCheckpointDir) + } } } } @@ -1329,11 +1339,15 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase with DeltaSQLCommandTest { new File(FileNames.deltaFile(deltaLog.logPath, 0).toUri).delete() // Can start from version 1 even if it's not recreatable - withTempView("startingTimestamp_test") { - testStartingTimestamp("2020-07-14") - checkAnswer( - spark.table("startingTimestamp_test"), - (10 until 20).map(_.toLong).toDF()) + // TODO: currently we would error out if we couldn't construct the snapshot to check column + // mapping enable tables. Unblock this once we roll out the proper semantics. + withStreamingReadOnColumnMappingTableEnabled { + withTempView("startingTimestamp_test") { + testStartingTimestamp("2020-07-14") + checkAnswer( + spark.table("startingTimestamp_test"), + (10 until 20).map(_.toLong).toDF()) + } } } } @@ -1413,80 +1427,6 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase with DeltaSQLCommandTest { } } - test(s"block streaming reads from a column mapping enabled table") { - withTempDir { inputDir => - val path = inputDir.getCanonicalPath - withTable("t1") { - sql( - s""" - |CREATE TABLE t1 (value STRING) USING DELTA - |TBLPROPERTIES( - |${DeltaConfigs.COLUMN_MAPPING_MODE.key} = 'name', - |${DeltaConfigs.MIN_READER_VERSION.key} = '2', - |${DeltaConfigs.MIN_WRITER_VERSION.key} = '5' - |) LOCATION '$path' - |""".stripMargin) - - Seq("keep1", "keep2", "keep3", "drop1").toDF("value") - .write.format("delta").mode("append").saveAsTable("t1") - - Seq(true, false).foreach { isStartVersion0 => - withClue(s"isStartVersion0 = $isStartVersion0") { - var dfr = spark.readStream.format("delta") - if (isStartVersion0) { - // By default the stream starts at the latest version in the table - dfr = dfr.option("startingVersion", "0") - } - val df = dfr.load(path).filter($"value" contains "keep") - - val ex = intercept[Exception] { - testStream(df)(ProcessAllAvailable()) - } - assert(ex.getMessage contains - "Streaming reads from a Delta table with column mapping enabled are not supported.") - } - } - } - } - } - - test("block streaming reads after a table is upgraded with column mapping") { - withTempDir { inputDir => - val path = inputDir.getCanonicalPath - withTable("t1") { - sql(s"CREATE TABLE t1 (value STRING) USING DELTA LOCATION '$path'") - - Seq("keep1", "keep2", "keep3", "drop1").toDF("value") - .write.format("delta").mode("append").saveAsTable("t1") - - val df = spark.readStream - .format("delta") - .load(path) - .filter($"value" contains "keep") - - val ex = intercept[Exception] { - testStream(df)( - ProcessAllAvailable(), - Execute { _ => - sql( - s""" - |ALTER TABLE t1 - |SET TBLPROPERTIES ( - | '${DeltaConfigs.MIN_READER_VERSION.key}' = '2', - | '${DeltaConfigs.MIN_WRITER_VERSION.key}' = '5', - | '${DeltaConfigs.COLUMN_MAPPING_MODE.key}'='name') - |""".stripMargin) - }, - AddToReservoir(inputDir, Seq("keep7", "drop2").toDF()), - ProcessAllAvailable() - ) - } - assert(ex.getMessage contains - "Streaming reads from a Delta table with column mapping enabled are not supported.") - } - } - } - test("startingVersion latest") { withTempDir { dir => withTempView("startingVersionTest") { @@ -1940,132 +1880,6 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase with DeltaSQLCommandTest { } } -abstract class DeltaSourceColumnMappingSuiteBase extends DeltaSourceSuite { - import testImplicits._ - - testQuietly("drop column from source disallowed by MicroBatchExecution") { - withSQLConf(DeltaSQLConf.DELTA_ALTER_TABLE_DROP_COLUMN_ENABLED.key -> "true") { - withTempDir { inputDir => - val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI)) - (0 until 5).foreach { i => - val v = Seq((i.toString, i.toString)).toDF("id", "value") - v.write.mode("append").format("delta").save(deltaLog.dataPath.toString) - } - - val checkpointDir = new File(inputDir, "_checkpoint") - - // reinitialize stream after restart - def df: DataFrame = spark.readStream - .format("delta") - .load(inputDir.getCanonicalPath) - - testStream(df)( - StartStream(checkpointLocation = checkpointDir.getCanonicalPath), - ProcessAllAvailable(), - CheckAnswerRows((0 until 5).map(i => Row(i.toString, i.toString)), false, false), - Execute { _ => - sql(s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` DROP COLUMN value") - }, - Execute { _ => - // write more data - (5 until 10).foreach { i => - val v = Seq(i.toString).toDF("id") - v.write.mode("append").format("delta").save(deltaLog.dataPath.toString) - } - }, - // should have another batch with diff schema and should fail - ExpectFailure[SparkException] { t => - assert(t.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR") - assert(t.getCause.getMessage.contains("Invalid batch")) - } - ) - - // Restart the stream from the same checkpoint should pick up the new schema - // Since `testStream` creates a new sink every time, the rows for the new data will - // be refreshed as well. - testStream(df)( - StartStream(checkpointLocation = checkpointDir.getCanonicalPath), - ProcessAllAvailable(), - // 5-10 is recovered due to the previous failure, and is the only data in the sink - // note they only have one item, which is the new schema - CheckAnswerRows((5 until 10).map(i => Row(i.toString)), false, false), - Execute { _ => - // write more data - (10 until 15) - .map(i => i.toString) - .toDF("id") - .write - .format("delta") - .mode("append") - .save(deltaLog.dataPath.toString) - }, - ProcessAllAvailable(), - CheckAnswerRows((5 until 15).map(i => Row(i.toString)), false, false) - ) - } - } - } - - testQuietly("rename a column disallowed by DeltaSource's schema check") { - withTempDir { inputDir => - val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI)) - (0 until 5).foreach { i => - val v = Seq((i.toString, i.toString)).toDF("id", "value") - v.write.mode("append").format("delta").save(deltaLog.dataPath.toString) - } - - val checkpointDir = new File(inputDir, "_checkpoint") - - // reinitialize stream after restart - def df: DataFrame = spark.readStream - .format("delta") - .load(inputDir.getCanonicalPath) - - testStream(df)( - StartStream(checkpointLocation = checkpointDir.getCanonicalPath), - ProcessAllAvailable(), - CheckAnswer((0 until 5).map(i => (i.toString, i.toString)): _*), - Execute { _ => - sql(s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` " + - s"RENAME COLUMN value TO new_value") - }, - Execute { _ => - (5 until 10).foreach { i => - val v = Seq((i.toString, i.toString)).toDF("id", "new_value") - v.write.mode("append").format("delta").save(deltaLog.dataPath.toString) - } - }, - ExpectFailure[IllegalStateException](t => - assert(t.getMessage.contains("Detected schema change"))) - ) - - // Restart the stream from the same checkpoint should pick up the new schema - // Since `testStream` creates a new sink every time, the rows for the new data will - // be refreshed as well. - testStream(df)( - StartStream(checkpointLocation = checkpointDir.getCanonicalPath), - ProcessAllAvailable(), - // 5-10 is recovered due to the previous failure, and is the only data in the sink - CheckAnswerRows((5 until 10).map(i => Row(i.toString, i.toString)), false, false), - Execute { _ => - // write more data - (10 until 15) - .map(i => (i.toString, i.toString)) - .toDF("id", "new_value") - .write - .format("delta") - .mode("append") - .save(deltaLog.dataPath.toString) - }, - ProcessAllAvailable(), - CheckAnswerRows((5 until 15).map(i => Row(i.toString, i.toString)), false, false) - ) - } - } - -} - - /** * A FileSystem implementation that returns monotonically increasing timestamps for file creation. * Note that we may return a different timestamp for the same file. This is okay for the tests