SAMZA-2685: Add job coordinator which does not do resource management#1529
SAMZA-2685: Add job coordinator which does not do resource management#1529cameronlee314 merged 7 commits intoapache:masterfrom
Conversation
kw2542
left a comment
There was a problem hiding this comment.
This looks like a significant change, do we want a SEP for it?
samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
Outdated
Show resolved
Hide resolved
.../org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/coordinator/JobModelMonitors.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/coordinator/JobModelMonitors.java
Outdated
Show resolved
Hide resolved
.../org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunicationContext.java
Outdated
Show resolved
Hide resolved
@kw2542 That's a good point about having a SEP. Do you think it would be reasonable to get some implementation checked in and tried out in Kubernetes a little more before posting a SEP? I do already have some context to put into a SEP now if you think that would be better. |
Synced with @mynameborat about this. We will update the existing SEP-20 for Samza on Kubernetes to include this new job coordination flow. |
|
LGTM |
mynameborat
left a comment
There was a problem hiding this comment.
Minor comment. Looks good to me!
samza-core/src/main/java/org/apache/samza/coordinator/NoProcessorJobCoordinatorListener.java
Show resolved
Hide resolved
There are some test failures. Doesn't seem related to your change although I'd just make sure if that is the case. |
This error has been happening on some other PRs as well, so it's very unlikely related to this change. It is being handled separately. |
Feature: Adding a job coordinator which does not do resource management. For Samza on YARN, the
ClusterBasedJobCoordinatordoes resource management. However, for Samza on Kubernetes, it is not necessary to have a job coordinator which does resource management, because Kubernetes controllers can take care of resource management.Changes:
StaticResourceJobCoordinatorwhich handles responsibilities like job model calculation, communicating job model to workers, and startpoint fanout.CoordinatorCommunicationfor communication between job coordinator and workers. Before, the only communication option was an HTTP endpoint. The new abstraction layer allows us to start decoupling the coordination from the HTTP endpoint. This PR doesn't expose an option to plug in a custom communication layer yet, but there is an interface to start working off of.Testing:
API changes (all changes are backwards compatible):
job.coordinator.factorytoorg.apache.samza.coordinator.staticresource.StaticResourceJobCoordinatorFactoryin order to use the new coordinator.job.coordinator.restart.signal.factoryto define how to restart the Samza job when an input stream changes which will change the job model. This plug-in is dependent on where the Samza job is running (e.g. Kubernetes). Currently, there is only a no-op implementation of this restart signal.Note: This PR reuses components like
JobModelHelper,JobCoordinatorMetadataManager, andStartpointManageracrossClusterBasedJobCoordinatorandStaticResourceJobCoordinator. I considered consolidatingClusterBasedJobCoordinatorandStaticResourceJobCoordinatoreven further to share code which encapsulates usage of multiple of the above components, but it's not yet clear how to fit the new and old flows together cleanly. There might be further divergence between the coordinators as we iterate on onStaticResourceJobCoordinator, so I felt it would be easier to leave them more decoupled for now. Also, keeping them decoupled reduces risk that a change will impact the existingClusterBasedJobCoordinator.