Skip to content

Commit

Permalink
SAMZA-2340: [Container Placements] Introduce ContainerManager for han…
Browse files Browse the repository at this point in the history
…dling validation for failures & starts of active & standby containers (#1211)
  • Loading branch information
Sanil15 authored and rmatharu-zz committed Nov 7, 2019
1 parent 6a9896b commit ab1512f
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.samza.clustermanager;

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
Expand Down Expand Up @@ -115,13 +114,13 @@ public class ContainerAllocator implements Runnable {
*/
private final int requestExpiryTimeout;

private final Optional<StandbyContainerManager> standbyContainerManager;
private final ContainerManager containerManager;

public ContainerAllocator(ClusterResourceManager clusterResourceManager,
Config config,
SamzaApplicationState state,
boolean hostAffinityEnabled,
Optional<StandbyContainerManager> standbyContainerManager) {
ContainerManager containerManager) {
ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
this.clusterResourceManager = clusterResourceManager;
this.allocatorSleepIntervalMs = clusterManagerConfig.getAllocatorSleepTime();
Expand All @@ -132,7 +131,7 @@ public ContainerAllocator(ClusterResourceManager clusterResourceManager,
this.state = state;
this.config = config;
this.hostAffinityEnabled = hostAffinityEnabled;
this.standbyContainerManager = standbyContainerManager;
this.containerManager = containerManager;
this.requestExpiryTimeout = clusterManagerConfig.getContainerRequestTimeout();
}

Expand Down Expand Up @@ -204,12 +203,7 @@ void assignResourceRequests() {
state.matchedResourceRequests.incrementAndGet();
}

// If hot-standby is enabled, check standby constraints are met before launching a processor
if (this.standbyContainerManager.isPresent()) {
checkStandByContrainsAndRunStreamProcessor(request, preferredHost);
} else {
runStreamProcessor(request, preferredHost);
}
containerManager.handleContainerLaunch(request, preferredHost, peekAllocatedResource(preferredHost), resourceRequestState, this);

} else {

Expand All @@ -220,7 +214,7 @@ void assignResourceRequests() {
if (expired) {
updateExpiryMetrics(request);
if (hostAffinityEnabled) {
handleExpiredRequestWithHostAffinityEnabled(processorId, preferredHost, request);
containerManager.handleExpiredRequestWithHostAffinityEnabled(processorId, preferredHost, request, this, resourceRequestState);
}
} else {
LOG.info("Request for Processor ID: {} on preferred host {} has not expired yet."
Expand All @@ -232,30 +226,6 @@ void assignResourceRequests() {
}
}

/**
* Handles an expired resource request for both active and standby containers. Since a preferred host cannot be obtained
* this method checks the availability of surplus ANY_HOST resources and launches the container if available. Otherwise
* issues an ANY_HOST request.
*/
@VisibleForTesting
void handleExpiredRequestWithHostAffinityEnabled(String processorId, String preferredHost,
SamzaResourceRequest request) {
boolean resourceAvailableOnAnyHost = hasAllocatedResource(ResourceRequestState.ANY_HOST);
if (standbyContainerManager.isPresent()) {
standbyContainerManager.get()
.handleExpiredResourceRequest(processorId, request,
Optional.ofNullable(peekAllocatedResource(ResourceRequestState.ANY_HOST)), this, resourceRequestState);
} else if (resourceAvailableOnAnyHost) {
LOG.info("Request for Processor ID: {} on host: {} has expired. Running on ANY_HOST", processorId, preferredHost);
runStreamProcessor(request, ResourceRequestState.ANY_HOST);
} else {
LOG.info("Request for Processor ID: {} on host: {} has expired. Requesting additional resources on ANY_HOST.",
processorId, preferredHost);
resourceRequestState.cancelResourceRequest(request);
requestResource(processorId, ResourceRequestState.ANY_HOST);
}
}

/**
* Updates the request state and runs a processor on the specified host. Assumes a resource
* is available on the preferred host, so the caller must verify that before invoking this method.
Expand Down Expand Up @@ -290,16 +260,6 @@ protected void runStreamProcessor(SamzaResourceRequest request, String preferred
clusterResourceManager.launchStreamProcessor(resource, builder);
}

/**
* If {@code StandbyContainerManager} is present check standBy constraints are met before attempting to launch
* @param request outstanding request which has an allocated resource
* @param preferredHost to run the request
*/
private void checkStandByContrainsAndRunStreamProcessor(SamzaResourceRequest request, String preferredHost) {
standbyContainerManager.get().checkStandbyConstraintsAndRunStreamProcessor(request, preferredHost,
peekAllocatedResource(preferredHost), this, resourceRequestState);
}

/**
* Called during initial request for resources
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.clustermanager;

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* ContainerManager is a centralized entity that manages control actions like start, stop for both active and standby containers
* ContainerManager acts as a brain for validating and issuing any actions on containers in the Job Coordinator.
*
* The requests to allocate resources resources made by {@link ContainerAllocator} can either expire or succeed.
* When the requests succeeds the ContainerManager validates those requests before starting the container
* When the requests expires the ContainerManager decides the next set of actions for the pending request.
*
* Callbacks issued from {@link ClusterResourceManager} aka {@link ContainerProcessManager} are intercepted
* by ContainerManager to handle container failure and completions for both active and standby containers
*/
public class ContainerManager {

private static final Logger LOG = LoggerFactory.getLogger(ContainerManager.class);

/**
* Resource-manager, used to stop containers
*/
private final ClusterResourceManager clusterResourceManager;
private final SamzaApplicationState samzaApplicationState;

private final Optional<StandbyContainerManager> standbyContainerManager;

public ContainerManager(SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
Boolean standByEnabled) {
this.samzaApplicationState = samzaApplicationState;
this.clusterResourceManager = clusterResourceManager;
// Enable standby container manager if required
if (standByEnabled) {
this.standbyContainerManager =
Optional.of(new StandbyContainerManager(samzaApplicationState, clusterResourceManager));
} else {
this.standbyContainerManager = Optional.empty();
}
}

/**
* Handles the container start action for both active & standby containers.
*
* @param request pending request for the preferred host
* @param preferredHost preferred host to start the container
* @param allocatedResource resource allocated from {@link ClusterResourceManager}
* @param resourceRequestState state of request in {@link ContainerAllocator}
* @param allocator to request resources from @{@link ClusterResourceManager}
*/
void handleContainerLaunch(SamzaResourceRequest request, String preferredHost, SamzaResource allocatedResource,
ResourceRequestState resourceRequestState, ContainerAllocator allocator) {
if (this.standbyContainerManager.isPresent()) {
standbyContainerManager.get()
.checkStandbyConstraintsAndRunStreamProcessor(request, preferredHost, allocatedResource, allocator,
resourceRequestState);
} else {
allocator.runStreamProcessor(request, preferredHost);
}
}

/**
* Handles the action to be taken after the container has been stopped.
* Case 1. When standby is enabled, refer to {@link StandbyContainerManager#handleContainerStop} to check constraints.
* Case 2. When standby is disabled there are two cases according to host-affinity being enabled
* Case 2.1. When host-affinity is enabled resources are requested on host where container was last seen
* Case 2.2. When host-affinity is disabled resources are requested for ANY_HOST
*
* @param processorId logical id of the container
* @param containerId last known id of the container deployed
* @param preferredHost host on which container was last deployed
* @param exitStatus exit code returned by the container
* @param preferredHostRetryDelay delay to be incurred before requesting resources
* @param containerAllocator allocator for requesting resources
*/
void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus,
Duration preferredHostRetryDelay, ContainerAllocator containerAllocator) {
if (standbyContainerManager.isPresent()) {
standbyContainerManager.get()
.handleContainerStop(processorId, containerId, preferredHost, exitStatus, containerAllocator,
preferredHostRetryDelay);
} else {
// If StandbyTasks are not enabled, we simply make a request for the preferredHost
containerAllocator.requestResourceWithDelay(processorId, preferredHost, preferredHostRetryDelay);
}
}

/**
* Handle the container launch failure for active containers and standby (if enabled).
* Case 1. When standby is enabled, refer to {@link StandbyContainerManager#handleContainerLaunchFail} to check behavior
* Case 2. When standby is disabled the allocator issues a request for ANY_HOST resources
*
* @param processorId logical id of the container
* @param containerId last known id of the container deployed
* @param preferredHost host on which container is requested to be deployed
* @param containerAllocator allocator for requesting resources
*/
void handleContainerLaunchFail(String processorId, String containerId, String preferredHost,
ContainerAllocator containerAllocator) {
if (processorId != null && standbyContainerManager.isPresent()) {
standbyContainerManager.get().handleContainerLaunchFail(processorId, containerId, containerAllocator);
} else if (processorId != null) {
LOG.info("Falling back to ANY_HOST for Processor ID: {} since launch failed for Container ID: {} on host: {}",
processorId, containerId, preferredHost);
containerAllocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
} else {
LOG.warn("Did not find a pending Processor ID for Container ID: {} on host: {}. "
+ "Ignoring invalid/redundant notification.", containerId, preferredHost);
}
}

/**
* Handles an expired resource request for both active and standby containers. Since a preferred host cannot be obtained
* this method checks the availability of surplus ANY_HOST resources and launches the container if available. Otherwise
* issues an ANY_HOST request. Only applies to HOST_AFFINITY enabled cases
*
* @param processorId logical id of the container
* @param preferredHost host on which container is requested to be deployed
* @param request pending request for the preferred host
* @param allocator allocator for requesting resources
* @param resourceRequestState state of request in {@link ContainerAllocator}
*/
@VisibleForTesting
void handleExpiredRequestWithHostAffinityEnabled(String processorId, String preferredHost,
SamzaResourceRequest request, ContainerAllocator allocator, ResourceRequestState resourceRequestState) {
boolean resourceAvailableOnAnyHost = allocator.hasAllocatedResource(ResourceRequestState.ANY_HOST);
if (standbyContainerManager.isPresent()) {
standbyContainerManager.get()
.handleExpiredResourceRequest(processorId, request,
Optional.ofNullable(allocator.peekAllocatedResource(ResourceRequestState.ANY_HOST)), allocator,
resourceRequestState);
} else if (resourceAvailableOnAnyHost) {
LOG.info("Request for Processor ID: {} on host: {} has expired. Running on ANY_HOST", processorId, preferredHost);
allocator.runStreamProcessor(request, ResourceRequestState.ANY_HOST);
} else {
LOG.info("Request for Processor ID: {} on host: {} has expired. Requesting additional resources on ANY_HOST.",
processorId, preferredHost);
resourceRequestState.cancelResourceRequest(request);
allocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
private final ContainerAllocator containerAllocator;
private final Thread allocatorThread;

// The StandbyContainerManager manages standby-aware allocation and failover of containers
private final Optional<StandbyContainerManager> standbyContainerManager;
// The ContainerManager manages control actions for both active & standby containers
private final ContainerManager containerManager;

private final Option<DiagnosticsManager> diagnosticsManager;

Expand Down Expand Up @@ -158,14 +158,9 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri
// Wire all metrics to all reporters
this.metricsReporters.values().forEach(reporter -> reporter.register(METRICS_SOURCE_NAME, registry));

// Enable standby container manager if required
if (jobConfig.getStandbyTasksEnabled()) {
this.standbyContainerManager = Optional.of(new StandbyContainerManager(state, clusterResourceManager));
} else {
this.standbyContainerManager = Optional.empty();
}
this.containerManager = new ContainerManager(state, clusterResourceManager, jobConfig.getStandbyTasksEnabled());

this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.standbyContainerManager);
this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.containerManager);
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
LOG.info("Finished container process manager initialization.");
}
Expand All @@ -175,19 +170,20 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri
SamzaApplicationState state,
MetricsRegistryMap registry,
ClusterResourceManager resourceManager,
Optional<ContainerAllocator> allocator) {
Optional<ContainerAllocator> allocator,
ContainerManager containerManager) {
this.state = state;
this.clusterManagerConfig = clusterManagerConfig;
this.jobConfig = new JobConfig(clusterManagerConfig);

this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();

this.clusterResourceManager = resourceManager;
this.standbyContainerManager = Optional.empty();
this.containerManager = containerManager;
this.diagnosticsManager = Option.empty();
this.containerAllocator = allocator.orElseGet(
() -> new ContainerAllocator(this.clusterResourceManager, clusterManagerConfig, state,
hostAffinityEnabled, this.standbyContainerManager));
hostAffinityEnabled, this.containerManager));
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
LOG.info("Finished container process manager initialization");
}
Expand Down Expand Up @@ -427,16 +423,7 @@ public void onStreamProcessorLaunchFailure(SamzaResource resource, Throwable t)

// 3. Re-request resources on ANY_HOST in case of launch failures on the preferred host, if standby are not enabled
// otherwise calling standbyContainerManager
if (processorId != null && standbyContainerManager.isPresent()) {
standbyContainerManager.get().handleContainerLaunchFail(processorId, containerId, containerAllocator);
} else if (processorId != null) {
LOG.info("Falling back to ANY_HOST for Processor ID: {} since launch failed for Container ID: {} on host: {}",
processorId, containerId, containerHost);
containerAllocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
} else {
LOG.warn("Did not find a pending Processor ID for Container ID: {} on host: {}. " +
"Ignoring invalid/redundant notification.", containerId, containerHost);
}
containerManager.handleContainerLaunchFail(processorId, containerId, containerHost, containerAllocator);
}

/**
Expand Down Expand Up @@ -604,25 +591,25 @@ private ResourceManagerFactory getContainerProcessManagerFactory(final ClusterMa
/**
* Obtains the ID of the Samza processor pending launch on the provided resource (container).
*
* @param resourceId the ID of the resource (container)
* @return the ID of the Samza processor on this resource
* ContainerProcessManager [INFO] Container ID: container_e66_1569376389369_0221_01_000049 matched pending Processor ID: 0 on host: ltx1-app0772.stg.linkedin.com
*
* @param containerId last known id of the container deployed
* @return the logical processorId of the processor (e.g., 0, 1, 2 ...)
*/
private String getPendingProcessorId(String resourceId) {
private String getPendingProcessorId(String containerId) {
for (Map.Entry<String, SamzaResource> entry: state.pendingProcessors.entrySet()) {
if (entry.getValue().getContainerId().equals(resourceId)) {
LOG.info("Container ID: {} matched pending Processor ID: {} on host: {}", resourceId, entry.getKey(), entry.getValue().getHost());
if (entry.getValue().getContainerId().equals(containerId)) {
LOG.info("Container ID: {} matched pending Processor ID: {} on host: {}", containerId, entry.getKey(), entry.getValue().getHost());
return entry.getKey();
}
}
return null;
}

private void handleContainerStop(String processorId, String resourceID, String preferredHost, int exitStatus, Duration preferredHostRetryDelay) {
if (standbyContainerManager.isPresent()) {
standbyContainerManager.get().handleContainerStop(processorId, resourceID, preferredHost, exitStatus, containerAllocator, preferredHostRetryDelay);
} else {
// If StandbyTasks are not enabled, we simply make a request for the preferredHost
containerAllocator.requestResourceWithDelay(processorId, preferredHost, preferredHostRetryDelay);
}
/**
* Request {@link ContainerManager#handleContainerStop} to determine next step of actions for the stopped container
*/
private void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus, Duration preferredHostRetryDelay) {
containerManager.handleContainerStop(processorId, containerId, preferredHost, exitStatus, preferredHostRetryDelay, containerAllocator);
}
}

0 comments on commit ab1512f

Please sign in to comment.