Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
TAJO-1399 TajoResourceAllocator might hang on network error
Browse files Browse the repository at this point in the history
  • Loading branch information
navis committed Mar 13, 2015
1 parent f48d4bd commit 65c5c4e
Showing 1 changed file with 37 additions and 29 deletions.
Expand Up @@ -28,7 +28,6 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.QueryCoordinatorProtocol;
import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
Expand Down Expand Up @@ -260,8 +259,6 @@ class TajoWorkerAllocationThread extends Thread {
@Override
public void run() {
LOG.info("Start TajoWorkerAllocationThread");
CallFuture<WorkerResourceAllocationResponse> callBack =
new CallFuture<WorkerResourceAllocationResponse>();

//TODO consider task's resource usage pattern
int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
Expand All @@ -278,33 +275,8 @@ public void run() {
.setQueryId(event.getExecutionBlockId().getQueryId().getProto())
.build();

RpcConnectionPool connPool = RpcConnectionPool.getPool();
NettyClientBase tmClient = null;
try {
ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker();
tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
masterClientService.allocateWorkerResources(null, request, callBack);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
}
WorkerResourceAllocationResponse response = allocate(request);

WorkerResourceAllocationResponse response = null;
while(!stopped.get()) {
try {
response = callBack.get(3, TimeUnit.SECONDS);
break;
} catch (InterruptedException e) {
if(stopped.get()) {
return;
}
} catch (TimeoutException e) {
LOG.info("No available worker resource for " + event.getExecutionBlockId());
continue;
}
}
int numAllocatedContainers = 0;

if(response != null) {
Expand Down Expand Up @@ -374,5 +346,41 @@ public void run() {
}
LOG.info("Stop TajoWorkerAllocationThread");
}

WorkerResourceAllocationResponse allocate(WorkerResourceAllocationRequest request) {

RpcConnectionPool connPool = RpcConnectionPool.getPool();
ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker();

WorkerResourceAllocationResponse response = null;
while (!stopped.get()) {
NettyClientBase tmClient = null;
try {
tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(),
QueryCoordinatorProtocol.class, true);
QueryCoordinatorProtocolService masterClientService = tmClient.getStub();

CallFuture<WorkerResourceAllocationResponse> callBack =
new CallFuture<WorkerResourceAllocationResponse>();

masterClientService.allocateWorkerResources(null, request, callBack);

while (!stopped.get() && response == null) {
try {
response = callBack.get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// loop again
} catch (TimeoutException e) {
LOG.info("No available worker resource for " + event.getExecutionBlockId());
}
}
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
}
}
return response;
}
}
}

0 comments on commit 65c5c4e

Please sign in to comment.