Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ Release 0.11.0 - unreleased

IMPROVEMENT

TAJO-1636: query rest api uri should change
from /databases/{database_name}/queies to /queries.
(Contributed by DaeMyung Kang, Committed by jaehwa)

TAJO-1624: Add managed table or external description in Table management
section. (hyunsik)

Expand Down Expand Up @@ -150,6 +154,9 @@ Release 0.11.0 - unreleased

BUG FIXES

TAJO-1634: REST API: fix error when offset is zero.
(Contributed by DaeMyung Kang, Committed by jaehwa)

TAJO-1630: Test failure after TAJO-1130. (jihoon)

TAJO-1623: INSERT INTO with wrong target columns causes NPE. (hyunsik)
Expand Down Expand Up @@ -333,6 +340,8 @@ Release 0.11.0 - unreleased

SUB TASKS

TAJO-1615: Implement TaskManager. (jinho)

TAJO-1599: Implement NodeResourceManager and Status updater. (jinho)

TAJO-1613: Rename StorageManager to Tablespace. (hyunsik)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.SessionedStringProto;
import org.apache.tajo.jdbc.SQLStates;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;

import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -145,13 +147,13 @@ public List<String> getTableList(@Nullable final String databaseName) throws Ser
connection.checkSessionAndGet(client);
BlockingInterface tajoMasterService = client.getStub();

ClientProtos.GetTableListRequest.Builder builder = ClientProtos.GetTableListRequest.newBuilder();
SessionedStringProto.Builder builder = SessionedStringProto.newBuilder();
builder.setSessionId(connection.sessionId);
if (databaseName != null) {
builder.setDatabaseName(databaseName);
builder.setValue(databaseName);
}
ClientProtos.GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
return res.getTablesList();
PrimitiveProtos.StringListProto res = tajoMasterService.getTableList(null, builder.build());
return res.getValuesList();
}

