Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

TAJO-1633: Cleanup TajoMasterClientService. #594

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -27,8 +27,10 @@
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.SessionedStringProto;
import org.apache.tajo.jdbc.SQLStates;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;

import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -145,13 +147,13 @@ public List<String> getTableList(@Nullable final String databaseName) throws Ser
connection.checkSessionAndGet(client);
BlockingInterface tajoMasterService = client.getStub();

ClientProtos.GetTableListRequest.Builder builder = ClientProtos.GetTableListRequest.newBuilder();
SessionedStringProto.Builder builder = SessionedStringProto.newBuilder();
builder.setSessionId(connection.sessionId);
if (databaseName != null) {
builder.setDatabaseName(databaseName);
builder.setValue(databaseName);
}
ClientProtos.GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
return res.getTablesList();
PrimitiveProtos.StringListProto res = tajoMasterService.getTableList(null, builder.build());
return res.getValuesList();
}

@Override
Expand All @@ -161,9 +163,9 @@ public TableDesc getTableDesc(final String tableName) throws ServiceException {
connection.checkSessionAndGet(client);
BlockingInterface tajoMasterService = client.getStub();

ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
SessionedStringProto.Builder builder = SessionedStringProto.newBuilder();
builder.setSessionId(connection.sessionId);
builder.setTableName(tableName);
builder.setValue(tableName);
ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
if (res.getResultCode() == ClientProtos.ResultCode.OK) {
return CatalogUtil.newTableDesc(res.getTableDesc());
Expand Down
Expand Up @@ -437,8 +437,8 @@ public List<ClientProtos.BriefQueryInfo> getRunningQueryList() throws ServiceExc
connection.checkSessionAndGet(client);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();

ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
builder.setSessionId(connection.sessionId);
TajoIdProtos.SessionIdProto.Builder builder = TajoIdProtos.SessionIdProto.newBuilder();
builder.setId(connection.sessionId.getId());
ClientProtos.GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build());
return res.getQueryListList();
}
Expand All @@ -450,8 +450,8 @@ public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() throws ServiceEx
connection.checkSessionAndGet(client);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();

ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
builder.setSessionId(connection.sessionId);
TajoIdProtos.SessionIdProto.Builder builder = TajoIdProtos.SessionIdProto.newBuilder();
builder.setId(connection.sessionId.getId());
ClientProtos.GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build());
return res.getQueryListList();
}
Expand Down
16 changes: 1 addition & 15 deletions tajo-client/src/main/proto/ClientProtos.proto
Expand Up @@ -57,7 +57,7 @@ message SessionUpdateResponse {

message SessionedStringProto {
optional SessionIdProto sessionId = 1;
required string value = 2;
optional string value = 2;
}

message ExplainQueryResponse {
Expand Down Expand Up @@ -196,20 +196,6 @@ message GetClusterInfoResponse {
repeated WorkerResourceInfo workerList = 1;
}

message GetTableListRequest {
optional SessionIdProto sessionId = 1;
optional string databaseName = 2;
}

message GetTableListResponse {
repeated string tables = 1;
}

message GetTableDescRequest {
optional SessionIdProto sessionId = 1;
required string tableName = 2;
}

message CreateTableRequest {
optional SessionIdProto sessionId = 1;
required string name = 2;
Expand Down
8 changes: 4 additions & 4 deletions tajo-client/src/main/proto/TajoMasterClientProtocol.proto
Expand Up @@ -46,8 +46,8 @@ service TajoMasterClientProtocolService {

// Query And Resource Management APIs
rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse);
rpc getRunningQueryList(GetQueryListRequest) returns (GetQueryListResponse);
rpc getFinishedQueryList(GetQueryListRequest) returns (GetQueryListResponse);
rpc getRunningQueryList(SessionIdProto) returns (GetQueryListResponse);
rpc getFinishedQueryList(SessionIdProto) returns (GetQueryListResponse);
rpc killQuery(QueryIdRequest) returns (BoolProto);
rpc getClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
rpc closeNonForwardQuery(QueryIdRequest) returns (BoolProto);
Expand All @@ -65,7 +65,7 @@ service TajoMasterClientProtocolService {
rpc createExternalTable(CreateTableRequest) returns (TableResponse);
rpc existTable(SessionedStringProto) returns (BoolProto);
rpc dropTable(DropTableRequest) returns (BoolProto);
rpc getTableList(GetTableListRequest) returns (GetTableListResponse);
rpc getTableDesc(GetTableDescRequest) returns (TableResponse);
rpc getTableList(SessionedStringProto) returns (StringListProto);
rpc getTableDesc(SessionedStringProto) returns (TableResponse);
rpc getFunctionList(SessionedStringProto) returns (FunctionResponse);
}
Expand Up @@ -47,19 +47,20 @@
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.exec.NonForwardQueryResultFileScanner;
import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
import org.apache.tajo.master.rm.Worker;
import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.logical.PartitionedTableScanNode;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.querymaster.QueryJobEvent;
import org.apache.tajo.master.rm.Worker;
import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.session.InvalidSessionException;
import org.apache.tajo.session.NoSuchSessionVariableException;
import org.apache.tajo.session.Session;
import org.apache.tajo.rpc.BlockingRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListProto;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
import org.apache.tajo.session.InvalidSessionException;
import org.apache.tajo.session.NoSuchSessionVariableException;
import org.apache.tajo.session.Session;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.ProtoUtil;
Expand Down Expand Up @@ -307,11 +308,6 @@ public UpdateQueryResponse updateQuery(RpcController controller, QueryRequest re
try {
Session session = context.getSessionManager().getSession(request.getSessionId().getId());
QueryContext queryContext = new QueryContext(conf, session);
if (queryContext.getCurrentDatabase() == null) {
for (Map.Entry<String,String> e : queryContext.getAllKeyValus().entrySet()) {
System.out.println(e.getKey() + "=" + e.getValue());
}
}

UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder();
try {
Expand Down Expand Up @@ -379,12 +375,12 @@ public GetQueryResultResponse getQueryResult(RpcController controller,
}

@Override
public GetQueryListResponse getRunningQueryList(RpcController controller, GetQueryListRequest request)
public GetQueryListResponse getRunningQueryList(RpcController controller, TajoIdProtos.SessionIdProto request)

throws ServiceException {

try {
context.getSessionManager().touch(request.getSessionId().getId());
context.getSessionManager().touch(request.getId());
GetQueryListResponse.Builder builder= GetQueryListResponse.newBuilder();

Collection<QueryInProgress> queries = new ArrayList<QueryInProgress>(context.getQueryJobManager().getSubmittedQueries());
Expand Down Expand Up @@ -416,11 +412,11 @@ public GetQueryListResponse getRunningQueryList(RpcController controller, GetQue
}

@Override
public GetQueryListResponse getFinishedQueryList(RpcController controller, GetQueryListRequest request)
public GetQueryListResponse getFinishedQueryList(RpcController controller, TajoIdProtos.SessionIdProto request)
throws ServiceException {

try {
context.getSessionManager().touch(request.getSessionId().getId());
context.getSessionManager().touch(request.getId());
GetQueryListResponse.Builder builder = GetQueryListResponse.newBuilder();

Collection<QueryInfo> queries
Expand Down Expand Up @@ -723,7 +719,7 @@ public BoolProto dropDatabase(RpcController controller, SessionedStringProto req
}

@Override
public PrimitiveProtos.StringListProto getAllDatabases(RpcController controller, TajoIdProtos.SessionIdProto
public StringListProto getAllDatabases(RpcController controller, TajoIdProtos.SessionIdProto
request) throws ServiceException {
try {
context.getSessionManager().touch(request.getId());
Expand All @@ -749,10 +745,6 @@ public BoolProto existTable(RpcController controller, SessionedStringProto reque
tableName = request.getValue();
}

if (databaseName == null) {
System.out.println("A");
}

if (catalog.existsTable(databaseName, tableName)) {
return BOOL_TRUE;
} else {
Expand All @@ -764,39 +756,47 @@ public BoolProto existTable(RpcController controller, SessionedStringProto reque
}

@Override
public GetTableListResponse getTableList(RpcController controller,
GetTableListRequest request) throws ServiceException {
public StringListProto getTableList(RpcController controller,
SessionedStringProto request) throws ServiceException {
try {
Session session = context.getSessionManager().getSession(request.getSessionId().getId());
String databaseName;
if (request.hasDatabaseName()) {
databaseName = request.getDatabaseName();
if (request.hasValue()) {
databaseName = request.getValue();
} else {
databaseName = session.getCurrentDatabase();
}
Collection<String> tableNames = catalog.getAllTableNames(databaseName);
GetTableListResponse.Builder builder = GetTableListResponse.newBuilder();
builder.addAllTables(tableNames);
StringListProto.Builder builder = StringListProto.newBuilder();
builder.addAllValues(tableNames);
return builder.build();
} catch (Throwable t) {
throw new ServiceException(t);
}
}

@Override
public TableResponse getTableDesc(RpcController controller, GetTableDescRequest request) throws ServiceException {
public TableResponse getTableDesc(RpcController controller, SessionedStringProto request) throws ServiceException {
try {

if (!request.hasValue()) {
return TableResponse.newBuilder()
.setResultCode(ResultCode.ERROR)
.setErrorMessage("table name is required.")
.build();
}

Session session = context.getSessionManager().getSession(request.getSessionId().getId());

String databaseName;
String tableName;
if (CatalogUtil.isFQTableName(request.getTableName())) {
String [] splitted = CatalogUtil.splitFQTableName(request.getTableName());
if (CatalogUtil.isFQTableName(request.getValue())) {
String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
databaseName = splitted[0];
tableName = splitted[1];
} else {
databaseName = session.getCurrentDatabase();
tableName = request.getTableName();
tableName = request.getValue();
}

if (catalog.existsTable(databaseName, tableName)) {
Expand All @@ -807,7 +807,7 @@ public TableResponse getTableDesc(RpcController controller, GetTableDescRequest
} else {
return TableResponse.newBuilder()
.setResultCode(ResultCode.ERROR)
.setErrorMessage("ERROR: no such a table: " + request.getTableName())
.setErrorMessage("ERROR: no such a table: " + request.getValue())
.build();
}
} catch (Throwable t) {
Expand Down