Skip to content

Commit

Permalink
Update offset for delta source within latestOffset for no data change…
Browse files Browse the repository at this point in the history
…s in CDF case

GitOrigin-RevId: fea46124c400d6a94beee5e6e29864cd1f520509
  • Loading branch information
anishshri-db authored and scottsand-db committed Oct 14, 2022
1 parent 90aef09 commit 0bbec37
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 11 deletions.
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSource, DeltaSQ

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -412,10 +413,12 @@ trait CDCReaderImpl extends DeltaLogging {
snapshot, isStreaming, spark)
dfs.append(deletedAndAddedRows: _*)

// build an empty DS. This DS retains the table schema
val emptyDf = spark.createDataFrame(
spark.sparkContext.emptyRDD[Row],
cdcReadSchema(snapshot.metadata.schema))
// build an empty DS. This DS retains the table schema and the isStreaming property
val emptyDf = spark.sqlContext.internalCreateDataFrame(
spark.sparkContext.emptyRDD[InternalRow],
cdcReadSchema(snapshot.metadata.schema),
isStreaming)

CDCVersionDiffInfo(
dfs.reduceOption((df1, df2) => df1.union(df2)).getOrElse(emptyDf),
totalFiles,
Expand Down
Expand Up @@ -83,6 +83,10 @@ private[delta] case class IndexedFile(
}
}

def hasFileAction: Boolean = {
getFileAction != null
}

def getFileSize: Long = {
if (add != null) {
add.size
Expand Down Expand Up @@ -889,8 +893,24 @@ case class DeltaSource(
}
}

if (fileAction.isEmpty) return true
val shouldAdmit = filesToTake > 0 && bytesToTake > 0

/**
* If a fileAction is not present, accept it only if we have capacity to accept
* new files or bytes.
* For eg - if we get a AddFile at version 0 and maxFilesPerTrigger set to 1, we will
* allow that file. Now if we get another AddFile, we will not accept it. Further, if
* we get an entry for CommitInfo with no file action and a higher version, we would have
* allowed it earlier. Now, the endOffset can be incremented to version 2 and when we create
* the data frame for the batch, we call filterFiles which will only check for valid offset
* range, since we pass limits as None for that call. Hence, we need to ensure that the rate
* limiting happens here such that we accept an empty file action, only if our capacity
* is not exhausted.
*/
if (fileAction.isEmpty) {
return shouldAdmit
}

filesToTake -= 1

bytesToTake -= getSize(fileAction.get)
Expand Down
Expand Up @@ -63,13 +63,19 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
}

private def noMatchesRegex(indexedFile: IndexedFile): Boolean = {
if (hasNoFileActionAndStartIndex(indexedFile)) return true

excludeRegex.forall(_.findFirstIn(indexedFile.getFileAction.path).isEmpty)
}

private def hasFileAction(indexedFile: IndexedFile): Boolean = {
indexedFile.getFileAction != null
}

private def hasNoFileActionAndStartIndex(indexedFile: IndexedFile): Boolean = {
!indexedFile.hasFileAction && indexedFile.index == -1
}

private def hasAddsOrRemoves(indexedFile: IndexedFile): Boolean = {
indexedFile.add != null || indexedFile.remove != null
}
Expand All @@ -79,7 +85,8 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
fromVersion: Long,
fromIndex: Long,
endOffset: Option[DeltaSourceOffset]): Boolean = {
!indexedFile.shouldSkip && hasFileAction(indexedFile) &&
!indexedFile.shouldSkip && (hasFileAction(indexedFile) ||
hasNoFileActionAndStartIndex(indexedFile)) &&
moreThanFrom(indexedFile, fromVersion, fromIndex) &&
lessThanEnd(indexedFile, endOffset) && noMatchesRegex(indexedFile) &&
lessThanEnd(indexedFile, Option(lastOffsetForTriggerAvailableNow))
Expand Down Expand Up @@ -125,11 +132,15 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
}
} else {
// CDC is recorded as AddFile or RemoveFile
fileActions.filter(hasAddsOrRemoves(_))
// We also allow entries with no file actions and index as -1
// that are used primarily to update latest offset when no other
// file action based entries are present.
fileActions.filter(indexedFile => hasAddsOrRemoves(indexedFile) ||
hasNoFileActionAndStartIndex(indexedFile))
.filter(
isValidIndexedFile(_, fromVersion, fromIndex, endOffset)
).takeWhile { indexedFile =>
admissionControl.admit(Some(indexedFile.getFileAction))
admissionControl.admit(Option(indexedFile.getFileAction))
}.toIterator
}
}
Expand Down Expand Up @@ -160,7 +171,7 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>

val groupedFileActions: Iterator[(Long, Seq[FileAction])] =
changes.map { case (v, indexFiles) =>
(v, indexFiles.map { _.getFileAction }.toSeq)
(v, indexFiles.filter(_.hasFileAction).map { _.getFileAction }.toSeq)
}

val cdcInfo = CDCReader.changesToDF(
Expand Down
Expand Up @@ -25,10 +25,10 @@ import scala.language.implicitConversions

import org.apache.spark.sql.delta.actions.AddCDCFile
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.sources.{DeltaSourceOffset, DeltaSQLConf}
import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest}
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import io.delta.tables._
import org.apache.hadoop.fs.Path

Expand Down Expand Up @@ -417,6 +417,77 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
}
}

Seq(true, false).foreach { readChangeFeed =>
test(s"streams updating latest offset with readChangeFeed=$readChangeFeed") {
withTempDirs { (inputDir, checkpointDir, outputDir) =>
withSQLConf(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true") {

// save some rows to input table.
spark.range(10).withColumn("value", lit("a"))
.write.format("delta").mode("overwrite")
.option("enableChangeDataFeed", "true").save(inputDir.getAbsolutePath)

// process the input table in a CDC manner
val df = spark.readStream
.option(DeltaOptions.CDC_READ_OPTION, readChangeFeed)
.format("delta")
.load(inputDir.getAbsolutePath)

val query = df
.select("id")
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpointDir.toString)
.start(outputDir.getAbsolutePath)

query.processAllAvailable()
query.stop()
query.awaitTermination()

// Create a temp view and write to input table as a no-op merge
spark.range(20, 30).withColumn("value", lit("b"))
.createOrReplaceTempView("source_table")

for (i <- 0 to 10) {
sql(s"MERGE INTO delta.`${inputDir.getAbsolutePath}` AS tgt " +
s"USING source_table src ON tgt.id = src.id " +
s"WHEN MATCHED THEN UPDATE SET * " +
s"WHEN NOT MATCHED AND src.id < 10 THEN INSERT *")
}

// Read again from input table and no new data should be generated
val df1 = spark.readStream
.option("readChangeFeed", readChangeFeed)
.format("delta")
.load(inputDir.getAbsolutePath)

val query1 = df1
.select("id")
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpointDir.toString)
.start(outputDir.getAbsolutePath)

query1.processAllAvailable()
query1.stop()
query1.awaitTermination()

// check that the last batch was committed and that the
// reservoirVersion for the table was updated to latest
// in both cdf and non-cdf cases.
assert(query1.lastProgress.batchId === 1)
val endOffset = JsonUtils.mapper.readValue[DeltaSourceOffset](
query1.lastProgress.sources.head.endOffset
)
assert(endOffset.reservoirVersion === 11)
assert(endOffset.index === -1)
}
}
}
}

test("cdc streams should be able to get offset when there only RemoveFiles") {
withTempDir { inputDir =>
// version 0
Expand Down

0 comments on commit 0bbec37

Please sign in to comment.