diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala index 50c80e59b3..5ef3becad7 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSource, DeltaSQ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.internal.SQLConf @@ -412,10 +413,12 @@ trait CDCReaderImpl extends DeltaLogging { snapshot, isStreaming, spark) dfs.append(deletedAndAddedRows: _*) - // build an empty DS. This DS retains the table schema - val emptyDf = spark.createDataFrame( - spark.sparkContext.emptyRDD[Row], - cdcReadSchema(snapshot.metadata.schema)) + // build an empty DS. This DS retains the table schema and the isStreaming property + val emptyDf = spark.sqlContext.internalCreateDataFrame( + spark.sparkContext.emptyRDD[InternalRow], + cdcReadSchema(snapshot.metadata.schema), + isStreaming) + CDCVersionDiffInfo( dfs.reduceOption((df1, df2) => df1.union(df2)).getOrElse(emptyDf), totalFiles, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala index 893ccd410b..a3c167e253 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala @@ -83,6 +83,10 @@ private[delta] case class IndexedFile( } } + def hasFileAction: Boolean = { + getFileAction != null + } + def getFileSize: Long = { if (add != null) { add.size @@ -889,8 +893,24 @@ case class DeltaSource( } } - if (fileAction.isEmpty) return true val shouldAdmit = filesToTake > 0 && bytesToTake > 0 + + /** + * If a fileAction is not present, accept it only if we have capacity to accept + * new files or bytes. + * For eg - if we get a AddFile at version 0 and maxFilesPerTrigger set to 1, we will + * allow that file. Now if we get another AddFile, we will not accept it. Further, if + * we get an entry for CommitInfo with no file action and a higher version, we would have + * allowed it earlier. Now, the endOffset can be incremented to version 2 and when we create + * the data frame for the batch, we call filterFiles which will only check for valid offset + * range, since we pass limits as None for that call. Hence, we need to ensure that the rate + * limiting happens here such that we accept an empty file action, only if our capacity + * is not exhausted. + */ + if (fileAction.isEmpty) { + return shouldAdmit + } + filesToTake -= 1 bytesToTake -= getSize(fileAction.get) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala index 1353fa6785..fb7066cf8e 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala @@ -63,6 +63,8 @@ trait DeltaSourceCDCSupport { self: DeltaSource => } private def noMatchesRegex(indexedFile: IndexedFile): Boolean = { + if (hasNoFileActionAndStartIndex(indexedFile)) return true + excludeRegex.forall(_.findFirstIn(indexedFile.getFileAction.path).isEmpty) } @@ -70,6 +72,10 @@ trait DeltaSourceCDCSupport { self: DeltaSource => indexedFile.getFileAction != null } + private def hasNoFileActionAndStartIndex(indexedFile: IndexedFile): Boolean = { + !indexedFile.hasFileAction && indexedFile.index == -1 + } + private def hasAddsOrRemoves(indexedFile: IndexedFile): Boolean = { indexedFile.add != null || indexedFile.remove != null } @@ -79,7 +85,8 @@ trait DeltaSourceCDCSupport { self: DeltaSource => fromVersion: Long, fromIndex: Long, endOffset: Option[DeltaSourceOffset]): Boolean = { - !indexedFile.shouldSkip && hasFileAction(indexedFile) && + !indexedFile.shouldSkip && (hasFileAction(indexedFile) || + hasNoFileActionAndStartIndex(indexedFile)) && moreThanFrom(indexedFile, fromVersion, fromIndex) && lessThanEnd(indexedFile, endOffset) && noMatchesRegex(indexedFile) && lessThanEnd(indexedFile, Option(lastOffsetForTriggerAvailableNow)) @@ -125,11 +132,15 @@ trait DeltaSourceCDCSupport { self: DeltaSource => } } else { // CDC is recorded as AddFile or RemoveFile - fileActions.filter(hasAddsOrRemoves(_)) + // We also allow entries with no file actions and index as -1 + // that are used primarily to update latest offset when no other + // file action based entries are present. + fileActions.filter(indexedFile => hasAddsOrRemoves(indexedFile) || + hasNoFileActionAndStartIndex(indexedFile)) .filter( isValidIndexedFile(_, fromVersion, fromIndex, endOffset) ).takeWhile { indexedFile => - admissionControl.admit(Some(indexedFile.getFileAction)) + admissionControl.admit(Option(indexedFile.getFileAction)) }.toIterator } } @@ -160,7 +171,7 @@ trait DeltaSourceCDCSupport { self: DeltaSource => val groupedFileActions: Iterator[(Long, Seq[FileAction])] = changes.map { case (v, indexFiles) => - (v, indexFiles.map { _.getFileAction }.toSeq) + (v, indexFiles.filter(_.hasFileAction).map { _.getFileAction }.toSeq) } val cdcInfo = CDCReader.changesToDF( diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala index a16b6cd7b4..194751d70a 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala @@ -25,10 +25,10 @@ import scala.language.implicitConversions import org.apache.spark.sql.delta.actions.AddCDCFile import org.apache.spark.sql.delta.commands.cdc.CDCReader -import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.sources.{DeltaSourceOffset, DeltaSQLConf} import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest} import org.apache.spark.sql.delta.test.DeltaTestImplicits._ -import org.apache.spark.sql.delta.util.FileNames +import org.apache.spark.sql.delta.util.{FileNames, JsonUtils} import io.delta.tables._ import org.apache.hadoop.fs.Path @@ -417,6 +417,77 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest } } + Seq(true, false).foreach { readChangeFeed => + test(s"streams updating latest offset with readChangeFeed=$readChangeFeed") { + withTempDirs { (inputDir, checkpointDir, outputDir) => + withSQLConf(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true") { + + // save some rows to input table. + spark.range(10).withColumn("value", lit("a")) + .write.format("delta").mode("overwrite") + .option("enableChangeDataFeed", "true").save(inputDir.getAbsolutePath) + + // process the input table in a CDC manner + val df = spark.readStream + .option(DeltaOptions.CDC_READ_OPTION, readChangeFeed) + .format("delta") + .load(inputDir.getAbsolutePath) + + val query = df + .select("id") + .writeStream + .format("delta") + .outputMode("append") + .option("checkpointLocation", checkpointDir.toString) + .start(outputDir.getAbsolutePath) + + query.processAllAvailable() + query.stop() + query.awaitTermination() + + // Create a temp view and write to input table as a no-op merge + spark.range(20, 30).withColumn("value", lit("b")) + .createOrReplaceTempView("source_table") + + for (i <- 0 to 10) { + sql(s"MERGE INTO delta.`${inputDir.getAbsolutePath}` AS tgt " + + s"USING source_table src ON tgt.id = src.id " + + s"WHEN MATCHED THEN UPDATE SET * " + + s"WHEN NOT MATCHED AND src.id < 10 THEN INSERT *") + } + + // Read again from input table and no new data should be generated + val df1 = spark.readStream + .option("readChangeFeed", readChangeFeed) + .format("delta") + .load(inputDir.getAbsolutePath) + + val query1 = df1 + .select("id") + .writeStream + .format("delta") + .outputMode("append") + .option("checkpointLocation", checkpointDir.toString) + .start(outputDir.getAbsolutePath) + + query1.processAllAvailable() + query1.stop() + query1.awaitTermination() + + // check that the last batch was committed and that the + // reservoirVersion for the table was updated to latest + // in both cdf and non-cdf cases. + assert(query1.lastProgress.batchId === 1) + val endOffset = JsonUtils.mapper.readValue[DeltaSourceOffset]( + query1.lastProgress.sources.head.endOffset + ) + assert(endOffset.reservoirVersion === 11) + assert(endOffset.index === -1) + } + } + } + } + test("cdc streams should be able to get offset when there only RemoveFiles") { withTempDir { inputDir => // version 0