SAMZA-1359: Handle phantom container notifications cleanly during an RM fail-over#243
SAMZA-1359: Handle phantom container notifications cleanly during an RM fail-over#243vjagadish1989 wants to merge 9 commits into
Conversation
vjagadish1989
commented
Jul 14, 2017
- Improved our container handling logic to be resilient to phantom notifications.
- Added a new metric to Samza's ContainerProcessManager module that tracks the number of such invalid notifications.
- Add a couple of tests that simulate this exact scenario above that we encountered during the cluster upgrade. (container starts -> container fails -> legitimate notification for the failure - container re-start -> RM fail-over -> phantom notification with a different exit code)
- As an aside, there are a whole bunch of tests in ContainerProcessManager that rely on Thread.sleep to ensure that threads get to run in a certain order. Removed this non-determinism and made them predictable.
| val mFailedContainers = newGauge("failed-containers", () => state.failedContainers.get()) | ||
| val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers.get()) | ||
| val mContainers = newGauge("container-count", () => state.containerCount) | ||
| val mInvalidNotifications = newGauge("invalid-notifications", () => state.invalidNotifications.get()) |
There was a problem hiding this comment.
From what I can tell, these notifications are not "invalid". They are "duplicate", "redundant" or "ignored", but not "invalid"
I'd suggest a rename, but it's your call.
There was a problem hiding this comment.
Makes sense, I'll rename them to be duplicate.
| * @throws InterruptedException if the current thread is interrupted while waiting | ||
| */ | ||
| void awaitContainersStart(int numExpectedContainers) throws InterruptedException { | ||
| latch = new CountDownLatch(numExpectedContainers); |
There was a problem hiding this comment.
How does the code guarantee that this latch reassignment doesn't happen AFTER the latch.countDown in runStreamProcessor()?
There was a problem hiding this comment.
-
awaitContainersStartis not meant to be used in a multi-threaded context. -
The only guarantee is that, we will wait until 2 containers have started after the call to
awaitContainersStart. The caller should ensure thatawaitContainerStartis called before any action that triggers the startup. This is just an alternative to doing aThread.sleep()
There was a problem hiding this comment.
Certainly better than Thread.sleep(). Thanks for explaining and +10000 to getting rid of Thread.sleep(). Appreciate it!
There was a problem hiding this comment.
I would suggest against an unbounded await. How about some reasonable timeout and then fail the test?
Instead of reassigning the count down latch you could use a semaphore and acquire the expected number of permits here and release them as containers start up.
cpettitt-linkedin
left a comment
There was a problem hiding this comment.
Kudos for improving the Thread.sleep situation!
| * @throws InterruptedException if the current thread is interrupted while waiting | ||
| */ | ||
| void awaitContainersStart(int numExpectedContainers) throws InterruptedException { | ||
| latch = new CountDownLatch(numExpectedContainers); |
There was a problem hiding this comment.
I would suggest against an unbounded await. How about some reasonable timeout and then fail the test?
Instead of reassigning the count down latch you could use a semaphore and acquire the expected number of permits here and release them as containers start up.
|
@santhoshvenkat1988 @jmakes Review please! |
shanthoosh
left a comment
There was a problem hiding this comment.
LGTM.
Just curious. Wondering if it makes sense to make this layer capable of handling any kind of duplicate(bad) yarn notifications(successful_completion, preempted etc). We check a received container notification against valid possible state transitions before acting on it.