Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-1080 : Initial Standalone JobCoordinator and StreamProcessor API #44

Closed
wants to merge 8 commits into from

Conversation

navina
Copy link
Contributor

@navina navina commented Jan 24, 2017

This patch contains changes associated with the Standalone StreamProcessor, where there is no coordination. This will work for load-balanced consumers like new Kafka consumer and statically partitioned cases.

Additionally, we have introduced TaskFactory for StreamTask and AsyncStreamTask.

@navina navina changed the title Initial Standalone JobCoordinator and StreamProcessor API SAMZA-1080 : Initial Standalone JobCoordinator and StreamProcessor API Jan 24, 2017
@navina navina force-pushed the Noop-JC branch 2 times, most recently from 9c9bcdd to 606bb84 Compare January 25, 2017 21:27
* </pre>
*/
@InterfaceStability.Evolving
public class StreamProcessor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can one JVM process creates multiple StreamProcessors? Can the StreamProcessors being created in multiple threads? Is it thread safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. One JVM process can create multiple StreamProcessors. StreamProcessors can be created in multiple threads. It should thread-safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! Please add this in the javadoc so it's clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

Copy link
Contributor

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The patch looks good to me. I have a few general questions. Please see the comments.

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SamzaContainerController {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this class can be package private

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SamzaContainerController object is referenced by JobCoordinatorFactory interface. So, it cannot be package private.


Map<String, String> updatedConfigMap = new HashMap<>();
updatedConfigMap.putAll(config);
updatedConfigMap.put(PROCESSOR_ID, String.valueOf(this.processorId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason we need to augment config with processorId? seems the processorId is passed down to both containerController and jobCoordinator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. It is a little tricky to understand why this is needed. "processorId" is equivalent to "containerId" . However, containerId (in today's Yarn world) can only be between 0 to N-1 , where N is defined by job.container.count. This is not the case for processorId. It is directly defined through the StreamprocessorAPI by the user application. So, the grouper - SspGrouper and TaskNameGrouper needs to know the processorId in order to use it as a key in the JobModel definition.
If you take a look at AllSspToSingleTaskGrouperFactory and SingleContainerGrouperFactory, you will get an idea of how this processorId is used in these groupers.
Imo, the correct long-term solution should be to change the grouper interface to take in specific parameters, instead of an arbitrary bag of key/values pairs. Since it is kind of orthogonal to the scope of this work, the change to the interface has not yet been made.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the details. It's much clear to me now after seeing the two grouper factory classes. Do we want to make the config public (something like adding to the container config)? I am not sure whether there is any use case that customer uses standalone with their own groupers, and they need to access the processId there too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well..the grouper have always been customizable by the users. Are you suggesting that we make these groupers as the only options for StandaloneJobCoordinator users?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry that's not what I meant. I was wondering whether we want to put this PROCESS_ID config name as some public config string inside JobConfig so the groupers can use it. Right now it's defined in both StreamProcessor and the groupers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh.. Let me explore that idea. Thanks for the suggestion!

* @throws InterruptedException if the current thread is interrupted while waiting for container to start-up
*/
public boolean awaitStart(long timeoutMs) throws InterruptedException {
return containerController.awaitStart(timeoutMs); // TODO: Should awaitStart be part of the JC interface, instead of directly using container controller
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this impl, JobCoordinator is responsible for the life cycle of containerController. If we add awaitStart(0 inside JC, then there is no reason why StreamProcessor needs to know about containerController anymore. Can we make this cleaner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awaitStart is a user-facing API. For applications using Samza as a library, we need an interface for the application to use to ensure that the processor as actually started. This way the user can track the actual status of the processor and take appropriate action.
Alternative could be to add lifecycle listeners. We are planning on adding that, although it's not very clear what kind of hooks will be required.
Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I mean this: instead of containerController.awaitStart(timeoutMs), we do jobCoordinator.awaitStart(), as the comment suggested (it make a lot of sense to me since the controller is started and stopped in coordinator.). Anyway it's a small refactoring and we don't need to do it now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah.. I see what you mean now. Let me give that a try. That does make sense.

streamPartitionCountMonitor: StreamPartitionCountMonitor) = {
val jobModel: JobModel = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache)
streamPartitionCountMonitor: StreamPartitionCountMonitor,
containerIds: java.util.List[Integer]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this containerIds used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah.. This seems to have trickled from a future patch :)
When we are operating using ZK for coordination, we will have an elastic set of processing nodes. So, containerIds , aka processorIds are needed for JobModel generation (since it is no longer between 0 and N-1).

Since this change is not directly related to the current JIRA, I will remove it. Thanks for noticing it!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fyi, I am going to rename this file : JobCoordinator.scala -> JobModelManager.scala. There shouldn't be any changes in it.

config,
jobModel.maxChangeLogStreamPartitions,
null,
new JmxServer))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we don't stop the JmxServer anymore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. You are right. I will fix this.

*
* It generates the JobModel using the Config passed into the constructor.
* Since the standalone JobCoordinator does not perform partition management, it allows two kinds of partition
* distribution mechanism - consumer-managed partition distribution and user-defined fixed partition distribution.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me understand the partition distribution more. When we use the high-level kafka consumer, do we use certain task-to-container grouper and partition-to-task grouper? I remembered we had a discussion before where the tasks will be created for every partition in the topic so we can handle the dynamic partition re-balancing. Is that still the impl?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Using high-level Kafka consumer, you have to use partition-to-task grouper that maps all Ssp to single task and task-to-container grouper that maps all tasks to the same container. I have added them in the tests as AllSspToSingleTaskGrouper and SingleContainerGrouper. Perhaps I should move them into samza-core itself and update the documentation here?
Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why these two groupers are added in test initially? Seems to me they are required by this standalone model with high level kafka so it should be in samza-core.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have an implementation of the high-level Kafka system consumer. So, I wasn't sure if it should be in samza-core or samza-test. Based on our comments, I think it will be clear to move it to samza-core and add proper documentation in the StandaloneJobCoordinator. Let me make that change.

Copy link

@fredji97 fredji97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought I submitted the review earlier but seems not successful. Redoing it now.

}

public String getJobCoordinatorFactoryClassName() {
String jobCoordinatorFactoryClassName = get(JOB_COORDINATOR_FACTORY);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have a TODO to change this to job-coordinator.factory, why not to use job-coordinator.factory in this newly added code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually thought we are already using the config name at LinkedIn and it will become backward incompatible for those client. But I just checked. This is newly introduced config. So, I will change it now. Thanks for noticing it :)

* @return value indicating how long to wait for the tasks to shutdown
*/
public long getShutdownMs() {
if (get(TASK_SHUTDOWN_MS) == null) return DEFAULT_TASK_SHUTDOWN_MS;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are not using java 8 Optional here, maybe we can consider doing this:
String taskShutDownVal = get(TASK_SHUTDOWN_MS);
if (null == taskShutDownVal) {
...
}
else {
...}

It is a minor thing (so optional for change :)), but the pattern in the old way will call get() function twice in most of the cases. I believe this get() is light, but it may be heavy in other cases. So I would suggest to consider to always make only one call as a best practice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion!

*/
public long getShutdownMs() {
if (get(TASK_SHUTDOWN_MS) == null) return DEFAULT_TASK_SHUTDOWN_MS;
return Long.valueOf(get(TASK_SHUTDOWN_MS));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could throw exception. We may want to error out or use default value if there is an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using a default value, if it is not set. Am I missing something ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say if get(TASK_SHUTDOWN_MS) returns some value which cannot be parsed to long, it will throw an exception (NumberFormatException).


/**
* Returns the logical ID assigned to the processor
* It is upto the user to ensure that different instances of StreamProcessor within a job have unique processor ID.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: upto -> up to

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

* execute
*/
public void start() {
jobCoordinator.start();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it match the comment? Should it have two steps in start()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I didn't update the documentation after moving the container control code out of start and into containerController. I will update the documentation.

Copy link
Contributor

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Ship it!

@fredji97
Copy link

Thanks for addressing my comments. It looks good overall. I just put one more minor thing by following up your reply to one of my comments.

public long getShutdownMs() {
String shutdownMs = get(TASK_SHUTDOWN_MS);
if (shutdownMs == null) return DEFAULT_TASK_SHUTDOWN_MS;
return Long.parseLong(shutdownMs);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can still throw NumberFormatException if the configured value is not null but not parseable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get it from your first comment. I was trying to keep the behavior same as before. Fixing it now. Your suggestion is better :)

/**
* This method should be defined as @BeforeMethod.
*/
override def setUp() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the meaning of define an overridden function which just calls the parent's function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this override from TestStreamProcessor. When parent class contains test annotation (like @before and @after), it automatically invokes the same methods in derived classes, if any.

Copy link

@fredji97 fredji97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing my comments! ship-it!

@asfgit asfgit closed this in 2a3a5ac Jan 31, 2017
jmakes pushed a commit to jmakes/samza that referenced this pull request Nov 2, 2017
This patch contains changes associated with the Standalone StreamProcessor, where there is no coordination. This will work for load-balanced consumers like new Kafka consumer and statically partitioned cases.

Additionally, we have introduced TaskFactory for StreamTask and AsyncStreamTask.

Author: navina <navina@apache.org>

Reviewers: xinyuiscool,fredji97

Closes apache#44 from navina/Noop-JC
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants