From a3ecd74a534f7d3bea83174b8fbfecad853e29fb Mon Sep 17 00:00:00 2001 From: Jongyoung Park Date: Mon, 14 Dec 2015 16:16:25 +0900 Subject: [PATCH 1/6] refactored --- .../org/apache/tajo/worker/TajoWorker.java | 75 +++---------------- 1 file changed, 11 insertions(+), 64 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index c7cac4f2e3..f1b812e1f8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -18,7 +18,6 @@ package org.apache.tajo.worker; -import com.codahale.metrics.Gauge; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,10 +32,8 @@ import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.CatalogClient; import org.apache.tajo.catalog.CatalogService; -import org.apache.tajo.catalog.FunctionDesc; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.function.FunctionLoader; -import org.apache.tajo.function.FunctionSignature; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.metrics.Node; import org.apache.tajo.plan.function.python.PythonScriptEngine; @@ -81,55 +78,26 @@ public class TajoWorker extends CompositeService { private static final Log LOG = LogFactory.getLog(TajoWorker.class); private TajoConf systemConf; - private StaticHttpServer webServer; - - private TajoWorkerClientService tajoWorkerClientService; - private QueryMasterManagerService queryMasterManagerService; - - private TajoWorkerManagerService tajoWorkerManagerService; - - private TajoMasterInfo tajoMasterInfo; - private CatalogClient catalogClient; - private WorkerContext workerContext; - private TaskManager taskManager; - private TaskExecutor taskExecutor; - private TajoPullServerService pullService; - private ServiceTracker serviceTracker; - private NodeResourceManager nodeResourceManager; - - private NodeStatusUpdater nodeStatusUpdater; - private AtomicBoolean stopped = new AtomicBoolean(false); - private WorkerConnectionInfo connectionInfo; - private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); - private String[] cmdArgs; - private DeletionService deletionService; - private TajoSystemMetrics workerSystemMetrics; - private HashShuffleAppenderManager hashShuffleAppenderManager; - - private AsyncDispatcher dispatcher; - private LocalDirAllocator lDirAllocator; - private JvmPauseMonitor pauseMonitor; private HistoryWriter taskHistoryWriter; - private HistoryReader historyReader; public TajoWorker() throws Exception { @@ -143,9 +111,12 @@ public void startWorker(TajoConf systemConf, String[] args) { start(); } - @Override public void serviceInit(Configuration conf) throws Exception { + AsyncDispatcher dispatcher; + TajoWorkerClientService tajoWorkerClientService; + TajoWorkerManagerService tajoWorkerManagerService; + ShutdownHookManager.get().addShutdownHook(new ShutdownHook(), SHUTDOWN_HOOK_PRIORITY); this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class); @@ -156,13 +127,11 @@ public void serviceInit(Configuration conf) throws Exception { this.workerContext = new TajoWorkerContext(); this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); - int clientPort = systemConf.getSocketAddrVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS).getPort(); int peerRpcPort = systemConf.getSocketAddrVar(ConfVars.WORKER_PEER_RPC_ADDRESS).getPort(); int qmManagerPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_RPC_ADDRESS).getPort(); - - this.dispatcher = new AsyncDispatcher(); + dispatcher = new AsyncDispatcher(); addIfService(dispatcher); tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort); @@ -186,8 +155,7 @@ public void serviceInit(Configuration conf) throws Exception { this.nodeResourceManager = new NodeResourceManager(rmDispatcher, workerContext); addService(nodeResourceManager); - this.nodeStatusUpdater = new NodeStatusUpdater(workerContext); - addService(nodeStatusUpdater); + addService(new NodeStatusUpdater(workerContext)); int httpPort = 0; if(!TajoPullServerService.isStandalone()) { @@ -242,27 +210,10 @@ private void initWorkerMetrics() { workerSystemMetrics = new TajoSystemMetrics(systemConf, Node.class, workerContext.getWorkerName()); workerSystemMetrics.start(); - workerSystemMetrics.register(Node.QueryMaster.RUNNING_QM, new Gauge() { - @Override - public Integer getValue() { - if(queryMasterManagerService != null) { - return queryMasterManagerService.getQueryMaster().getQueryMasterTasks().size(); - } else { - return 0; - } - } - }); - - workerSystemMetrics.register(Node.Tasks.RUNNING_TASKS, new Gauge() { - @Override - public Integer getValue() { - if(taskExecutor != null) { - return taskExecutor.getRunningTasks(); - } else { - return 0; - } - } - }); + workerSystemMetrics.register(Node.QueryMaster.RUNNING_QM, () -> + queryMasterManagerService != null ? queryMasterManagerService.getQueryMaster().getQueryMasterTasks().size() : 0); + + workerSystemMetrics.register(Node.Tasks.RUNNING_TASKS, () -> taskExecutor != null ? taskExecutor.getRunningTasks() : 0); } private int initWebServer() { @@ -310,7 +261,7 @@ public WorkerContext getWorkerContext() { public void serviceStart() throws Exception { startJvmPauseMonitor(); - tajoMasterInfo = new TajoMasterInfo(); + TajoMasterInfo tajoMasterInfo = new TajoMasterInfo(); if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { tajoMasterInfo.setTajoMasterAddress(serviceTracker.getUmbilicalAddress()); @@ -453,10 +404,6 @@ public CatalogService getCatalog() { return catalogClient; } - public TajoPullServerService getPullService() { - return pullService; - } - public WorkerConnectionInfo getConnectionInfo() { return connectionInfo; } From 526265b2d61a9ca514bc03873714b55a603befd7 Mon Sep 17 00:00:00 2001 From: Jongyoung Park Date: Mon, 14 Dec 2015 16:48:11 +0900 Subject: [PATCH 2/6] CI trigger --- tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java | 1 - 1 file changed, 1 deletion(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index f1b812e1f8..b65b357b4c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -96,7 +96,6 @@ public class TajoWorker extends CompositeService { private HashShuffleAppenderManager hashShuffleAppenderManager; private LocalDirAllocator lDirAllocator; private JvmPauseMonitor pauseMonitor; - private HistoryWriter taskHistoryWriter; private HistoryReader historyReader; From e82bb4e704ca2b7a6581109623a80bd25f5086e2 Mon Sep 17 00:00:00 2001 From: Jongyoung Park Date: Mon, 14 Dec 2015 22:05:21 +0900 Subject: [PATCH 3/6] trigger --- tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index b65b357b4c..fc283cd464 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -96,6 +96,7 @@ public class TajoWorker extends CompositeService { private HashShuffleAppenderManager hashShuffleAppenderManager; private LocalDirAllocator lDirAllocator; private JvmPauseMonitor pauseMonitor; + private HistoryWriter taskHistoryWriter; private HistoryReader historyReader; From 06b8755d2eeb0ae1f8ad2a8cfc1c2cb629b70044 Mon Sep 17 00:00:00 2001 From: Jongyoung Park Date: Tue, 15 Dec 2015 10:09:47 +0900 Subject: [PATCH 4/6] trigger --- tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index fc283cd464..f1b812e1f8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -96,7 +96,7 @@ public class TajoWorker extends CompositeService { private HashShuffleAppenderManager hashShuffleAppenderManager; private LocalDirAllocator lDirAllocator; private JvmPauseMonitor pauseMonitor; - + private HistoryWriter taskHistoryWriter; private HistoryReader historyReader; From 6b20f2cded5af779359c75cf63b806ff33f444c3 Mon Sep 17 00:00:00 2001 From: Jongyoung Park Date: Tue, 15 Dec 2015 13:49:07 +0900 Subject: [PATCH 5/6] indent too long lines --- .../src/main/java/org/apache/tajo/worker/TajoWorker.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index f1b812e1f8..fb0bdca7ff 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -210,10 +210,12 @@ private void initWorkerMetrics() { workerSystemMetrics = new TajoSystemMetrics(systemConf, Node.class, workerContext.getWorkerName()); workerSystemMetrics.start(); - workerSystemMetrics.register(Node.QueryMaster.RUNNING_QM, () -> - queryMasterManagerService != null ? queryMasterManagerService.getQueryMaster().getQueryMasterTasks().size() : 0); + workerSystemMetrics.register(Node.QueryMaster.RUNNING_QM, + () -> queryMasterManagerService != null ? + queryMasterManagerService.getQueryMaster().getQueryMasterTasks().size() : 0); - workerSystemMetrics.register(Node.Tasks.RUNNING_TASKS, () -> taskExecutor != null ? taskExecutor.getRunningTasks() : 0); + workerSystemMetrics.register(Node.Tasks.RUNNING_TASKS, + () -> taskExecutor != null ? taskExecutor.getRunningTasks() : 0); } private int initWebServer() { From 3a574b930274018e8db98e354c22b4ac07306ad9 Mon Sep 17 00:00:00 2001 From: Jongyoung Park Date: Tue, 15 Dec 2015 14:38:27 +0900 Subject: [PATCH 6/6] trigger --- tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index fb0bdca7ff..8315c1ca80 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -208,6 +208,7 @@ public void serviceInit(Configuration conf) throws Exception { private void initWorkerMetrics() { workerSystemMetrics = new TajoSystemMetrics(systemConf, Node.class, workerContext.getWorkerName()); + workerSystemMetrics.start(); workerSystemMetrics.register(Node.QueryMaster.RUNNING_QM,