Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.ShutdownHookManager;

import java.io.*;
import java.lang.reflect.Constructor;
Expand All @@ -50,6 +51,7 @@
import java.util.TreeMap;

public class TajoCli {
public static final int SHUTDOWN_HOOK_PRIORITY = 50;
public static final String ERROR_PREFIX = "ERROR: ";
public static final String KILL_PREFIX = "KILL: ";

Expand Down Expand Up @@ -373,7 +375,7 @@ private void initCommands() {
}

private void addShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
ShutdownHookManager.get().addShutdownHook(new Runnable() {
@Override
public void run() {
try {
Expand All @@ -382,7 +384,7 @@ public void run() {
}
client.close();
}
}));
}, SHUTDOWN_HOOK_PRIORITY);
}

private String updatePrompt(ParsingState state) throws ServiceException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.tajo.client;

import com.google.protobuf.ServiceException;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoopGroup;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.SessionVars;
Expand All @@ -38,16 +38,14 @@
import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService.BlockingInterface;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.rpc.NettyUtils;
import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.rpc.RpcConstants;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetResponse;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringResponse;
import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.ProtoUtil;

import java.io.Closeable;
Expand All @@ -57,9 +55,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.tajo.error.Errors.ResultCode.NO_SUCH_SESSION_VARIABLE;
import static org.apache.tajo.exception.ReturnStateUtil.*;
Expand All @@ -70,8 +66,6 @@ public class SessionConnection implements Closeable {

private final static Log LOG = LogFactory.getLog(SessionConnection.class);

private final static AtomicInteger connections = new AtomicInteger();

final RpcClientManager manager;

private String baseDatabase;
Expand All @@ -87,6 +81,8 @@ public class SessionConnection implements Closeable {

private final ServiceTracker serviceTracker;

private final EventLoopGroup eventLoopGroup;

private NettyClientBase client;

private final KeyValueSet properties;
Expand All @@ -110,7 +106,13 @@ public SessionConnection(@NotNull ServiceTracker tracker, @Nullable String baseD
this.manager.setRetries(properties.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES));
this.userInfo = UserRoleInfo.getCurrentUser();

this.client = getTajoMasterConnection();
this.eventLoopGroup = NettyUtils.createEventLoopGroup(getClass().getSimpleName(), 4);
try {
this.client = getTajoMasterConnection();
} catch (TajoRuntimeException e) {
NettyUtils.shutdown(eventLoopGroup);
throw e;
}
}

public Map<String, String> getClientSideSessionVars() {
Expand All @@ -127,16 +129,8 @@ public synchronized NettyClientBase getTajoMasterConnection() {
RpcClientManager.cleanup(client);

// Client do not closed on idle state for support high available
this.client = manager.newClient(
getTajoMasterAddr(),
TajoMasterClientProtocol.class,
false,
manager.getRetries(),
0,
TimeUnit.SECONDS,
false);
connections.incrementAndGet();

this.client = manager.newBlockingClient(getTajoMasterAddr(), TajoMasterClientProtocol.class,
manager.getRetries(), eventLoopGroup);
} catch (Throwable t) {
throw new TajoRuntimeException(new ClientConnectionException(t));
}
Expand Down Expand Up @@ -346,14 +340,7 @@ public void close() {
// ignore
} finally {
RpcClientManager.cleanup(client);
if(connections.decrementAndGet() == 0) {
if (!System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equals(CommonTestingUtil.TAJO_TEST_TRUE)) {
RpcChannelFactory.shutdownGracefully();
if (LOG.isDebugEnabled()) {
LOG.debug("RPC connection is closed");
}
}
}
NettyUtils.shutdown(eventLoopGroup);
}
}

Expand Down Expand Up @@ -457,5 +444,4 @@ ClientProtos.SessionedStringProto getSessionedString(String str) {
}
return builder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public static enum ConfVars implements ConfigKey {
SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num",
2, Validators.min("1")),
SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size", 8192),
SHUFFLE_FETCHER_CONNECT_TIMEOUT("tajo.shuffle.fetcher.connect.timeout-sec", 10, Validators.min("1")),
SHUFFLE_FETCHER_CONNECT_TIMEOUT("tajo.shuffle.fetcher.connect.timeout-sec", 60, Validators.min("1")),
SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 60, Validators.min("1")),
SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 2, Validators.min("0")),
SHUFFLE_HASH_APPENDER_BUFFER_SIZE("tajo.shuffle.hash.appender.buffer.size", 10000),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tajo.util;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* The <code>ShutdownHookManager</code> enables running shutdownHook
* in a deterministic order, higher priority first.
* <p/>
* The JVM runs ShutdownHooks in a non-deterministic order or in parallel.
* This class registers a single JVM shutdownHook and run all the
* shutdownHooks registered to it (to this class) in order based on their
* priority.
*
* this is an implementation copied from hadoop-common
*/
public class ShutdownHookManager {

private static final ShutdownHookManager MGR = new ShutdownHookManager();

private static final Log LOG = LogFactory.getLog(ShutdownHookManager.class);

static {
Runtime.getRuntime().addShutdownHook(
new Thread() {
@Override
public void run() {
MGR.shutdownInProgress.set(true);
for (Runnable hook: MGR.getShutdownHooksInOrder()) {
try {
hook.run();
} catch (Throwable ex) {
LOG.warn("ShutdownHook '" + hook.getClass().getSimpleName() +
"' failed, " + ex.toString(), ex);
}
}
}
}
);
}

/**
* Return <code>ShutdownHookManager</code> singleton.
*
* @return <code>ShutdownHookManager</code> singleton.
*/
public static ShutdownHookManager get() {
return MGR;
}

/**
* Private structure to store ShutdownHook and its priority.
*/
private static class HookEntry {
Runnable hook;
int priority;

public HookEntry(Runnable hook, int priority) {
this.hook = hook;
this.priority = priority;
}

@Override
public int hashCode() {
return hook.hashCode();
}

@Override
public boolean equals(Object obj) {
boolean eq = false;
if (obj != null) {
if (obj instanceof HookEntry) {
eq = (hook == ((HookEntry)obj).hook);
}
}
return eq;
}

}

private Set<HookEntry> hooks =
Collections.synchronizedSet(new HashSet<HookEntry>());

private AtomicBoolean shutdownInProgress = new AtomicBoolean(false);

//private to constructor to ensure singularity
private ShutdownHookManager() {
}

/**
* Returns the list of shutdownHooks in order of execution,
* Highest priority first.
*
* @return the list of shutdownHooks in order of execution.
*/
List<Runnable> getShutdownHooksInOrder() {
List<HookEntry> list;
synchronized (MGR.hooks) {
list = new ArrayList<HookEntry>(MGR.hooks);
}
Collections.sort(list, new Comparator<HookEntry>() {

//reversing comparison so highest priority hooks are first
@Override
public int compare(HookEntry o1, HookEntry o2) {
return o2.priority - o1.priority;
}
});
List<Runnable> ordered = new ArrayList<Runnable>();
for (HookEntry entry: list) {
ordered.add(entry.hook);
}
return ordered;
}

/**
* Adds a shutdownHook with a priority, the higher the priority
* the earlier will run. ShutdownHooks with same priority run
* in a non-deterministic order.
*
* @param shutdownHook shutdownHook <code>Runnable</code>
* @param priority priority of the shutdownHook.
*/
public void addShutdownHook(Runnable shutdownHook, int priority) {
if (shutdownHook == null) {
throw new IllegalArgumentException("shutdownHook cannot be NULL");
}
if (shutdownInProgress.get()) {
throw new IllegalStateException("Shutdown in progress, cannot add a shutdownHook");
}
hooks.add(new HookEntry(shutdownHook, priority));
}

/**
* Removes a shutdownHook.
*
* @param shutdownHook shutdownHook to remove.
* @return TRUE if the shutdownHook was registered and removed,
* FALSE otherwise.
*/
public boolean removeShutdownHook(Runnable shutdownHook) {
if (shutdownInProgress.get()) {
throw new IllegalStateException("Shutdown in progress, cannot remove a shutdownHook");
}
return hooks.remove(new HookEntry(shutdownHook, 0));
}

/**
* Indicates if a shutdownHook is registered or not.
*
* @param shutdownHook shutdownHook to check if registered.
* @return TRUE/FALSE depending if the shutdownHook is is registered.
*/
public boolean hasShutdownHook(Runnable shutdownHook) {
return hooks.contains(new HookEntry(shutdownHook, 0));
}

/**
* Indicates if shutdown is in progress or not.
*
* @return TRUE if the shutdown is in progress, otherwise FALSE.
*/
public boolean isShutdownInProgress() {
return shutdownInProgress.get();
}

}
15 changes: 15 additions & 0 deletions tajo-core-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,21 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Loading