-
Notifications
You must be signed in to change notification settings - Fork 334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SAMZA-2378: Container Placements support for Standby containers enabled jobs #1281
SAMZA-2378: Container Placements support for Standby containers enabled jobs #1281
Conversation
@@ -48,8 +48,7 @@ | |||
* ContainerManager encapsulates logic and state related to container placement actions like move, restarts for active container | |||
* if issued externally. | |||
* | |||
* TODO SAMZA-2378: Container Placements for Standby containers enabled jobs | |||
* SAMZA-2379: Container Placements for job running in degraded state | |||
* TODO SAMZA-2379: Container Placements for job running in degraded state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably prioritize this sooner than later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup following up with RB
@@ -136,7 +135,22 @@ boolean handleContainerLaunch(SamzaResourceRequest request, String preferredHost | |||
LOG.info("Waiting for running container to shutdown due to existing ContainerPlacement action {}", actionMetaData); | |||
return false; | |||
} else if (actionStatus == ContainerPlacementMetadata.ContainerStatus.STOPPED) { | |||
allocator.runStreamProcessor(request, preferredHost); | |||
// If the job has standby containers enabled, always check standby constraints before issuing a start on container |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
Can the lines above
if (hasActiveContainerPlacementAction(request.getProcessorId())) {
String processorId = request.getProcessorId();
String processorId = request.getProcessorId();
ContainerPlacementMetadata actionMetaData = getPlacementActionMetadata(processorId).get();
be rewritten to:
Optional<ContainerPlacementMetadata> actionMetaData = getPlacementActionMetadata(processorId);
if (actionMetaData.isPresent()) {
use actionMetaData.get() everywhere after...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hasActiveContainerPlacementAction checks the metadata of the action to be either in ACCEPTED or IN_PROGRESS, it not a check for the presence of metadata
// Fallback to source host since the new allocated resource does not meet standby constraints | ||
allocator.requestResource(processorId, actionMetaData.getSourceHost()); | ||
markContainerPlacementActionFailed(actionMetaData, | ||
String.format("allocated resource %s does not meet standby constraints now, falling back to source host", allocatedResource)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update failedStandbyAllocations metric?
Also it may be possible to expose a method on standby-container-manager to do the
resourceRequestState.releaseUnstartableContainer();
resourceRequestState.cancelResourceRequest(request);
containerAllocator.requestResource();
because it is done in standby-container-manager as a part of
checkStandbyConstraintsAndRunStreamProcessor method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update failedStandbyAllocations metric?
No, because here we did not initiate a standby failover, user-initiated two individual requests: move of standby to x-host and move of active to the standby host, these are two independent requests hence we do not need a metric update
Also it may be possible to expose a method on standby-container-manager to do the
Sure, will do
markContainerPlacementActionFailed(actionMetaData, | ||
String.format("allocated resource %s does not meet standby constraints now, falling back to source host", allocatedResource)); | ||
} else { | ||
LOG.info("Status updated for ContainerPlacement action: ", actionMetaData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Status wasnt updated here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is invoked by the Allocator thread when only when active Container is successfully stopped, the signal of that successful stop is given by AMRMClientAsync thread. Hence a state change (done by AMRMClientAsync) is updated with metadata which is logged
if (hasActiveContainerPlacementAction(requestMessage.getProcessorId()) | ||
|| checkStandbyOrActiveContainerHasActivePlacementAction(requestMessage.getProcessorId())) { | ||
if (standbyContainerManager.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic seems twisted; making the code unreadable,
we check if there there is an active-action on the container-id, or if there is an active-action on its active/standby counterparts,
after that we check if there is a standby-container-manager present?
Would it be possible to
a. first check if a standby-container-manager is present?
Or
b. can hasActiveContainerPlacementAction encapsulate the logic of checking a and checking with if there is an active-action on its active/standby counterparts of the given processor-id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point, let me refactor it
* | ||
* @param requestMessage container placement request message | ||
* @return Pair<ContainerPlacementMessage.StatusCode, String> which is status code & response suggesting if the request is valid | ||
*/ | ||
private Pair<ContainerPlacementMessage.StatusCode, String> validatePlacementAction(ContainerPlacementRequestMessage requestMessage) { | ||
String errorMessagePrefix = String.format("ContainerPlacement request: %s is rejected due to", requestMessage); | ||
String errorMessagePrefix = ContainerPlacementMessage.StatusCode.BAD_REQUEST + " reason: "; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead can we define errorMessage here
as ContainerPlacementMessage.StatusCode.BAD_REQUEST + " reason: %s"
and later String.format(errorMessage, reason);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
private boolean checkStandbyOrActiveContainerHasActivePlacementAction(String processorId) { | ||
if (standbyContainerManager.isPresent()) { | ||
// If requested placement action is on a standby container and its active container has a placement request, | ||
// this request shall not be de-queued until in-flight action on active container is complete | ||
if (StandbyTaskUtil.isStandbyContainer(processorId) && hasActiveContainerPlacementAction( | ||
StandbyTaskUtil.getActiveContainerId(processorId))) { | ||
return true; | ||
} | ||
// If requested placement action is on a standby container and its active container has a placement request, | ||
// this request shall not be de-queued until in-flight action on active container is complete | ||
if (!StandbyTaskUtil.isStandbyContainer(processorId)) { | ||
for (String standby : standbyContainerManager.get().getStandbyList(processorId)) { | ||
if (hasActiveContainerPlacementAction(standby)) { | ||
return true; | ||
} | ||
} | ||
} | ||
} | ||
return false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method can be simplified/inlined with hasActiveAction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
List<String> getStandbyList(String activeContainerId) { | ||
return this.standbyContainerConstraints.get(activeContainerId); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comment above on simplifying,
You can additional methods here to simplify the caller's logic, so that the caller does not need to know the internals of the standby container manager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I need this exposed from StandbyContainerManager because I need to refer to metadata maintained in ContainerManager for checking active actions on each standby replica, the method here just exposes the list of standby replicas!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
took a pass
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a test is failing, check with @lakshmi-manasa-g if its relevant/related.
yes, the failing test (at org.apache.samza.system.azureblob.avro.TestAzureBlobOutputStream.testClose) in the last build here is being fixed in #1298 For now, do an empty commit to trigger another build and it should mostly pass. this test is flaky and wont fail in all Travis builds. |
Design: SEP-22: Container Placements in Samza
Changes: Following abilities are added as a part of this PR:
Tests:
Integ test: End-to-end test for standby enabled job is added
Manual testing matrix: Container Placement Test Plan
API Changes:
Upgrade Instructions: None
Usage Instructions: Instantiate ContainerPlacementMetadataStore to write Container placement messages to Metastore, query it by the UUID generated