From 972f46242727d54c5c9937bc81ad1088ab847b94 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 3 Apr 2015 09:07:54 +0900 Subject: [PATCH] TAJO-1509 Use dedicated thread to release resource allocated to container --- .../planner/physical/ExternalSortExec.java | 2 +- .../apache/tajo/master/ContainerProxy.java | 12 ++- .../tajo/master/TajoContainerProxy.java | 40 +++------ .../tajo/worker/TajoResourceAllocator.java | 81 ++++++++++++++----- .../org/apache/tajo/rpc/NullCallback.java | 9 ++- 5 files changed, 90 insertions(+), 54 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index 355f015ccc..3da296cdd8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -184,7 +184,7 @@ private Path sortAndStoreChunk(int chunkId, List tupleBlock) info(LOG, "Chunk #" + chunkId + " sort and written (" + FileUtil.humanReadableByteCount(appender.getOffset(), false) + " bytes, " + rowNum + " rows, " + - ", sort time: " + (sortEnd - sortStart) + " msec, " + + "sort time: " + (sortEnd - sortStart) + " msec, " + "write time: " + (chunkWriteEnd - chunkWriteStart) + " msec)"); return outputPath; } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java index 562790d22e..cad63a0d6d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java @@ -46,7 +46,7 @@ protected static enum ContainerState { protected ContainerState state; // store enough information to be able to cleanup the container protected TajoContainer container; - protected TajoContainerId containerID; + protected TajoContainerId containerId; protected String hostName; protected int port = -1; @@ -60,7 +60,7 @@ public ContainerProxy(QueryMasterTask.QueryMasterTaskContext context, Configurat this.state = ContainerState.PREP; this.container = container; this.executionBlockId = executionBlockId; - this.containerID = container.getId(); + this.containerId = container.getId(); } public synchronized boolean isCompletelyDone() { @@ -75,7 +75,11 @@ public int getTaskPort() { return this.port; } - public String getId() { - return executionBlockId.toString(); + public TajoContainerId getContainerId() { + return containerId; + } + + public ExecutionBlockId getBlockId() { + return executionBlockId; } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index 7ed9fc5aa0..139359c2f5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -23,9 +23,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; @@ -43,6 +41,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; public class TajoContainerProxy extends ContainerProxy { @@ -61,14 +60,14 @@ public TajoContainerProxy(QueryMasterTask.QueryMasterTaskContext context, @Override public synchronized void launch(ContainerLaunchContext containerLaunchContext) { - context.getResourceAllocator().addContainer(containerID, this); + context.getResourceAllocator().addContainer(containerId, this); this.hostName = container.getNodeId().getHost(); this.port = ((TajoWorkerContainer)container).getWorkerResource().getConnectionInfo().getPullServerPort(); this.state = ContainerState.RUNNING; if (LOG.isDebugEnabled()) { - LOG.debug("Launch Container:" + executionBlockId + "," + containerID.getId() + "," + + LOG.debug("Launch Container:" + executionBlockId + "," + containerId.getId() + "," + container.getId() + "," + container.getNodeId() + ", pullServer=" + port); } @@ -127,41 +126,30 @@ private void assignExecutionBlock(ExecutionBlockId executionBlockId, TajoContain @Override public synchronized void stopContainer() { if (LOG.isDebugEnabled()) { - LOG.debug("Release TajoWorker Resource: " + executionBlockId + "," + containerID + ", state:" + this.state); + LOG.debug("Release TajoWorker Resource: " + executionBlockId + "," + containerId + ", state:" + this.state); } if(isCompletelyDone()) { - LOG.info("Container already stopped:" + containerID); + LOG.info("Container already stopped:" + containerId); return; } if(this.state == ContainerState.PREP) { this.state = ContainerState.KILLED_BEFORE_LAUNCH; } else { try { - TajoWorkerContainer tajoWorkerContainer = ((TajoWorkerContainer)container); - releaseWorkerResource(context, executionBlockId, tajoWorkerContainer.getId()); - context.getResourceAllocator().removeContainer(containerID); - this.state = ContainerState.DONE; + releaseWorkerResource(context, executionBlockId, Arrays.asList(containerId)); + context.getResourceAllocator().removeContainer(containerId); } catch (Throwable t) { // ignore the cleanup failure String message = "cleanup failed for container " - + this.containerID + " : " + + this.containerId + " : " + StringUtils.stringifyException(t); LOG.warn(message); + } finally { this.state = ContainerState.DONE; - return; } } } - public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context, - ExecutionBlockId executionBlockId, - TajoContainerId containerId) throws Exception { - List containerIds = new ArrayList(); - containerIds.add(containerId); - - releaseWorkerResource(context, executionBlockId, containerIds); - } - public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context, ExecutionBlockId executionBlockId, List containerIds) throws Exception { @@ -181,12 +169,10 @@ public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); masterClientService.releaseWorkerResource(null, QueryCoordinatorProtocol.WorkerResourceReleaseRequest.newBuilder() - .setExecutionBlockId(executionBlockId.getProto()) - .addAllContainerIds(containerIdProtos) - .build(), - NullCallback.get()); - } catch (Throwable e) { - LOG.error(e.getMessage(), e); + .setExecutionBlockId(executionBlockId.getProto()) + .addAllContainerIds(containerIdProtos) + .build(), + NullCallback.get()); } finally { connPool.releaseConnection(tmClient); } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index 47a9fda43e..e6c276511c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -18,6 +18,9 @@ package org.apache.tajo.worker; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,7 +31,6 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; @@ -51,13 +53,16 @@ import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.ApplicationIdUtils; import java.net.InetSocketAddress; import java.util.*; +import java.util.concurrent.BlockingDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -68,6 +73,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { private TajoConf tajoConf; private QueryMasterTask.QueryMasterTaskContext queryTaskContext; private final ExecutorService executorService; + private final Deallocator deallocator; private AtomicBoolean stopped = new AtomicBoolean(false); @@ -75,6 +81,7 @@ public TajoResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskCon this.queryTaskContext = queryTaskContext; executorService = Executors.newFixedThreadPool( queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM)); + deallocator = new Deallocator(); } @Override @@ -115,16 +122,19 @@ public void init(Configuration conf) { queryTaskContext.getDispatcher().register(ContainerAllocatorEventType.class, new TajoWorkerAllocationHandler()); + deallocator.start(); + super.init(conf); } @Override public synchronized void stop() { - if (stopped.getAndSet(true)) { + if (stopped.compareAndSet(false, true)) { return; } executorService.shutdownNow(); + deallocator.shutdown(); Map containers = queryTaskContext.getResourceAllocator() .getContainers(); @@ -196,7 +206,8 @@ private void stopExecutionBlock(ExecutionBlockId executionBlockId, NodeId worker tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); - tajoWorkerRpcClient.stopExecutionBlock(null, executionBlockId.getProto(), NullCallback.get()); + tajoWorkerRpcClient.stopExecutionBlock(null, executionBlockId.getProto(), + NullCallback.get(PrimitiveProtos.BoolProto.class)); } catch (Throwable e) { LOG.error(e.getMessage(), e); } finally { @@ -221,26 +232,55 @@ public void run() { } private void stopContainers(Collection containers) { - for (TajoContainer container : containers) { - final ContainerProxy proxy = queryTaskContext.getResourceAllocator().getContainer(container.getId()); - executorService.submit(new StopContainerRunner(container.getId(), proxy)); - } + deallocator.submit(Iterables.transform(containers, new Function() { + public TajoContainerId apply(TajoContainer input) { return input.getId(); } + })); } - private static class StopContainerRunner implements Runnable { - private final ContainerProxy proxy; - private final TajoContainerId id; - public StopContainerRunner(TajoContainerId id, ContainerProxy proxy) { - this.id = id; - this.proxy = proxy; + private static final TajoContainerId FIN = new TajoWorkerContainerId(); + + private class Deallocator extends Thread { + + private final BlockingDeque queue = new LinkedBlockingDeque(); + + public Deallocator() { + setName("Deallocator"); + setDaemon(true); + } + + private void submit(Iterable container) { + queue.addAll(Lists.newArrayList(container)); + } + + private void shutdown() { + queue.add(FIN); } @Override public void run() { - if (LOG.isDebugEnabled()) { - LOG.debug("ContainerProxy stopped:" + id + "," + proxy.getId()); + final AbstractResourceAllocator allocator = queryTaskContext.getResourceAllocator(); + while (!stopped.get() || !queue.isEmpty()) { + TajoContainerId containerId; + try { + containerId = queue.take(); + } catch (InterruptedException e) { + continue; + } + if (containerId == FIN) { + break; + } + ContainerProxy proxy = allocator.getContainer(containerId); + if (proxy == null) { + continue; + } + try { + LOG.info("Stopping ContainerProxy: " + proxy.getContainerId() + "," + proxy.getBlockId()); + proxy.stopContainer(); + } catch (Exception e) { + LOG.warn("Failed to stop container " + proxy.getContainerId() + "," + proxy.getBlockId(), e); + } } - proxy.stopContainer(); + LOG.info("Deallocator exiting"); } } @@ -341,13 +381,14 @@ public void run() { StageState state = queryTaskContext.getStage(executionBlockId).getSynchronizedState(); if (!Stage.isRunningState(state)) { + List containerIds = new ArrayList(); + for(TajoContainer eachContainer: containers) { + containerIds.add(eachContainer.getId()); + } try { - List containerIds = new ArrayList(); - for(TajoContainer eachContainer: containers) { - containerIds.add(eachContainer.getId()); - } TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, containerIds); } catch (Throwable e) { + deallocator.submit(containerIds); LOG.error(e.getMessage(), e); } return; diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java index 9b7f8acf9f..896a02e7b4 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java @@ -20,7 +20,7 @@ import com.google.protobuf.RpcCallback; -public class NullCallback implements RpcCallback { +public class NullCallback implements RpcCallback { private final static NullCallback instance; static { @@ -31,8 +31,13 @@ public static RpcCallback get() { return instance; } + @SuppressWarnings("unchecked") + public static RpcCallback get(Class clazz) { + return (RpcCallback)instance; + } + @Override - public void run(Object parameter) { + public void run(T parameter) { // do nothing } }