Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

TAJO-1399 TajoResourceAllocator might hang on network error #420

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -169,6 +169,7 @@ public static enum ConfVars implements ConfigKey {
WORKER_TEMPORAL_DIR_CLEANUP("tajo.worker.tmpdir.cleanup-at-startup", false, Validators.bool()),

// Tajo Worker Resources
WORKER_RESOURCE_ALLOCATION_TIMEOUT("tajo.worker.resource.allocation.timeout", "3 sec"),
WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores", 1, Validators.min("1")),
WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1024, Validators.min("64")),
WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1.0f),
Expand Down
Expand Up @@ -44,7 +44,10 @@

import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -298,7 +301,8 @@ public void run() {
try {
WorkerResourceRequest resourceRequest = requestQueue.poll(
QUEUE_POLLING_TIME, TimeUnit.MILLISECONDS);
if (resourceRequest == null) {
if (resourceRequest == null || (resourceRequest.callBack instanceof CancelableRpcCallback &&
((CancelableRpcCallback)resourceRequest.callBack).isCancelled())) {
continue;
}

Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tajo.*;
Expand Down Expand Up @@ -561,7 +562,8 @@ public Path getStagingDir() {
return queryContext.getStagingDir();
}

public synchronized EventHandler getEventHandler() {
@SuppressWarnings("unchecked")
public synchronized <T extends Event> EventHandler<T> getEventHandler() {
if(eventHandler == null) {
eventHandler = dispatcher.getEventHandler();
}
Expand Down
Expand Up @@ -28,7 +28,6 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.QueryCoordinatorProtocol;
import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
Expand Down Expand Up @@ -260,8 +259,6 @@ class TajoWorkerAllocationThread extends Thread {
@Override
public void run() {
LOG.info("Start TajoWorkerAllocationThread");
CallFuture<WorkerResourceAllocationResponse> callBack =
new CallFuture<WorkerResourceAllocationResponse>();

//TODO consider task's resource usage pattern
int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
Expand All @@ -278,33 +275,8 @@ public void run() {
.setQueryId(event.getExecutionBlockId().getQueryId().getProto())
.build();

RpcConnectionPool connPool = RpcConnectionPool.getPool();
NettyClientBase tmClient = null;
try {
ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker();
tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
masterClientService.allocateWorkerResources(null, request, callBack);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
}
WorkerResourceAllocationResponse response = allocate(request);

WorkerResourceAllocationResponse response = null;
while(!stopped.get()) {
try {
response = callBack.get(3, TimeUnit.SECONDS);
break;
} catch (InterruptedException e) {
if(stopped.get()) {
return;
}
} catch (TimeoutException e) {
LOG.info("No available worker resource for " + event.getExecutionBlockId());
continue;
}
}
int numAllocatedContainers = 0;

if(response != null) {
Expand Down Expand Up @@ -374,5 +346,43 @@ public void run() {
}
LOG.info("Stop TajoWorkerAllocationThread");
}

WorkerResourceAllocationResponse allocate(WorkerResourceAllocationRequest request) {

// 3 seconds, by default
long timeout = tajoConf.getTimeVar(
TajoConf.ConfVars.WORKER_RESOURCE_ALLOCATION_TIMEOUT, TimeUnit.MILLISECONDS);

RpcConnectionPool connPool = RpcConnectionPool.getPool();
ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker();

WorkerResourceAllocationResponse response = null;
NettyClientBase tmClient = null;
try {
tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(),
QueryCoordinatorProtocol.class, true);
QueryCoordinatorProtocolService masterClientService = tmClient.getStub();

CallFuture<WorkerResourceAllocationResponse> callBack =
new CallFuture<WorkerResourceAllocationResponse>();

masterClientService.allocateWorkerResources(null, request, callBack);

while (!stopped.get() && response == null) {
try {
response = callBack.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// loop again
} catch (TimeoutException e) {
LOG.info("No available worker resource for " + event.getExecutionBlockId());
}
}
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
}
return response;
}
}
}
Expand Up @@ -71,4 +71,8 @@ public T get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutEx

protected void cancel(T canceled) {
}

public boolean isCancelled() {
return state.get() == CANCELED;
}
}