From 80efb7710e84ad3a3bc2362a4d18654a35c181ee Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Wed, 18 Nov 2015 11:29:16 +0900 Subject: [PATCH] TAJO-1935: Some Tasks don't work after they become TA_ASSIGNED. --- .../org/apache/tajo/master/QueryInProgress.java | 6 ++---- .../tajo/querymaster/DefaultTaskScheduler.java | 14 +++++++------- .../org/apache/tajo/querymaster/QueryMaster.java | 11 ++++++----- .../apache/tajo/worker/ExecutionBlockContext.java | 12 +++++++----- .../org/apache/tajo/worker/NodeStatusUpdater.java | 6 +----- .../java/org/apache/tajo/worker/TaskManager.java | 5 +---- .../java/org/apache/tajo/rpc/RpcConstants.java | 1 - 7 files changed, 24 insertions(+), 31 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index f76e7f0e1b..b474d07df3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -35,7 +35,6 @@ import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcClientManager; -import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.session.Session; import org.apache.tajo.util.NetUtils; @@ -44,7 +43,6 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.Properties; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -105,7 +103,7 @@ public void kill() { if (queryMasterRpcClient != null) { CallFuture callFuture = new CallFuture<>(); queryMasterRpcClient.killQuery(callFuture.getController(), queryId.getProto(), callFuture); - callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + callFuture.get(); } } catch (Throwable e) { catchException("Failed to kill query " + queryId + " by exception " + e, e); @@ -222,7 +220,7 @@ public boolean submitToQueryMaster() { CallFuture callFuture = new CallFuture<>(); queryMasterRpcClient.executeQuery(callFuture.getController(), builder.build(), callFuture); - callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + callFuture.get(); querySubmitted = true; getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index f45f64d3cc..f1c0f6229e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -41,7 +41,10 @@ import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; import org.apache.tajo.plan.serder.LogicalNodeSerializer; import org.apache.tajo.resource.NodeResources; -import org.apache.tajo.rpc.*; +import org.apache.tajo.rpc.AsyncRpcClient; +import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.storage.DataLocation; import org.apache.tajo.storage.fragment.FileFragment; @@ -54,7 +57,6 @@ import java.net.InetSocketAddress; import java.util.*; import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -314,7 +316,7 @@ protected LinkedList createTaskRequest(final int incompleteTas .setQueue(context.getMasterContext().getQueryContext().get("queue", "default")); //TODO set queue masterClientService.reserveNodeResources(callBack.getController(), request.build(), callBack); - NodeResourceResponse response = callBack.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + NodeResourceResponse response = callBack.get(); for (AllocationResourceProto resource : response.getResourceList()) { taskRequestEvents.add(new TaskRequestEvent(resource.getWorkerId(), resource, context.getBlockId())); @@ -896,8 +898,7 @@ public void assignToLeafTasks(LinkedList taskRequests) { TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); tajoWorkerRpcClient.allocateTasks(callFuture.getController(), requestProto.build(), callFuture); - BatchAllocationResponse responseProto = - callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + BatchAllocationResponse responseProto = callFuture.get(); if (responseProto.getCancellationTaskCount() > 0) { for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) { @@ -1015,8 +1016,7 @@ public void assignToNonLeafTasks(LinkedList taskRequests) { TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); tajoWorkerRpcClient.allocateTasks(callFuture.getController(), requestProto.build(), callFuture); - BatchAllocationResponse - responseProto = callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + BatchAllocationResponse responseProto = callFuture.get(); if(responseProto.getCancellationTaskCount() > 0) { for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) { diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index 6b26ddf1bb..7104fb9423 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -42,7 +42,10 @@ import org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService; import org.apache.tajo.master.event.QueryStartEvent; import org.apache.tajo.master.event.QueryStopEvent; -import org.apache.tajo.rpc.*; +import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.NullCallback; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.RpcParameterFactory; @@ -56,7 +59,6 @@ import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; public class QueryMaster extends CompositeService implements EventHandler { private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName()); @@ -180,8 +182,7 @@ public List getAllWorker() { masterService.getAllWorkers(callBack.getController(), PrimitiveProtos.NullProto.getDefaultInstance(), callBack); - WorkerConnectionsResponse connectionsProto = - callBack.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + WorkerConnectionsResponse connectionsProto = callBack.get(); return connectionsProto.getWorkerList(); } catch (Exception e) { LOG.error(e.getMessage(), e); @@ -300,7 +301,7 @@ public void stopQuery(final QueryId queryId) { QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); masterClientService.heartbeat(future.getController(), queryHeartbeat, future); - future.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + future.get(); } catch (Exception e) { //this function will be closed in new thread. //When tajo do stop cluster, tajo master maybe throw closed connection exception diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index db28433cdf..9feae7ef3c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -36,7 +36,10 @@ import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.pullserver.TajoPullServerService; -import org.apache.tajo.rpc.*; +import org.apache.tajo.rpc.AsyncRpcClient; +import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.rpc.NullCallback; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.util.Pair; @@ -47,7 +50,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -253,7 +255,7 @@ public void fatalError(TaskAttemptId taskAttemptId, String message) { //If QueryMaster does not responding, current execution block should be stop CallFuture callFuture = new CallFuture<>(); getStub().fatalError(callFuture.getController(), builder.build(), callFuture); - callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + callFuture.get(); } catch (Exception e) { getWorkerContext().getTaskManager().getDispatcher().getEventHandler() .handle(new ExecutionBlockErrorEvent(taskAttemptId.getTaskId().getExecutionBlockId(), e)); @@ -300,7 +302,7 @@ private void sendHashShuffleReport(ExecutionBlockId ebId) throws Exception { CallFuture callFuture = new CallFuture<>(); stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture); - callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + callFuture.get(); return; } @@ -355,7 +357,7 @@ private void sendHashShuffleReport(ExecutionBlockId ebId) throws Exception { try { CallFuture callFuture = new CallFuture<>(); stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture); - callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + callFuture.get(); } catch (Throwable e) { // can't send report to query master LOG.fatal(e.getMessage(), e); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java index 3ca0bfb486..162e5e9b28 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java @@ -32,7 +32,6 @@ import org.apache.tajo.rpc.AsyncRpcClient; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.RpcClientManager; -import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.RpcParameterFactory; @@ -46,7 +45,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static org.apache.tajo.ResourceProtos.*; @@ -168,11 +166,9 @@ protected NodeHeartbeatResponse sendHeartbeat(NodeHeartbeatRequest requestProto) CallFuture callBack = new CallFuture<>(); resourceTracker.nodeHeartbeat(callBack.getController(), requestProto, callBack); - response = callBack.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + response = callBack.get(); } catch (InterruptedException e) { LOG.warn(e.getMessage()); - } catch (TimeoutException te) { - LOG.warn("Heartbeat response is being delayed.", te); } catch (ExecutionException ee) { LOG.warn("TajoMaster failure: " + ee.getMessage()); resourceTracker = null; diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java index 18e9762b68..9e2e9e80f2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java @@ -33,7 +33,6 @@ import org.apache.tajo.rpc.AsyncRpcClient; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.RpcClientManager; -import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.RpcParameterFactory; import org.apache.tajo.util.TUtil; @@ -44,7 +43,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.TimeUnit; import static org.apache.tajo.ResourceProtos.*; @@ -125,8 +123,7 @@ protected ExecutionBlockContext createExecutionBlock(ExecutionBlockId executionB CallFuture callback = new CallFuture<>(); stub.getExecutionBlockContext(callback.getController(), request.build(), callback); - ExecutionBlockContextResponse contextProto = - callback.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + ExecutionBlockContextResponse contextProto = callback.get(); ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), contextProto, client); context.init(); diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java index 95e5ae4783..fec0e6bfac 100644 --- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java @@ -27,7 +27,6 @@ public class RpcConstants { public static final String PING_PACKET = "TAJO"; public static final int DEFAULT_PAUSE = 1000; // 1 sec - public static final int FUTURE_TIMEOUT_SECONDS_DEFAULT = 10; /** How many times the connect will retry */ public static final String CLIENT_RETRY_NUM = "tajo.rpc.client.retry-num";