Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
TAJO-1405: Fix some illegal way of usages on connection pool. (Contri…
Browse files Browse the repository at this point in the history
…buted by navis, Committed by Keuntae Park)

Closes #425
  • Loading branch information
sirpkt committed Mar 17, 2015
1 parent 0dc7d68 commit 286b956
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 60 deletions.
3 changes: 3 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ Release 0.11.0 - unreleased

BUG FIXES

TAJO-1405: Fix some illegal way of usages on connection pool.
(Contributed by navis, Committed by Keuntae Park)

TAJO-1384: Duplicated output file path problem. (jihoon)

TAJO-1386: CURRENT_DATE generates parsing errors sometimes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class SessionConnection implements Closeable {

final RpcConnectionPool connPool;

private final String baseDatabase;
private String baseDatabase;

private final UserRoleInfo userInfo;

Expand Down Expand Up @@ -260,7 +260,8 @@ public Map<String, String> call(NettyClientBase client) throws ServiceException
}

public Boolean selectDatabase(final String databaseName) throws ServiceException {
return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
Boolean selected = new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
TajoMasterClientProtocol.class, false, true) {

public Boolean call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
Expand All @@ -269,6 +270,11 @@ public Boolean call(NettyClientBase client) throws ServiceException {
return tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue();
}
}.withRetries();

if (selected == Boolean.TRUE) {
this.baseDatabase = databaseName;
}
return selected;
}

@Override
Expand All @@ -278,13 +284,15 @@ public void close() {
}

