From 689aa425f0361813925e70e59406e3c8558ec062 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Thu, 23 Jul 2015 15:57:45 +0900 Subject: [PATCH 1/2] TAJO-1700: Add better exception handling in TajoMasterClientService. --- .../main/proto/TajoMasterClientProtocol.proto | 2 +- .../tajo/master/TajoMasterClientService.java | 105 ++++++++++++++---- 2 files changed, 83 insertions(+), 24 deletions(-) diff --git a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto index 1dcf1ac08d..57bb2db40f 100644 --- a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto +++ b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto @@ -37,7 +37,7 @@ service TajoMasterClientProtocolService { rpc removeSession(SessionIdProto) returns (ReturnState); rpc updateSessionVariables(UpdateSessionVariableRequest) returns (SessionUpdateResponse); rpc existSessionVariable(SessionedStringProto) returns (ReturnState); - rpc getSessionVariable(SessionedStringProto) returns (StringProto); + rpc getSessionVariable(SessionedStringProto) returns (StringResponse); rpc getAllSessionVariables(SessionIdProto) returns (KeyValueSetResponse); // Query Submission and Result APIs diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 6fbe9682c1..baf1320cdd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -41,6 +41,9 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.ErrorUtil; +import org.apache.tajo.exception.ExceptionUtil; +import org.apache.tajo.exception.ReturnStateUtil; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.*; import org.apache.tajo.ipc.TajoMasterClientProtocol; @@ -65,8 +68,12 @@ import java.util.*; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.exception.ExceptionUtil.printStackTraceIfError; import static org.apache.tajo.exception.ReturnStateUtil.*; +/** + * It provides Client Remote API service for TajoMaster. + */ public class TajoMasterClientService extends AbstractService { private final static Log LOG = LogFactory.getLog(TajoMasterClientService.class); private final MasterContext context; @@ -76,8 +83,6 @@ public class TajoMasterClientService extends AbstractService { private BlockingRpcServer server; private InetSocketAddress bindAddress; - private final BoolProto BOOL_TRUE = - BoolProto.newBuilder().setValue(true).build(); public TajoMasterClientService(MasterContext context) { super(TajoMasterClientService.class.getName()); @@ -140,6 +145,9 @@ public CreateSessionResponse createSession(RpcController controller, CreateSessi return builder.build(); } catch (Throwable t) { + + printStackTraceIfError(LOG, t); + CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder(); builder.setState(returnError(t)); return builder.build(); @@ -150,8 +158,13 @@ public CreateSessionResponse createSession(RpcController controller, CreateSessi public ReturnState removeSession(RpcController controller, TajoIdProtos.SessionIdProto request) throws ServiceException { - if (request != null) { - context.getSessionManager().removeSession(request.getId()); + try { + if (request != null) { + context.getSessionManager().removeSession(request.getId()); + } + } catch (Throwable t) { + printStackTraceIfError(LOG, t); + return ReturnStateUtil.returnError(t); } return OK; @@ -179,20 +192,29 @@ public SessionUpdateResponse updateSessionVariables(RpcController controller, return builder.build(); } catch (Throwable t) { + printStackTraceIfError(LOG, t); builder.setState(returnError(t)); return builder.build(); } } @Override - public StringProto getSessionVariable(RpcController controller, SessionedStringProto request) + public StringResponse getSessionVariable(RpcController controller, SessionedStringProto request) throws ServiceException { try { - return ProtoUtil.convertString( - context.getSessionManager().getVariable(request.getSessionId().getId(), request.getValue())); + String value = context.getSessionManager().getVariable(request.getSessionId().getId(), request.getValue()); + + return StringResponse.newBuilder() + .setState(OK) + .setValue(value) + .build(); + } catch (Throwable t) { - throw new ServiceException(t); + printStackTraceIfError(LOG, t); + return StringResponse.newBuilder() + .setState(returnError(t)) + .build(); } } @@ -207,6 +229,7 @@ public ReturnState existSessionVariable(RpcController controller, SessionedStrin return errNoSessionVar(request.getValue()); } } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -227,6 +250,7 @@ public KeyValueSetResponse getAllSessionVariables(RpcController controller, .build(); } catch (Throwable t) { + printStackTraceIfError(LOG, t); return KeyValueSetResponse.newBuilder() .setState(returnError(t)) .build(); @@ -243,6 +267,7 @@ public StringResponse getCurrentDatabase(RpcController controller, TajoIdProtos. .build(); } catch (Throwable t) { + printStackTraceIfError(LOG, t); return StringResponse.newBuilder() .setState(returnError(t)) .build(); @@ -262,6 +287,7 @@ public ReturnState selectDatabase(RpcController controller, SessionedStringProto return errUndefinedDatabase(databaseName); } } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -277,10 +303,11 @@ public SubmitQueryResponse submitQuery(RpcController controller, QueryRequest re return context.getGlobalEngine().executeQuery(session, request.getQuery(), request.getIsJson()); - } catch (Exception e) { + } catch (Throwable t) { + printStackTraceIfError(LOG, t); return ClientProtos.SubmitQueryResponse.newBuilder() - .setState(returnError(e)) + .setState(returnError(t)) .setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()) .setIsForwarded(true) .setUserName(context.getConf().getVar(ConfVars.USERNAME)) @@ -300,6 +327,7 @@ public UpdateQueryResponse updateQuery(RpcController controller, QueryRequest re builder.setState(OK); } catch (Throwable t) { + printStackTraceIfError(LOG, t); builder.setState(returnError(t)); } return builder.build(); @@ -351,20 +379,22 @@ public GetQueryResultResponse getQueryResult(RpcController controller, builder.setState(errIncompleteQuery(queryId)); } - return builder.build(); } catch (Throwable t) { - throw new ServiceException(t); + printStackTraceIfError(LOG, t); + builder.setState(returnError(t)); } + + return builder.build(); } @Override public GetQueryListResponse getRunningQueryList(RpcController controller, TajoIdProtos.SessionIdProto request) - throws ServiceException { + GetQueryListResponse.Builder builder = GetQueryListResponse.newBuilder(); + try { context.getSessionManager().touch(request.getId()); - GetQueryListResponse.Builder builder= GetQueryListResponse.newBuilder(); Collection queries = new ArrayList(context.getQueryJobManager().getSubmittedQueries()); queries.addAll(context.getQueryJobManager().getRunningQueries()); @@ -387,11 +417,14 @@ public GetQueryListResponse getRunningQueryList(RpcController controller, TajoId builder.addQueryList(infoBuilder.build()); } - GetQueryListResponse result = builder.build(); - return result; + builder.setState(OK); + } catch (Throwable t) { - throw new ServiceException(t); + printStackTraceIfError(LOG, t); + builder.setState(returnError(t)); } + + return builder.build(); } @Override @@ -424,7 +457,9 @@ public GetQueryListResponse getFinishedQueryList(RpcController controller, TajoI } builder.setState(OK); + } catch (Throwable t) { + printStackTraceIfError(LOG, t); builder.setState(returnError(t)); } @@ -435,10 +470,11 @@ public GetQueryListResponse getFinishedQueryList(RpcController controller, TajoI public GetQueryStatusResponse getQueryStatus(RpcController controller, GetQueryStatusRequest request) throws ServiceException { + GetQueryStatusResponse.Builder builder = GetQueryStatusResponse.newBuilder(); + try { context.getSessionManager().touch(request.getSessionId().getId()); - GetQueryStatusResponse.Builder builder = GetQueryStatusResponse.newBuilder(); QueryId queryId = new QueryId(request.getQueryId()); builder.setQueryId(request.getQueryId()); @@ -485,11 +521,13 @@ public GetQueryStatusResponse getQueryStatus(RpcController controller, GetQueryS } } } - return builder.build(); } catch (Throwable t) { - throw new ServiceException(t); + printStackTraceIfError(LOG, t); + builder.setState(returnError(t)); } + + return builder.build(); } @Override @@ -539,8 +577,10 @@ public GetQueryResultDataResponse getQueryResultData(RpcController controller, G request.getSessionId().getId() + "," + queryId + ", " + rows.size() + " rows"); } catch (Throwable t) { - builder.setResultSet(resultSetBuilder.build()); // required field + printStackTraceIfError(LOG, t); + builder.setState(returnError(t)); + builder.setResultSet(resultSetBuilder.build()); // required field } return builder.build(); @@ -559,6 +599,7 @@ public ReturnState closeNonForwardQuery(RpcController controller, QueryIdRequest return OK; } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -587,6 +628,7 @@ public GetQueryInfoResponse getQueryInfo(RpcController controller, QueryIdReques builder.setState(OK); } catch (Throwable t) { + printStackTraceIfError(LOG, t); builder.setState(returnError(t)); } @@ -611,6 +653,7 @@ public ReturnState killQuery(RpcController controller, QueryIdRequest request) t return OK; } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -646,6 +689,7 @@ public GetClusterInfoResponse getClusterInfo(RpcController controller, builder.setState(OK); } catch (Throwable t) { + printStackTraceIfError(LOG, t); builder.setState(returnError(t)); } @@ -665,6 +709,7 @@ public ReturnState createDatabase(RpcController controller, SessionedStringProto } } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -680,6 +725,7 @@ public ReturnState existDatabase(RpcController controller, SessionedStringProto } } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -697,6 +743,7 @@ public ReturnState dropDatabase(RpcController controller, SessionedStringProto r } } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -714,6 +761,7 @@ public StringListResponse getAllDatabases(RpcController controller, TajoIdProtos .build(); } catch (Throwable t) { + printStackTraceIfError(LOG, t); return StringListResponse.newBuilder() .setState(returnError(t)) .build(); @@ -744,6 +792,7 @@ public ReturnState existTable(RpcController controller, SessionedStringProto req } } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -767,6 +816,7 @@ public StringListResponse getTableList(RpcController controller, .build(); } catch (Throwable t) { + printStackTraceIfError(LOG, t); return StringListResponse.newBuilder() .setState(returnError(t)) .build(); @@ -807,7 +857,10 @@ public TableResponse getTableDesc(RpcController controller, SessionedStringProto .build(); } } catch (Throwable t) { - throw new ServiceException(t); + printStackTraceIfError(LOG, t); + return TableResponse.newBuilder() + .setState(returnError(t)) + .build(); } } @@ -850,6 +903,7 @@ public TableResponse createExternalTable(RpcController controller, CreateTableRe .setTable(desc.getProto()).build(); } catch (Throwable t) { + printStackTraceIfError(LOG, t); return TableResponse.newBuilder() .setState(returnError(t)) .build(); @@ -867,6 +921,7 @@ public ReturnState dropTable(RpcController controller, DropTableRequest dropTabl return OK; } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -898,7 +953,11 @@ public FunctionListResponse getFunctionList(RpcController controller, SessionedS .build(); } catch (Throwable t) { - return FunctionListResponse.newBuilder().setState(returnError(t)).build(); + printStackTraceIfError(LOG, t); + + return FunctionListResponse.newBuilder(). + setState(returnError(t)) + .build(); } } } From 80f6179dff4f08eed7c0dc42fb8714cbdad7b098 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Fri, 24 Jul 2015 01:22:15 +0900 Subject: [PATCH 2/2] Fixed existSessionVariable API. --- .../apache/tajo/client/SessionConnection.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index 788d193ab4..f875335d8f 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -36,7 +36,9 @@ import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.RpcConstants; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetResponse; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringResponse; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.CommonTestingUtil; @@ -54,8 +56,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.tajo.error.Errors.ResultCode.NO_SUCH_SESSION_VARIABLE; import static org.apache.tajo.exception.ReturnStateUtil.isError; import static org.apache.tajo.exception.ReturnStateUtil.isSuccess; +import static org.apache.tajo.exception.ReturnStateUtil.isThisError; import static org.apache.tajo.exception.SQLExceptionUtil.toSQLException; import static org.apache.tajo.exception.SQLExceptionUtil.throwIfError; import static org.apache.tajo.ipc.ClientProtos.CreateSessionRequest; @@ -278,9 +282,18 @@ public String getSessionVariable(final String varname) throws SQLException { public Boolean existSessionVariable(final String varname) throws SQLException { - BlockingInterface stub = getTMStub(); try { - return isSuccess(stub.existSessionVariable(null, getSessionedString(varname))); + final BlockingInterface stub = getTMStub(); + ReturnState state = stub.existSessionVariable(null, getSessionedString(varname)); + + if (isThisError(state, NO_SUCH_SESSION_VARIABLE)) { + return false; + } else if (isError(state)){ + throw SQLExceptionUtil.toSQLException(state); + } + + return isSuccess(state); + } catch (ServiceException e) { throw new RuntimeException(e); }