diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java index f17ec801c6..83763e8a2b 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java @@ -39,6 +39,7 @@ import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.ShutdownHookManager; import java.io.*; import java.lang.reflect.Constructor; @@ -50,6 +51,7 @@ import java.util.TreeMap; public class TajoCli { + public static final int SHUTDOWN_HOOK_PRIORITY = 50; public static final String ERROR_PREFIX = "ERROR: "; public static final String KILL_PREFIX = "KILL: "; @@ -373,7 +375,7 @@ private void initCommands() { } private void addShutdownHook() { - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + ShutdownHookManager.get().addShutdownHook(new Runnable() { @Override public void run() { try { @@ -382,7 +384,7 @@ public void run() { } client.close(); } - })); + }, SHUTDOWN_HOOK_PRIORITY); } private String updatePrompt(ParsingState state) throws ServiceException { diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index b63d35be1f..ac0ff52704 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -19,7 +19,7 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; -import io.netty.channel.ConnectTimeoutException; +import io.netty.channel.EventLoopGroup; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.SessionVars; @@ -38,16 +38,14 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService.BlockingInterface; import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcChannelFactory; +import org.apache.tajo.rpc.NettyUtils; import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetResponse; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringResponse; import org.apache.tajo.service.ServiceTracker; -import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; -import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; import java.io.Closeable; @@ -57,9 +55,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static org.apache.tajo.error.Errors.ResultCode.NO_SUCH_SESSION_VARIABLE; import static org.apache.tajo.exception.ReturnStateUtil.*; @@ -70,8 +66,6 @@ public class SessionConnection implements Closeable { private final static Log LOG = LogFactory.getLog(SessionConnection.class); - private final static AtomicInteger connections = new AtomicInteger(); - final RpcClientManager manager; private String baseDatabase; @@ -87,6 +81,8 @@ public class SessionConnection implements Closeable { private final ServiceTracker serviceTracker; + private final EventLoopGroup eventLoopGroup; + private NettyClientBase client; private final KeyValueSet properties; @@ -110,7 +106,13 @@ public SessionConnection(@NotNull ServiceTracker tracker, @Nullable String baseD this.manager.setRetries(properties.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES)); this.userInfo = UserRoleInfo.getCurrentUser(); - this.client = getTajoMasterConnection(); + this.eventLoopGroup = NettyUtils.createEventLoopGroup(getClass().getSimpleName(), 4); + try { + this.client = getTajoMasterConnection(); + } catch (TajoRuntimeException e) { + NettyUtils.shutdown(eventLoopGroup); + throw e; + } } public Map getClientSideSessionVars() { @@ -127,16 +129,8 @@ public synchronized NettyClientBase getTajoMasterConnection() { RpcClientManager.cleanup(client); // Client do not closed on idle state for support high available - this.client = manager.newClient( - getTajoMasterAddr(), - TajoMasterClientProtocol.class, - false, - manager.getRetries(), - 0, - TimeUnit.SECONDS, - false); - connections.incrementAndGet(); - + this.client = manager.newBlockingClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, + manager.getRetries(), eventLoopGroup); } catch (Throwable t) { throw new TajoRuntimeException(new ClientConnectionException(t)); } @@ -346,14 +340,7 @@ public void close() { // ignore } finally { RpcClientManager.cleanup(client); - if(connections.decrementAndGet() == 0) { - if (!System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equals(CommonTestingUtil.TAJO_TEST_TRUE)) { - RpcChannelFactory.shutdownGracefully(); - if (LOG.isDebugEnabled()) { - LOG.debug("RPC connection is closed"); - } - } - } + NettyUtils.shutdown(eventLoopGroup); } } @@ -457,5 +444,4 @@ ClientProtos.SessionedStringProto getSessionedString(String str) { } return builder.build(); } - } diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 909f2667f4..0f393f67ed 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -208,7 +208,7 @@ public static enum ConfVars implements ConfigKey { SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num", 2, Validators.min("1")), SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size", 8192), - SHUFFLE_FETCHER_CONNECT_TIMEOUT("tajo.shuffle.fetcher.connect.timeout-sec", 10, Validators.min("1")), + SHUFFLE_FETCHER_CONNECT_TIMEOUT("tajo.shuffle.fetcher.connect.timeout-sec", 60, Validators.min("1")), SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 60, Validators.min("1")), SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 2, Validators.min("0")), SHUFFLE_HASH_APPENDER_BUFFER_SIZE("tajo.shuffle.hash.appender.buffer.size", 10000), diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ShutdownHookManager.java b/tajo-common/src/main/java/org/apache/tajo/util/ShutdownHookManager.java new file mode 100644 index 0000000000..3ec535fc90 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/util/ShutdownHookManager.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The ShutdownHookManager enables running shutdownHook + * in a deterministic order, higher priority first. + *

+ * The JVM runs ShutdownHooks in a non-deterministic order or in parallel. + * This class registers a single JVM shutdownHook and run all the + * shutdownHooks registered to it (to this class) in order based on their + * priority. + * + * this is an implementation copied from hadoop-common + */ +public class ShutdownHookManager { + + private static final ShutdownHookManager MGR = new ShutdownHookManager(); + + private static final Log LOG = LogFactory.getLog(ShutdownHookManager.class); + + static { + Runtime.getRuntime().addShutdownHook( + new Thread() { + @Override + public void run() { + MGR.shutdownInProgress.set(true); + for (Runnable hook: MGR.getShutdownHooksInOrder()) { + try { + hook.run(); + } catch (Throwable ex) { + LOG.warn("ShutdownHook '" + hook.getClass().getSimpleName() + + "' failed, " + ex.toString(), ex); + } + } + } + } + ); + } + + /** + * Return ShutdownHookManager singleton. + * + * @return ShutdownHookManager singleton. + */ + public static ShutdownHookManager get() { + return MGR; + } + + /** + * Private structure to store ShutdownHook and its priority. + */ + private static class HookEntry { + Runnable hook; + int priority; + + public HookEntry(Runnable hook, int priority) { + this.hook = hook; + this.priority = priority; + } + + @Override + public int hashCode() { + return hook.hashCode(); + } + + @Override + public boolean equals(Object obj) { + boolean eq = false; + if (obj != null) { + if (obj instanceof HookEntry) { + eq = (hook == ((HookEntry)obj).hook); + } + } + return eq; + } + + } + + private Set hooks = + Collections.synchronizedSet(new HashSet()); + + private AtomicBoolean shutdownInProgress = new AtomicBoolean(false); + + //private to constructor to ensure singularity + private ShutdownHookManager() { + } + + /** + * Returns the list of shutdownHooks in order of execution, + * Highest priority first. + * + * @return the list of shutdownHooks in order of execution. + */ + List getShutdownHooksInOrder() { + List list; + synchronized (MGR.hooks) { + list = new ArrayList(MGR.hooks); + } + Collections.sort(list, new Comparator() { + + //reversing comparison so highest priority hooks are first + @Override + public int compare(HookEntry o1, HookEntry o2) { + return o2.priority - o1.priority; + } + }); + List ordered = new ArrayList(); + for (HookEntry entry: list) { + ordered.add(entry.hook); + } + return ordered; + } + + /** + * Adds a shutdownHook with a priority, the higher the priority + * the earlier will run. ShutdownHooks with same priority run + * in a non-deterministic order. + * + * @param shutdownHook shutdownHook Runnable + * @param priority priority of the shutdownHook. + */ + public void addShutdownHook(Runnable shutdownHook, int priority) { + if (shutdownHook == null) { + throw new IllegalArgumentException("shutdownHook cannot be NULL"); + } + if (shutdownInProgress.get()) { + throw new IllegalStateException("Shutdown in progress, cannot add a shutdownHook"); + } + hooks.add(new HookEntry(shutdownHook, priority)); + } + + /** + * Removes a shutdownHook. + * + * @param shutdownHook shutdownHook to remove. + * @return TRUE if the shutdownHook was registered and removed, + * FALSE otherwise. + */ + public boolean removeShutdownHook(Runnable shutdownHook) { + if (shutdownInProgress.get()) { + throw new IllegalStateException("Shutdown in progress, cannot remove a shutdownHook"); + } + return hooks.remove(new HookEntry(shutdownHook, 0)); + } + + /** + * Indicates if a shutdownHook is registered or not. + * + * @param shutdownHook shutdownHook to check if registered. + * @return TRUE/FALSE depending if the shutdownHook is is registered. + */ + public boolean hasShutdownHook(Runnable shutdownHook) { + return hooks.contains(new HookEntry(shutdownHook, 0)); + } + + /** + * Indicates if shutdown is in progress or not. + * + * @return TRUE if the shutdown is in progress, otherwise FALSE. + */ + public boolean isShutdownInProgress() { + return shutdownInProgress.get(); + } + +} diff --git a/tajo-core-tests/pom.xml b/tajo-core-tests/pom.xml index 8199f46b40..20b7378ef0 100644 --- a/tajo-core-tests/pom.xml +++ b/tajo-core-tests/pom.xml @@ -335,6 +335,21 @@ junit test + + org.mockito + mockito-core + test + + + org.powermock + powermock-module-junit4 + test + + + org.powermock + powermock-api-mockito + test + diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java index efadc7aee7..38819f1580 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java @@ -40,6 +40,7 @@ import org.apache.tajo.ipc.ClientProtos.QueryHistoryProto; import org.apache.tajo.ipc.ClientProtos.QueryInfoProto; import org.apache.tajo.ipc.ClientProtos.StageHistoryProto; +import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.CommonTestingUtil; @@ -47,6 +48,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.powermock.reflect.Whitebox; import java.io.IOException; import java.io.InputStream; @@ -771,4 +773,28 @@ public int compare(ClientProtos.StageHistoryProto o1, StageHistoryProto o2) { assertEquals(1, taskHistories.get(1).getTotalReadRows()); assertEquals(1, taskHistories.get(1).getTotalWriteRows()); } + + @Test + public void testClientRPCInterference() throws Exception { + TajoClient client = cluster.newTajoClient(); + TajoClient client2 = cluster.newTajoClient(); + + + NettyClientBase rpcClient = Whitebox.getInternalState(client, NettyClientBase.class); + assertNotNull(rpcClient); + + NettyClientBase rpcClient2 = Whitebox.getInternalState(client2, NettyClientBase.class); + assertNotNull(rpcClient); + + assertNotEquals(rpcClient.getChannel().eventLoop(), rpcClient2.getChannel().eventLoop()); + + client.close(); + client2.close(); + + rpcClient.getChannel().eventLoop().terminationFuture().sync(); + assertTrue(rpcClient.getChannel().eventLoop().isTerminated()); + + rpcClient2.getChannel().eventLoop().terminationFuture().sync(); + assertTrue(rpcClient2.getChannel().eventLoop().isTerminated()); + } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index 754df7f6ee..1197e988b1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -49,7 +49,6 @@ import org.apache.tajo.metrics.ClusterResourceMetricSet; import org.apache.tajo.metrics.Master; import org.apache.tajo.plan.function.python.PythonScriptEngine; -import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.rule.EvaluationContext; @@ -86,6 +85,8 @@ public class TajoMaster extends CompositeService { /** Class Logger */ private static final Log LOG = LogFactory.getLog(TajoMaster.class); + public static final int SHUTDOWN_HOOK_PRIORITY = 10; + /** rw-r--r-- */ @SuppressWarnings("OctalInteger") final public static FsPermission TAJO_ROOT_DIR_PERMISSION = FsPermission.createImmutable((short) 0755); @@ -162,7 +163,7 @@ public TajoMasterClientService getTajoMasterClientService() { public void serviceInit(Configuration conf) throws Exception { this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class); - Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook())); + ShutdownHookManager.get().addShutdownHook(new ShutdownHook(), SHUTDOWN_HOOK_PRIORITY); context = new MasterContext(systemConf); clock = new SystemClock(); @@ -547,12 +548,13 @@ public void run() { && AbstractDBStore.needShutdown(catalogServer.getStoreUri())) { DerbyStore.shutdown(); } - RpcChannelFactory.shutdownGracefully(); + RpcClientManager.shutdown(); } } } public static void main(String[] args) throws Exception { + Thread.setDefaultUncaughtExceptionHandler(new TajoUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(TajoMaster.class, args, LOG); try { diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java index ff85a4b2fe..762278b4da 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java @@ -33,7 +33,7 @@ import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.pullserver.retriever.FileChunk; -import org.apache.tajo.rpc.RpcChannelFactory; +import org.apache.tajo.rpc.NettyUtils; import java.io.File; import java.io.FileNotFoundException; @@ -89,7 +89,7 @@ public Fetcher(TajoConf conf, URI uri, FileChunk chunk) { if (!useLocalFile) { bootstrap = new Bootstrap() .group( - RpcChannelFactory.getSharedClientEventloopGroup(RpcChannelFactory.ClientChannelId.FETCHER, + NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER, conf.getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM))) .channel(NioSocketChannel.class) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 65a95110ca..fbb8d5426c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -43,7 +43,6 @@ import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.querymaster.QueryMaster; import org.apache.tajo.querymaster.QueryMasterManagerService; -import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; @@ -78,6 +77,7 @@ public class TajoWorker extends CompositeService { public static final PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build(); public static final PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build(); public static final PrimitiveProtos.NullProto NULL_PROTO = PrimitiveProtos.NullProto.newBuilder().build(); + public static final int SHUTDOWN_HOOK_PRIORITY = 30; private static final Log LOG = LogFactory.getLog(TajoWorker.class); @@ -147,7 +147,7 @@ public void startWorker(TajoConf systemConf, String[] args) { @Override public void serviceInit(Configuration conf) throws Exception { - Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook())); + ShutdownHookManager.get().addShutdownHook(new ShutdownHook(), SHUTDOWN_HOOK_PRIORITY); this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class); RackResolver.init(systemConf); @@ -571,7 +571,7 @@ public void run() { LOG.info("TajoWorker received SIGINT Signal"); LOG.info("============================================"); stop(); - RpcChannelFactory.shutdownGracefully(); + RpcClientManager.shutdown(); } } } diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index 29cf719bc5..59a758f8f1 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -58,7 +58,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.pullserver.retriever.FileChunk; -import org.apache.tajo.rpc.RpcChannelFactory; +import org.apache.tajo.rpc.NettyUtils; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; @@ -190,7 +190,7 @@ public void init(Configuration conf) { int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num", Runtime.getRuntime().availableProcessors() * 2); - selector = RpcChannelFactory.createServerChannelFactory("TajoPullServerService", workerNum) + selector = NettyUtils.createServerBootstrap("TajoPullServerService", workerNum) .option(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.TCP_NODELAY, true); diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java index ad443d7d8d..2c154bf518 100644 --- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java @@ -79,7 +79,7 @@ public void init(ChannelInitializer initializer, int workerNum) { listener.onBeforeInit(this); } - bootstrap = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum); + bootstrap = NettyUtils.createServerBootstrap(serviceName, workerNum); this.initializer = initializer; bootstrap diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java new file mode 100644 index 0000000000..01fd48bcd2 --- /dev/null +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public final class NettyUtils { + private static final Log LOG = LogFactory.getLog(NettyUtils.class); + + private static final int DEFAULT_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2; + + private static final Object lockObjectForLoopGroup = new Object(); + private static AtomicInteger serverCount = new AtomicInteger(0); + + public enum GROUP { + DEFAULT, + FETCHER + } + + private static final Map eventLoopGroupMap = + new ConcurrentHashMap(); + + private NettyUtils(){ + } + + /** + * Get default EventLoopGroup of netty’s. servers and clients can shared it. + */ + public static EventLoopGroup getDefaultEventLoopGroup() { + return getSharedEventLoopGroup(GROUP.DEFAULT, DEFAULT_THREAD_NUM); + } + + /** + * Get EventLoopGroup of netty’s. + * + * @param clientId + * @param threads + * @return A EventLoopGroup by key + */ + public static EventLoopGroup getSharedEventLoopGroup(GROUP clientId, int threads) { + EventLoopGroup returnEventLoopGroup; + + synchronized (lockObjectForLoopGroup) { + if (!eventLoopGroupMap.containsKey(clientId)) { + eventLoopGroupMap.put(clientId, createEventLoopGroup(clientId.name(), threads)); + } + + returnEventLoopGroup = eventLoopGroupMap.get(clientId); + if (isEventLoopGroupShuttingDown(returnEventLoopGroup)) { + returnEventLoopGroup = createEventLoopGroup(clientId.name(), threads); + eventLoopGroupMap.put(clientId, returnEventLoopGroup); + } + } + + return returnEventLoopGroup; + } + + public static EventLoopGroup createEventLoopGroup(String name, int threads) { + if (LOG.isDebugEnabled()) { + LOG.debug("Create " + name + " EventLoopGroup. threads:" + threads); + } + + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + ThreadFactory clientFactory = builder.setNameFormat(name + " #%d").build(); + + return createEventLoopGroup(threads, clientFactory); + } + + protected static boolean isEventLoopGroupShuttingDown(EventLoopGroup eventLoopGroup) { + return ((eventLoopGroup == null) || eventLoopGroup.isShuttingDown()); + } + + private static EventLoopGroup createEventLoopGroup(int threads, ThreadFactory factory) { + return new NioEventLoopGroup(threads, factory); + } + + /** + * Server must release the external resources + */ + public static ServerBootstrap createServerBootstrap(String name, int threads) { + name = name + "-" + serverCount.incrementAndGet(); + + EventLoopGroup eventLoopGroup = createEventLoopGroup(name, threads); + return new ServerBootstrap().group(eventLoopGroup, eventLoopGroup); + } + + public static void shutdownGracefully() { + if (LOG.isDebugEnabled()) { + LOG.debug("Shutdown Shared RPC Pool"); + } + synchronized (lockObjectForLoopGroup) { + for (EventLoopGroup eventLoopGroup : eventLoopGroupMap.values()) { + try { + shutdown(eventLoopGroup).sync(); + } catch (InterruptedException e) { + //ignore + } + } + eventLoopGroupMap.clear(); + } + } + + public static io.netty.util.concurrent.Future shutdown(EventLoopGroup eventLoopGroup) { + if (eventLoopGroup != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Shutdown EventLoopGroup :" + eventLoopGroup.toString()); + } + + return eventLoopGroup.shutdownGracefully(); + } + return null; + } +} diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java deleted file mode 100644 index eb34ca2ffe..0000000000 --- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -public final class RpcChannelFactory { - private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class); - - private static final int DEFAULT_WORKER_NUM = Runtime.getRuntime().availableProcessors() * 2; - - private static final Object lockObjectForLoopGroup = new Object(); - private static AtomicInteger serverCount = new AtomicInteger(0); - - public enum ClientChannelId { - CLIENT_DEFAULT, - FETCHER - } - - private static final Map defaultMaxKeyPoolCount = - new ConcurrentHashMap(); - private static final Map> eventLoopGroupPool = - new ConcurrentHashMap>(); - - private RpcChannelFactory(){ - } - - static { - Runtime.getRuntime().addShutdownHook(new CleanUpHandler()); - - defaultMaxKeyPoolCount.put(ClientChannelId.CLIENT_DEFAULT, 1); - defaultMaxKeyPoolCount.put(ClientChannelId.FETCHER, 1); - } - - /** - * make this factory static thus all clients can share its thread pool. - * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe - */ - public static EventLoopGroup getSharedClientEventloopGroup() { - return getSharedClientEventloopGroup(DEFAULT_WORKER_NUM); - } - - /** - * make this factory static thus all clients can share its thread pool. - * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe - * - * @param workerNum The number of workers - */ - public static EventLoopGroup getSharedClientEventloopGroup(int workerNum){ - //shared woker and boss pool - return getSharedClientEventloopGroup(ClientChannelId.CLIENT_DEFAULT, workerNum); - } - - /** - * This function return eventloopgroup by key. Fetcher client will have one or more eventloopgroup for its throughput. - * - * @param clientId - * @param workerNum - * @return - */ - public static EventLoopGroup getSharedClientEventloopGroup(ClientChannelId clientId, int workerNum) { - Queue eventLoopGroupQueue; - EventLoopGroup returnEventLoopGroup; - - synchronized (lockObjectForLoopGroup) { - eventLoopGroupQueue = eventLoopGroupPool.get(clientId); - if (eventLoopGroupQueue == null) { - eventLoopGroupQueue = createClientEventloopGroups(clientId, workerNum); - } - - returnEventLoopGroup = eventLoopGroupQueue.poll(); - if (isEventLoopGroupShuttingDown(returnEventLoopGroup)) { - returnEventLoopGroup = createClientEventloopGroup(clientId.name(), workerNum); - } - eventLoopGroupQueue.add(returnEventLoopGroup); - } - - return returnEventLoopGroup; - } - - protected static boolean isEventLoopGroupShuttingDown(EventLoopGroup eventLoopGroup) { - return ((eventLoopGroup == null) || eventLoopGroup.isShuttingDown()); - } - - // Client must release the external resources - protected static Queue createClientEventloopGroups(ClientChannelId clientId, int workerNum) { - int defaultMaxObjectCount = defaultMaxKeyPoolCount.get(clientId); - Queue loopGroupQueue = new ConcurrentLinkedQueue(); - eventLoopGroupPool.put(clientId, loopGroupQueue); - - for (int objectIdx = 0; objectIdx < defaultMaxObjectCount; objectIdx++) { - loopGroupQueue.add(createClientEventloopGroup(clientId.name(), workerNum)); - } - - return loopGroupQueue; - } - - protected static EventLoopGroup createClientEventloopGroup(String name, int workerNum) { - if (LOG.isDebugEnabled()) { - LOG.debug("Create " + name + " ClientEventLoopGroup. Worker:" + workerNum); - } - - ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); - ThreadFactory clientFactory = builder.setNameFormat(name + " Client #%d").build(); - - return new NioEventLoopGroup(workerNum, clientFactory); - } - - // Client must release the external resources - public static ServerBootstrap createServerChannelFactory(String name, int workerNum) { - name = name + "-" + serverCount.incrementAndGet(); - if(LOG.isInfoEnabled()){ - LOG.info("Create " + name + " ServerSocketChannelFactory. Worker:" + workerNum); - } - ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); - ThreadFactory bossFactory = builder.setNameFormat(name + " Server Boss #%d").build(); - ThreadFactory workerFactory = builder.setNameFormat(name + " Server Worker #%d").build(); - - EventLoopGroup bossGroup = - new NioEventLoopGroup(1, bossFactory); - EventLoopGroup workerGroup = - new NioEventLoopGroup(workerNum, workerFactory); - - return new ServerBootstrap().group(bossGroup, workerGroup); - } - - public static void shutdownGracefully(){ - if(LOG.isDebugEnabled()) { - LOG.debug("Shutdown Shared RPC Pool"); - } - - synchronized(lockObjectForLoopGroup) { - for (Queue eventLoopGroupQueue: eventLoopGroupPool.values()) { - for (EventLoopGroup eventLoopGroup: eventLoopGroupQueue) { - eventLoopGroup.shutdownGracefully(); - } - - eventLoopGroupQueue.clear(); - } - eventLoopGroupPool.clear(); - } - } - - static class CleanUpHandler extends Thread { - - @Override - public void run() { - RpcChannelFactory.shutdownGracefully(); - } - - } -} diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java index dd7d495bb2..6fb62d4743 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java @@ -18,9 +18,11 @@ package org.apache.tajo.rpc; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.*; import io.netty.channel.ChannelHandler; +import io.netty.channel.EventLoopGroup; import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey; import org.apache.tajo.rpc.RpcProtos.RpcResponse; @@ -34,9 +36,10 @@ public class AsyncRpcClient extends NettyClientBase protocol, final Object instance, final InetSocketAddress bindAddress, - final int workerNum) + final int threads) throws Exception { super(protocol.getSimpleName(), bindAddress); @@ -54,7 +54,7 @@ public AsyncRpcServer(final Class protocol, this.service = (Service) method.invoke(null, instance); this.initializer = new ProtoServerChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance()); - super.init(this.initializer, workerNum); + super.init(this.initializer, threads); } @ChannelHandler.Sharable diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java index 349a0a081c..4327003b67 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java @@ -18,9 +18,11 @@ package org.apache.tajo.rpc; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.*; import com.google.protobuf.Descriptors.MethodDescriptor; import io.netty.channel.ChannelHandler; +import io.netty.channel.EventLoopGroup; import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey; import org.apache.tajo.rpc.RpcProtos.RpcResponse; @@ -35,9 +37,10 @@ public class BlockingRpcClient extends NettyClientBase protocol, final Object instance, final InetSocketAddress bindAddress, - final int workerNum) + final int threads) throws Exception { super(protocol.getSimpleName(), bindAddress); @@ -55,7 +55,7 @@ public BlockingRpcServer(final Class protocol, this.service = (BlockingService) method.invoke(null, instance); this.initializer = new ProtoServerChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance()); - super.init(this.initializer, workerNum); + super.init(this.initializer, threads); } @ChannelHandler.Sharable diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java index 5f76bfc5e0..c6d90ed53f 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java @@ -39,7 +39,6 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.net.UnknownHostException; import java.nio.channels.UnresolvedAddressException; import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; @@ -66,10 +65,10 @@ public NettyClientBase(RpcConnectionKey rpcConnectionKey, int numRetries) } // should be called from sub class - protected void init(ChannelInitializer initializer) { + protected void init(ChannelInitializer initializer, EventLoopGroup eventLoopGroup) { this.bootstrap = new Bootstrap(); this.bootstrap - .group(RpcChannelFactory.getSharedClientEventloopGroup()) + .group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(initializer) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) @@ -117,6 +116,11 @@ protected static RpcProtos.RpcRequest buildRequest(int seqId, */ protected void invoke(final RpcProtos.RpcRequest rpcRequest, final T callback, final int retry) { + if(getChannel().eventLoop().isShuttingDown()) { + LOG.warn("RPC is shutting down"); + return; + } + ChannelPromise promise = getChannel().newPromise(); promise.addListener(new GenericFutureListener() { @@ -197,6 +201,11 @@ private void doReconnect(final InetSocketAddress address, ChannelFuture future, if (maxRetries > retries) { retries++; + if(getChannel().eventLoop().isShuttingDown()) { + LOG.warn("RPC is shutting down"); + return; + } + LOG.warn(getErrorMessage(ExceptionUtils.getMessage(future.cause())) + "\nTry to reconnect : " + getKey().addr); try { Thread.sleep(RpcConstants.DEFAULT_PAUSE); diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java index 111754eb4a..aa7ba67089 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java @@ -19,6 +19,7 @@ package org.apache.tajo.rpc; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.internal.logging.CommonsLoggerFactory; @@ -64,12 +65,22 @@ private T makeClient(RpcConnectionKey rpcConnectionK long timeout, TimeUnit timeUnit, boolean enablePing) + throws NoSuchMethodException, ConnectException, ClassNotFoundException { + return makeClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing, NettyUtils.getDefaultEventLoopGroup()); + } + + private T makeClient(RpcConnectionKey rpcConnectionKey, + int retries, + long timeout, + TimeUnit timeUnit, + boolean enablePing, + EventLoopGroup eventLoopGroup) throws NoSuchMethodException, ClassNotFoundException, ConnectException { NettyClientBase client; if (rpcConnectionKey.asyncMode) { - client = new AsyncRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing); + client = new AsyncRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing, eventLoopGroup); } else { - client = new BlockingRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing); + client = new BlockingRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing, eventLoopGroup); } return (T) client; } @@ -152,6 +163,19 @@ public synchronized T newClient(RpcConnectionKey key return client; } + public synchronized T newBlockingClient(InetSocketAddress addr, + Class protocolClass, + int retries, + EventLoopGroup eventLoopGroup) + throws NoSuchMethodException, ClassNotFoundException, ConnectException { + + T client = makeClient(new RpcConnectionKey(addr, protocolClass, false), + retries, 0, TimeUnit.SECONDS, false, eventLoopGroup); + client.connect(); + assert client.isConnected(); + return client; + } + /** * Request to close this clients * After it is closed, it is removed from clients map. @@ -174,7 +198,7 @@ public static void close() { */ public static void shutdown() { close(); - RpcChannelFactory.shutdownGracefully(); + NettyUtils.shutdownGracefully(); } protected static boolean contains(RpcConnectionKey key) { diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java index 18c7d80fdd..4f174763b0 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java @@ -139,7 +139,7 @@ public void setUpRpcClient() throws Exception { @AfterClass public static void tearDownClass() throws Exception { - RpcChannelFactory.shutdownGracefully(); + RpcClientManager.shutdown(); } public void tearDownRpcServer() throws Exception { diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java index 9f95f58875..0fae7ee093 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java @@ -130,7 +130,7 @@ public void setUpRpcClient() throws Exception { @AfterClass public static void tearDownClass() throws Exception { - RpcChannelFactory.shutdownGracefully(); + RpcClientManager.shutdown(); } public void tearDownRpcServer() throws Exception {