Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4873][Streaming] Use Future.zip instead of Future.flatMap(for-loop) in WriteAheadLogBasedBlockHandler #3721

Closed
wants to merge 1 commit into from

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Dec 17, 2014

Use Future.zip instead of Future.flatMap(for-loop). zip implies these two Futures will run concurrently, while flatMap usually means one Future depends on the other one.

@SparkQA
Copy link

SparkQA commented Dec 17, 2014

Test build #24542 has started for PR 3721 at commit 860f3a2.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 17, 2014

Test build #24543 has started for PR 3721 at commit 8b6d095.

  • This patch merges cleanly.

@zsxwing zsxwing changed the title WriteAheadLogBasedBlockHandler improvement [SPARK-4873][Streaming] WriteAheadLogBasedBlockHandler improvement Dec 17, 2014
@SparkQA
Copy link

SparkQA commented Dec 17, 2014

Test build #24542 has finished for PR 3721 at commit 860f3a2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24542/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Dec 17, 2014

Test build #24543 has finished for PR 3721 at commit 8b6d095.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait ParquetTest
    • protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24543/
Test PASSed.

@JoshRosen
Copy link
Contributor

The flaky test fix here looks like it addresses https://issues.apache.org/jira/browse/SPARK-4790.

/cc @harishreedharan, who is investigating that flaky test.

@harishreedharan
Copy link
Contributor

This would not address the test issue as we are still returning from cleanupOldLogs before the deletion is completed. This tracker3.cleanupOldBatches(batchTime2) will still end up hitting an exception if the deletion is not done in between the listFiles and getFileStatus (for the child file) calls in the getFileStatus method.

@harishreedharan
Copy link
Contributor

The eventually ensures that the block is retried - but the failure is happening before the eventually, so the test would still throw.

@harishreedharan
Copy link
Contributor

Hmm, looking at it again - this would fix the test as well, though I think the approach in #3726 is cleaner.

@zsxwing
Copy link
Member Author

zsxwing commented Dec 18, 2014

My first trial was making cleanupOldBatches return a Future so that the caller can use it to wait. But I looked other places and found they used eventually. So I followed their approach. But IMO it's better to return the Future to user.

@zsxwing
Copy link
Member Author

zsxwing commented Dec 18, 2014

Used Future to fix the test and override my previous commits.

@SparkQA
Copy link

SparkQA commented Dec 18, 2014

Test build #24566 has started for PR 3721 at commit d3d8a51.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 18, 2014

Test build #24566 has finished for PR 3721 at commit d3d8a51.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24566/
Test PASSed.

@@ -125,7 +125,7 @@ private[streaming] class WriteAheadLogManager(
* between the node calculating the threshTime (say, driver node), and the local system time
* (say, worker node), the caller has to take account of possible time skew.
*/
def cleanupOldLogs(threshTime: Long): Unit = {
def cleanupOldLogs(threshTime: Long): Future[Unit] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should be exposing this future. That is internal implementation detail and we'd have to stick with this implementation. At some point, we might want to delete the files synchronously - at which point returning this Future might not make sense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, I prefer to also return Future for an asynchronous action. Returning Unit hides the asynchronous feature and such method will be misused easily.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is that deleting asynchronously is an implementation detail - which we choose to do now, which can be changed later, don't you think? If we change it later, we'd have to change this method's signature - which can cause pain if there is code that uses Await.result(*) on this future.

If we expose it via a parameter, we can choose to ignore the param and still the calling code will not have to change. Since it is unlikely that the calling code will actually depend on the async nature, it is unlikely to see any difference in functionality and no change in code is required.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is that deleting asynchronously is an implementation detail - which we choose to do now, which can be changed later, don't you think? If we change it later, we'd have to change this method's signature - which can cause pain if there is code that uses Await.result(*) on this future.

I doubt if it will be changed. However, asynchronously is an important implementation detail that the caller should know it, or they may misuse it.

If we expose it via a parameter, we can choose to ignore the param and still the calling code will not have to change. Since it is unlikely that the calling code will actually depend on the async nature, it is unlikely to see any difference in functionality and no change in code is required.

I don't think a parameter is enough. At least, it needs a more parameter, a timeout parameter. In your PR, you used 1 second which may not be enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's more, if we really want to change it to a synchronously deleting. Returning Future does still work. Just simply writing something like:

deleteFiles()
return Promise[Unit]().success(null).future

@@ -169,10 +171,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
}

// Combine the futures, wait for both to complete, and return the write ahead log segment
val combinedFuture = for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good catch. I wasnt aware that for-yield is flatMap which is obviously ordered.

@tdas
Copy link
Contributor

tdas commented Dec 24, 2014

High level comment, I think I completely agree on the Future.zip, but not sure if I find the other changed related to other two valid.

So it would be great if you can reduce the scope of this PR to the Future.zip and accordingly update the title or the PR + JIRA to a more specific than "improvements".

@zsxwing zsxwing changed the title [SPARK-4873][Streaming] WriteAheadLogBasedBlockHandler improvement [SPARK-4873][Streaming] Use Future.zip instead of Future.flatMap(for-loop) in WriteAheadLogBasedBlockHandler Dec 25, 2014
@zsxwing
Copy link
Member Author

zsxwing commented Dec 25, 2014

Now this PR only contains Future.zip change. I also updated the PR & JIRA.

@SparkQA
Copy link

SparkQA commented Dec 25, 2014

Test build #24799 has started for PR 3721 at commit 46a2cd9.

  • This patch merges cleanly.

@tdas
Copy link
Contributor

tdas commented Dec 25, 2014

LGTM. Will merge if tests pass.

@SparkQA
Copy link

SparkQA commented Dec 25, 2014

Test build #24799 has finished for PR 3721 at commit 46a2cd9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24799/
Test PASSed.

@tdas
Copy link
Contributor

tdas commented Dec 25, 2014

Merged this, thanks very much!

asfgit pushed a commit that referenced this pull request Dec 25, 2014
…for-loop) in WriteAheadLogBasedBlockHandler

Use `Future.zip` instead of `Future.flatMap`(for-loop). `zip` implies these two Futures will run concurrently, while `flatMap` usually means one Future depends on the other one.

Author: zsxwing <zsxwing@gmail.com>

Closes #3721 from zsxwing/SPARK-4873 and squashes the following commits:

46a2cd9 [zsxwing] Use Future.zip instead of Future.flatMap(for-loop)

(cherry picked from commit b4d0db8)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@asfgit asfgit closed this in b4d0db8 Dec 25, 2014
@zsxwing zsxwing deleted the SPARK-4873 branch December 25, 2014 03:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants