Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-1212 - Refactor interaction between StreamProcessor, JobCoordinator and SamzaContainer #148

Closed
wants to merge 34 commits into from

Conversation

navina
Copy link
Contributor

@navina navina commented Apr 28, 2017

See SAMZA-1212 for motivation toward this refactoring.
Changes here are:

  • Removed awaitStart (blocking) method in StreamProcessor, JobCoordinator and SamzaContainer
  • Introduced SamzaContainerListener and JobCoordinatorListener interface implemented by StreamProcessor
  • Introduced SamzaContainerStatus to handler failures and lifecycle using Listener interfaces

@navina
Copy link
Contributor Author

navina commented Apr 28, 2017

@prateekm @xinyuiscool Can you please review?

Note I am still working on adding unit tests for StreamProecssor. Just wanted to get the review started as the changes are quite significant. Thanks!

STOPPED,

/**
* Indicates that the container failed during any of its 3 active states -

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the above diagram, "FAILED" does not come from "STOPPED"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It is expected to be in either on of these states. I "failed" job cannot transition to "stopped". These are 2 different End states of the container.

* Method invoked by a {@link org.apache.samza.coordinator.JobCoordinator} when it is shutting without any errors
* </p>
*/
void onCoordinatorStop();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is just JobCoordinatorListener, would it be cleaner to just call this and the following one onStop() and onFailure()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be. It was getting pretty confusing when I initially started between the 3 groups of listeners - container listener, coordinator listener and processorListener, and each of them had the same onStart and onStop apis :) That's why I decided to keep it more explicitly in this case.

* {@link org.apache.samza.container.RunLoop}
* </p>
*/
void onContainerStart();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be cleaner to call it onStart()? And also onStop() and onFailure for the following ones?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same explanation as above :)


@Override
public void onJobModelExpired() {
if (container != null && container.getStatus().equals(SamzaContainerStatus.RUNNING)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better off to do SamzaContainerStatus.RUNNING.equals(container.getStatus()) to avoid check if container.getStatus() is null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Will fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If container's not null but not running (hypothetical example: jobModelUpdate followed immediately by a jobModelExpired before container transitioned to running), shouldn't we try to stop and replace it regardless? Might be safer to remove this check and log the container status if not RUNNING in the body instead.

Also, Maybe log in else that expired was called but container was null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we try to stop and replace it regardless?

Fair point about container not having transitioned to RUNNING state.

Might be safer to remove this check and log the container status if not RUNNING in the body instead.

I don't understand what you mean by log the container status in the body?? Are you suggesting that we invoke shutdown, irrespective of the state of the container?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized we still need the state check since calling shutdown on a stopped/failed container won't countdown the latch.

I think we can change this check to if (container != null && container != stopped or failed)
That way if it's not started yet, it'll go to RUNNING -> STOPPED immediately. Does that sound reasonable?

As an aside, we should throw in SamzaContainer if run is called on a STOPPED | FAILED container.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can change this check to if (container != null && container != stopped or failed)

Yeah. Sounds good. Negative test condition works better here.

As an aside, we should throw in SamzaContainer if run is called on a STOPPED | FAILED container.

Ok. Let me try adding these checks. Thanks!

public synchronized void stop() {
if (container != null) {
LOGGER.info("Shutting down container " + container.toString() + " from StreamProcessor");
container.shutdown(false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

besides container.shutdown, should we do jobcoordinator stop as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Container shutdown should automatically trigger onContainerStop callback that will trigger the JobCoordinator to stop. There are multiple ways in which the StreamProcessor stops:

  1. caller of StreamProcessor invokes stop()
  2. Container completes processing (eg. bounded input) and shuts down
  3. Container fails
  4. Coordinator fails

When either container or coordinator stops (cleanly or due to exception), it will try to shutdown the streamprocessor. This needs to be synchronized so that only one code path gets triggered for shutdown.

If container is running,

  • container is shutdown cleanly and onContainerStop will trigger jobcoordinator.stop.
  • container fails to shutdown cleanly -> onContainerFailure will trigger jobcoordinator.stop

If container is not running, then we should simply shutdown the jobcoordinator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@navina let's document this as part of implementation notes in the method javadoc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok sure.

for (String systemName: systemConfig.getSystemNames()) {
String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
if (systemFactoryClassName == null) {
LOGGER.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to do both logging and exception on the same message here? I think throwing the exception and let the caller to handle the exception should be good enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the exception will get logged if the caller doesn't handle the exception, will it?
If exception gets logged either ways, then, I can remove the redundant log statement.

// TODO - make a loop here with some number of attempts.
// possibly split into two method - becomeLeader() and becomeParticipant()
leaderElector.tryBecomeLeader(new LeaderElectorListener() {
@Override
public void onBecomingLeader() {
onBecomeLeader();
listenToProcessorLiveness(); // subscribe for adding new processors
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment here does not match the method here, or it does not explain well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I will fix it.

Copy link
Contributor

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refactoring looks great. Thanks for the clean up.

/**
* Indicates that the container has not been started
*/
NOT_STARTED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ApplicationStatus we have a status "NEW", which is the same meaning as NOT_STARTED. Would you like being consistent with that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ApplicationStatus should rely on the status of the StreamProcessor itself, and not on its individual components. Having said that, we have not defined states for StreamProcessors yet. We should, ideally, define states for StreamProcessor and that should be inspected by the ApplicationRunner for status.

I would prefer to keep SamzaContainer's initial status as NOT_STARTED because each time we rebalance, we are creating a "new" samza container instance, which is technically not started. Calling it NEW really doesn't make sense atm.

Is there any state transition details for ApplicationStatus? What it means in the standalone scenarios?

* @return An instance of IJobCoordinator
*/
JobCoordinator getJobCoordinator(String processorId, Config config, SamzaContainerController containerController);
JobCoordinator getJobCoordinator(String processorId, Config config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes the interface a lot cleaner! Thx!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Np :)

@Override
public void onJobModelExpired() {
if (container != null && container.getStatus().equals(SamzaContainerStatus.RUNNING)) {
container.shutdown(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel passing this flag around and then using it in the callback is kind of making the listener interface not very clean. Do you think we can set up a different listener before container.shutdown() like following:

container.setListener(new ContainerListener() {
void onStart() {//donothing}
void onStop() {// do the logic here, trigger the countdown latch}
void onfailuer() {//do the failuer logic}
);
container.shutdown();
...

Then I think we don't need this flag anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not clear on what you proposing here.
I agree that passing around a flag is not optimal. But we have bi-directional notifications among the components here. If there is a more elegant way, we should incorporate that. Let me sync up with you in-person.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spoke with @xinyuiscool and @prateekm offline. Summarizing it here:
Above, Xinyu was suggesting the overwriting the listener to the container depending on who invokes the shutdown. Or register more than one listener to the container and the container invokes the correct listener depending on who is requesting the shutdown.
The problem with either of the suggestion is:

  1. we are blurring the contracts of the listener interface
  2. container needs to be aware of the components it is interacting with. This is against the whole idea behind listener model

The main issue under discussion is the "pauseByJm" boolean flag passed to onContainerStop. This is required to identify why the container stopped or completed. If the container itself can clearly expose its state, then we wouldn't need this. Problem is that when the container "completes" processing (as in bounded or end of stream), it will automatically stop. There is no way to distinguish this state of the container from the "STOPPED" state. If we can clearly identify a "PAUSED" state and "COMPLETED" state of the SamzaContainer, we can get rid of this flag. Until such a clear definition of state becomes apparent, we have to thread the flag through the components.

Conclusion: For the sake of clarity of the code, we decided to use an "enum" to identify the source requesting the shutdown, instead of using a boolean flag.

}
@volatile private var status = SamzaContainerStatus.NOT_STARTED
private var exceptionSeen: Throwable = null
private var paused: Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above, hopefully we can remove this flag.


@Override
public void onContainerFailed(Throwable t) {
log.info("Container Failed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be handled for the yarn case. Previously container.run() will throw exception when there is a failure. Now seems the logic is changed and the exception is populated in the callback. So it seems we need to store the exception into a var and throw it after container.run(). Something like:
LocalConatinerRunner {
Throwable throwable;
...
container setContainerListener(new SamzaContainerListener() {
...
onContainerFailed(Throwable t) {
throwable = t;
}
}

container.run();

if(throwable != null) {
throw throwable
}
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh you are right. I think I actually wanted to ask you about this runner and I forgot about it.

But why do we want to throw again, only to be caught by uncaughtexceptionhandler ?

I am ok with throwing it after it returns from run. It will exit the status with error code I guess. Thanks for pointing out that I changed the existing behavior :D

}
case SamzaContainerStatus.FAILED =>
if (containerListener != null) {
containerListener.onContainerFailed(exceptionSeen)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we are not throw the exception directly from container.run(). We populate it into the callback. Please do a usage search of container.run() and confirm that the throwable from the callback is handled properly. I found one case needed to be fixed in the LocalContainerRunner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually did a usage search and that is how I identified LocalContainerRunner. I don't see any other usage of container.run. The only callers are LocalContainerRunner, ThreadJob and StreamProcessor.
If there are custom usages of this interface through CommandBuilders (ie. using task.execute), it should be handled by those custom scripts. Let me know if you find other places of usage. I couldn't find any.

@@ -28,39 +29,41 @@
* based on the underlying environment. In some cases, ID assignment is completely config driven, while in other
* cases, ID assignment may require coordination with JobCoordinators of other StreamProcessors.
*
* This interface contains methods required for the StreamProcessor to interact with JobCoordinator.
* This interface contains methods required for the StreamProcessor to interact with JobCoordinator. StreamProcessor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation: First sentence is redundant.
"to get notified about JobModel and Coordinator state changes."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

@@ -55,7 +55,7 @@
private final String processorId;

private final ZkController zkController;
private final SamzaContainerController containerController;
private JobCoordinatorListener coordinatorListener = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codestyle: Group final and non-final fields separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.processor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JobCoordinatorLifecycleListener. Should be in the coordinator package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

* Listener interface that can be registered with the {@link org.apache.samza.container.SamzaContainer} instance in
* order to receive notifications.
*/
public interface SamzaContainerListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SamzaContainerLifecycleListener. Should be in the container package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Are you also suggesting we rename this SamzaContainerLifecycleListener?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's verbose but would prefer XLifecycleListener naming for all of these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized that the lifecycle listener that we already have internally is called SamzaContainerLifecycleListener. So, there will be a name collision if we change it now. May be we can change this later on?

public interface SamzaContainerListener {

/**
* <p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation: <p> are not necessary (here and everywhere else).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessary. But it looks better formatted in the javadoc. Is that not the case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need it if there's only one paragraph, javadoc will format it appropriately. If there are multiple paragraphs, you can insert

between them. E.g., http://www.oracle.com/technetwork/java/javase/documentation/index-137868.html#format

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is so odd that

is mostly used for breaklines. I removed from this class. Will adjust other docs as and when I find them.

throw ie
}
@volatile private var status = SamzaContainerStatus.NOT_STARTED
private var exceptionSeen: Throwable = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Single space before =

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

/**
* Indicates that the container started the {@link org.apache.samza.container.RunLoop}
*/
RUNNING,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Slightly prefer STARTED instead of RUNNING. Matches better with the previous 2 states and the lifecycle hook name (onStart). Up to you though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah.. too many places to change now . Let me try to refactor in the IDE

throw e
} finally {
if (status.equals(SamzaContainerStatus.RUNNING)) {
error("Caught exception/error in process loop.", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: "in run loop."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

}
}

def shutdown() = {
/**
* Triggers shutdown of the run loop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the comment for StreamProcessor. Maybe:

Asynchronously shuts down this SamzaContainer.
Implementation: 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

@@ -54,18 +52,31 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
case _ => None
}

val containerListener = new SamzaContainerListener {
override def onContainerFailed(t: Throwable): Unit = {
throw t
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the thrown exception be handled correctly or result in an uncaught exception and jvm quitting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the behavior should be same as before. ThreadJob does have a try-catch and it re-throws the Throwable. Not sure if the JobRunner will handle it though. There isn't any uncaught exception handler in thread job. So, may be ApplicationRunner should deal with it??

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't ThreadJob be run w/o the ApplicationRunner (i.e. with the low-level APIs)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iiuc, ApplicationRunner is the user-facing api, even for thread jobs.

@xinyuiscool can you please clarify ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Log it please.

@navina
Copy link
Contributor Author

navina commented May 3, 2017

@prateekm @xinyuiscool I have addressed all the comments. Only exception is the one involving the boolean flag. Replacing it with the enum makes it easy to read. But I don't think we have clear values for the enum - except for JobCoordinator.
On second thought, would it be better to keep a separate method called "pause" in SamzaContainer and that is used to set the paused flag. The benefit of this approach is:

  1. It will be more readable and we won't semantically overload the shutdown method
  2. In future, when we refactor container to have a well-defined paused status, it should be relatively simple to transition over.

What do you think?

@xinyuiscool
Copy link
Contributor

+1. I like adding the pause() much better than passing the flag or enum around. We can have some comments saying it's actually stopping the container right now, but the code is much cleaner.

@prateekm
Copy link
Contributor

prateekm commented May 3, 2017 via email

@navina
Copy link
Contributor Author

navina commented May 3, 2017

My first preference would be to try to do the bookkeeping in
StreamProcessor for whether JC or SP requested shutdown, if possible.

That would be the most ideal case. But it is not as straightforward as we think. We have to synchronize access and ordering of calls between jobcoordinator and container. So, I would like to table this approach for now.

If we use the enum instead, how about the following values:
JobCoordinator, StreamProcessor, SamzaContainer (EOS), LocalContainerRunner
(for now)

I can add values for the enum. But in reality, the only enum for which you will have a different behavior is when the value is JobCoordinator. The rest of them are unusable atm or don't have any functional value. I would prefer not to add enums just for the sake of it. It only feels like more tech-debt for the future and unlikely, that the debt is resolved in the "near-future".

I'd prefer either over adding pause before it's an actual SamzaContainer state.

The only problem we are actually facing right now is how to make the code more readable. I believe pause solves the problem reasonably.

Functionally, there is not a lot we can change. I tried out the Enum approach and didn't like it because it was ignoring all values other than JobCoordinator.

@navina
Copy link
Contributor Author

navina commented May 3, 2017

@prateekm another point that I missed above. It is actually the Streamprocessor that decides that when the jobmodel changes, it will stop the container. So, should the enum used be JobCoordinator or StreamProcessor. For all shutdown request from the listener, it should theoretically be StreamProcessor because JobCoordinator doesn't care about the state of the container.
This aspect was confusing when using the enum approach.

Copy link
Contributor

@prateekm prateekm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM!



/**
* <p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Remove <p> in this class too.

@@ -54,18 +52,31 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
case _ => None
}

val containerListener = new SamzaContainerListener {
override def onContainerFailed(t: Throwable): Unit = {
throw t
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Log it please.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants