From 985f61d6b1c71a7b261a5f91496fde104b61d708 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 26 May 2015 16:13:28 +0900 Subject: [PATCH 1/6] TAJO-1619: JDBC program is stuck after closing. (jihoon) --- CHANGES | 2 ++ .../org/apache/tajo/client/SessionConnection.java | 14 ++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/CHANGES b/CHANGES index 50b735c296..5baaa53442 100644 --- a/CHANGES +++ b/CHANGES @@ -40,6 +40,8 @@ Release 0.10.1 - unreleased BUG FIXES + TAJO-1619: JDBC program is stuck after closing. (jihoon) + TAJO-1612: TestKillQuery occassionally fails. (hyunsik) TAJO-1440: Some tests fail in parallel test environment in TestKillQuery. diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index b0cc662225..187af339b8 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -30,9 +30,11 @@ import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse; import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.ServerCallable; import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.ProtoUtil; @@ -46,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.tajo.ipc.ClientProtos.CreateSessionRequest; import static org.apache.tajo.ipc.ClientProtos.CreateSessionResponse; @@ -55,6 +58,8 @@ public class SessionConnection implements Closeable { private final Log LOG = LogFactory.getLog(TajoClientImpl.class); + private final static AtomicInteger connections = new AtomicInteger(); + final RpcClientManager manager; private final String baseDatabase; @@ -91,6 +96,7 @@ public SessionConnection(ServiceTracker tracker, @Nullable String baseDatabase, this.baseDatabase = baseDatabase != null ? baseDatabase : null; this.serviceTracker = tracker; + connections.incrementAndGet(); } public Map getClientSideSessionVars() { @@ -287,6 +293,14 @@ public void close() { // ignore } finally { RpcClientManager.cleanup(client); + if(connections.decrementAndGet() == 0) { + if (!System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equals(CommonTestingUtil.TAJO_TEST_TRUE)) { + RpcChannelFactory.shutdownGracefully(); + if (LOG.isDebugEnabled()) { + LOG.debug("RPC connection is closed"); + } + } + } } } From 170e3faf71f35f1f668abfd8f72b9cbf0f728bdf Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Tue, 26 May 2015 16:32:32 +0900 Subject: [PATCH 2/6] TAJO-1621 --- .../apache/tajo/engine/utils/ThreadUtil.java | 149 ------------------ 1 file changed, 149 deletions(-) delete mode 100644 tajo-core/src/main/java/org/apache/tajo/engine/utils/ThreadUtil.java diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/ThreadUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/ThreadUtil.java deleted file mode 100644 index 23b1e5d24d..0000000000 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/ThreadUtil.java +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.engine.utils; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.util.ReflectionUtils; - -import java.io.PrintWriter; -import java.lang.Thread.UncaughtExceptionHandler; - -public class ThreadUtil { - protected static final Log LOG = LogFactory.getLog(ThreadUtil.class); - - /** - * Utility method that sets name, daemon status and starts passed thread. - * @param t thread to run - * @return Returns the passed Thread t. - */ - public static Thread setDaemonThreadRunning(final Thread t) { - return setDaemonThreadRunning(t, t.getName()); - } - - /** - * Utility method that sets name, daemon status and starts passed thread. - * @param t thread to frob - * @param name new name - * @return Returns the passed Thread t. - */ - public static Thread setDaemonThreadRunning(final Thread t, - final String name) { - return setDaemonThreadRunning(t, name, null); - } - - /** - * Utility method that sets name, daemon status and starts passed thread. - * @param t thread to frob - * @param name new name - * @param handler A handler to set on the thread. Pass null if want to - * use default handler. - * @return Returns the passed Thread t. - */ - public static Thread setDaemonThreadRunning(final Thread t, - final String name, final UncaughtExceptionHandler handler) { - t.setName(name); - if (handler != null) { - t.setUncaughtExceptionHandler(handler); - } - t.setDaemon(true); - t.start(); - return t; - } - - /** - * Shutdown passed thread using isAlive and join. - * @param t Thread to shutdown - */ - public static void shutdown(final Thread t) { - shutdown(t, 0); - } - - /** - * Shutdown passed thread using isAlive and join. - * @param joinwait Pass 0 if we're to wait forever. - * @param t Thread to shutdown - */ - public static void shutdown(final Thread t, final long joinwait) { - if (t == null) return; - while (t.isAlive()) { - try { - t.join(joinwait); - } catch (InterruptedException e) { - LOG.warn(t.getName() + "; joinwait=" + joinwait, e); - } - } - } - - - /** - * @param t Waits on the passed thread to die dumping a threaddump every - * minute while its up. - * @throws InterruptedException - */ - public static void threadDumpingIsAlive(final Thread t) - throws InterruptedException { - if (t == null) { - return; - } - - while (t.isAlive()) { - t.join(60 * 1000); - if (t.isAlive()) { - ReflectionUtils.printThreadInfo(new PrintWriter(System.out), - "Automatic Stack Trace every 60 seconds waiting on " + - t.getName()); - } - } - } - - /** - * @param millis How long to sleep for in milliseconds. - */ - public static void sleep(int millis) { - try { - Thread.sleep(millis); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - /** - * Sleeps for the given amount of time even if interrupted. Preserves - * the interrupt status. - * @param msToWait the amount of time to sleep in milliseconds - */ - public static void sleepWithoutInterrupt(final long msToWait) { - long timeMillis = System.currentTimeMillis(); - long endTime = timeMillis + msToWait; - boolean interrupted = false; - while (timeMillis < endTime) { - try { - Thread.sleep(endTime - timeMillis); - } catch (InterruptedException ex) { - interrupted = true; - } - timeMillis = System.currentTimeMillis(); - } - - if (interrupted) { - Thread.currentThread().interrupt(); - } - } -} From 92926b40a8ee5e78be575ea095d5d2b5da23457a Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Tue, 26 May 2015 17:10:12 +0900 Subject: [PATCH 3/6] TAJO-1621 --- .../apache/tajo/engine/utils/ThreadUtil.java | 149 ------------------ 1 file changed, 149 deletions(-) delete mode 100644 tajo-core/src/main/java/org/apache/tajo/engine/utils/ThreadUtil.java diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/ThreadUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/ThreadUtil.java deleted file mode 100644 index 23b1e5d24d..0000000000 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/ThreadUtil.java +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.engine.utils; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.util.ReflectionUtils; - -import java.io.PrintWriter; -import java.lang.Thread.UncaughtExceptionHandler; - -public class ThreadUtil { - protected static final Log LOG = LogFactory.getLog(ThreadUtil.class); - - /** - * Utility method that sets name, daemon status and starts passed thread. - * @param t thread to run - * @return Returns the passed Thread t. - */ - public static Thread setDaemonThreadRunning(final Thread t) { - return setDaemonThreadRunning(t, t.getName()); - } - - /** - * Utility method that sets name, daemon status and starts passed thread. - * @param t thread to frob - * @param name new name - * @return Returns the passed Thread t. - */ - public static Thread setDaemonThreadRunning(final Thread t, - final String name) { - return setDaemonThreadRunning(t, name, null); - } - - /** - * Utility method that sets name, daemon status and starts passed thread. - * @param t thread to frob - * @param name new name - * @param handler A handler to set on the thread. Pass null if want to - * use default handler. - * @return Returns the passed Thread t. - */ - public static Thread setDaemonThreadRunning(final Thread t, - final String name, final UncaughtExceptionHandler handler) { - t.setName(name); - if (handler != null) { - t.setUncaughtExceptionHandler(handler); - } - t.setDaemon(true); - t.start(); - return t; - } - - /** - * Shutdown passed thread using isAlive and join. - * @param t Thread to shutdown - */ - public static void shutdown(final Thread t) { - shutdown(t, 0); - } - - /** - * Shutdown passed thread using isAlive and join. - * @param joinwait Pass 0 if we're to wait forever. - * @param t Thread to shutdown - */ - public static void shutdown(final Thread t, final long joinwait) { - if (t == null) return; - while (t.isAlive()) { - try { - t.join(joinwait); - } catch (InterruptedException e) { - LOG.warn(t.getName() + "; joinwait=" + joinwait, e); - } - } - } - - - /** - * @param t Waits on the passed thread to die dumping a threaddump every - * minute while its up. - * @throws InterruptedException - */ - public static void threadDumpingIsAlive(final Thread t) - throws InterruptedException { - if (t == null) { - return; - } - - while (t.isAlive()) { - t.join(60 * 1000); - if (t.isAlive()) { - ReflectionUtils.printThreadInfo(new PrintWriter(System.out), - "Automatic Stack Trace every 60 seconds waiting on " + - t.getName()); - } - } - } - - /** - * @param millis How long to sleep for in milliseconds. - */ - public static void sleep(int millis) { - try { - Thread.sleep(millis); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - /** - * Sleeps for the given amount of time even if interrupted. Preserves - * the interrupt status. - * @param msToWait the amount of time to sleep in milliseconds - */ - public static void sleepWithoutInterrupt(final long msToWait) { - long timeMillis = System.currentTimeMillis(); - long endTime = timeMillis + msToWait; - boolean interrupted = false; - while (timeMillis < endTime) { - try { - Thread.sleep(endTime - timeMillis); - } catch (InterruptedException ex) { - interrupted = true; - } - timeMillis = System.currentTimeMillis(); - } - - if (interrupted) { - Thread.currentThread().interrupt(); - } - } -} From 523e26c32735bae720b697d21520ddcc2e15b71f Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Tue, 26 May 2015 18:57:19 +0900 Subject: [PATCH 4/6] trimStackTrace to false --- tajo-core/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml index bc11d1c0c9..15c3e86af6 100644 --- a/tajo-core/pom.xml +++ b/tajo-core/pom.xml @@ -733,6 +733,7 @@ ${maven.fork.count} true + false -Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8 true true From 3ad0a6b21ccefe5a1c6d1093861332e1b1e62834 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Tue, 26 May 2015 20:48:20 +0900 Subject: [PATCH 5/6] decrease the hbase batch pool for testing --- tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 17348e1b25..79a5944309 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -510,6 +510,7 @@ public void startMiniCluster(final int numSlaves, final String [] dataNodeHosts) startMiniDFSCluster(numDataNodes, clusterTestBuildDir, dataNodeHosts); this.dfsCluster.waitClusterUp(); + conf.setInt("hbase.hconnection.threads.core", 50); hbaseUtil = new HBaseTestClusterUtil(conf, clusterTestBuildDir); if(!standbyWorkerMode) { From 22d492a4670cf6205051b0117e0d69254348bb79 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Tue, 26 May 2015 22:38:09 +0900 Subject: [PATCH 6/6] move Thread.sleep in PingChecker --- .../java/org/apache/tajo/ha/HdfsServiceTracker.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java index 5f1aff8b9d..d0eb9852f8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java +++ b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java @@ -350,6 +350,13 @@ private class PingChecker implements Runnable { @Override public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(monitorInterval); + } catch (InterruptedException e) { + LOG.info("PingChecker interrupted. - masterName:" + masterName); + break; + } + synchronized (HdfsServiceTracker.this) { try { if (!currentActiveMaster.equals(masterName)) { @@ -371,12 +378,6 @@ public void run() { e.printStackTrace(); } } - try { - Thread.sleep(monitorInterval); - } catch (InterruptedException e) { - LOG.info("PingChecker interrupted. - masterName:" + masterName); - break; - } } } }