-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19779][SS]Delete needless tmp file after restart structured streaming job #17124
Conversation
if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, finalDeltaFile)) { | ||
if (fs.exists(finalDeltaFile)) { | ||
fs.delete(tempDeltaFile, true) | ||
} else if (!fs.rename(tempDeltaFile, finalDeltaFile)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the file exists, it is deleted, but no new file is renamed to it -- is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when restart streaming job , thefinalDeltaFile
generated by the first batch is same to a finalDeltaFile
generated by the last batch of streaming job before restart. So here don't need rename to create an same file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess my point is, after this change, the file may not exist after this executes. Before, it always existed after this block. I wasn't sure that was the intended behavior change because the purpose seems to be to delete the temp file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This pr just want to delete the needless temp file, and the delta file need exist.
Test build #3589 has finished for PR 17124 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this. Just some minor issues.
@@ -282,8 +282,12 @@ private[state] class HDFSBackedStateStoreProvider( | |||
// target file will break speculation, skipping the rename step is the only choice. It's still | |||
// semantically correct because Structured Streaming requires rerunning a batch should | |||
// generate the same output. (SPARK-19677) | |||
// Also, a tmp file of delta file that generated by the first batch after restart |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is not 100% correct, this may also happen in a speculation task.
This PR is just a follow up to delete the temp file that #17012 forgot to do it. IMO, not need to add a comment for it.
@@ -295,6 +295,28 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth | |||
provider.getStore(0).commit() | |||
} | |||
|
|||
test("SPARK-19779: A tmp file of delta file should not be reserved on HDFS " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding a new test, I prefer to just add several lines to the above SPARK-19677: Committing a delta file atop an existing one should not fail on HDFS
. E.g.
test("SPARK-19677: Committing a delta file atop an existing one should not fail on HDFS") {
val conf = new Configuration()
conf.set("fs.fake.impl", classOf[RenameLikeHDFSFileSystem].getName)
conf.set("fs.default.name", "fake:///")
val provider = newStoreProvider(hadoopConf = conf)
provider.getStore(0).commit()
provider.getStore(0).commit()
// Verify we don't leak temp files
val tempFiles = FileUtils.listFiles(new File(provider.id.checkpointLocation), null, true)
.asScala.filter(_.getName.contains("temp-"))
assert(tempFiles.isEmpty)
}
@zsxwing I have rewritten the test case. |
1a0b232
to
db3f4db
Compare
retest this please |
Test build #73786 has finished for PR 17124 at commit
|
retest this please |
ok to test |
Test build #73803 has finished for PR 17124 at commit
|
LGTM. Merging to master, 2.1 and 2.0. Thanks! |
…treaming job ## What changes were proposed in this pull request? [SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779) The PR (#17012) can to fix restart a Structured Streaming application using hdfs as fileSystem, but also exist a problem that a tmp file of delta file is still reserved in hdfs. And Structured Streaming don't delete the tmp file generated when restart streaming job in future. ## How was this patch tested? unit tests Author: guifeng <guifengleaf@gmail.com> Closes #17124 from gf53520/SPARK-19779. (cherry picked from commit e24f21b) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
…treaming job ## What changes were proposed in this pull request? [SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779) The PR (#17012) can to fix restart a Structured Streaming application using hdfs as fileSystem, but also exist a problem that a tmp file of delta file is still reserved in hdfs. And Structured Streaming don't delete the tmp file generated when restart streaming job in future. ## How was this patch tested? unit tests Author: guifeng <guifengleaf@gmail.com> Closes #17124 from gf53520/SPARK-19779. (cherry picked from commit e24f21b) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
What changes were proposed in this pull request?
SPARK-19779
The PR (#17012) fixed restart a Structured Streaming application using hdfs as fileSystem, but also exist a problem that a tmp file of delta file is still reserved in hdfs. And Structured Streaming don't delete the tmp file generated when restart streaming job in future.
How was this patch tested?
unit tests