From 65c5c4e5ca079bddc238b0eeacae34726f696957 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 13 Mar 2015 13:47:27 +0900 Subject: [PATCH] TAJO-1399 TajoResourceAllocator might hang on network error --- .../tajo/worker/TajoResourceAllocator.java | 66 +++++++++++-------- 1 file changed, 37 insertions(+), 29 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index 47a9fda43e..323ab20cdc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -28,7 +28,6 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; @@ -260,8 +259,6 @@ class TajoWorkerAllocationThread extends Thread { @Override public void run() { LOG.info("Start TajoWorkerAllocationThread"); - CallFuture callBack = - new CallFuture(); //TODO consider task's resource usage pattern int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY); @@ -278,33 +275,8 @@ public void run() { .setQueryId(event.getExecutionBlockId().getQueryId().getProto()) .build(); - RpcConnectionPool connPool = RpcConnectionPool.getPool(); - NettyClientBase tmClient = null; - try { - ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker(); - tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); - QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); - masterClientService.allocateWorkerResources(null, request, callBack); - } catch (Throwable e) { - LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(tmClient); - } + WorkerResourceAllocationResponse response = allocate(request); - WorkerResourceAllocationResponse response = null; - while(!stopped.get()) { - try { - response = callBack.get(3, TimeUnit.SECONDS); - break; - } catch (InterruptedException e) { - if(stopped.get()) { - return; - } - } catch (TimeoutException e) { - LOG.info("No available worker resource for " + event.getExecutionBlockId()); - continue; - } - } int numAllocatedContainers = 0; if(response != null) { @@ -374,5 +346,41 @@ public void run() { } LOG.info("Stop TajoWorkerAllocationThread"); } + + WorkerResourceAllocationResponse allocate(WorkerResourceAllocationRequest request) { + + RpcConnectionPool connPool = RpcConnectionPool.getPool(); + ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker(); + + WorkerResourceAllocationResponse response = null; + while (!stopped.get()) { + NettyClientBase tmClient = null; + try { + tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), + QueryCoordinatorProtocol.class, true); + QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); + + CallFuture callBack = + new CallFuture(); + + masterClientService.allocateWorkerResources(null, request, callBack); + + while (!stopped.get() && response == null) { + try { + response = callBack.get(3, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // loop again + } catch (TimeoutException e) { + LOG.info("No available worker resource for " + event.getExecutionBlockId()); + } + } + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + } finally { + connPool.releaseConnection(tmClient); + } + } + return response; + } } }