New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18790][SS] Keep a general offset history of stream batches #16219
Conversation
Test build #69879 has finished for PR 16219 at commit
|
Test build #69882 has finished for PR 16219 at commit
|
Test build #69898 has finished for PR 16219 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good. Just left some comments.
@@ -58,6 +58,8 @@ class StreamExecution( | |||
|
|||
private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay | |||
|
|||
private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to add a require(...)
here to make sure minBatchesToRetain >= 1
.
if (isCompactionBatch(batchId, compactInterval)) { | ||
compact(batchId, logs) | ||
batchAdded = compact(batchId, logs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you can rewrite it like
val batchAdded =
if (isCompactionBatch(batchId, compactInterval)) {
compact(batchId, logs)
} else {
super.add(batchId, logs)
}
val storeConf = StateStoreConf.empty | ||
val sqlConf = new SQLConf() | ||
sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2) | ||
val storeConf = StateStoreConf(sqlConf) // StateStoreConf.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is not StateStoreConf.empty
. Please remove it.
@@ -303,7 +303,6 @@ private[state] class HDFSBackedStateStoreProvider( | |||
val mapFromFile = readSnapshotFile(version).getOrElse { | |||
val prevMap = loadMap(version - 1) | |||
val newMap = new MapType(prevMap) | |||
newMap.putAll(prevMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove this line? newMap
should be prevMap
+ delta file in such case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new MapType(prevMap) will make a call that is equivalent to newMap.putAll(prevMap). So basically, newMap.putAll(prevMap) is redundant work.
|
||
spark.conf.set("spark.sql.streaming.minBatchesToRetain", 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you can use
withSQLConf(SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
...
}
to simplify the codes. withSQLConf
will recover the conf.
} catch { | ||
case _: NumberFormatException => | ||
false | ||
private def deleteExpiredLog(currentBatchId: Long): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also update the comments since they are out of date?
withSQLConf( | ||
SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3", | ||
SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0") { | ||
SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0", | ||
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also test other value of SQLConf.MIN_BATCHES_TO_RETAIN
? Testing only 1 is not enough.
Test build #69938 has finished for PR 16219 at commit
|
Test build #69943 has finished for PR 16219 at commit
|
LGTM |
Test build #3493 has finished for PR 16219 at commit
|
Thanks! Merging to master and 2.1. |
## What changes were proposed in this pull request? Instead of only keeping the minimum number of offsets around, we should keep enough information to allow us to roll back n batches and reexecute the stream starting from a given point. In particular, we should create a config in SQLConf, spark.sql.streaming.retainedBatches that defaults to 100 and ensure that we keep enough log files in the following places to roll back the specified number of batches: the offsets that are present in each batch versions of the state store the files lists stored for the FileStreamSource the metadata log stored by the FileStreamSink marmbrus zsxwing ## How was this patch tested? The following tests were added. ### StreamExecution offset metadata Test added to StreamingQuerySuite that ensures offset metadata is garbage collected according to minBatchesRetain ### CompactibleFileStreamLog Tests added in CompactibleFileStreamLogSuite to ensure that logs are purged starting before the first compaction file that proceeds the current batch id - minBatchesToRetain. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <tcondie@gmail.com> Closes #16219 from tcondie/offset_hist. (cherry picked from commit 83a4289) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
## What changes were proposed in this pull request? Instead of only keeping the minimum number of offsets around, we should keep enough information to allow us to roll back n batches and reexecute the stream starting from a given point. In particular, we should create a config in SQLConf, spark.sql.streaming.retainedBatches that defaults to 100 and ensure that we keep enough log files in the following places to roll back the specified number of batches: the offsets that are present in each batch versions of the state store the files lists stored for the FileStreamSource the metadata log stored by the FileStreamSink marmbrus zsxwing ## How was this patch tested? The following tests were added. ### StreamExecution offset metadata Test added to StreamingQuerySuite that ensures offset metadata is garbage collected according to minBatchesRetain ### CompactibleFileStreamLog Tests added in CompactibleFileStreamLogSuite to ensure that logs are purged starting before the first compaction file that proceeds the current batch id - minBatchesToRetain. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <tcondie@gmail.com> Closes apache#16219 from tcondie/offset_hist.
## What changes were proposed in this pull request? Instead of only keeping the minimum number of offsets around, we should keep enough information to allow us to roll back n batches and reexecute the stream starting from a given point. In particular, we should create a config in SQLConf, spark.sql.streaming.retainedBatches that defaults to 100 and ensure that we keep enough log files in the following places to roll back the specified number of batches: the offsets that are present in each batch versions of the state store the files lists stored for the FileStreamSource the metadata log stored by the FileStreamSink marmbrus zsxwing ## How was this patch tested? The following tests were added. ### StreamExecution offset metadata Test added to StreamingQuerySuite that ensures offset metadata is garbage collected according to minBatchesRetain ### CompactibleFileStreamLog Tests added in CompactibleFileStreamLogSuite to ensure that logs are purged starting before the first compaction file that proceeds the current batch id - minBatchesToRetain. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <tcondie@gmail.com> Closes apache#16219 from tcondie/offset_hist.
What changes were proposed in this pull request?
Instead of only keeping the minimum number of offsets around, we should keep enough information to allow us to roll back n batches and reexecute the stream starting from a given point. In particular, we should create a config in SQLConf, spark.sql.streaming.retainedBatches that defaults to 100 and ensure that we keep enough log files in the following places to roll back the specified number of batches:
the offsets that are present in each batch
versions of the state store
the files lists stored for the FileStreamSource
the metadata log stored by the FileStreamSink
@marmbrus @zsxwing
How was this patch tested?
The following tests were added.
StreamExecution offset metadata
Test added to StreamingQuerySuite that ensures offset metadata is garbage collected according to minBatchesRetain
CompactibleFileStreamLog
Tests added in CompactibleFileStreamLogSuite to ensure that logs are purged starting before the first compaction file that proceeds the current batch id - minBatchesToRetain.
Please review http://spark.apache.org/contributing.html before opening a pull request.