Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23503][SS] Enforce sequencing of committed epochs for Continuous Execution #20936

Closed
wants to merge 5 commits into from
Closed

[SPARK-23503][SS] Enforce sequencing of committed epochs for Continuous Execution #20936

wants to merge 5 commits into from

Conversation

spaced4ndy
Copy link

@spaced4ndy spaced4ndy commented Mar 29, 2018

What changes were proposed in this pull request?

Made changes to EpochCoordinator so that it enforces a commit order. In case a message for epoch n is lost and epoch (n + 1) is ready for commit before epoch n is, epoch (n + 1) will wait for epoch n to be committed first.

How was this patch tested?

Existing tests in ContinuousSuite and EpochCoordinatorSuite.

Copy link
Contributor

@jose-torres jose-torres left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general approach looks correct to me.

partitionCommits.collect { case ((e, _), msg) if e == nextEpoch => msg }
logDebug(s"Committing epoch $nextEpoch.")
writer.commit(nextEpoch, nextEpochCommits.toArray)
query.commit(nextEpoch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bit of duplicated logic here - helper methods would probably be nice.

} else {
logDebug(s"Epoch $epoch has received commits from all partitions" +
s"and is waiting for epoch ${epoch - 1} to be committed first.")
epochsWaitingToBeCommitted.add(epoch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe swap the order of the if else. I'd forgotten what the condition was for after scrolling down here.

@spaced4ndy
Copy link
Author

@jose-torres Could you review latest changes for this PR please?

@jose-torres
Copy link
Contributor

LGTM, once #20983 is committed and we can pull in the tests from that PR.

@spaced4ndy
Copy link
Author

@tdas @jose-torres Hi, I tested this with #20983, can we run jenkins tests now?

@jose-torres
Copy link
Contributor

LGTM

@@ -137,30 +137,65 @@ private[continuous] class EpochCoordinator(
private val partitionOffsets =
mutable.Map[(Long, Int), PartitionOffset]()

private var lastCommittedEpoch = startEpoch - 1
// Remembers epochs that have to wait for previous epochs to be committed first.
private val epochsWaitingToBeCommitted = mutable.HashSet.empty[Long]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is orthogonal to the current PR, but I realized that both this and the commits/offsets maps are unbounded queues. We probably should introduce some SQLConf for the maximum epoch backlog, and report an error when too many stack up. I'll file a JIRA ticket for this.

@felixcheung
Copy link
Member

Jenkins, ok to test

@SparkQA
Copy link

SparkQA commented Apr 19, 2018

Test build #89555 has finished for PR 20936 at commit 478291b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@spaced4ndy
Copy link
Author

spaced4ndy commented Apr 24, 2018

@felixcheung hi, could you merge this please? I'd use this to work on PR for epoch backlog issue Jose pointed out. Since it passes tests and Jose approves, I guess it's good to go.

As a side note, could you also add me to whitelist so that I don't waste maintainers' time to launch jenkins tests for future PRs? I don't know if it's usual for new contributors, but I see it's a possible work flow in Spark contributing guide.

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good, just requires a little bit more docs.

}
}
}

private def findCommitsForEpoch(epoch: Long): Iterable[WriterCommitMessage] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docs explaining what this does? As is, its hard to distinguish just from the name the difference between findCommitsForEpoch and commitEpoch. I think the term "commit" is overloaded here - commit in findCommitsForEpoch refers to per-partition commits, whereas commit in commitEpoch refers to committing the epoch to the offset log. May be its better to differentiate more clearly. commitEpoch and findPartitionCommitsForEpoch. And add docs to both methods also helps.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas done

@spaced4ndy spaced4ndy changed the title [SPARK-23503][Structured Streaming] Enforcing sequencing of committed epochs for Continuous Execution [SPARK-23503][SS] Enforce sequencing of committed epochs for Continuous Execution Apr 26, 2018
@SparkQA
Copy link

SparkQA commented Apr 26, 2018

Test build #89876 has finished for PR 20936 at commit b1b9985.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented May 5, 2018

jenkins retest this please.

@SparkQA
Copy link

SparkQA commented May 5, 2018

Test build #90245 has finished for PR 20936 at commit b1b9985.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@spaced4ndy
Copy link
Author

spaced4ndy commented May 6, 2018

@tdas Hi, could you give me some advice on how to resolve this error? I'm not sure how this PR could cause this, especially considering it was passing tests before and failed jenkins test says "It is not a test it is a sbt.testing.SuiteSelector" in description.

@jose-torres
Copy link
Contributor

retest this please

@jose-torres
Copy link
Contributor

(I agree that your PR isn't responsible here, there's a known problem with that suite.)

@SparkQA
Copy link

SparkQA commented May 6, 2018

Test build #90281 has finished for PR 20936 at commit b1b9985.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@spaced4ndy
Copy link
Author

@tdas Hi, seems like it's good to go. Could you merge this when you have time?

@tdas
Copy link
Contributor

tdas commented May 16, 2018

Jenkins retest this please.

@tdas
Copy link
Contributor

tdas commented May 16, 2018

@efimpoberezkin I will merge it. Let me do another round of tests.

@spaced4ndy
Copy link
Author

retest this please

@spaced4ndy
Copy link
Author

@tdas can't start tests

@tdas
Copy link
Contributor

tdas commented May 17, 2018

jenkins retest this please.

@SparkQA
Copy link

SparkQA commented May 18, 2018

Test build #90758 has finished for PR 20936 at commit b1b9985.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jose-torres
Copy link
Contributor

ugh that flaky kafka test. It's already reported, and I've been looking into it this week albeit with little luck.

@jose-torres
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented May 18, 2018

Test build #4181 has finished for PR 20936 at commit b1b9985.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 18, 2018

Test build #90764 has finished for PR 20936 at commit b1b9985.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented May 18, 2018

I merged this to master. Thanks. And sorry for the delay.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants