From 7412a633ecde23102dce6675afeb955dd7843860 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Tue, 14 Apr 2015 18:52:26 +0900 Subject: [PATCH] TAJO-1551 Reuse allocated resources in next execution block if possible --- .../apache/tajo/master/ContainerProxy.java | 4 + .../tajo/master/LaunchTaskRunnersEvent.java | 3 +- .../tajo/master/QueryCoordinatorService.java | 8 +- .../tajo/master/TaskRunnerGroupEvent.java | 7 +- .../event/ContainerAllocationEvent.java | 15 +- .../GrouppedContainerAllocatorEvent.java | 4 +- .../event/StageContainerAllocationEvent.java | 7 +- .../apache/tajo/master/rm/TajoRMContext.java | 6 +- .../master/rm/TajoWorkerResourceManager.java | 46 ++-- .../tajo/master/rm/WorkerResourceManager.java | 4 +- .../querymaster/DefaultTaskScheduler.java | 29 ++- .../tajo/querymaster/QueryMasterTask.java | 2 +- .../org/apache/tajo/querymaster/Stage.java | 24 +- .../worker/AbstractResourceAllocator.java | 55 +++- .../apache/tajo/worker/AllocatedResource.java | 74 ++++++ .../org/apache/tajo/worker/Resources.java | 89 +++++++ .../TajoContainerProxy.java | 61 +---- .../tajo/worker/TajoResourceAllocator.java | 244 ++++++++++-------- .../rm => worker}/TajoWorkerContainer.java | 22 +- .../main/proto/QueryCoordinatorProtocol.proto | 5 +- .../master/rm/TestTajoResourceManager.java | 33 +-- 21 files changed, 469 insertions(+), 273 deletions(-) create mode 100644 tajo-core/src/main/java/org/apache/tajo/worker/AllocatedResource.java create mode 100644 tajo-core/src/main/java/org/apache/tajo/worker/Resources.java rename tajo-core/src/main/java/org/apache/tajo/{master => worker}/TajoContainerProxy.java (68%) rename tajo-core/src/main/java/org/apache/tajo/{master/rm => worker}/TajoWorkerContainer.java (83%) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java index cad63a0d6d..da4104ed0d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java @@ -67,6 +67,10 @@ public synchronized boolean isCompletelyDone() { return state == ContainerState.DONE || state == ContainerState.FAILED; } + public TajoContainer getContainer() { + return container; + } + public String getTaskHostName() { return this.hostName; } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java index e620afa161..c92b7508e6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java @@ -21,6 +21,7 @@ import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.container.TajoContainer; +import org.apache.tajo.worker.TajoWorkerContainer; import java.util.Collection; @@ -29,7 +30,7 @@ public class LaunchTaskRunnersEvent extends TaskRunnerGroupEvent { private final String planJson; public LaunchTaskRunnersEvent(ExecutionBlockId executionBlockId, - Collection containers, QueryContext queryContext, + Collection containers, QueryContext queryContext, String planJson) { super(EventType.CONTAINER_REMOTE_LAUNCH, executionBlockId, containers); this.queryContext = queryContext; diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java index 1b1d49e499..173b9cc633 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java @@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.service.AbstractService; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; import org.apache.tajo.master.cluster.WorkerConnectionInfo; @@ -37,7 +36,6 @@ import java.net.InetSocketAddress; import java.util.Collection; -import java.util.List; public class QueryCoordinatorService extends AbstractService { private final static Log LOG = LogFactory.getLog(QueryCoordinatorService.class); @@ -128,10 +126,8 @@ public void allocateWorkerResources( @Override public void releaseWorkerResource(RpcController controller, WorkerResourceReleaseRequest request, RpcCallback done) { - List containerIds = request.getContainerIdsList(); - - for(ContainerProtocol.TajoContainerIdProto eachContainer: containerIds) { - context.getResourceManager().releaseWorkerResource(eachContainer); + for (Integer integer: request.getResourceIdsList()) { + context.getResourceManager().releaseWorkerResource(integer); } done.run(BOOL_TRUE); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java index c1c6522e68..4717e55f2a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java @@ -22,6 +22,7 @@ import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.master.TaskRunnerGroupEvent.EventType; import org.apache.tajo.master.container.TajoContainer; +import org.apache.tajo.worker.TajoWorkerContainer; import java.util.Collection; @@ -32,16 +33,16 @@ public enum EventType { } protected final ExecutionBlockId executionBlockId; - protected final Collection containers; + protected final Collection containers; public TaskRunnerGroupEvent(EventType eventType, ExecutionBlockId executionBlockId, - Collection containers) { + Collection containers) { super(eventType); this.executionBlockId = executionBlockId; this.containers = containers; } - public Collection getContainers() { + public Collection getContainers() { return containers; } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java index c3a9a5981c..b9a06949ce 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java @@ -19,15 +19,15 @@ package org.apache.tajo.master.event; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.worker.Resources; public class ContainerAllocationEvent extends AbstractEvent { private final ExecutionBlockId executionBlockId; private final Priority priority; - private final Resource resource; + private final Resources resource; private final boolean isLeafQuery; private final int requiredNum; private final float progress; @@ -35,7 +35,7 @@ public class ContainerAllocationEvent extends AbstractEvent requestMap, boolean isLeafQuery, float progress) { super(eventType, executionBlockId, priority, diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java index 0d29e4467c..cb713d3a8d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java @@ -20,19 +20,20 @@ import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.master.container.TajoContainer; +import org.apache.tajo.worker.TajoWorkerContainer; import java.util.List; public class StageContainerAllocationEvent extends StageEvent { - private List allocatedContainer; + private List allocatedContainer; public StageContainerAllocationEvent(final ExecutionBlockId id, - List allocatedContainer) { + List allocatedContainer) { super(id, StageEventType.SQ_CONTAINER_ALLOCATED); this.allocatedContainer = allocatedContainer; } - public List getAllocatedContainer() { + public List getAllocatedContainer() { return this.allocatedContainer; } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java index bb8cc126b7..320aae60b3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java @@ -21,7 +21,6 @@ import com.google.common.collect.Maps; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.tajo.QueryId; -import org.apache.tajo.ipc.ContainerProtocol; import java.util.Collections; import java.util.Set; @@ -42,8 +41,7 @@ public class TajoRMContext { private final ConcurrentMap inactiveWorkers = Maps.newConcurrentMap(); /** map between queryIds and query master ContainerId */ - private final ConcurrentMap qmContainerMap = Maps - .newConcurrentMap(); + private final ConcurrentMap qmContainerMap = Maps.newConcurrentMap(); private final Set liveQueryMasterWorkerResources = Collections.newSetFromMap(new ConcurrentHashMap()); @@ -77,7 +75,7 @@ public ConcurrentMap getInactiveWorkers() { * * @return The Map for query master containers */ - public ConcurrentMap getQueryMasterContainer() { + public ConcurrentMap getQueryMasterContainer() { return qmContainerMap; } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java index 90a4eb505a..99d682854b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java @@ -33,13 +33,11 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.QueryId; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; import org.apache.tajo.master.QueryInProgress; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.rpc.CancelableRpcCallback; import org.apache.tajo.rpc.RpcUtils; -import org.apache.tajo.util.ApplicationIdUtils; import org.apache.tajo.util.BasicFuture; import java.io.IOException; @@ -56,7 +54,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke /** class logger */ private static final Log LOG = LogFactory.getLog(TajoWorkerResourceManager.class); - static AtomicInteger containerIdSeq = new AtomicInteger(0); + static AtomicInteger resourceIdSeq = new AtomicInteger(0); private TajoMaster.MasterContext masterContext; @@ -80,7 +78,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke private TajoConf systemConf; - private ConcurrentMap allocatedResourceMap = Maps + private ConcurrentMap allocatedResourceMap = Maps .newConcurrentMap(); /** It receives status messages from workers and their resources. */ @@ -220,7 +218,7 @@ protected void cancel(WorkerResourceAllocationResponse canceled) { if (canceled != null && !canceled.getWorkerAllocatedResourceList().isEmpty()) { LOG.info("Canceling resources allocated"); WorkerAllocatedResource resource = canceled.getWorkerAllocatedResource(0); - releaseWorkerResource(resource.getContainerId()); + releaseWorkerResource(resource.getResourceId()); } } }; @@ -243,12 +241,12 @@ protected void cancel(WorkerResourceAllocationResponse canceled) { } WorkerAllocatedResource resource = response.getWorkerAllocatedResource(0); - registerQueryMaster(queryInProgress.getQueryId(), resource.getContainerId()); + registerQueryMaster(queryInProgress.getQueryId(), resource.getResourceId()); return resource; } - private void registerQueryMaster(QueryId queryId, ContainerProtocol.TajoContainerIdProto containerId) { - rmContext.getQueryMasterContainer().putIfAbsent(queryId, containerId); + private void registerQueryMaster(QueryId queryId, int resourceId) { + rmContext.getQueryMasterContainer().putIfAbsent(queryId, resourceId); } @Override @@ -328,22 +326,15 @@ public void run() { NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getConnectionInfo().getHost(), allocatedResource.worker.getConnectionInfo().getPeerRpcPort()); - TajoWorkerContainerId containerId = new TajoWorkerContainerId(); - - containerId.setApplicationAttemptId( - ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId)); - containerId.setId(containerIdSeq.incrementAndGet()); - - ContainerProtocol.TajoContainerIdProto containerIdProto = containerId.getProto(); + int resourceId = resourceIdSeq.incrementAndGet(); allocatedResources.add(WorkerAllocatedResource.newBuilder() - .setContainerId(containerIdProto) + .setResourceId(resourceId) .setConnectionInfo(allocatedResource.worker.getConnectionInfo().getProto()) .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB) .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots) - .build()); - + .build()); - allocatedResourceMap.putIfAbsent(containerIdProto, allocatedResource); + allocatedResourceMap.putIfAbsent(resourceId, allocatedResource); } resourceRequest.callBack.run(WorkerResourceAllocationResponse.newBuilder() @@ -567,17 +558,16 @@ private List chooseWorkers(WorkerResourceRequest resour /** * Release allocated resource. * - * @param containerId ContainerIdProto to be released + * @param resourceId ContainerIdProto to be released */ @Override - public void releaseWorkerResource(ContainerProtocol.TajoContainerIdProto containerId) { - AllocatedWorkerResource allocated = allocatedResourceMap.remove(containerId); - if(allocated != null) { + public void releaseWorkerResource(int resourceId) { + AllocatedWorkerResource allocated = allocatedResourceMap.remove(resourceId); + if (allocated != null) { LOG.info("Release Resource: " + allocated.allocatedDiskSlots + "," + allocated.allocatedMemoryMB); allocated.worker.getResource().releaseResource( allocated.allocatedDiskSlots, allocated.allocatedMemoryMB); } else { - LOG.warn("No AllocatedWorkerResource data for [" + containerId + "]"); - return; + LOG.debug("No AllocatedWorkerResource data for [" + resourceId + "]"); } } @@ -592,8 +582,10 @@ public void releaseQueryMaster(QueryId queryId) { LOG.warn("No QueryMaster resource info for " + queryId); return; } else { - ContainerProtocol.TajoContainerIdProto containerId = rmContext.getQueryMasterContainer().remove(queryId); - releaseWorkerResource(containerId); + Integer resourceId = rmContext.getQueryMasterContainer().remove(queryId); + if (resourceId != null) { + releaseWorkerResource(resourceId); + } rmContext.getStoppedQueryIds().add(queryId); LOG.info(String.format("Released QueryMaster (%s) resource." , queryId.toString())); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java index 3d5e062bef..8b69ceb306 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java @@ -60,9 +60,9 @@ public void allocateWorkerResources(WorkerResourceAllocationRequest request, /** * Release a container * - * @param containerId ContainerIdProto to be released + * @param resourceId ContainerIdProto to be released */ - public void releaseWorkerResource(ContainerProtocol.TajoContainerIdProto containerId); + public void releaseWorkerResource(int resourceId); public String getSeedQueryId() throws IOException; diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index 351856fde9..1d2c68e042 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -46,6 +46,8 @@ import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.NetUtils; +import org.apache.tajo.worker.AbstractResourceAllocator; +import org.apache.tajo.worker.AllocatedResource; import org.apache.tajo.worker.FetchImpl; import java.util.*; @@ -754,19 +756,21 @@ public void assignToLeafTasks(LinkedList taskRequests) { if(taskRequest == null) { // if there are only remote task requests taskRequest = remoteTaskRequests.pollFirst(); } + AbstractResourceAllocator allocator = context.getMasterContext().getResourceAllocator(); // checking if this container is still alive. // If not, ignore the task request and stop the task runner - ContainerProxy container = context.getMasterContext().getResourceAllocator() - .getContainer(taskRequest.getContainerId()); + ContainerProxy container = allocator.getContainer(taskRequest.getContainerId()); if(container == null) { taskRequest.getCallback().run(stopTaskRunnerReq); continue; } // getting the hostname of requested node - WorkerConnectionInfo connectionInfo = - context.getMasterContext().getResourceAllocator().getWorkerConnectionInfo(taskRequest.getWorkerId()); + AllocatedResource resource = (AllocatedResource)container.getContainer().getResource(); + TajoContainerId containerId = container.getContainer().getId(); + WorkerConnectionInfo connectionInfo = resource.getConnectionInfo(); + String host = connectionInfo.getHost(); // if there are no worker matched to the hostname a task request @@ -783,7 +787,6 @@ public void assignToLeafTasks(LinkedList taskRequests) { } } - TajoContainerId containerId = taskRequest.getContainerId(); LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," + "containerId=" + containerId); @@ -880,6 +883,16 @@ public void assignToNonLeafTasks(LinkedList taskRequests) { taskRequest = taskRequests.pollFirst(); LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId()); + TajoContainerId containerId = taskRequest.getContainerId(); + AbstractResourceAllocator allocator = context.getMasterContext().getResourceAllocator(); + + // checking if this container is still alive. + // If not, ignore the task request and stop the task runner + ContainerProxy container = allocator.getContainer(taskRequest.getContainerId()); + if (container == null) { + taskRequest.getCallback().run(stopTaskRunnerReq); + break; + } TaskAttemptId attemptId; // random allocation if (nonLeafTasks.size() > 0) { @@ -912,10 +925,10 @@ public void assignToNonLeafTasks(LinkedList taskRequests) { } } - WorkerConnectionInfo connectionInfo = context.getMasterContext().getResourceAllocator(). - getWorkerConnectionInfo(taskRequest.getWorkerId()); + AllocatedResource resource = (AllocatedResource)container.getContainer().getResource(); + context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId, - taskRequest.getContainerId(), connectionInfo)); + containerId, resource.getConnectionInfo())); taskRequest.getCallback().run(taskAssign.getProto()); totalAssigned++; scheduledObjectNum--; diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index 465fa8494f..289afb4c52 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -42,7 +42,7 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.TajoContainerProxy; +import org.apache.tajo.worker.TajoContainerProxy; import org.apache.tajo.master.event.*; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.plan.LogicalOptimizer; diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index d82d078fe6..5c714d5e32 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -24,7 +24,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.*; @@ -53,7 +52,6 @@ import org.apache.tajo.master.TaskRunnerGroupEvent; import org.apache.tajo.master.TaskRunnerGroupEvent.EventType; import org.apache.tajo.master.TaskState; -import org.apache.tajo.master.container.TajoContainer; import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; @@ -67,7 +65,10 @@ import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.history.StageHistory; import org.apache.tajo.util.history.TaskHistory; +import org.apache.tajo.worker.AbstractResourceAllocator; import org.apache.tajo.worker.FetchImpl; +import org.apache.tajo.worker.Resources; +import org.apache.tajo.worker.TajoWorkerContainer; import java.io.IOException; import java.util.*; @@ -108,8 +109,8 @@ public class Stage implements EventHandler { private Thread timeoutChecker; volatile Map tasks = new ConcurrentHashMap(); - volatile Map containers = new ConcurrentHashMap(); + volatile Map containers = + new ConcurrentHashMap(); private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition(); private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); @@ -1052,15 +1053,20 @@ public static void allocateContainers(Stage stage) { //TODO consider disk slot int requiredMemoryMBPerTask = 512; - int numRequest = stage.getContext().getResourceAllocator().calculateNumRequestContainers( - stage.getContext().getQueryMasterContext().getWorkerContext(), + AbstractResourceAllocator allocator = stage.getContext().getResourceAllocator(); + QueryMaster.QueryMasterContext masterContext = stage.getContext().getQueryMasterContext(); + int numRequest = allocator.calculateNumRequestContainers( + masterContext.getWorkerContext(), stage.schedulerContext.getEstimatedTaskNum(), requiredMemoryMBPerTask ); - final Resource resource = Records.newRecord(Resource.class); + //TODO consider task's resource usage pattern + TajoConf tajoConf = masterContext.getConf(); + int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY); + float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK); - resource.setMemory(requiredMemoryMBPerTask); + Resources resource = new Resources(1, requiredMemoryMB, requiredDiskSlots); LOG.info("Request Container for " + stage.getId() + " containers=" + numRequest); @@ -1167,7 +1173,7 @@ public void transition(Stage stage, StageEvent event) { try { StageContainerAllocationEvent allocationEvent = (StageContainerAllocationEvent) event; - for (TajoContainer container : allocationEvent.getAllocatedContainer()) { + for (TajoWorkerContainer container : allocationEvent.getAllocatedContainer()) { TajoContainerId cId = container.getId(); if (stage.containers.containsKey(cId)) { stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java index 68c57f2603..01d5f931c8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java @@ -18,27 +18,64 @@ package org.apache.tajo.worker; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import org.apache.hadoop.service.CompositeService; import org.apache.tajo.master.ContainerProxy; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.container.TajoContainerId; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentMap; public abstract class AbstractResourceAllocator extends CompositeService implements ResourceAllocator { - /** - * A key is worker id, and a value is a worker connection information. - */ - protected ConcurrentMap workerInfoMap = Maps.newConcurrentMap(); - public WorkerConnectionInfo getWorkerConnectionInfo(int workerId) { - return workerInfoMap.get(workerId); + // resource-id to resource + private final Map allocated = new HashMap(); + + public synchronized void addAllocatedResource(AllocatedResource resource) { + allocated.put(resource.getResourceId(), resource); + } + + public synchronized AllocatedResource getAllocatedResource(int resourceId) { + return allocated.get(resourceId); + } + + public WorkerConnectionInfo getWorkerConnectionInfo(int resourceId) { + return getAllocatedResource(resourceId).getConnectionInfo(); + } + + public synchronized List removeFreeResources() { + List result = new ArrayList(); + for (AllocatedResource resource : allocated.values()) { + if (resource.acquire()) { + result.add(resource); + } + } + for (AllocatedResource free : result) { + allocated.remove(free.getConnectionInfo().getId()); + } + return result; + } + + public synchronized void removeResources(List resources) { + for (AllocatedResource resource : resources) { + allocated.remove(resource.getConnectionInfo().getId()); + } } - public void addWorkerConnectionInfo(WorkerConnectionInfo connectionInfo) { - workerInfoMap.putIfAbsent(connectionInfo.getId(), connectionInfo); + public synchronized List allocatedResources(Resources required, int count) { + Predicate predicate = AllocatedResource.getMinimum(required); + List result = new ArrayList(count); + for (AllocatedResource resource : Iterables.filter(allocated.values(), predicate)) { + if (resource.acquire()) { + result.add(resource); + } + } + return result; } private Map containers = Maps.newConcurrentMap(); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/AllocatedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/AllocatedResource.java new file mode 100644 index 0000000000..271768f295 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/AllocatedResource.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.base.Predicate; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class AllocatedResource extends Resources { + + private final int resourceId; + + /** Worker connection information */ + private final WorkerConnectionInfo connectionInfo; + + private final AtomicBoolean occupied = new AtomicBoolean(false); + + public AllocatedResource(int resourceId, int cpuCoreSlots, int memoryMB, float diskSlots, WorkerConnectionInfo connectionInfo) { + super(cpuCoreSlots, memoryMB, diskSlots); + this.resourceId = resourceId; + this.connectionInfo = connectionInfo; + } + + public int getResourceId() { + return resourceId; + } + + public WorkerConnectionInfo getConnectionInfo() { + return connectionInfo; + } + + public synchronized boolean isOccupied() { + return occupied.get(); + } + + public synchronized boolean release() { + return occupied.compareAndSet(true, false); + } + + public synchronized boolean acquire() { + return occupied.compareAndSet(false, true); + } + + public String toString() { + return connectionInfo.getId() + "::" + super.toString(); + } + + public static Predicate getMinimum(final Resources required) { + final Predicate predicate = Resources.getMinimum(required); + return new Predicate() { + @Override + public boolean apply(AllocatedResource input) { + return !input.isOccupied() && predicate.apply(input); + } + }; + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Resources.java b/tajo-core/src/main/java/org/apache/tajo/worker/Resources.java new file mode 100644 index 0000000000..c688936cea --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Resources.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.base.Predicate; +import com.google.common.primitives.Floats; +import com.google.common.primitives.Ints; +import org.apache.hadoop.yarn.api.records.Resource; + +public class Resources extends Resource { + // todo: there should be min-max + protected final int cpuCoreSlots; + protected final int memoryMB; + protected final float diskSlots; + + public Resources(int cpuCoreSlots, int memoryMB, float diskSlots) { + this.memoryMB = memoryMB; + this.cpuCoreSlots = cpuCoreSlots; + this.diskSlots = diskSlots; + } + + public float getDiskSlots() { + return diskSlots; + } + + @Override + public int getMemory() { + return memoryMB; + } + + @Override + public void setMemory(int i) { + throw new UnsupportedOperationException("setMemory"); + } + + @Override + public int getVirtualCores() { + return cpuCoreSlots; + } + + @Override + public void setVirtualCores(int i) { + throw new UnsupportedOperationException("setVirtualCores"); + } + + @Override + public String toString() { + return "CPU=" + cpuCoreSlots + ", MEMORY=" + memoryMB + "MB, DISK=" + diskSlots; + } + + public static Predicate getMinimum(final T required) { + return new Predicate() { + @Override + public boolean apply(T input) { + return required.cpuCoreSlots >= input.cpuCoreSlots && + required.memoryMB >= input.memoryMB && + required.diskSlots >= input.diskSlots; + } + }; + } + + @Override + public int compareTo(Resource o) { + int compare = Ints.compare(cpuCoreSlots, o.getVirtualCores()); + if (compare == 0) { + compare = Ints.compare(memoryMB, o.getMemory()); + } + if (compare == 0 && o instanceof Resources) { + compare = Floats.compare(diskSlots, ((Resources)o).getDiskSlots()); + } + return compare; + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoContainerProxy.java similarity index 68% rename from tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java rename to tajo-core/src/main/java/org/apache/tajo/worker/TajoContainerProxy.java index 2aac00561b..323ff7a186 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoContainerProxy.java @@ -16,34 +16,24 @@ * limitations under the License. */ -package org.apache.tajo.master; +package org.apache.tajo.worker; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.master.ContainerProxy; import org.apache.tajo.master.container.TajoContainer; -import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.event.TaskFatalErrorEvent; -import org.apache.tajo.master.rm.TajoWorkerContainer; -import org.apache.tajo.master.rm.TajoWorkerContainerId; import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcClientManager; -import org.apache.tajo.service.ServiceTracker; -import org.apache.tajo.worker.TajoWorker; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; public class TajoContainerProxy extends ContainerProxy { private final QueryContext queryContext; @@ -64,7 +54,7 @@ public synchronized void launch(ContainerLaunchContext containerLaunchContext) { context.getResourceAllocator().addContainer(containerId, this); this.hostName = container.getNodeId().getHost(); - this.port = ((TajoWorkerContainer)container).getWorkerResource().getConnectionInfo().getPullServerPort(); + this.port = ((TajoWorkerContainer)container).getResource().getConnectionInfo().getPullServerPort(); this.state = ContainerState.RUNNING; if (LOG.isDebugEnabled()) { @@ -131,47 +121,12 @@ public synchronized void stopContainer() { LOG.info("Container already stopped:" + containerId); return; } - if(this.state == ContainerState.PREP) { - this.state = ContainerState.KILLED_BEFORE_LAUNCH; + if (state == ContainerState.PREP) { + state = ContainerState.KILLED_BEFORE_LAUNCH; } else { - try { - releaseWorkerResource(context, executionBlockId, Arrays.asList(containerId)); - context.getResourceAllocator().removeContainer(containerId); - } catch (Throwable t) { - // ignore the cleanup failure - String message = "cleanup failed for container " - + this.containerId + " : " - + StringUtils.stringifyException(t); - LOG.warn(message); - } finally { - this.state = ContainerState.DONE; - } + ((TajoWorkerContainer)container).getResource().release(); + context.getResourceAllocator().removeContainer(containerId); + state = ContainerState.DONE; } } - - public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context, - ExecutionBlockId executionBlockId, - List containerIds) throws Exception { - List containerIdProtos = - new ArrayList(); - - for(TajoContainerId eachContainerId: containerIds) { - containerIdProtos.add(TajoWorkerContainerId.getContainerIdProto(eachContainerId)); - } - - RpcClientManager manager = RpcClientManager.getInstance(); - NettyClientBase tmClient = null; - - ServiceTracker serviceTracker = context.getQueryMasterContext().getWorkerContext().getServiceTracker(); - tmClient = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); - - QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); - masterClientService.releaseWorkerResource(null, - QueryCoordinatorProtocol.WorkerResourceReleaseRequest.newBuilder() - .setExecutionBlockId(executionBlockId.getProto()) - .addAllContainerIds(containerIdProtos) - .build(), - NullCallback.get()); - - } } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index 05dd1a940b..d3d3b61c18 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -30,7 +30,9 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryId; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.utils.ThreadUtil; import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; @@ -42,10 +44,7 @@ import org.apache.tajo.master.event.ContainerAllocationEvent; import org.apache.tajo.master.event.ContainerAllocatorEventType; import org.apache.tajo.master.event.StageContainerAllocationEvent; -import org.apache.tajo.master.rm.TajoWorkerContainer; import org.apache.tajo.master.rm.TajoWorkerContainerId; -import org.apache.tajo.master.rm.Worker; -import org.apache.tajo.master.rm.WorkerResource; import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.querymaster.Stage; import org.apache.tajo.querymaster.StageState; @@ -61,6 +60,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; public class TajoResourceAllocator extends AbstractResourceAllocator { private static final Log LOG = LogFactory.getLog(TajoResourceAllocator.class); @@ -70,7 +70,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { private final ExecutorService allocationExecutor; private final Deallocator deallocator; - private AtomicBoolean stopped = new AtomicBoolean(false); + private final AtomicBoolean stopped = new AtomicBoolean(false); public TajoResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) { this.queryTaskContext = queryTaskContext; @@ -102,7 +102,7 @@ public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask; clusterSlots = Math.max(1, clusterSlots - 1); // reserve query master slot LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks + - ", Number of Cluster Slots=" + clusterSlots); + ", Number of Cluster Slots=" + clusterSlots); return Math.min(numTasks, clusterSlots); } @@ -142,7 +142,6 @@ public synchronized void stop() { } } - workerInfoMap.clear(); super.stop(); } @@ -177,14 +176,22 @@ private void launchTaskRunners(LaunchTaskRunnersEvent event) { } } + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + releaseResources(removeFreeResources()); + // todo do something + } + public void stopExecutionBlock(final ExecutionBlockId executionBlockId, - Collection containers) { - Set workers = Sets.newHashSet(); - for (TajoContainer container : containers){ - workers.add(container.getNodeId()); + Collection containers) { + List resources = Lists.newArrayList(); + for (TajoWorkerContainer container : containers) { + resources.add(container.getResource()); } + releaseResources(resources); - for (final NodeId worker : workers) { + for (final NodeId worker : getUniqueNodes(containers)) { allocationExecutor.submit(new Runnable() { @Override public void run() { @@ -194,6 +201,14 @@ public void run() { } } + private Set getUniqueNodes(Collection containers) { + Set nodes = Sets.newHashSet(); + for (TajoWorkerContainer container : containers) { + nodes.add(container.getNodeId()); + } + return nodes; + } + private void stopExecutionBlock(ExecutionBlockId executionBlockId, NodeId worker) { NettyClientBase tajoWorkerRpc = null; try { @@ -224,7 +239,7 @@ public void run() { } } - private void stopContainers(Collection containers) { + private void stopContainers(Collection containers) { deallocator.submit(Iterables.transform(containers, new Function() { public TajoContainerId apply(TajoContainer input) { return input.getId(); } })); @@ -280,10 +295,77 @@ public void run() { class TajoWorkerAllocationHandler implements EventHandler { @Override public void handle(ContainerAllocationEvent event) { - allocationExecutor.submit(new TajoWorkerAllocationThread(event)); + List allocated = allocatedResources(event.getCapability(), event.getRequiredNum()); + List containers = publish(event.getExecutionBlockId(), allocated); + if (allocated.size() < event.getRequiredNum()) { + ContainerAllocationEvent shortage = event.shortOf(event.getRequiredNum() - allocated.size()); + allocationExecutor.submit(new TajoWorkerAllocationThread(shortage)); + } + // todo consider parallelism + releaseResources(removeFreeResources()); + } + } + + private void releaseResources(List resources) { + Iterable resourceIds= Iterables.transform(resources, + new Function() { + public Integer apply(AllocatedResource input) { return input.getResourceId(); } + }); + + RpcClientManager manager = RpcClientManager.getInstance(); + try { + ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker(); + NettyClientBase tmClient = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); + + QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); + masterClientService.releaseWorkerResource(null, + WorkerResourceReleaseRequest.newBuilder().addAllResourceIds(resourceIds).build(), + NullCallback.get()); + removeResources(resources); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + // todo: do something (delegate to master) } } + private final AtomicInteger containerIdSeq = new AtomicInteger(); + + List publish(ExecutionBlockId executionBlockId, List resources) { + StageState state = queryTaskContext.getStage(executionBlockId).getSynchronizedState(); + if (!Stage.isRunningState(state)) { + releaseResources(resources); + return null; + } + + List containers = new ArrayList(resources.size()); + for (AllocatedResource resource : resources) { + QueryId queryId = executionBlockId.getQueryId(); + TajoWorkerContainer container = new TajoWorkerContainer(); + TajoWorkerContainerId containerId = new TajoWorkerContainerId(); + containerId.setApplicationAttemptId(ApplicationIdUtils.createApplicationAttemptId(queryId)); + containerId.setId(containerIdSeq.incrementAndGet()); + container.setId(containerId); + + WorkerConnectionInfo connectionInfo = resource.getConnectionInfo(); + NodeId nodeId = NodeId.newInstance(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()); + container.setNodeId(nodeId); + + addAllocatedResource(resource); + + container.setResource(resource); + containers.add(container); + } + + if (!containers.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("StageContainerAllocationEvent fire:" + executionBlockId); + } + queryTaskContext.getEventHandler().handle(new StageContainerAllocationEvent(executionBlockId, containers)); + } + + return containers; + } + class TajoWorkerAllocationThread extends Thread { ContainerAllocationEvent event; TajoWorkerAllocationThread(ContainerAllocationEvent event) { @@ -296,120 +378,78 @@ public void run() { CallFuture callBack = new CallFuture(); - //TODO consider task's resource usage pattern - int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY); - float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK); - + Resources requirement = event.getCapability(); WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder() - .setMinMemoryMBPerContainer(requiredMemoryMB) - .setMaxMemoryMBPerContainer(requiredMemoryMB) + .setMinMemoryMBPerContainer(requirement.getMemory()) + .setMaxMemoryMBPerContainer(requirement.getMemory()) .setNumContainers(event.getRequiredNum()) .setResourceRequestPriority(!event.isLeafQuery() ? ResourceRequestPriority.MEMORY : ResourceRequestPriority.DISK) - .setMinDiskSlotPerContainer(requiredDiskSlots) - .setMaxDiskSlotPerContainer(requiredDiskSlots) + .setMinDiskSlotPerContainer(requirement.getDiskSlots()) + .setMaxDiskSlotPerContainer(requirement.getDiskSlots()) .setQueryId(event.getExecutionBlockId().getQueryId().getProto()) .build(); + LOG.info("Requesting resource for " + event.getRequiredNum() + " containers, " + requirement); - NettyClientBase tmClient = null; - try { - ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker(); - tmClient = RpcClientManager.getInstance(). - getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); - QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); - masterClientService.allocateWorkerResources(callBack.getController(), request, callBack); - } catch (Throwable e) { - LOG.error(e.getMessage(), e); - } + ExecutionBlockId executionBlockId = event.getExecutionBlockId(); - WorkerResourceAllocationResponse response = null; - while(!stopped.get()) { - try { - response = callBack.get(3, TimeUnit.SECONDS); - break; - } catch (InterruptedException e) { - if(stopped.get()) { - return; - } - } catch (TimeoutException e) { - LOG.info("No available worker resource for " + event.getExecutionBlockId()); + for (int i = 0; i < 3; i++) { + WorkerResourceAllocationResponse response = allocate(request); + if (response == null) { + ThreadUtil.sleepWithoutInterrupt(1000 * (1 << i)); continue; - } catch (ExecutionException e) { - LOG.error(e.getMessage(), e); - break; } - } - int numAllocatedContainers = 0; + List resources = new ArrayList(); + for (WorkerAllocatedResource allocated : response.getWorkerAllocatedResourceList()) { - if(response != null) { - List allocatedResources = response.getWorkerAllocatedResourceList(); - ExecutionBlockId executionBlockId = event.getExecutionBlockId(); + AllocatedResource resource = new AllocatedResource(allocated.getResourceId(), + 1, allocated.getAllocatedMemoryMB(), allocated.getAllocatedDiskSlots(), + new WorkerConnectionInfo(allocated.getConnectionInfo())); + resource.acquire(); - List containers = new ArrayList(); - for(WorkerAllocatedResource eachAllocatedResource: allocatedResources) { - TajoWorkerContainer container = new TajoWorkerContainer(); - NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(), - eachAllocatedResource.getConnectionInfo().getPeerRpcPort()); + resources.add(resource); + } - TajoWorkerContainerId containerId = new TajoWorkerContainerId(); + List containers = publish(event.getExecutionBlockId(), resources); + if (containers != null && (event.getRequiredNum() > containers.size())) { + ContainerAllocationEvent shortage = event.shortOf(event.getRequiredNum() - containers.size()); + queryTaskContext.getEventHandler().handle(shortage); + } + } + LOG.info("Stop TajoWorkerAllocationThread"); + } - containerId.setApplicationAttemptId( - ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId(), - eachAllocatedResource.getContainerId().getAppAttemptId().getAttemptId())); - containerId.setId(eachAllocatedResource.getContainerId().getId()); + private WorkerResourceAllocationResponse allocate(WorkerResourceAllocationRequest request) { - container.setId(containerId); - container.setNodeId(nodeId); + RpcClientManager manager = RpcClientManager.getInstance(); + ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker(); + WorkerResourceAllocationResponse response = null; + try { + NettyClientBase tmClient = manager.getClient(serviceTracker.getUmbilicalAddress(), + QueryCoordinatorProtocol.class, true); + QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); - WorkerResource workerResource = new WorkerResource(); - workerResource.setMemoryMB(eachAllocatedResource.getAllocatedMemoryMB()); - workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots()); + CallFuture callBack = + new CallFuture(); - Worker worker = new Worker(null, workerResource, - new WorkerConnectionInfo(eachAllocatedResource.getConnectionInfo())); - container.setWorkerResource(worker); - addWorkerConnectionInfo(worker.getConnectionInfo()); - containers.add(container); - } + masterClientService.allocateWorkerResources(callBack.getController(), request, callBack); - StageState state = queryTaskContext.getStage(executionBlockId).getSynchronizedState(); - if (!Stage.isRunningState(state)) { - List containerIds = new ArrayList(); - for(TajoContainer eachContainer: containers) { - containerIds.add(eachContainer.getId()); - } + while (!stopped.get() && response == null) { try { - TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, containerIds); - } catch (Throwable e) { - deallocator.submit(containerIds); - LOG.error(e.getMessage(), e); + response = callBack.get(3, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // loop again + } catch (TimeoutException e) { + LOG.info("No available worker resource for " + event.getExecutionBlockId()); } - return; } - - if (allocatedResources.size() > 0) { - if(LOG.isDebugEnabled()) { - LOG.debug("StageContainerAllocationEvent fire:" + executionBlockId); - } - queryTaskContext.getEventHandler().handle(new StageContainerAllocationEvent(executionBlockId, containers)); - } - numAllocatedContainers += allocatedResources.size(); - - } - if(event.getRequiredNum() > numAllocatedContainers) { - ContainerAllocationEvent shortRequestEvent = new ContainerAllocationEvent( - event.getType(), event.getExecutionBlockId(), event.getPriority(), - event.getResource(), - event.getRequiredNum() - numAllocatedContainers, - event.isLeafQuery(), event.getProgress() - ); - queryTaskContext.getEventHandler().handle(shortRequestEvent); - + } catch (Throwable e) { + LOG.error(e.getMessage(), e); } - LOG.info("Stop TajoWorkerAllocationThread"); + return response; } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerContainer.java similarity index 83% rename from tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java rename to tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerContainer.java index 8c5b96c5c5..dcd93f2f01 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerContainer.java @@ -16,25 +16,17 @@ * limitations under the License. */ -package org.apache.tajo.master.rm; +package org.apache.tajo.worker; import org.apache.hadoop.yarn.api.records.*; import org.apache.tajo.master.container.TajoContainer; import org.apache.tajo.master.container.TajoContainerId; - +import org.apache.tajo.master.rm.Worker; public class TajoWorkerContainer extends TajoContainer { TajoContainerId id; NodeId nodeId; - Worker worker; - - public Worker getWorkerResource() { - return worker; - } - - public void setWorkerResource(Worker workerResource) { - this.worker = workerResource; - } + AllocatedResource resource; @Override public TajoContainerId getId() { @@ -67,13 +59,13 @@ public void setNodeHttpAddress(String nodeHttpAddress) { } @Override - public Resource getResource() { - return null; //To change body of implemented methods use File | Settings | File Templates. + public AllocatedResource getResource() { + return resource; } @Override public void setResource(Resource resource) { - //To change body of implemented methods use File | Settings | File Templates. + this.resource = (AllocatedResource)resource; } @Override @@ -110,7 +102,6 @@ public boolean equals(Object o) { if (id != null ? !id.equals(that.id) : that.id != null) return false; if (nodeId != null ? !nodeId.equals(that.nodeId) : that.nodeId != null) return false; - if (worker != null ? !worker.equals(that.worker) : that.worker != null) return false; return true; } @@ -119,7 +110,6 @@ public boolean equals(Object o) { public int hashCode() { int result = id != null ? id.hashCode() : 0; result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0); - result = 31 * result + (worker != null ? worker.hashCode() : 0); return result; } } diff --git a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto index 2440e2af5a..4fdacbb539 100644 --- a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto +++ b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto @@ -119,12 +119,11 @@ message WorkerResourcesRequest { } message WorkerResourceReleaseRequest { - required ExecutionBlockIdProto executionBlockId = 1; - repeated TajoContainerIdProto containerIds = 2; + repeated int32 resourceIds = 1; } message WorkerAllocatedResource { - required TajoContainerIdProto containerId = 1; + required int32 resourceId = 1; required WorkerConnectionInfoProto connectionInfo = 2; required int32 allocatedMemoryMB = 3; diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java index 2c997a3203..898b099600 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java @@ -22,7 +22,6 @@ import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.rpc.NullCallback; @@ -148,8 +147,7 @@ public void testMemoryResource() throws Exception { .build(); final CountDownLatch barrier = new CountDownLatch(1); - final List containerIds = new - ArrayList(); + final List resourceIds = new ArrayList(); RpcCallback callBack = new RpcCallback() { @@ -186,11 +184,11 @@ public void run(WorkerResourceAllocationResponse response) { for(WorkerAllocatedResource eachResource: response.getWorkerAllocatedResourceList()) { assertTrue( eachResource.getAllocatedMemoryMB() >= minMemory && eachResource.getAllocatedMemoryMB() <= maxMemory); - containerIds.add(eachResource.getContainerId()); + resourceIds.add(eachResource.getResourceId()); } - for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) { - tajoWorkerResourceManager.releaseWorkerResource(eachContainerId); + for (Integer eachResourceId: resourceIds) { + tajoWorkerResourceManager.releaseWorkerResource(eachResourceId); } for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) { @@ -258,7 +256,7 @@ public void run(WorkerResourceAllocationResponse response) { TestTajoResourceManager.this.response.getWorkerAllocatedResourceList()) { assertTrue( eachResource.getAllocatedMemoryMB() >= minMemory && eachResource.getAllocatedMemoryMB() <= maxMemory); - tajoWorkerResourceManager.releaseWorkerResource(eachResource.getContainerId()); + tajoWorkerResourceManager.releaseWorkerResource(eachResource.getResourceId()); } for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) { @@ -317,8 +315,7 @@ public void testDiskResource() throws Exception { .build(); final CountDownLatch barrier = new CountDownLatch(1); - final List containerIds = new - ArrayList(); + final List resourceIds = new ArrayList(); RpcCallback callBack = new RpcCallback() { @@ -337,7 +334,7 @@ public void run(WorkerResourceAllocationResponse response) { assertTrue("AllocatedDiskSlot:" + eachResource.getAllocatedDiskSlots(), eachResource.getAllocatedDiskSlots() >= minDiskSlots && eachResource.getAllocatedDiskSlots() <= maxDiskSlots); - containerIds.add(eachResource.getContainerId()); + resourceIds.add(eachResource.getResourceId()); } // assert after callback @@ -356,8 +353,8 @@ public void run(WorkerResourceAllocationResponse response) { assertEquals(numWorkers * 3, response.getWorkerAllocatedResourceList().size()); - for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) { - tajoWorkerResourceManager.releaseWorkerResource(eachContainerId); + for (Integer eachResourceId : resourceIds) { + tajoWorkerResourceManager.releaseWorkerResource(eachResourceId); } for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) { @@ -399,9 +396,6 @@ public void testDiskResourceWithStoppedQuery() throws Exception { .build(); final CountDownLatch barrier = new CountDownLatch(1); - final List containerIds = new - ArrayList(); - RpcCallback callBack = new RpcCallback() { @@ -416,6 +410,11 @@ public void run(WorkerResourceAllocationResponse response) { tajoWorkerResourceManager.allocateWorkerResources(request, callBack); assertFalse(barrier.await(3, TimeUnit.SECONDS)); + if (response != null) { + for (WorkerAllocatedResource resource : response.getWorkerAllocatedResourceList()) { + tajoWorkerResourceManager.releaseWorkerResource(resource.getResourceId()); + } + } assertNull(response); // assert after callback @@ -432,10 +431,6 @@ public void run(WorkerResourceAllocationResponse response) { assertEquals(0, totalUsedDisks, 0); - for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) { - tajoWorkerResourceManager.releaseWorkerResource(eachContainerId); - } - for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) { WorkerResource resource = worker.getResource(); assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());