-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-11324][STREAMING] Flag for closing Write Ahead Logs after a write #9285
Conversation
Wouldn't this be really expensive? |
@harishreedharan There is a related patch that allows batching here. |
If there is no other option, this LGTM |
Test build #44375 has finished for PR 9285 at commit
|
@@ -39,6 +39,7 @@ private[streaming] object WriteAheadLogUtils extends Logging { | |||
|
|||
val DEFAULT_ROLLING_INTERVAL_SECS = 60 | |||
val DEFAULT_MAX_FAILURES = 3 | |||
val WAL_CLOSE_AFTER_WRITE = "spark.streaming.writeAheadLog.closeAfterWrite" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should stick to existing naming conventions for this configuration. So for lets have spark.streaming.driver.wal.closeFileAfterWrite
and same for ...receiver...
So this should be a configuration like the rollingIntervalSecs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do receivers by default use local disk for WAL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't the config in that case be something like spark.streaming.driver.writeAheadLog.closeFileAfterWrite
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah yeah ... i shortened to make the point .. .sorry if that wasnt clear.
@brkyvz receivers by default do not use WAL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdas Noticed that this is going to be super hard. How are we going to differentiate between a receiver WAL and a driver WAL in the write method of FileBasedWAL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NVM, worked around it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take a look at how rollingIntervalSecs is implemented. That also has
separate configurations for driver and receiver
On Mon, Oct 26, 2015 at 7:47 PM, Burak Yavuz notifications@github.com
wrote:
In
streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
#9285 (comment):@@ -39,6 +39,7 @@ private[streaming] object WriteAheadLogUtils extends Logging {
val DEFAULT_ROLLING_INTERVAL_SECS = 60
val DEFAULT_MAX_FAILURES = 3
- val WAL_CLOSE_AFTER_WRITE = "spark.streaming.writeAheadLog.closeAfterWrite"
@tdas https://github.com/tdas Noticed that this is going to be super
hard. How are we going to differentiate between a receiver WAL and a driver
WAL in the write method of FileBasedWAL?—
Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/9285/files#r43079268.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -47,7 +47,8 @@ private[streaming] class FileBasedWriteAheadLog( | |||
logDirectory: String, | |||
hadoopConf: Configuration, | |||
rollingIntervalSecs: Int, | |||
maxFailures: Int | |||
maxFailures: Int, | |||
closeAfterWrite: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to break existing code. Apparently FileBasedWriteAheadLog
is instantiated in only 4 spots, therefore I can manually add the closeAfterWrite = false
parameter to all of them. It could be okay to keep it though as the method is private[streaming] and we don't need to keep Java compatibility as it is not exposed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it is not exposed. So its better to actually keep it as non-default parameter, and add the parameter judiciously in the all the 4 spots.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Test build #44394 has finished for PR 9285 at commit
|
Test build #44439 has finished for PR 9285 at commit
|
Test build #44444 has finished for PR 9285 at commit
|
LGTM. Merging this to master. Thanks @brkyvz |
Currently the Write Ahead Log in Spark Streaming flushes data as writes need to be made. S3 does not support flushing of data, data is written once the stream is actually closed.
In case of failure, the data for the last minute (default rolling interval) will not be properly written. Therefore we need a flag to close the stream after the write, so that we achieve read after write consistency.
cc @tdas @zsxwing