// remove session
NettyClientBase client = null;
try {

NettyClientBase client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub();
tajoMaster.removeSession(null, sessionId);

} catch (Throwable e) {
// ignore
} finally {
connPool.releaseConnection(client);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.Pair;
import org.apache.tajo.worker.event.TaskRunnerStartEvent;

import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoopGroup;
Expand Down Expand Up @@ -139,7 +137,17 @@ public void init() throws Throwable {
try{
this.resource.initialize(queryContext, plan);
} catch (Throwable e) {
getQueryMasterStub().killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
try {
NettyClientBase client = getQueryMasterConnection();
try {
QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
stub.killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
} finally {
connPool.releaseConnection(client);
}
} catch (Throwable t) {
//ignore
}
throw e;
}
}
Expand All @@ -148,15 +156,13 @@ public ExecutionBlockSharedResource getSharedResource() {
return resource;
}

public QueryMasterProtocol.QueryMasterProtocolService.Interface getQueryMasterStub()
public NettyClientBase getQueryMasterConnection()
throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException {
NettyClientBase clientBase = null;
try {
clientBase = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
return clientBase.getStub();
} finally {
connPool.releaseConnection(clientBase);
}
return connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
}

public void releaseConnection(NettyClientBase connection) {
connPool.releaseConnection(connection);
}

public void stop(){
Expand Down Expand Up @@ -267,7 +273,13 @@ public TajoWorker.WorkerContext getWorkerContext(){
}

private void sendExecutionBlockReport(ExecutionBlockReport reporter) throws Exception {
getQueryMasterStub().doneExecutionBlock(null, reporter, NullCallback.get());
NettyClientBase client = getQueryMasterConnection();
try {
QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
stub.doneExecutionBlock(null, reporter, NullCallback.get());
} finally {
connPool.releaseConnection(client);
}
}

protected void reportExecutionBlock(ExecutionBlockId ebId) {
Expand Down Expand Up @@ -361,12 +373,14 @@ Runnable createReporterThread() {

return new Runnable() {
int remainingRetries = MAX_RETRIES;
QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub;
@Override
public void run() {
while (!reporterStop.get() && !Thread.interrupted()) {

NettyClientBase client = null;
try {
masterStub = getQueryMasterStub();
client = getQueryMasterConnection();
QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub = client.getStub();

if(tasks.size() == 0){
masterStub.ping(null, getExecutionBlockId().getProto(), NullCallback.get());
Expand All @@ -391,6 +405,7 @@ public void run() {
throw new RuntimeException(t);
}
} finally {
releaseConnection(client);
if (remainingRetries > 0 && !reporterStop.get()) {
synchronized (reporterThread) {
try {
Expand Down
75 changes: 41 additions & 34 deletions tajo-core/src/main/java/org/apache/tajo/worker/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
Expand Down Expand Up @@ -424,46 +425,52 @@ public void run() throws Exception {

executionBlockContext.completedTasksNum.incrementAndGet();
context.getHashShuffleAppenderManager().finalizeTask(taskId);
QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getQueryMasterStub();
if (context.isStopped()) {
context.setExecutorProgress(0.0f);

if(context.getState() == TaskAttemptState.TA_KILLED) {
queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
executionBlockContext.killedTasksNum.incrementAndGet();
} else {
context.setState(TaskAttemptState.TA_FAILED);
TaskFatalErrorReport.Builder errorBuilder =
TaskFatalErrorReport.newBuilder()
.setId(getId().getProto());
if (error != null) {
if (error.getMessage() == null) {
errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
} else {
errorBuilder.setErrorMessage(error.getMessage());
NettyClientBase client = executionBlockContext.getQueryMasterConnection();
try {
QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = client.getStub();
if (context.isStopped()) {
context.setExecutorProgress(0.0f);

if (context.getState() == TaskAttemptState.TA_KILLED) {
queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
executionBlockContext.killedTasksNum.incrementAndGet();
} else {
context.setState(TaskAttemptState.TA_FAILED);
TaskFatalErrorReport.Builder errorBuilder =
TaskFatalErrorReport.newBuilder()
.setId(getId().getProto());
if (error != null) {
if (error.getMessage() == null) {
errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
} else {
errorBuilder.setErrorMessage(error.getMessage());
}
errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
}
errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));

queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
executionBlockContext.failedTasksNum.incrementAndGet();
}
} else {
// if successful
context.setProgress(1.0f);
context.setState(TaskAttemptState.TA_SUCCEEDED);
executionBlockContext.succeededTasksNum.incrementAndGet();

queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
executionBlockContext.failedTasksNum.incrementAndGet();
TaskCompletionReport report = getTaskCompletionReport();
queryMasterStub.done(null, report, NullCallback.get());
}
} else {
// if successful
context.setProgress(1.0f);
context.setState(TaskAttemptState.TA_SUCCEEDED);
executionBlockContext.succeededTasksNum.incrementAndGet();

TaskCompletionReport report = getTaskCompletionReport();
queryMasterStub.done(null, report, NullCallback.get());
finishTime = System.currentTimeMillis();
LOG.info(context.getTaskId() + " completed. " +
"Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
+ ", killed: " + executionBlockContext.killedTasksNum.intValue()
+ ", failed: " + executionBlockContext.failedTasksNum.intValue());
cleanupTask();
} finally {
executionBlockContext.releaseConnection(client);
}
finishTime = System.currentTimeMillis();
LOG.info(context.getTaskId() + " completed. " +
"Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
+ ", killed: " + executionBlockContext.killedTasksNum.intValue()
+ ", failed: " + executionBlockContext.failedTasksNum.intValue());
cleanupTask();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.tajo.master.container.TajoContainerIdPBImpl;
import org.apache.tajo.master.container.TajoConverterUtils;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;

import io.netty.channel.ConnectTimeoutException;
Expand Down Expand Up @@ -196,9 +197,9 @@ public void run() {
TaskRequestProto taskRequest = null;

while(!stopped) {
QueryMasterProtocolService.Interface qmClientService;
NettyClientBase client;
try {
qmClientService = getContext().getQueryMasterStub();
client = executionBlockContext.getQueryMasterConnection();
} catch (ConnectTimeoutException ce) {
// NettyClientBase throws ConnectTimeoutException if connection was failed
stop();
Expand All @@ -212,6 +213,8 @@ public void run() {
break;
}

QueryMasterProtocolService.Interface qmClientService = client.getStub();

try {
if (callFuture == null) {
callFuture = new CallFuture<TaskRequestProto>();
Expand Down Expand Up @@ -296,6 +299,8 @@ public void run() {
}
} catch (Throwable t) {
LOG.fatal(t.getMessage(), t);
} finally {
executionBlockContext.releaseConnection(client);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class AsyncRpcClient extends NettyClientBase {
private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);

private final Map<Integer, ResponseCallback> requests =
private final ConcurrentMap<Integer, ResponseCallback> requests =
new ConcurrentHashMap<Integer, ResponseCallback>();

private final Method stubMethod;
Expand Down Expand Up @@ -178,14 +179,12 @@ private String getErrorMessage(String message) {
@ChannelHandler.Sharable
private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter {

synchronized void registerCallback(int seqId, ResponseCallback callback) {
void registerCallback(int seqId, ResponseCallback callback) {

if (requests.containsKey(seqId)) {
if (requests.putIfAbsent(seqId, callback) != null) {
throw new RemoteException(
getErrorMessage("Duplicate Sequence Id "+ seqId));
}

requests.put(seqId, callback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ private boolean handleConnectionInternally(final InetSocketAddress addr, long ti
final CountDownLatch ticket = new CountDownLatch(1);
final CountDownLatch granted = connect.check(ticket);

// basically, it's double checked lock
if (ticket == granted && isConnected()) {
granted.countDown();
return true;
}

if (ticket == granted) {
connectUsingNetty(addr, new RetryConnectionListener(addr, granted));
}
Expand Down

0 comments on commit 286b956

Please sign in to comment.