Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
TAJO-1399 TajoResourceAllocator might hang on network error
Browse files Browse the repository at this point in the history
  • Loading branch information
navis committed Mar 17, 2015
1 parent 286b956 commit 2673190
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 30 deletions.
Expand Up @@ -44,6 +44,7 @@
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -213,14 +214,33 @@ public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgre
WorkerResourceAllocationRequest qmResourceRequest = createQMResourceRequest(queryInProgress.getQueryId());

// call future for async call
CallFuture<WorkerResourceAllocationResponse> callFuture = new CallFuture<WorkerResourceAllocationResponse>();
CallFuture<WorkerResourceAllocationResponse> callFuture = new CallFuture<WorkerResourceAllocationResponse>() {
@Override
public synchronized void run(WorkerResourceAllocationResponse response) {
if (isCancelled()) {
for (WorkerAllocatedResource resource : response.getWorkerAllocatedResourceList()) {
releaseWorkerResource(resource.getContainerId());
}
} else {
super.run(response);
}
}
@Override
public synchronized boolean isDone() {
return super.isDone();
}
};
allocateWorkerResources(qmResourceRequest, callFuture);

// Wait for 3 seconds
WorkerResourceAllocationResponse response = null;
try {
response = callFuture.get(3, TimeUnit.SECONDS);
} catch (Throwable t) {
callFuture.cancel(true);
if (callFuture.isDone()) {
callFuture.run(callFuture.peek());
}
LOG.error(t, t);
return null;
}
Expand Down Expand Up @@ -278,6 +298,10 @@ public void run() {
while(!stopped.get()) {
try {
WorkerResourceRequest resourceRequest = requestQueue.take();
if (resourceRequest.callBack instanceof Future &&
((Future)resourceRequest.callBack).isCancelled()) {
continue;
}

if (LOG.isDebugEnabled()) {
LOG.debug("allocateWorkerResources:" +
Expand Down
Expand Up @@ -28,7 +28,6 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.QueryCoordinatorProtocol;
import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
Expand Down Expand Up @@ -260,8 +259,6 @@ class TajoWorkerAllocationThread extends Thread {
@Override
public void run() {
LOG.info("Start TajoWorkerAllocationThread");
CallFuture<WorkerResourceAllocationResponse> callBack =
new CallFuture<WorkerResourceAllocationResponse>();

//TODO consider task's resource usage pattern
int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
Expand All @@ -278,33 +275,8 @@ public void run() {
.setQueryId(event.getExecutionBlockId().getQueryId().getProto())
.build();

RpcConnectionPool connPool = RpcConnectionPool.getPool();
NettyClientBase tmClient = null;
try {
ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker();
tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
masterClientService.allocateWorkerResources(null, request, callBack);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
}
WorkerResourceAllocationResponse response = allocate(request);

WorkerResourceAllocationResponse response = null;
while(!stopped.get()) {
try {
response = callBack.get(3, TimeUnit.SECONDS);
break;
} catch (InterruptedException e) {
if(stopped.get()) {
return;
}
} catch (TimeoutException e) {
LOG.info("No available worker resource for " + event.getExecutionBlockId());
continue;
}
}
int numAllocatedContainers = 0;

if(response != null) {
Expand Down Expand Up @@ -374,5 +346,39 @@ public void run() {
}
LOG.info("Stop TajoWorkerAllocationThread");
}

WorkerResourceAllocationResponse allocate(WorkerResourceAllocationRequest request) {

RpcConnectionPool connPool = RpcConnectionPool.getPool();
ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker();

WorkerResourceAllocationResponse response = null;
NettyClientBase tmClient = null;
try {
tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(),
QueryCoordinatorProtocol.class, true);
QueryCoordinatorProtocolService masterClientService = tmClient.getStub();

CallFuture<WorkerResourceAllocationResponse> callBack =
new CallFuture<WorkerResourceAllocationResponse>();

masterClientService.allocateWorkerResources(null, request, callBack);

while (!stopped.get() && response == null) {
try {
response = callBack.get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// loop again
} catch (TimeoutException e) {
LOG.info("No available worker resource for " + event.getExecutionBlockId());
}
}
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
}
return response;
}
}
}
4 changes: 4 additions & 0 deletions tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
Expand Up @@ -81,4 +81,8 @@ public T get(long timeout, TimeUnit unit)
throw new TimeoutException();
}
}

public T peek() {
return response;
}
}

0 comments on commit 2673190

Please sign in to comment.