Skip to content

Commit

Permalink
SAMZA-2319: [1/2] Simplify Container Allocation logic (#1152)
Browse files Browse the repository at this point in the history
* SAMZA-2319: Simplify Container Allocation logic

* Removing expiry check from ContainerAllocator for host affinity off case

* Addressing Review Adding more tests

* Removing whitespace

* Removing whitespace

* Addressing Review, updating docs
  • Loading branch information
Sanil15 authored and rmatharu-zz committed Sep 27, 2019
1 parent 6d91400 commit ba1d456
Show file tree
Hide file tree
Showing 10 changed files with 441 additions and 323 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.samza.clustermanager;

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
Expand All @@ -35,20 +36,51 @@

/**
* {@link AbstractContainerAllocator} makes requests for physical resources to the resource manager and also runs
* a processor on an allocated physical resource. Sub-classes should override the assignResourceRequests()
* method to assign resource requests according to some strategy.
* a processor on an allocated physical resource.
*
* See {@link ContainerAllocator} and {@link HostAwareContainerAllocator} for two such strategies
* <ul>
* <li>
* In case of host-affinity enabled, each request ({@link SamzaResourceRequest} contains a processorId which
* identifies the processor the request is for and a "preferredHost" which is determined by the locality mappings
* in the coordinator stream
* </li>
* <li>
* This thread periodically matches outstanding resource requests with allocated resources.
* Its period is controlled using the {@code allocatorSleepIntervalMs} parameter
* </li>
* <li>
* When host-affinity is enabled, the resource-request's preferredHost param is set to the host the processor
* was last seen on
* </li>
* <li>
* When host-affinity is disabled, the resource-request's preferredHost param is set to {@link ResourceRequestState#ANY_HOST}
* </li>
* <li>
* When host-affinity is enabled and a preferred resource has not been obtained after {@code requestExpiryTimeout}
* milliseconds of the request being made, the resource is declared expired. The expired request are handled by
* allocating them to *ANY* allocated resource if available. If no surplus resources are available the current preferred
* resource-request is cancelled and resource-request for ANY_HOST is issued
* </li>
* <li>
* When host-affinity is not enabled, this periodically wakes up to assign a processor to *ANY* allocated resource.
* If there aren't enough resources, it waits by sleeping for {@code allocatorSleepIntervalMs} milliseconds.
* </li>
* </ul>
*
* This class is not thread-safe. This class is used in the refactored code path as called by run-jc.sh
*/
public abstract class AbstractContainerAllocator implements Runnable {
public class AbstractContainerAllocator implements Runnable {

private static final Logger log = LoggerFactory.getLogger(AbstractContainerAllocator.class);
private static final Logger LOG = LoggerFactory.getLogger(AbstractContainerAllocator.class);

/* State that controls the lifecycle of the allocator thread*/
/* State that controls the lifecycle of the allocator thread */
private volatile boolean isRunning = true;

/**
* Flag for affine host requests
*/
private final boolean hostAffinityEnabled;

/**
* Config and derived config objects
*/
Expand Down Expand Up @@ -83,22 +115,32 @@ public abstract class AbstractContainerAllocator implements Runnable {
* ResourceRequestState indicates the state of all unfulfilled and allocated container requests
*/
protected final ResourceRequestState resourceRequestState;
/**
* Tracks the expiration of a request for resources.
*/
private final int requestExpiryTimeout;

public AbstractContainerAllocator(ClusterResourceManager containerProcessManager,
ResourceRequestState resourceRequestState,
private final Optional<StandbyContainerManager> standbyContainerManager;

public AbstractContainerAllocator(ClusterResourceManager clusterResourceManager,
Config config,
SamzaApplicationState state,
ClassLoader pluginClassLoader) {
ClassLoader pluginClassLoader,
boolean hostAffinityEnabled,
Optional<StandbyContainerManager> standbyContainerManager) {
ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
this.clusterResourceManager = containerProcessManager;
this.clusterResourceManager = clusterResourceManager;
this.allocatorSleepIntervalMs = clusterManagerConfig.getAllocatorSleepTime();
this.resourceRequestState = resourceRequestState;
this.resourceRequestState = new ResourceRequestState(hostAffinityEnabled, this.clusterResourceManager);
this.containerMemoryMb = clusterManagerConfig.getContainerMemoryMb();
this.containerNumCpuCores = clusterManagerConfig.getNumCores();
this.taskConfig = new TaskConfig(config);
this.state = state;
this.config = config;
this.pluginClassLoader = pluginClassLoader;
this.hostAffinityEnabled = hostAffinityEnabled;
this.standbyContainerManager = standbyContainerManager;
this.requestExpiryTimeout = clusterManagerConfig.getContainerRequestTimeout();
}

/**
Expand All @@ -121,18 +163,104 @@ public void run() {

Thread.sleep(allocatorSleepIntervalMs);
} catch (InterruptedException e) {
log.warn("Got InterruptedException in AllocatorThread.", e);
LOG.warn("Got InterruptedException in AllocatorThread.", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("Got unknown Exception in AllocatorThread.", e);
LOG.error("Got unknown Exception in AllocatorThread.", e);
}
}
}

/**
* Assigns resources received from the cluster manager to processors.
*
* During the run() method, the thread sleeps for allocatorSleepIntervalMs ms. It then invokes assignResourceRequests,
* and tries to allocate any unsatisfied request that is still in the request queue {@link ResourceRequestState})
* with allocated resources.
*
* When host-affinity is disabled, all allocated resources are buffered by the key "ANY_HOST".
* When host-affinity is enabled, all allocated resources are buffered by the hostName as key
*
* If the requested host is not available, the thread checks to see if the request has expired. If it has expired
* then two cases are handled separately
*
* Case 1: host-affinity is enabled, looks for allocated resouces on ANY_HOST and issues a container start if available,
* otherwise issues an ANY_HOST request
* Case 2: host-affinity is disabled, expired requests are not handled, allocator waits for cluster manager to issue
* resources
* TODO: SAMZA-2330 Hadle expired request for host affinity disabled case
*
* When host-affinity is enabled and a {@code StandbyContainerManager} is present, the allocator transfers the request
* to it for checking StandByConstraints before launcing a processor
*/
protected abstract void assignResourceRequests();
void assignResourceRequests() {
while (hasReadyPendingRequest()) {
SamzaResourceRequest request = peekReadyPendingRequest().get();
String processorId = request.getProcessorId();
String preferredHost = hostAffinityEnabled ? request.getPreferredHost() : ResourceRequestState.ANY_HOST;
Instant requestCreationTime = request.getRequestTimestamp();

LOG.info("Handling assignment request for Processor ID: {} on host: {}.", processorId, preferredHost);
if (hasAllocatedResource(preferredHost)) {

// Found allocated container on preferredHost
LOG.info("Found an available container for Processor ID: {} on the host: {}", processorId, preferredHost);

// Needs to be only updated when host affinity is enabled
if (hostAffinityEnabled) {
state.matchedResourceRequests.incrementAndGet();
}

// If hot-standby is enabled, check standby constraints are met before launching a processor
if (this.standbyContainerManager.isPresent()) {
checkStandByContrainsAndRunStreamProcessor(request, preferredHost);
} else {
runStreamProcessor(request, preferredHost);
}

} else {

LOG.info("Did not find any allocated containers for running Processor ID: {} on the host: {}.",
processorId, preferredHost);
boolean expired = isRequestExpired(request);

if (expired) {
updateExpiryMetrics(request);
if (hostAffinityEnabled) {
handleExpiredRequestWithHostAffinityEnabled(processorId, preferredHost, request);
}
} else {
LOG.info("Request for Processor ID: {} on preferred host {} has not expired yet."
+ "Request creation time: {}. Current Time: {}. Request timeout: {} ms", processorId, preferredHost,
requestCreationTime, System.currentTimeMillis(), requestExpiryTimeout);
break;
}
}
}
}

/**
* Handles an expired resource request when {@code hostAffinityEnabled} is true, in this case since the
* preferred host, we try to see if a surplus ANY_HOST is available in the request queue.
*/
@VisibleForTesting
void handleExpiredRequestWithHostAffinityEnabled(String processorId, String preferredHost,
SamzaResourceRequest request) {
boolean resourceAvailableOnAnyHost = hasAllocatedResource(ResourceRequestState.ANY_HOST);
if (standbyContainerManager.isPresent()) {
standbyContainerManager.get()
.handleExpiredResourceRequest(processorId, request,
Optional.ofNullable(peekAllocatedResource(ResourceRequestState.ANY_HOST)), this, resourceRequestState);
} else if (resourceAvailableOnAnyHost) {
LOG.info("Request for Processor ID: {} on host: {} has expired. Running on ANY_HOST", processorId, preferredHost);
runStreamProcessor(request, ResourceRequestState.ANY_HOST);
} else {
LOG.info("Request for Processor ID: {} on host: {} has expired. Requesting additional resources on ANY_HOST.",
processorId, preferredHost);
resourceRequestState.cancelResourceRequest(request);
requestResource(processorId, ResourceRequestState.ANY_HOST);
}
}

/**
* Updates the request state and runs a processor on the specified host. Assumes a resource
Expand All @@ -156,7 +284,7 @@ protected void runStreamProcessor(SamzaResourceRequest request, String preferred
String processorId = request.getProcessorId();

// Run processor on resource
log.info("Found Container ID: {} for Processor ID: {} on host: {} for request creation time: {}.",
LOG.info("Found Container ID: {} for Processor ID: {} on host: {} for request creation time: {}.",
resource.getContainerId(), processorId, preferredHost, request.getRequestTimestamp());

// Update processor state as "pending" and then issue a request to launch it. It's important to perform the state-update
Expand All @@ -168,6 +296,16 @@ protected void runStreamProcessor(SamzaResourceRequest request, String preferred
clusterResourceManager.launchStreamProcessor(resource, builder);
}

/**
* If {@code StandbyContainerManager} is present check standBy constraints are met before attempting to launch
* @param request outstanding request which has an allocated resource
* @param preferredHost to run the request
*/
private void checkStandByContrainsAndRunStreamProcessor(SamzaResourceRequest request, String preferredHost) {
standbyContainerManager.get().checkStandbyConstraintsAndRunStreamProcessor(request, preferredHost,
peekAllocatedResource(preferredHost), this, resourceRequestState);
}

/**
* Called during initial request for resources
*
Expand All @@ -178,7 +316,19 @@ protected void runStreamProcessor(SamzaResourceRequest request, String preferred
* - when host-affinity is enabled and job is run for the first time
* - when the number of containers has been increased.
*/
public abstract void requestResources(Map<String, String> processorToHostMapping);
public void requestResources(Map<String, String> processorToHostMapping) {
for (Map.Entry<String, String> entry : processorToHostMapping.entrySet()) {
String processorId = entry.getKey();
String preferredHost = entry.getValue();
if (!hostAffinityEnabled) {
preferredHost = ResourceRequestState.ANY_HOST;
} else if (preferredHost == null) {
LOG.info("No preferred host mapping found for Processor ID: {}. Requesting resource on ANY_HOST", processorId);
preferredHost = ResourceRequestState.ANY_HOST;
}
requestResource(processorId, preferredHost);
}
}

/**
* Checks if this allocator has a pending resource request with a request timestamp equal to or earlier than the current
Expand Down Expand Up @@ -309,4 +459,29 @@ public final void releaseResource(String containerId) {
public void stop() {
isRunning = false;
}


/**
* Checks if a request has expired.
* @param request the request to check
* @return true if request has expired
*/
private boolean isRequestExpired(SamzaResourceRequest request) {
long currTime = Instant.now().toEpochMilli();
boolean requestExpired = currTime - request.getRequestTimestamp().toEpochMilli() > requestExpiryTimeout;
if (requestExpired) {
LOG.info("Request for Processor ID: {} on host: {} with creation time: {} has expired at current time: {} after timeout: {} ms.",
request.getProcessorId(), request.getPreferredHost(), request.getRequestTimestamp(), currTime, requestExpiryTimeout);
}
return requestExpired;
}

private void updateExpiryMetrics(SamzaResourceRequest request) {
String preferredHost = request.getPreferredHost();
if (ResourceRequestState.ANY_HOST.equals(preferredHost)) {
state.expiredAnyHostRequests.incrementAndGet();
} else {
state.expiredPreferredHostRequests.incrementAndGet();
}
}
}

This file was deleted.

0 comments on commit ba1d456

Please sign in to comment.