From 403c183ce59c69cb50c8328db206db67dc59ccee Mon Sep 17 00:00:00 2001 From: jhkim Date: Wed, 8 Oct 2014 15:53:01 +0900 Subject: [PATCH] TAJO-1105: Add thread which detects JVM pauses like HADOOP's --- .../org/apache/tajo/master/TajoMaster.java | 16 +- .../org/apache/tajo/util/JvmPauseMonitor.java | 218 ++++++++++++++++++ .../org/apache/tajo/worker/TajoWorker.java | 16 +- 3 files changed, 240 insertions(+), 10 deletions(-) create mode 100644 tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index e393783730..7fec037cc6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -55,11 +55,7 @@ import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.storage.AbstractStorageManager; import org.apache.tajo.storage.StorageManagerFactory; -import org.apache.tajo.util.ClassUtil; -import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.util.NetUtils; -import org.apache.tajo.util.StringUtils; -import org.apache.tajo.util.VersionInfo; +import org.apache.tajo.util.*; import org.apache.tajo.util.metrics.TajoSystemMetrics; import org.apache.tajo.webapp.QueryExecutorServlet; import org.apache.tajo.webapp.StaticHttpServer; @@ -134,6 +130,8 @@ public class TajoMaster extends CompositeService { private HAService haService; + private JvmPauseMonitor pauseMonitor; + public TajoMaster() throws Exception { super(TajoMaster.class.getName()); } @@ -356,6 +354,11 @@ public static List initBuiltinFunctions() throws ServiceException return sqlFuncs; } + private void startJvmPauseMonitor(){ + pauseMonitor = new JvmPauseMonitor(systemConf); + pauseMonitor.start(); + } + public MasterContext getContext() { return this.context; } @@ -364,6 +367,8 @@ public MasterContext getContext() { public void serviceStart() throws Exception { LOG.info("TajoMaster is starting up"); + startJvmPauseMonitor(); + // check base tablespace and databases checkBaseTBSpaceAndDatabase(); @@ -450,6 +455,7 @@ public void stop() { RpcChannelFactory.shutdown(); + if(pauseMonitor != null) pauseMonitor.stop(); super.stop(); LOG.info("Tajo Master main thread exiting"); } diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java b/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java new file mode 100644 index 0000000000..e1e529c788 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Daemon; + +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Class which sets up a simple thread which runs in a loop sleeping + * for a short interval of time. If the sleep takes significantly longer + * than its target time, it implies that the JVM or host machine has + * paused processing, which may cause other problems. If such a pause is + * detected, the thread logs a message. + */ +@InterfaceAudience.Private +public class JvmPauseMonitor { + private static final Log LOG = LogFactory.getLog( + JvmPauseMonitor.class); + + /** The target sleep time */ + private static final long SLEEP_INTERVAL_MS = 500; + + /** log WARN if we detect a pause longer than this threshold */ + private final long warnThresholdMs; + private static final String WARN_THRESHOLD_KEY = + "jvm.pause.warn-threshold.ms"; + private static final long WARN_THRESHOLD_DEFAULT = 10000; + + /** log INFO if we detect a pause longer than this threshold */ + private final long infoThresholdMs; + private static final String INFO_THRESHOLD_KEY = + "jvm.pause.info-threshold.ms"; + private static final long INFO_THRESHOLD_DEFAULT = 1000; + + private long numGcWarnThresholdExceeded = 0; + private long numGcInfoThresholdExceeded = 0; + private long totalGcExtraSleepTime = 0; + + private Thread monitorThread; + private volatile boolean shouldRun = true; + + public JvmPauseMonitor(Configuration conf) { + this.warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT); + this.infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT); + } + + public void start() { + Preconditions.checkState(monitorThread == null, + "Already started"); + monitorThread = new Daemon(new Monitor()); + monitorThread.start(); + } + + public void stop() { + shouldRun = false; + monitorThread.interrupt(); + try { + monitorThread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + public boolean isStarted() { + return monitorThread != null; + } + + public long getNumGcWarnThreadholdExceeded() { + return numGcWarnThresholdExceeded; + } + + public long getNumGcInfoThresholdExceeded() { + return numGcInfoThresholdExceeded; + } + + public long getTotalGcExtraSleepTime() { + return totalGcExtraSleepTime; + } + + private String formatMessage(long extraSleepTime, + Map gcTimesAfterSleep, + Map gcTimesBeforeSleep) { + + Set gcBeanNames = Sets.intersection( + gcTimesAfterSleep.keySet(), + gcTimesBeforeSleep.keySet()); + List gcDiffs = Lists.newArrayList(); + for (String name : gcBeanNames) { + GcTimes diff = gcTimesAfterSleep.get(name).subtract( + gcTimesBeforeSleep.get(name)); + if (diff.gcCount != 0) { + gcDiffs.add("GC pool '" + name + "' had collection(s): " + + diff.toString()); + } + } + + String ret = "Detected pause in JVM or host machine (eg GC): " + + "pause of approximately " + extraSleepTime + "ms\n"; + if (gcDiffs.isEmpty()) { + ret += "No GCs detected"; + } else { + ret += Joiner.on("\n").join(gcDiffs); + } + return ret; + } + + private Map getGcTimes() { + Map map = Maps.newHashMap(); + List gcBeans = + ManagementFactory.getGarbageCollectorMXBeans(); + for (GarbageCollectorMXBean gcBean : gcBeans) { + map.put(gcBean.getName(), new GcTimes(gcBean)); + } + return map; + } + + private static class GcTimes { + private GcTimes(GarbageCollectorMXBean gcBean) { + gcCount = gcBean.getCollectionCount(); + gcTimeMillis = gcBean.getCollectionTime(); + } + + private GcTimes(long count, long time) { + this.gcCount = count; + this.gcTimeMillis = time; + } + + private GcTimes subtract(GcTimes other) { + return new GcTimes(this.gcCount - other.gcCount, + this.gcTimeMillis - other.gcTimeMillis); + } + + @Override + public String toString() { + return "count=" + gcCount + " time=" + gcTimeMillis + "ms"; + } + + private long gcCount; + private long gcTimeMillis; + } + + private class Monitor implements Runnable { + @Override + public void run() { + Stopwatch sw = new Stopwatch(); + Map gcTimesBeforeSleep = getGcTimes(); + while (shouldRun) { + sw.reset().start(); + try { + Thread.sleep(SLEEP_INTERVAL_MS); + } catch (InterruptedException ie) { + return; + } + long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS; + Map gcTimesAfterSleep = getGcTimes(); + + if (extraSleepTime > warnThresholdMs) { + ++numGcWarnThresholdExceeded; + LOG.warn(formatMessage( + extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); + } else if (extraSleepTime > infoThresholdMs) { + ++numGcInfoThresholdExceeded; + LOG.info(formatMessage( + extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); + } + totalGcExtraSleepTime += extraSleepTime; + gcTimesBeforeSleep = gcTimesAfterSleep; + } + } + } + + /** + * Simple 'main' to facilitate manual testing of the pause monitor. + * + * This main function just leaks memory into a list. Running this class + * with a 1GB heap will very quickly go into "GC hell" and result in + * log messages about the GC pauses. + */ + public static void main(String []args) throws Exception { + new JvmPauseMonitor(new Configuration()).start(); + List list = Lists.newArrayList(); + int i = 0; + while (true) { + list.add(String.valueOf(i++)); + } + } +} \ No newline at end of file diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 280fc2bd09..b3dae8b95e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -32,7 +32,6 @@ import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.CatalogClient; import org.apache.tajo.catalog.CatalogService; -import org.apache.tajo.common.exception.NotImplementedException; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; @@ -45,10 +44,7 @@ import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.HashShuffleAppenderManager; -import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.util.HAServiceUtil; -import org.apache.tajo.util.NetUtils; -import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.*; import org.apache.tajo.util.metrics.TajoSystemMetrics; import org.apache.tajo.webapp.StaticHttpServer; @@ -131,6 +127,8 @@ public class TajoWorker extends CompositeService { private LocalDirAllocator lDirAllocator; + private JvmPauseMonitor pauseMonitor; + public TajoWorker() throws Exception { super(TajoWorker.class.getName()); } @@ -309,12 +307,18 @@ private void initCleanupService() throws IOException { } } + private void startJvmPauseMonitor(){ + pauseMonitor = new JvmPauseMonitor(systemConf); + pauseMonitor.start(); + } + public WorkerContext getWorkerContext() { return workerContext; } @Override public void serviceStart() throws Exception { + startJvmPauseMonitor(); tajoMasterInfo = new TajoMasterInfo(); if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { @@ -372,6 +376,8 @@ public void serviceStop() throws Exception { } if(deletionService != null) deletionService.stop(); + + if(pauseMonitor != null) pauseMonitor.stop(); super.serviceStop(); LOG.info("TajoWorker main thread exiting"); }