Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

TAJO-1405 Fix some illegal way of usages on connection pool #425

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class SessionConnection implements Closeable {

final RpcConnectionPool connPool;

private final String baseDatabase;
private String baseDatabase;

private final UserRoleInfo userInfo;

Expand Down Expand Up @@ -260,7 +260,8 @@ public Map<String, String> call(NettyClientBase client) throws ServiceException
}

public Boolean selectDatabase(final String databaseName) throws ServiceException {
return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
Boolean selected = new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
TajoMasterClientProtocol.class, false, true) {

public Boolean call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
Expand All @@ -269,6 +270,11 @@ public Boolean call(NettyClientBase client) throws ServiceException {
return tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue();
}
}.withRetries();

if (selected == Boolean.TRUE) {
this.baseDatabase = databaseName;
}
return selected;
}

@Override
Expand All @@ -278,13 +284,15 @@ public void close() {
}

// remove session
NettyClientBase client = null;
try {

NettyClientBase client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub();
tajoMaster.removeSession(null, sessionId);

} catch (Throwable e) {
// ignore
} finally {
connPool.releaseConnection(client);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.Pair;
import org.apache.tajo.worker.event.TaskRunnerStartEvent;

import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoopGroup;
Expand Down Expand Up @@ -139,7 +137,17 @@ public void init() throws Throwable {
try{
this.resource.initialize(queryContext, plan);
} catch (Throwable e) {
getQueryMasterStub().killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
try {
NettyClientBase client = getQueryMasterConnection();
try {
QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
stub.killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
} finally {
connPool.releaseConnection(client);
}
} catch (Throwable t) {
//ignore
}
throw e;
}
}
Expand All @@ -148,15 +156,13 @@ public ExecutionBlockSharedResource getSharedResource() {
return resource;
}

public QueryMasterProtocol.QueryMasterProtocolService.Interface getQueryMasterStub()
public NettyClientBase getQueryMasterConnection()
throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException {
NettyClientBase clientBase = null;
try {
clientBase = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
return clientBase.getStub();
} finally {
connPool.releaseConnection(clientBase);
}
return connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
}

public void releaseConnection(NettyClientBase connection) {
connPool.releaseConnection(connection);
}

public void stop(){
Expand Down Expand Up @@ -267,7 +273,13 @@ public TajoWorker.WorkerContext getWorkerContext(){
}

private void sendExecutionBlockReport(ExecutionBlockReport reporter) throws Exception {
getQueryMasterStub().doneExecutionBlock(null, reporter, NullCallback.get());
NettyClientBase client = getQueryMasterConnection();
try {
QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
stub.doneExecutionBlock(null, reporter, NullCallback.get());
} finally {
connPool.releaseConnection(client);
}
}

protected void reportExecutionBlock(ExecutionBlockId ebId) {
Expand Down Expand Up @@ -361,12 +373,14 @@ Runnable createReporterThread() {

return new Runnable() {
int remainingRetries = MAX_RETRIES;
QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub;
@Override
public void run() {
while (!reporterStop.get() && !Thread.interrupted()) {

NettyClientBase client = null;
try {
masterStub = getQueryMasterStub();
client = getQueryMasterConnection();
QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub = client.getStub();

if(tasks.size() == 0){
masterStub.ping(null, getExecutionBlockId().getProto(), NullCallback.get());
Expand All @@ -391,6 +405,7 @@ public void run() {
throw new RuntimeException(t);
}
} finally {
releaseConnection(client);
if (remainingRetries > 0 && !reporterStop.get()) {
synchronized (reporterThread) {
try {
Expand Down
75 changes: 41 additions & 34 deletions tajo-core/src/main/java/org/apache/tajo/worker/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
Expand Down Expand Up @@ -424,46 +425,52 @@ public void run() throws Exception {

executionBlockContext.completedTasksNum.incrementAndGet();
context.getHashShuffleAppenderManager().finalizeTask(taskId);
QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getQueryMasterStub();
if (context.isStopped()) {
context.setExecutorProgress(0.0f);

if(context.getState() == TaskAttemptState.TA_KILLED) {
queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
executionBlockContext.killedTasksNum.incrementAndGet();
} else {
context.setState(TaskAttemptState.TA_FAILED);
TaskFatalErrorReport.Builder errorBuilder =
TaskFatalErrorReport.newBuilder()
.setId(getId().getProto());
if (error != null) {
if (error.getMessage() == null) {
errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
} else {
errorBuilder.setErrorMessage(error.getMessage());
NettyClientBase client = executionBlockContext.getQueryMasterConnection();
try {
QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = client.getStub();
if (context.isStopped()) {
context.setExecutorProgress(0.0f);

if (context.getState() == TaskAttemptState.TA_KILLED) {
queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
executionBlockContext.killedTasksNum.incrementAndGet();
} else {
context.setState(TaskAttemptState.TA_FAILED);
TaskFatalErrorReport.Builder errorBuilder =
TaskFatalErrorReport.newBuilder()
.setId(getId().getProto());
if (error != null) {
if (error.getMessage() == null) {
errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
} else {
errorBuilder.setErrorMessage(error.getMessage());
}
errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
}
errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));

queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
executionBlockContext.failedTasksNum.incrementAndGet();
}
} else {
// if successful
context.setProgress(1.0f);
context.setState(TaskAttemptState.TA_SUCCEEDED);
executionBlockContext.succeededTasksNum.incrementAndGet();

queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
executionBlockContext.failedTasksNum.incrementAndGet();
TaskCompletionReport report = getTaskCompletionReport();
queryMasterStub.done(null, report, NullCallback.get());
}
} else {
// if successful
context.setProgress(1.0f);
context.setState(TaskAttemptState.TA_SUCCEEDED);
executionBlockContext.succeededTasksNum.incrementAndGet();

TaskCompletionReport report = getTaskCompletionReport();
queryMasterStub.done(null, report, NullCallback.get());
finishTime = System.currentTimeMillis();
LOG.info(context.getTaskId() + " completed. " +
"Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
+ ", killed: " + executionBlockContext.killedTasksNum.intValue()
+ ", failed: " + executionBlockContext.failedTasksNum.intValue());
cleanupTask();
} finally {
executionBlockContext.releaseConnection(client);
}
finishTime = System.currentTimeMillis();
LOG.info(context.getTaskId() + " completed. " +
"Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
+ ", killed: " + executionBlockContext.killedTasksNum.intValue()
+ ", failed: " + executionBlockContext.failedTasksNum.intValue());
cleanupTask();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.tajo.master.container.TajoContainerIdPBImpl;
import org.apache.tajo.master.container.TajoConverterUtils;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;

import io.netty.channel.ConnectTimeoutException;
Expand Down Expand Up @@ -196,9 +197,9 @@ public void run() {
TaskRequestProto taskRequest = null;

while(!stopped) {
QueryMasterProtocolService.Interface qmClientService;
NettyClientBase client;
try {
qmClientService = getContext().getQueryMasterStub();
client = executionBlockContext.getQueryMasterConnection();
} catch (ConnectTimeoutException ce) {
// NettyClientBase throws ConnectTimeoutException if connection was failed
stop();
Expand All @@ -212,6 +213,8 @@ public void run() {
break;
}

QueryMasterProtocolService.Interface qmClientService = client.getStub();

try {
if (callFuture == null) {
callFuture = new CallFuture<TaskRequestProto>();
Expand Down Expand Up @@ -296,6 +299,8 @@ public void run() {
}
} catch (Throwable t) {
LOG.fatal(t.getMessage(), t);
} finally {
executionBlockContext.releaseConnection(client);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class AsyncRpcClient extends NettyClientBase {
private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);

private final Map<Integer, ResponseCallback> requests =
private final ConcurrentMap<Integer, ResponseCallback> requests =
new ConcurrentHashMap<Integer, ResponseCallback>();

private final Method stubMethod;
Expand Down Expand Up @@ -178,14 +179,12 @@ private String getErrorMessage(String message) {
@ChannelHandler.Sharable
private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter {

synchronized void registerCallback(int seqId, ResponseCallback callback) {
void registerCallback(int seqId, ResponseCallback callback) {

if (requests.containsKey(seqId)) {
if (requests.putIfAbsent(seqId, callback) != null) {
throw new RemoteException(
getErrorMessage("Duplicate Sequence Id "+ seqId));
}

requests.put(seqId, callback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ private boolean handleConnectionInternally(final InetSocketAddress addr, long ti
final CountDownLatch ticket = new CountDownLatch(1);
final CountDownLatch granted = connect.check(ticket);

// basically, it's double checked lock
if (ticket == granted && isConnected()) {
granted.countDown();
return true;
}

if (ticket == granted) {
connectUsingNetty(addr, new RetryConnectionListener(addr, granted));
}
Expand Down