@Override
Expand All @@ -161,9 +163,9 @@ public TableDesc getTableDesc(final String tableName) throws ServiceException {
connection.checkSessionAndGet(client);
BlockingInterface tajoMasterService = client.getStub();

ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
SessionedStringProto.Builder builder = SessionedStringProto.newBuilder();
builder.setSessionId(connection.sessionId);
builder.setTableName(tableName);
builder.setValue(tableName);
ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
if (res.getResultCode() == ClientProtos.ResultCode.OK) {
return CatalogUtil.newTableDesc(res.getTableDesc());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ public List<ClientProtos.BriefQueryInfo> getRunningQueryList() throws ServiceExc
connection.checkSessionAndGet(client);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();

ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
builder.setSessionId(connection.sessionId);
TajoIdProtos.SessionIdProto.Builder builder = TajoIdProtos.SessionIdProto.newBuilder();
builder.setId(connection.sessionId.getId());
ClientProtos.GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build());
return res.getQueryListList();
}
Expand All @@ -450,8 +450,8 @@ public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() throws ServiceEx
connection.checkSessionAndGet(client);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();

ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
builder.setSessionId(connection.sessionId);
TajoIdProtos.SessionIdProto.Builder builder = TajoIdProtos.SessionIdProto.newBuilder();
builder.setId(connection.sessionId.getId());
ClientProtos.GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build());
return res.getQueryListList();
}
Expand Down
16 changes: 1 addition & 15 deletions tajo-client/src/main/proto/ClientProtos.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ message SessionUpdateResponse {

message SessionedStringProto {
optional SessionIdProto sessionId = 1;
required string value = 2;
optional string value = 2;
}

message ExplainQueryResponse {
Expand Down Expand Up @@ -196,20 +196,6 @@ message GetClusterInfoResponse {
repeated WorkerResourceInfo workerList = 1;
}

message GetTableListRequest {
optional SessionIdProto sessionId = 1;
optional string databaseName = 2;
}

message GetTableListResponse {
repeated string tables = 1;
}

message GetTableDescRequest {
optional SessionIdProto sessionId = 1;
required string tableName = 2;
}

message CreateTableRequest {
optional SessionIdProto sessionId = 1;
required string name = 2;
Expand Down
8 changes: 4 additions & 4 deletions tajo-client/src/main/proto/TajoMasterClientProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ service TajoMasterClientProtocolService {

// Query And Resource Management APIs
rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse);
rpc getRunningQueryList(GetQueryListRequest) returns (GetQueryListResponse);
rpc getFinishedQueryList(GetQueryListRequest) returns (GetQueryListResponse);
rpc getRunningQueryList(SessionIdProto) returns (GetQueryListResponse);
rpc getFinishedQueryList(SessionIdProto) returns (GetQueryListResponse);
rpc killQuery(QueryIdRequest) returns (BoolProto);
rpc getClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
rpc closeNonForwardQuery(QueryIdRequest) returns (BoolProto);
Expand All @@ -65,7 +65,7 @@ service TajoMasterClientProtocolService {
rpc createExternalTable(CreateTableRequest) returns (TableResponse);
rpc existTable(SessionedStringProto) returns (BoolProto);
rpc dropTable(DropTableRequest) returns (BoolProto);
rpc getTableList(GetTableListRequest) returns (GetTableListResponse);
rpc getTableDesc(GetTableDescRequest) returns (TableResponse);
rpc getTableList(SessionedStringProto) returns (StringListProto);
rpc getTableDesc(SessionedStringProto) returns (TableResponse);
rpc getFunctionList(SessionedStringProto) returns (FunctionResponse);
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private HashShuffleAppender getAppender(int partId) throws IOException {
HashShuffleAppender appender = appenderMap.get(partId);
if (appender == null) {
appender = hashShuffleAppenderManager.getAppender(context.getConf(),
context.getQueryId().getTaskId().getExecutionBlockId(), partId, meta, outSchema);
context.getTaskId().getTaskId().getExecutionBlockId(), partId, meta, outSchema);
appenderMap.put(partId, appender);
}
return appender;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected void fatal(Log log, String message) {
}

protected Path getExecutorTmpDir() {
return new Path(context.getQueryId().getTaskId().getExecutionBlockId().getQueryId().toString(),
return new Path(context.getTaskId().getTaskId().getExecutionBlockId().getQueryId().toString(),
UUID.randomUUID().toString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,20 @@
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.exec.NonForwardQueryResultFileScanner;
import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
import org.apache.tajo.master.rm.Worker;
import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.logical.PartitionedTableScanNode;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.querymaster.QueryJobEvent;
import org.apache.tajo.master.rm.Worker;
import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.session.InvalidSessionException;
import org.apache.tajo.session.NoSuchSessionVariableException;
import org.apache.tajo.session.Session;
import org.apache.tajo.rpc.BlockingRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListProto;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
import org.apache.tajo.session.InvalidSessionException;
import org.apache.tajo.session.NoSuchSessionVariableException;
import org.apache.tajo.session.Session;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.ProtoUtil;
Expand Down Expand Up @@ -307,11 +308,6 @@ public UpdateQueryResponse updateQuery(RpcController controller, QueryRequest re
try {
Session session = context.getSessionManager().getSession(request.getSessionId().getId());
QueryContext queryContext = new QueryContext(conf, session);
if (queryContext.getCurrentDatabase() == null) {
for (Map.Entry<String,String> e : queryContext.getAllKeyValus().entrySet()) {
System.out.println(e.getKey() + "=" + e.getValue());
}
}

UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder();
try {
Expand Down Expand Up @@ -379,12 +375,12 @@ public GetQueryResultResponse getQueryResult(RpcController controller,
}

@Override
public GetQueryListResponse getRunningQueryList(RpcController controller, GetQueryListRequest request)
public GetQueryListResponse getRunningQueryList(RpcController controller, TajoIdProtos.SessionIdProto request)

throws ServiceException {

try {
context.getSessionManager().touch(request.getSessionId().getId());
context.getSessionManager().touch(request.getId());
GetQueryListResponse.Builder builder= GetQueryListResponse.newBuilder();

Collection<QueryInProgress> queries = new ArrayList<QueryInProgress>(context.getQueryJobManager().getSubmittedQueries());
Expand Down Expand Up @@ -416,11 +412,11 @@ public GetQueryListResponse getRunningQueryList(RpcController controller, GetQue
}

@Override
public GetQueryListResponse getFinishedQueryList(RpcController controller, GetQueryListRequest request)
public GetQueryListResponse getFinishedQueryList(RpcController controller, TajoIdProtos.SessionIdProto request)
throws ServiceException {

try {
context.getSessionManager().touch(request.getSessionId().getId());
context.getSessionManager().touch(request.getId());
GetQueryListResponse.Builder builder = GetQueryListResponse.newBuilder();

Collection<QueryInfo> queries
Expand Down Expand Up @@ -723,7 +719,7 @@ public BoolProto dropDatabase(RpcController controller, SessionedStringProto req
}

@Override
public PrimitiveProtos.StringListProto getAllDatabases(RpcController controller, TajoIdProtos.SessionIdProto
public StringListProto getAllDatabases(RpcController controller, TajoIdProtos.SessionIdProto
request) throws ServiceException {
try {
context.getSessionManager().touch(request.getId());
Expand All @@ -749,10 +745,6 @@ public BoolProto existTable(RpcController controller, SessionedStringProto reque
tableName = request.getValue();
}

if (databaseName == null) {
System.out.println("A");
}

if (catalog.existsTable(databaseName, tableName)) {
return BOOL_TRUE;
} else {
Expand All @@ -764,39 +756,47 @@ public BoolProto existTable(RpcController controller, SessionedStringProto reque
}

@Override
public GetTableListResponse getTableList(RpcController controller,
GetTableListRequest request) throws ServiceException {
public StringListProto getTableList(RpcController controller,
SessionedStringProto request) throws ServiceException {
try {
Session session = context.getSessionManager().getSession(request.getSessionId().getId());
String databaseName;
if (request.hasDatabaseName()) {
databaseName = request.getDatabaseName();
if (request.hasValue()) {
databaseName = request.getValue();
} else {
databaseName = session.getCurrentDatabase();
}
Collection<String> tableNames = catalog.getAllTableNames(databaseName);
GetTableListResponse.Builder builder = GetTableListResponse.newBuilder();
builder.addAllTables(tableNames);
StringListProto.Builder builder = StringListProto.newBuilder();
builder.addAllValues(tableNames);
return builder.build();
} catch (Throwable t) {
throw new ServiceException(t);
}
}

@Override
public TableResponse getTableDesc(RpcController controller, GetTableDescRequest request) throws ServiceException {
public TableResponse getTableDesc(RpcController controller, SessionedStringProto request) throws ServiceException {
try {

if (!request.hasValue()) {
return TableResponse.newBuilder()
.setResultCode(ResultCode.ERROR)
.setErrorMessage("table name is required.")
.build();
}

Session session = context.getSessionManager().getSession(request.getSessionId().getId());

String databaseName;
String tableName;
if (CatalogUtil.isFQTableName(request.getTableName())) {
String [] splitted = CatalogUtil.splitFQTableName(request.getTableName());
if (CatalogUtil.isFQTableName(request.getValue())) {
String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
databaseName = splitted[0];
tableName = splitted[1];
} else {
databaseName = session.getCurrentDatabase();
tableName = request.getTableName();
tableName = request.getValue();
}

if (catalog.existsTable(databaseName, tableName)) {
Expand All @@ -807,7 +807,7 @@ public TableResponse getTableDesc(RpcController controller, GetTableDescRequest
} else {
return TableResponse.newBuilder()
.setResultCode(ResultCode.ERROR)
.setErrorMessage("ERROR: no such a table: " + request.getTableName())
.setErrorMessage("ERROR: no such a table: " + request.getValue())
.build();
}
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ public void heartbeat(
public void nodeHeartbeat(RpcController controller, TajoResourceTrackerProtocol.NodeHeartbeatRequestProto request,
RpcCallback<TajoResourceTrackerProtocol.NodeHeartbeatResponseProto> done) {
//TODO implement with ResourceManager for scheduler
throw new RuntimeException(new ServiceException(new NotImplementedException().getMessage()));
TajoResourceTrackerProtocol.NodeHeartbeatResponseProto.Builder
response = TajoResourceTrackerProtocol.NodeHeartbeatResponseProto.newBuilder();
done.run(response.setCommand(TajoResourceTrackerProtocol.ResponseCommand.NORMAL).build());
}

private Worker createWorkerResource(NodeHeartbeat request) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tajo.util;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ShutdownHookManager;

import java.lang.Thread.UncaughtExceptionHandler;

/**
* This class is intended to be installed by calling
* {@link Thread#setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler)}
* In the main entry point. It is intended to try and cleanly shut down
* programs using the Yarn Event framework.
*
* Note: Right now it only will shut down the program if a Error is caught, but
* not any other exception. Anything else is just logged.
*
* this is an implementation copied from YarnUncaughtExceptionHandler
*/
public class TajoUncaughtExceptionHandler implements UncaughtExceptionHandler {
private static final Log LOG = LogFactory.getLog(TajoUncaughtExceptionHandler.class);

@Override
public void uncaughtException(Thread t, Throwable e) {
if(ShutdownHookManager.get().isShutdownInProgress()) {
LOG.error("Thread " + t + " threw an Throwable, but we are shutting " +
"down, so ignoring this", e);
} else if(e instanceof Error) {
try {
LOG.fatal("Thread " + t + " threw an Error.", e);
} catch (Throwable err) {
//We don't want to not exit because of an issue with logging
}

if(e instanceof OutOfMemoryError) {
//After catching an OOM java says it is undefined behavior, so don't
//even try to clean up or we can get stuck on shutdown.
try {
System.err.println("Halting due to Out Of Memory Error...");
} catch (Throwable err) {
//Again we done want to exit because of logging issues.
}
ExitUtil.halt(-1);
} else {
//ExitUtil.terminate(-1);
}
} else {
LOG.error("Thread " + t + " threw an Exception.", e);
}
}
}
Loading