-
Notifications
You must be signed in to change notification settings - Fork 329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SEP-19: Improved standby-aware container allocation for active-containers on job redeploys #952
Conversation
This reverts commit 719126a.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initial feedback;
// If we find a standbyContainer, we initiate a failover | ||
if (standbyContainer.isPresent()) { | ||
// Check if there is a running standby-container on that host that needs to be stopped | ||
List<String> standbyContainers = this.standbyContainerConstraints.get(activeContainerID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/standbyContainers/standbySamzaContainerIds
if (standbyContainer.isPresent()) { | ||
// Check if there is a running standby-container on that host that needs to be stopped | ||
List<String> standbyContainers = this.standbyContainerConstraints.get(activeContainerID); | ||
Map<String, SamzaResource> runningStandbyContainersOnHost = this.samzaApplicationState.runningContainers.entrySet().stream().filter(x -> standbyContainers.contains(x.getKey())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I think it's cleaner to restructure this as follows:
runningContainers.forEach((samzaContainerId, samzaResource) -> {
if (samzaContainerId in standbySamzaContainerIds and samzaResource.host == standByHost) {
// update the map
}
}
Alternately, if you still prefer using Java 8 streams, I'd suggest to consolidating the two filter operators
// record the resource request, before issuing it to avoid race with allocation-thread | ||
SamzaResourceRequest resourceRequestForActive = containerAllocator.getResourceRequest(activeContainerID, standbyHost); | ||
failoverMetadata.recordResourceRequest(resourceRequestForActive); | ||
containerAllocator.issueResourceRequest(resourceRequestForActive); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the difference between issueResourceRequest
and requestResource
? can we consolidate the 2 methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i need to recordResource requests, to handle expired requests, so i create one and then use
issueResourceRequest to issue it.
SamzaResource standbyResource = standbyContainer.get().getValue(); | ||
String standbyResourceID = standbyResource.getResourceID(); | ||
String standbyHost = standbyResource.getHost(); | ||
// if the standbyHost returned is anyhost, we proceed with the request directly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a readability guideline, If...Else-if..Else
blocks should check predicates which are related. For eg: it may not be obvious that standbyHost == ANY_HOST and runningStandbyContainersOnHost.isEmpty are related..
containerAllocator.requestResource(activeContainerID, ResourceRequestState.ANY_HOST); | ||
|
||
} else if (runningStandbyContainersOnHost.isEmpty()) { | ||
// if there are no running standby-containers on the standbyHost, we proceed to directly to make a resource request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move the computation of runningStandbyContainersOnHost to the else
block?
this.clusterResourceManager.stopStreamProcessor(standbyResource); | ||
}); | ||
|
||
if (runningStandbyContainersOnHost.size() > 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how bad is this error? should we terminate the process?
* 2. If we dont any such host, we iterate over last-known standbyHosts, if we haven't already selected it for failover. | ||
* 3. If still dont find a host, we fall back to AnyHost. | ||
* | ||
* TODO: In Step 2 & 3, enrich the functionality to select standbyHost intelligently based on lag, timestamp, load-balancing, etc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of these ideas like load aware-standby allocation warrant their own design. Instead of documenting them here, I would nuke this TODO and track them in a JIRA for better discoverability
Map<String, String> containerToHostMapping = this.samzaApplicationState.jobModelManager.jobModel().getAllContainerLocality(); | ||
|
||
for (String standbyContainerID : this.standbyContainerConstraints.get(activeContainerID)) { | ||
String standbyHost = containerToHostMapping.get(standbyContainerID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We appear to be re-implementing some of querying logic to find the host for a standby-container. Instead, leverage the existing APIs in jobModel to query the host for a specific containerId
|
||
// if there is no last-known host for the standby, continue | ||
if (standbyHost == null) { | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should log here
|
||
// if there is no last-known host for the standby, continue | ||
if (standbyHost == null) { | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the flow is a bit circuitous. avoid the continue
and consider returning any-host
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i agree, simplified it a bit.
but we cant return any-host only once we're done iterating over all standbyContainerIDs.
Addressed all comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, one final comment. ship it!
if (standbyHost == null) { | ||
log.info("No last known standbyHost for container {}", standbyContainerID); | ||
|
||
} else if (standbyHost != null && failoverMetadata.isPresent() && failoverMetadata.get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by definition, the condition standbyHost != null
is true at L:280. Can we remove it
containerAllocator.issueResourceRequest(resourceRequestForActive); | ||
samzaApplicationState.failoversToStandby.incrementAndGet(); | ||
} else { | ||
// if there is a running standby-containers on the standbyHost, we issue a stop (the stopComplete callback completes the remainder of the flow) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice explanations;
…job redeploys Author: Ray Matharu <rmatharu@linkedin.com> Reviewers: Jagadish<jagadish@apache.org> Closes apache#952 from rmatharu/test-standbyimprovements
No description provided.