From 7e9054c6f682fd9c716de36bc543cb9eb99abcc4 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Thu, 12 Mar 2015 10:57:53 +0900 Subject: [PATCH] TAJO-1394 Support reconnect on tsql --- .../org/apache/tajo/cli/tsql/TajoCli.java | 61 +++++++++++-------- .../apache/tajo/client/SessionConnection.java | 52 ++++++++++++++++ 2 files changed, 86 insertions(+), 27 deletions(-) diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java index 7d7d0bdc5c..354f60dde3 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java @@ -41,10 +41,7 @@ import java.lang.reflect.Constructor; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; +import java.util.*; public class TajoCli { public static final String ERROR_PREFIX = "ERROR: "; @@ -60,6 +57,8 @@ public class TajoCli { private final PrintWriter sout; private TajoFileHistory history; + private final boolean reconnect; // reconnect on invalid session + // Current States private String currentDatabase; @@ -99,6 +98,7 @@ public class TajoCli { options.addOption("B", "background", false, "execute as background process"); options.addOption("conf", "conf", true, "configuration value"); options.addOption("param", "param", true, "parameter value in SQL file"); + options.addOption("reconnect", "reconnect", false, "reconnect on invalid session"); options.addOption("help", "help", false, "help"); } @@ -208,6 +208,8 @@ public TajoCli(TajoConf c, String [] args, InputStream in, OutputStream out) thr processConfVarCommand(cmd.getOptionValues("conf")); } + this.reconnect = cmd.hasOption("reconnect"); + // if there is no "-h" option, if(hostName == null) { if (conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) { @@ -467,13 +469,8 @@ public int executeMetaCommand(String line) throws Exception { try { invoked.invoke(arguments); - } catch (IllegalArgumentException ige) { - displayFormatter.printErrorMessage(sout, ige); - wasError = true; - return -1; } catch (Exception e) { - displayFormatter.printErrorMessage(sout, e); - wasError = true; + onError(null, e); return -1; } finally { context.getOutput().flush(); @@ -492,8 +489,7 @@ private void executeJsonQuery(String json) throws ServiceException, IOException long startTime = System.currentTimeMillis(); ClientProtos.SubmitQueryResponse response = client.executeQueryWithJson(json); if (response == null) { - displayFormatter.printErrorMessage(sout, "response is null"); - wasError = true; + onError("response is null", null); } else if (response.getResultCode() == ClientProtos.ResultCode.OK) { if (response.getIsForwarded()) { QueryId queryId = new QueryId(response.getQueryId()); @@ -508,8 +504,7 @@ private void executeJsonQuery(String json) throws ServiceException, IOException } } else { if (response.hasErrorMessage()) { - displayFormatter.printErrorMessage(sout, response.getErrorMessage()); - wasError = true; + onError(response.getErrorMessage(), null); } } } @@ -520,17 +515,12 @@ private int executeQuery(String statement) throws ServiceException, IOException ClientProtos.SubmitQueryResponse response = null; try{ response = client.executeQuery(statement); - } catch (ServiceException e){ - displayFormatter.printErrorMessage(sout, e.getMessage()); - wasError = true; } catch(Throwable te){ - displayFormatter.printErrorMessage(sout, te); - wasError = true; + onError(null, te); } if (response == null) { - displayFormatter.printErrorMessage(sout, "response is null"); - wasError = true; + onError("response is null", null); } else if (response.getResultCode() == ClientProtos.ResultCode.OK) { if (response.getIsForwarded()) { QueryId queryId = new QueryId(response.getQueryId()); @@ -544,8 +534,7 @@ private int executeQuery(String statement) throws ServiceException, IOException } } else { if (response.hasErrorMessage()) { - displayFormatter.printErrorMessage(sout, response.getErrorMessage()); - wasError = true; + onError(response.getErrorMessage(), null); } } @@ -569,13 +558,13 @@ private void localQueryCompleted(ClientProtos.SubmitQueryResponse response, long displayFormatter.printResult(sout, sin, desc, responseTime, res); } } catch (Throwable t) { - displayFormatter.printErrorMessage(sout, t); - wasError = true; + onError(null, t); } finally { if (res != null) { try { res.close(); } catch (SQLException e) { + // ignore } } } @@ -637,8 +626,7 @@ private void waitForQueryCompleted(QueryId queryId) { } } } catch (Throwable t) { - displayFormatter.printErrorMessage(sout, t); - wasError = true; + onError(null, t); } finally { if (res != null) { try { @@ -668,6 +656,25 @@ private void printInvalidCommand(String command) { sout.println("Invalid command " + command + ". Try \\? for help."); } + private void onError(String message, Throwable t) { + wasError = true; + if (t == null) { + displayFormatter.printErrorMessage(sout, message); + } else { + displayFormatter.printErrorMessage(sout, t); + } + if (reconnect && (t instanceof InvalidClientSessionException || + (message != null && message.startsWith("org.apache.tajo.session.InvalidSessionException")))) { + if (client instanceof SessionConnection) { + try { + ((SessionConnection)client).reconnect(); + } catch (Exception e) { + // ignore + } + } + } + } + @VisibleForTesting public void close() { //for testcase diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index bcf6d8b7c7..317bf54e25 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -21,6 +21,7 @@ import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.SessionVars; import org.apache.tajo.TajoIdProtos; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.auth.UserRoleInfo; @@ -319,6 +320,57 @@ protected void checkSessionAndGet(NettyClientBase client) throws ServiceExceptio } } + public boolean reconnect() throws Exception { + return new ServerCallable(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + + public Boolean call(NettyClientBase client) throws ServiceException { + CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder(); + builder.setUsername(userInfo.getUserName()).build(); + if (baseDatabase != null) { + builder.setBaseDatabaseName(baseDatabase); + } + + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + CreateSessionResponse response = tajoMasterService.createSession(null, builder.build()); + if (response.getResultCode() != ResultCode.OK) { + return false; + } + sessionId = response.getSessionId(); + Map sessionVars = ProtoUtil.convertToMap(response.getSessionVars()); + synchronized (sessionVarsCache) { + for (SessionVars var : UPDATE_ON_RECONNECT) { + String value = sessionVars.get(var.keyname()); + if (value != null) { + sessionVarsCache.put(var.keyname(), value); + } + } + } + + try { + KeyValueSet keyValueSet = new KeyValueSet(); + keyValueSet.putAll(sessionVarsCache); + ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() + .setSessionId(sessionId) + .setSessionVars(keyValueSet.getProto()).build(); + + if (tajoMasterService.updateSessionVariables(null, request).getResultCode() != ResultCode.OK) { + tajoMasterService.removeSession(null, sessionId); + return false; + } + LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName())); + return true; + } catch (ServiceException e) { + tajoMasterService.removeSession(null, sessionId); + return false; + } + } + }.withRetries(); + } + + private static final SessionVars[] UPDATE_ON_RECONNECT = new SessionVars[] { + SessionVars.SESSION_ID, SessionVars.SESSION_LAST_ACCESS_TIME, SessionVars.CLIENT_HOST + }; + ClientProtos.SessionedStringProto convertSessionedString(String str) { ClientProtos.SessionedStringProto.Builder builder = ClientProtos.SessionedStringProto.newBuilder(); builder.setSessionId(sessionId);