From 50f1751b97dacec30c98b7c3c3cd74f9f3151a09 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 18 Mar 2015 15:41:51 +0900 Subject: [PATCH 1/4] TAJO-1403: Improve 'Simple Query' with only partition columns and constant values --- .../NonForwardQueryResultFileScanner.java | 37 ++++++++- .../tajo/master/exec/QueryExecutor.java | 5 ++ .../apache/tajo/master/TestGlobalPlanner.java | 3 +- .../apache/tajo/plan/util/PlannerUtil.java | 82 ++++++++++++++++++- .../tajo/storage/FileStorageManager.java | 29 ++++++- 5 files changed, 149 insertions(+), 7 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index dc0c44a374..ed4d1a8dbc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -24,6 +24,7 @@ import org.apache.tajo.QueryId; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; @@ -31,12 +32,15 @@ import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.engine.planner.physical.SeqScanExec; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.FileStorageManager; import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.util.StringUtils; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -74,10 +78,37 @@ public void init() throws IOException { initSeqScanExec(); } + /** + * Set partition path and depth if ScanNode's qualification exists + * + * @param storageManager target storage manager to be set with partition info + */ + private void setPartition(StorageManager storageManager) { + if (tableDesc.isExternal() && tableDesc.hasPartition() && scanNode.getQual() != null && + storageManager instanceof FileStorageManager) { + StringBuffer path = new StringBuffer(); + int depth = 0; + if (tableDesc.hasPartition()) { + for (Column c : tableDesc.getPartitionMethod().getExpressionSchema().getColumns()) { + String partitionValue = PlannerUtil.getPartitionValue(scanNode.getQual(), c.getSimpleName()); + if (partitionValue == null) + break; + path.append(String.format("/%s=%s", c.getSimpleName(), StringUtils.escapePathName(partitionValue))); + depth++; + } + } + ((FileStorageManager)storageManager).setPartitionPath(path.toString()); + ((FileStorageManager)storageManager).setCurrentDepth(depth); + scanNode.setQual(null); + } + } + private void initSeqScanExec() throws IOException { - List fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType()) - .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN); - + StorageManager storageManager = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType()); + List fragments = null; + setPartition(storageManager); + fragments = storageManager.getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN); + if (fragments != null && !fragments.isEmpty()) { FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[] {})); this.taskContext = new TaskAttemptContext( diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index db82fca603..aa8b228024 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -212,6 +212,11 @@ public void execSimpleQuery(QueryContext queryContext, Session session, String q scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN); } TableDesc desc = scanNode.getTableDesc(); + // Keep info for partition-column-only queries + SelectionNode selectionNode = plan.getRootBlock().getNode(NodeType.SELECTION); + if (desc.isExternal() && desc.hasPartition() && selectionNode != null) { + scanNode.setQual(selectionNode.getQual()); + } int maxRow = Integer.MAX_VALUE; if (plan.getRootBlock().hasNode(NodeType.LIMIT)) { LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT); diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java index d0f7cf44a8..45c94a37e6 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java @@ -317,8 +317,9 @@ public void testCheckIfSimpleQuery() throws Exception { plan = buildPlan("select * from customer where c_nationkey = 1"); assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); + // c_nationkey is partition column plan = buildPlan("select * from customer_parts where c_nationkey = 1"); - assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); + assertTrue(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); // same column order plan = buildPlan("select c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment" + diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index 0fbd3593ad..f11ac1b21c 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -100,6 +100,7 @@ public static boolean checkIfSimpleQuery(LogicalPlan plan) { PlannerUtil.getRelationLineage(plan.getRootBlock().getRoot()).length == 1; boolean noComplexComputation = false; + boolean prefixPartitionWhere = false; if (singleRelation) { ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN); if (scanNode == null) { @@ -133,11 +134,90 @@ public static boolean checkIfSimpleQuery(LogicalPlan plan) { } } } + + if (!noWhere && scanNode.getTableDesc().isExternal() && scanNode.getTableDesc().getPartitionMethod() != null) { + EvalNode node = ((SelectionNode) plan.getRootBlock().getNode(NodeType.SELECTION)).getQual(); + Schema partSchema = scanNode.getTableDesc().getPartitionMethod().getExpressionSchema(); + if (checkIfPartitionSelection(node, partSchema)) { + prefixPartitionWhere = true; + boolean isPrefix = true; + for (Column c : partSchema.getColumns()) { + String value = getPartitionValue(node, c.getSimpleName()); + if (isPrefix && value == null) + isPrefix = false; + else if (!isPrefix && value != null) { + prefixPartitionWhere = false; + break; + } + } + } + } } return !checkIfDDLPlan(rootNode) && (simpleOperator && noComplexComputation && isOneQueryBlock && - noOrderBy && noGroupBy && noWhere && noJoin && singleRelation); + noOrderBy && noGroupBy && (noWhere || prefixPartitionWhere) && noJoin && singleRelation); + } + + /** + * Checks whether EvalNode consists of only partition columns and const values. + * The partition based simple query can be defined as 'select * from tb_name where col_name1="X" and col_name2="Y" [LIMIT Z]', + * whose WHERE clause consists of only partition-columns with constant values. + * Partition columns must be able to form a prefix of HDFS path like '/tb_name1/col_name1=X/col_name2=Y'. + * + * @param node The qualification node of a SELECTION node + * @param partSchema Partition expression schema + * @return True if the query is partition-column based simple query. + */ + public static boolean checkIfPartitionSelection(EvalNode node, Schema partSchema) { + if (node != null && node instanceof BinaryEval) { + BinaryEval eval = (BinaryEval)node; + EvalNode left = eval.getLeftExpr(); + EvalNode right = eval.getRightExpr(); + EvalType type = eval.getType(); + + if (type == EvalType.EQUAL) { + if (left instanceof FieldEval && right instanceof ConstEval && partSchema.contains(((FieldEval) left).getColumnName())) { + return true; + } else if (left instanceof ConstEval && right instanceof FieldEval && partSchema.contains(((FieldEval) right).getColumnName())) { + return true; + } + } else if (type == EvalType.AND && left instanceof BinaryEval && right instanceof BinaryEval) { + return checkIfPartitionSelection(left, partSchema) && checkIfPartitionSelection(right, partSchema); + } + } + return false; + } + + /** + * Get partition constant value associated with `columnName`. + * + * @param node EvalNode having query predicates + * @param columnName Column name to be looked up + * @return String The value associated with `columnName` in the predicates + */ + public static String getPartitionValue(EvalNode node, String columnName) { + if (node != null && node instanceof BinaryEval) { + BinaryEval eval = (BinaryEval)node; + EvalNode left = eval.getLeftExpr(); + EvalNode right = eval.getRightExpr(); + EvalType type = eval.getType(); + + if (type == EvalType.EQUAL) { + if (left instanceof FieldEval && right instanceof ConstEval && columnName.equals(((FieldEval) left).getColumnName())) { + return ((ConstEval)right).getValue().toString(); + } else if (left instanceof ConstEval && right instanceof FieldEval && columnName.equals(((FieldEval) right).getColumnName())) { + return ((ConstEval)left).getValue().toString(); + } + } else if (type == EvalType.AND && left instanceof BinaryEval && right instanceof BinaryEval) { + String value = getPartitionValue(left, columnName); + if (value == null) { + value = getPartitionValue(right, columnName); + } + return value; + } + } + return null; } /** diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java index c427940c75..0dbf9b3e01 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java @@ -149,6 +149,21 @@ public Path getTablePath(String tableName) { return new Path(tableBaseDir, tableName); } + private String partitionPath = ""; + private int currentDepth = 0; + + /** + * Set a specific partition path for partition-column only queries + * @param path The partition prefix path + */ + public void setPartitionPath(String path) { partitionPath = path; } + + /** + * Set a depth of partition path for partition-column only queries + * @param depth Depth of partitions + */ + public void setCurrentDepth(int depth) { currentDepth = depth; } + @VisibleForTesting public Appender getAppender(TableMeta meta, Schema schema, Path filePath) throws IOException { @@ -722,8 +737,18 @@ public List getNonForwardSplit(TableDesc tableDesc, int currentPage, i List nonZeroLengthFiles = new ArrayList(); if (fs.exists(tablePath)) { - getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numResultFragments, - new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth); + if (!partitionPath.isEmpty()) + { + Path partPath = new Path(tableDesc.getPath() + partitionPath); + if (fs.exists(partPath)) { + getNonZeroLengthDataFiles(fs, partPath, nonZeroLengthFiles, currentPage, numResultFragments, + new AtomicInteger(0), tableDesc.hasPartition(), this.currentDepth, partitionDepth); + } + } + else { + getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numResultFragments, + new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth); + } } List fragments = new ArrayList(); From a9ae3cab69526294475a771014e9c0e49c80462b Mon Sep 17 00:00:00 2001 From: Jihun Kang Date: Wed, 18 Mar 2015 23:57:06 +0900 Subject: [PATCH 2/4] TAJO-1337: Implements common modules to handle RESTful API Closes #399 --- CHANGES | 2 + tajo-catalog/tajo-catalog-client/pom.xml | 2 +- .../tajo-hcatalog/pom.xml | 2 +- tajo-catalog/tajo-catalog-server/pom.xml | 2 +- tajo-cli/pom.xml | 2 +- tajo-client/pom.xml | 2 +- .../java/org/apache/tajo/conf/TajoConf.java | 2 + tajo-core/pom.xml | 2 +- tajo-dist/pom.xml | 7 +- tajo-project/pom.xml | 31 ++ tajo-pullserver/pom.xml | 2 +- tajo-rpc/pom.xml | 180 ++-------- tajo-rpc/tajo-rpc-common/pom.xml | 216 ++++++++++++ .../org/apache/tajo/rpc/NettyServerBase.java | 38 +++ .../org/apache/tajo/rpc/RemoteException.java | 0 .../tajo/rpc/RetriesExhaustedException.java | 0 .../apache/tajo/rpc/RpcChannelFactory.java | 0 .../org/apache/tajo/rpc/RpcEventListener.java | 62 ++++ .../java/org/apache/tajo/rpc/RpcUtils.java | 0 tajo-rpc/tajo-rpc-protobuf/pom.xml | 274 +++++++++++++++ .../org/apache/tajo/rpc/AsyncRpcClient.java | 0 .../org/apache/tajo/rpc/AsyncRpcServer.java | 0 .../apache/tajo/rpc/BlockingRpcClient.java | 0 .../apache/tajo/rpc/BlockingRpcServer.java | 0 .../java/org/apache/tajo/rpc/CallFuture.java | 0 .../apache/tajo/rpc/DefaultRpcController.java | 0 .../org/apache/tajo/rpc/NettyClientBase.java | 0 .../apache/tajo/rpc/NettyRpcController.java | 0 .../org/apache/tajo/rpc/NullCallback.java | 0 .../tajo/rpc/ProtoChannelInitializer.java | 0 .../apache/tajo/rpc/RemoteCallException.java | 0 .../org/apache/tajo/rpc/RemoteException.java | 37 ++ .../tajo/rpc/RetriesExhaustedException.java | 104 ++++++ .../apache/tajo/rpc/RpcConnectionPool.java | 0 .../org/apache/tajo/rpc/ServerCallable.java | 0 .../apache/tajo/rpc/TajoServiceException.java | 0 .../src/main/proto/DummyProtos.proto | 0 .../src/main/proto/RpcProtos.proto | 0 .../src/main/proto/TestProtocol.proto | 0 .../src/main/proto/TestProtos.proto | 0 .../src/test/java/log4j.properties | 0 .../org/apache/tajo/rpc/TestAsyncRpc.java | 0 .../org/apache/tajo/rpc/TestBlockingRpc.java | 0 .../rpc/test/impl/DummyProtocolAsyncImpl.java | 0 .../test/impl/DummyProtocolBlockingImpl.java | 0 tajo-rpc/tajo-ws-rs/pom.xml | 218 ++++++++++++ .../rs/netty/NettyRestChannelInitializer.java | 50 +++ .../rs/netty/NettyRestHandlerContainer.java | 319 ++++++++++++++++++ .../NettyRestHandlerContainerProvider.java | 42 +++ .../tajo/ws/rs/netty/NettyRestServer.java | 67 ++++ .../ws/rs/netty/NettyRestServerFactory.java | 89 +++++ .../ws/rs/netty/NettyRestServerListener.java | 72 ++++ .../tajo/ws/rs/netty/gson/GsonFeature.java | 34 ++ .../tajo/ws/rs/netty/gson/GsonReader.java | 52 +++ .../tajo/ws/rs/netty/gson/GsonUtil.java | 32 ++ .../tajo/ws/rs/netty/gson/GsonWriter.java | 59 ++++ ...NettyRestHandlerContainerProviderTest.java | 66 ++++ .../tajo/ws/rs/netty/NettyRestServerTest.java | 137 ++++++++ .../rs/netty/testapp1/TestApplication1.java | 38 +++ .../ws/rs/netty/testapp1/TestResource1.java | 36 ++ .../ws/rs/netty/testapp2/DirectoriesDao.java | 39 +++ .../netty/testapp2/DirectoriesResource.java | 85 +++++ .../tajo/ws/rs/netty/testapp2/Directory.java | 52 +++ .../testapp2/FileManagementApplication.java | 35 ++ 64 files changed, 2331 insertions(+), 158 deletions(-) create mode 100644 tajo-rpc/tajo-rpc-common/pom.xml rename tajo-rpc/{ => tajo-rpc-common}/src/main/java/org/apache/tajo/rpc/NettyServerBase.java (86%) rename tajo-rpc/{ => tajo-rpc-common}/src/main/java/org/apache/tajo/rpc/RemoteException.java (100%) rename tajo-rpc/{ => tajo-rpc-common}/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java (100%) rename tajo-rpc/{ => tajo-rpc-common}/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java (100%) create mode 100644 tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcEventListener.java rename tajo-rpc/{ => tajo-rpc-common}/src/main/java/org/apache/tajo/rpc/RpcUtils.java (100%) create mode 100644 tajo-rpc/tajo-rpc-protobuf/pom.xml rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/java/org/apache/tajo/rpc/CallFuture.java (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/java/org/apache/tajo/rpc/NettyClientBase.java (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/java/org/apache/tajo/rpc/NettyRpcController.java (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/java/org/apache/tajo/rpc/NullCallback.java (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/java/org/apache/tajo/rpc/RemoteCallException.java (100%) create mode 100644 tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteException.java create mode 100644 tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/java/org/apache/tajo/rpc/ServerCallable.java (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/java/org/apache/tajo/rpc/TajoServiceException.java (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/proto/DummyProtos.proto (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/proto/RpcProtos.proto (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/proto/TestProtocol.proto (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/main/proto/TestProtos.proto (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/test/java/log4j.properties (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java (100%) rename tajo-rpc/{ => tajo-rpc-protobuf}/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java (100%) create mode 100644 tajo-rpc/tajo-ws-rs/pom.xml create mode 100644 tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestChannelInitializer.java create mode 100644 tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainer.java create mode 100644 tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainerProvider.java create mode 100644 tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServer.java create mode 100644 tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerFactory.java create mode 100644 tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerListener.java create mode 100644 tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonFeature.java create mode 100644 tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonReader.java create mode 100644 tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonUtil.java create mode 100644 tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonWriter.java create mode 100644 tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainerProviderTest.java create mode 100644 tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/NettyRestServerTest.java create mode 100644 tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp1/TestApplication1.java create mode 100644 tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp1/TestResource1.java create mode 100644 tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp2/DirectoriesDao.java create mode 100644 tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp2/DirectoriesResource.java create mode 100644 tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp2/Directory.java create mode 100644 tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp2/FileManagementApplication.java diff --git a/CHANGES b/CHANGES index c3f2691d38..4875cab8e7 100644 --- a/CHANGES +++ b/CHANGES @@ -76,6 +76,8 @@ Release 0.11.0 - unreleased SUB TASKS + TAJO-1337: Implements common modules to handle RESTful API. (jihun) + TAJO-1329: Improve Schema class to support nested struct support. (hyunsik) diff --git a/tajo-catalog/tajo-catalog-client/pom.xml b/tajo-catalog/tajo-catalog-client/pom.xml index 98b85a8320..84e2aa3431 100644 --- a/tajo-catalog/tajo-catalog-client/pom.xml +++ b/tajo-catalog/tajo-catalog-client/pom.xml @@ -135,7 +135,7 @@ org.apache.tajo - tajo-rpc + tajo-rpc-protobuf org.apache.hadoop diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml index fe8f34a436..7c3efdd1f6 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml @@ -109,7 +109,7 @@ org.apache.tajo - tajo-rpc + tajo-rpc-protobuf org.apache.tajo diff --git a/tajo-catalog/tajo-catalog-server/pom.xml b/tajo-catalog/tajo-catalog-server/pom.xml index 501f9af611..8efeecf042 100644 --- a/tajo-catalog/tajo-catalog-server/pom.xml +++ b/tajo-catalog/tajo-catalog-server/pom.xml @@ -141,7 +141,7 @@ org.apache.tajo - tajo-rpc + tajo-rpc-protobuf org.apache.hadoop diff --git a/tajo-cli/pom.xml b/tajo-cli/pom.xml index 684c298fb7..e8360ad160 100644 --- a/tajo-cli/pom.xml +++ b/tajo-cli/pom.xml @@ -140,7 +140,7 @@ org.apache.tajo - tajo-rpc + tajo-rpc-protobuf commons-cli diff --git a/tajo-client/pom.xml b/tajo-client/pom.xml index 692e1b5287..e6be476d4d 100644 --- a/tajo-client/pom.xml +++ b/tajo-client/pom.xml @@ -195,7 +195,7 @@ org.apache.tajo - tajo-rpc + tajo-rpc-protobuf diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 4ed8097b4f..5b569d585b 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -249,6 +249,8 @@ public static enum ConfVars implements ConfigKey { Runtime.getRuntime().availableProcessors() * 1), WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM("tajo.worker.service.rpc.server.worker-thread-num", Runtime.getRuntime().availableProcessors() * 1), + REST_SERVICE_RPC_SERVER_WORKER_THREAD_NUM("tajo.rest.service.rpc.server.worker-thread-num", + Runtime.getRuntime().availableProcessors() * 1), // Task Configuration ----------------------------------------------------- TASK_DEFAULT_MEMORY("tajo.task.memory-slot-mb.default", 512), diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml index 38bddecd17..61a156b668 100644 --- a/tajo-core/pom.xml +++ b/tajo-core/pom.xml @@ -277,7 +277,7 @@ org.apache.tajo - tajo-rpc + tajo-rpc-protobuf org.apache.tajo diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml index aed7b4be43..da5f48f2e9 100644 --- a/tajo-dist/pom.xml +++ b/tajo-dist/pom.xml @@ -60,7 +60,12 @@ org.apache.tajo - tajo-rpc + tajo-rpc-protobuf + provided + + + org.apache.tajo + tajo-ws-rs provided diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml index 9f1b1abc75..37121e37c5 100644 --- a/tajo-project/pom.xml +++ b/tajo-project/pom.xml @@ -38,6 +38,7 @@ 0.11.0-SNAPSHOT 0.98.7-hadoop2 4.0.25.Final + 2.6 ${project.parent.relativePath}/.. src/main/hadoop-${hadoop.version} @@ -787,6 +788,21 @@ tajo-rpc ${tajo.version} + + org.apache.tajo + tajo-rpc-common + ${tajo.version} + + + org.apache.tajo + tajo-rpc-protobuf + ${tajo.version} + + + org.apache.tajo + tajo-ws-rs + ${tajo.version} + org.apache.tajo tajo-algebra @@ -1063,6 +1079,21 @@ jcip-annotations 1.0-1 + + org.glassfish.jersey.core + jersey-common + ${jersey.version} + + + org.glassfish.jersey.core + jersey-server + ${jersey.version} + + + javax.ws.rs + javax.ws.rs-api + 2.0.1 + diff --git a/tajo-pullserver/pom.xml b/tajo-pullserver/pom.xml index 944cf3dd52..ba6e6b732c 100644 --- a/tajo-pullserver/pom.xml +++ b/tajo-pullserver/pom.xml @@ -56,7 +56,7 @@ org.apache.tajo - tajo-rpc + tajo-rpc-protobuf org.apache.tajo diff --git a/tajo-rpc/pom.xml b/tajo-rpc/pom.xml index 8c626b4bc9..f069aca3f3 100644 --- a/tajo-rpc/pom.xml +++ b/tajo-rpc/pom.xml @@ -24,165 +24,39 @@ org.apache.tajo ../tajo-project - jar tajo-rpc - Tajo Rpc - RPC Server/Client Implementation based on Netty and Protocol Buffer + pom + Tajo RPC + + UTF-8 + UTF-8 + + + + tajo-rpc-common + tajo-rpc-protobuf + tajo-ws-rs + - - org.apache.maven.plugins - maven-compiler-plugin - - 1.6 - 1.6 - ${project.build.sourceEncoding} - - org.apache.rat apache-rat-plugin - - - verify - - check - - - org.apache.maven.plugins - maven-jar-plugin - 2.4 - - - - - create-jar - prepare-package - - - - - org.apache.maven.plugins - maven-antrun-plugin - - - create-protobuf-generated-sources-directory - initialize - - - - - - - run - - - - - - org.codehaus.mojo - exec-maven-plugin - 1.2 - - - generate-sources - generate-sources - - protoc - - -Isrc/main/proto/ - --java_out=target/generated-sources/proto - src/main/proto/DummyProtos.proto - src/main/proto/RpcProtos.proto - src/main/proto/TestProtos.proto - src/main/proto/TestProtocol.proto - - - - exec - - - - - - org.codehaus.mojo - build-helper-maven-plugin - 1.5 - - - add-source - generate-sources - - add-source - - - - target/generated-sources/proto - - - - + maven-surefire-report-plugin - org.apache.maven.plugins - maven-surefire-report-plugin - 2.15 + maven-deploy-plugin + + true + - - - io.netty - netty-transport - - - io.netty - netty-codec - - - io.netty - netty-handler - - - commons-logging - commons-logging - - - commons-lang - commons-lang - - - com.google.guava - guava - - - com.google.protobuf - protobuf-java - - - junit - junit - test - - - - UTF-8 - - - - repository.jboss.org - https://repository.jboss.org/nexus/content/repositories/releases/ - - - false - - - @@ -216,6 +90,9 @@ dist false + + tar|rpm|deb + @@ -225,7 +102,7 @@ dist - package + prepare-package run @@ -248,12 +125,15 @@ echo echo "Current directory `pwd`" echo - run rm -rf ${project.artifactId}-${project.version} - run mkdir ${project.artifactId}-${project.version} - run cd ${project.artifactId}-${project.version} - run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar . + run rm -rf tajo-rpc-${project.version} + run mkdir tajo-rpc-${project.version} + run cd tajo-rpc-${project.version} + run cp -r ${basedir}/tajo-rpc-common/target/tajo-rpc-common-${project.version}*.jar . + run cp -r ${basedir}/tajo-rpc-protobuf/target/tajo-rpc-protobuf-${project.version}*.jar . + run cp -r ${basedir}/tajo-ws-rs/target/tajo-ws-rs-${project.version}*.jar . + echo - echo "Tajo Rpc dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}" + echo "Tajo RPC dist layout available at: ${project.build.directory}/tajo-rpc-${project.version}" echo @@ -274,9 +154,9 @@ org.apache.maven.plugins maven-surefire-report-plugin - 2.15 + diff --git a/tajo-rpc/tajo-rpc-common/pom.xml b/tajo-rpc/tajo-rpc-common/pom.xml new file mode 100644 index 0000000000..2b1cd7a696 --- /dev/null +++ b/tajo-rpc/tajo-rpc-common/pom.xml @@ -0,0 +1,216 @@ + + + + 4.0.0 + + tajo-project + 0.11.0-SNAPSHOT + org.apache.tajo + ../../tajo-project + + jar + tajo-rpc-common + Tajo Rpc Common + Common Implementation for Netty + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.6 + 1.6 + ${project.build.sourceEncoding} + + + + org.apache.rat + apache-rat-plugin + + + verify + + check + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + + create-jar + prepare-package + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + 2.15 + + + + + + + io.netty + netty-transport + + + io.netty + netty-codec + + + io.netty + netty-handler + + + commons-logging + commons-logging + + + commons-lang + commons-lang + + + com.google.guava + guava + + + junit + junit + test + + + + UTF-8 + + + + repository.jboss.org + https://repository.jboss.org/nexus/content/repositories/releases/ + + + false + + + + + + + docs + + false + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + + module-javadocs + package + + jar + + + ${project.build.directory} + + + + + + + + + dist + + false + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + dist + package + + run + + + + + run() { + echo "\$ ${@}" + "${@}" + res=$? + if [ $res != 0 ]; then + echo + echo "Failed!" + echo + exit $res + fi + } + + ROOT=`cd ${basedir}/..;pwd` + echo + echo "Current directory `pwd`" + echo + run rm -rf ${project.artifactId}-${project.version} + run mkdir ${project.artifactId}-${project.version} + run cd ${project.artifactId}-${project.version} + run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar . + echo + echo "Tajo Rpc dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}" + echo + + + + + + + + + + + + + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + 2.15 + + + + + diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java similarity index 86% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java rename to tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java index 024108b244..ad443d7d8d 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java @@ -36,9 +36,15 @@ import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.ServerSocket; +import java.util.Collections; +import java.util.HashSet; import java.util.Random; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +/** + * Base class for netty implementation. + */ public class NettyServerBase { private static final Log LOG = LogFactory.getLog(NettyServerBase.class); private static final String DEFAULT_PREFIX = "RpcServer_"; @@ -53,6 +59,7 @@ public class NettyServerBase { protected ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private InetSocketAddress initIsa; + private Set listeners = Collections.synchronizedSet(new HashSet()); public NettyServerBase(InetSocketAddress address) { this.initIsa = address; @@ -68,6 +75,10 @@ public void setName(String name) { } public void init(ChannelInitializer initializer, int workerNum) { + for (RpcEventListener listener: listeners) { + listener.onBeforeInit(this); + } + bootstrap = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum); this.initializer = initializer; @@ -80,6 +91,10 @@ public void init(ChannelInitializer initializer, int workerNum) { .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) .childOption(ChannelOption.SO_RCVBUF, 1048576 * 10); + + for (RpcEventListener listener: listeners) { + listener.onAfterInit(this); + } } public InetSocketAddress getListenAddress() { @@ -87,6 +102,10 @@ public InetSocketAddress getListenAddress() { } public void start() { + for (RpcEventListener listener: listeners) { + listener.onBeforeStart(this); + } + if (serviceName == null) { this.serviceName = getNextDefaultServiceName(); } @@ -105,6 +124,9 @@ public void start() { this.channelFuture = bootstrap.clone().bind(serverAddr).syncUninterruptibly(); this.bindAddress = (InetSocketAddress) channelFuture.channel().localAddress(); + for (RpcEventListener listener: listeners) { + listener.onAfterStart(this); + } LOG.info("Rpc (" + serviceName + ") listens on " + this.bindAddress); } @@ -117,6 +139,10 @@ public void shutdown() { } public void shutdown(boolean waitUntilThreadsStop) { + for (RpcEventListener listener: listeners) { + listener.onBeforeShutdown(this); + } + try { accepted.close(); } catch (Throwable t) { @@ -138,6 +164,10 @@ public void shutdown(boolean waitUntilThreadsStop) { } } } + + for (RpcEventListener listener: listeners) { + listener.onAfterShutdown(this); + } if (bindAddress != null) { LOG.info("Rpc (" + serviceName + ") listened on " @@ -202,4 +232,12 @@ private static boolean available(int port) throws IOException { } } } + + public void addListener(RpcEventListener listener) { + listeners.add(listener); + } + + public void removeListener(RpcEventListener listener) { + listeners.remove(listener); + } } \ No newline at end of file diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteException.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RemoteException.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteException.java rename to tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RemoteException.java diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java rename to tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java rename to tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcEventListener.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcEventListener.java new file mode 100644 index 0000000000..4d72536696 --- /dev/null +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcEventListener.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +/** + * Event listener for netty code. Users can subscribe events by using this interface. + */ +public interface RpcEventListener { + + /** + * Performs actions before start. + * @param obj Method caller + */ + public void onBeforeStart(Object obj); + + /** + * Performs actions after start. + * @param obj Method caller + */ + public void onAfterStart(Object obj); + + /** + * Performs actions before initialization. + * @param obj Method caller + */ + public void onBeforeInit(Object obj); + + /** + * Performs actions after initialization. + * @param obj Method caller + */ + public void onAfterInit(Object obj); + + /** + * Performs actions before shutdown. + * @param obj Method caller + */ + public void onBeforeShutdown(Object obj); + + /** + * Performs actions after shutdown. + * @param obj Method caller + */ + public void onAfterShutdown(Object obj); + +} diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java rename to tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java diff --git a/tajo-rpc/tajo-rpc-protobuf/pom.xml b/tajo-rpc/tajo-rpc-protobuf/pom.xml new file mode 100644 index 0000000000..1f67255084 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/pom.xml @@ -0,0 +1,274 @@ + + + + 4.0.0 + + tajo-project + 0.11.0-SNAPSHOT + org.apache.tajo + ../../tajo-project + + jar + tajo-rpc-protobuf + Tajo Protocol Buffer Rpc + RPC Server/Client Implementation based on Netty and Protocol Buffer + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.6 + 1.6 + ${project.build.sourceEncoding} + + + + org.apache.rat + apache-rat-plugin + + + verify + + check + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + + create-jar + prepare-package + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + create-protobuf-generated-sources-directory + initialize + + + + + + + run + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.2 + + + generate-sources + generate-sources + + protoc + + -Isrc/main/proto/ + --java_out=target/generated-sources/proto + src/main/proto/DummyProtos.proto + src/main/proto/RpcProtos.proto + src/main/proto/TestProtos.proto + src/main/proto/TestProtocol.proto + + + + exec + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.5 + + + add-source + generate-sources + + add-source + + + + target/generated-sources/proto + + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + 2.15 + + + + + + + commons-logging + commons-logging + + + commons-lang + commons-lang + + + com.google.guava + guava + + + com.google.protobuf + protobuf-java + + + org.apache.tajo + tajo-rpc-common + + + junit + junit + test + + + + UTF-8 + + + + repository.jboss.org + https://repository.jboss.org/nexus/content/repositories/releases/ + + + false + + + + + + + docs + + false + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + + module-javadocs + package + + jar + + + ${project.build.directory} + + + + + + + + + dist + + false + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + dist + package + + run + + + + + run() { + echo "\$ ${@}" + "${@}" + res=$? + if [ $res != 0 ]; then + echo + echo "Failed!" + echo + exit $res + fi + } + + ROOT=`cd ${basedir}/..;pwd` + echo + echo "Current directory `pwd`" + echo + run rm -rf ${project.artifactId}-${project.version} + run mkdir ${project.artifactId}-${project.version} + run cd ${project.artifactId}-${project.version} + run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar . + echo + echo "Tajo Rpc dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}" + echo + + + + + + + + + + + + + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + 2.15 + + + + + diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java rename to tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java rename to tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java rename to tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java rename to tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java rename to tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java rename to tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java rename to tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcController.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyRpcController.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcController.java rename to tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyRpcController.java diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java rename to tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java rename to tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java rename to tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteException.java new file mode 100644 index 0000000000..30c110d189 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +public class RemoteException extends RuntimeException { + public RemoteException() { + super(); + } + + public RemoteException(String message) { + super(message); + } + + public RemoteException(Throwable t) { + super(t); + } + + public RemoteException(String message, Throwable t) { + super(message, t); + } +} diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java new file mode 100644 index 0000000000..3c054adf3c --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import java.io.IOException; +import java.util.Date; +import java.util.List; + +public class RetriesExhaustedException extends RuntimeException { + private static final long serialVersionUID = 1876775844L; + + public RetriesExhaustedException(final String msg) { + super(msg); + } + + public RetriesExhaustedException(final String msg, final IOException e) { + super(msg, e); + } + + /** + * Datastructure that allows adding more info around Throwable incident. + */ + public static class ThrowableWithExtraContext { + private final Throwable t; + private final long when; + private final String extras; + + public ThrowableWithExtraContext(final Throwable t, final long when, + final String extras) { + this.t = t; + this.when = when; + this.extras = extras; + } + + @Override + public String toString() { + return new Date(this.when).toString() + ", " + extras + ", " + t.toString(); + } + } + + /** + * Create a new RetriesExhaustedException from the list of prior failures. + * @param callableVitals Details from the {@link ServerCallable} we were using + * when we got this exception. + * @param numTries The number of tries we made + * @param exceptions List of exceptions that failed before giving up + */ + public RetriesExhaustedException(final String callableVitals, int numTries, + List exceptions) { + super(getMessage(callableVitals, numTries, exceptions)); + } + + /** + * Create a new RetriesExhaustedException from the list of prior failures. + * @param numTries + * @param exceptions List of exceptions that failed before giving up + */ + public RetriesExhaustedException(final int numTries, + final List exceptions) { + super(getMessage(numTries, exceptions)); + } + + private static String getMessage(String callableVitals, int numTries, + List exceptions) { + StringBuilder buffer = new StringBuilder("Failed contacting "); + buffer.append(callableVitals); + buffer.append(" after "); + buffer.append(numTries + 1); + buffer.append(" attempts.\nExceptions:\n"); + for (Throwable t : exceptions) { + buffer.append(t.toString()); + buffer.append("\n"); + } + return buffer.toString(); + } + + private static String getMessage(final int numTries, + final List exceptions) { + StringBuilder buffer = new StringBuilder("Failed after attempts="); + buffer.append(numTries + 1); + buffer.append(", exceptions:\n"); + for (Throwable t : exceptions) { + buffer.append(t.toString()); + buffer.append("\n"); + } + return buffer.toString(); + } +} \ No newline at end of file diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java rename to tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java rename to tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/TajoServiceException.java similarity index 100% rename from tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java rename to tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/TajoServiceException.java diff --git a/tajo-rpc/src/main/proto/DummyProtos.proto b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/DummyProtos.proto similarity index 100% rename from tajo-rpc/src/main/proto/DummyProtos.proto rename to tajo-rpc/tajo-rpc-protobuf/src/main/proto/DummyProtos.proto diff --git a/tajo-rpc/src/main/proto/RpcProtos.proto b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/RpcProtos.proto similarity index 100% rename from tajo-rpc/src/main/proto/RpcProtos.proto rename to tajo-rpc/tajo-rpc-protobuf/src/main/proto/RpcProtos.proto diff --git a/tajo-rpc/src/main/proto/TestProtocol.proto b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto similarity index 100% rename from tajo-rpc/src/main/proto/TestProtocol.proto rename to tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto diff --git a/tajo-rpc/src/main/proto/TestProtos.proto b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtos.proto similarity index 100% rename from tajo-rpc/src/main/proto/TestProtos.proto rename to tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtos.proto diff --git a/tajo-rpc/src/test/java/log4j.properties b/tajo-rpc/tajo-rpc-protobuf/src/test/java/log4j.properties similarity index 100% rename from tajo-rpc/src/test/java/log4j.properties rename to tajo-rpc/tajo-rpc-protobuf/src/test/java/log4j.properties diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java similarity index 100% rename from tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java rename to tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java similarity index 100% rename from tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java rename to tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java similarity index 100% rename from tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java rename to tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java similarity index 100% rename from tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java rename to tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java diff --git a/tajo-rpc/tajo-ws-rs/pom.xml b/tajo-rpc/tajo-ws-rs/pom.xml new file mode 100644 index 0000000000..a87a67a7bd --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/pom.xml @@ -0,0 +1,218 @@ + + + + 4.0.0 + + tajo-project + 0.11.0-SNAPSHOT + org.apache.tajo + ../../tajo-project + + jar + tajo-ws-rs + Tajo RESTful Container + RESTful Container Implementation based on Netty + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.6 + 1.6 + ${project.build.sourceEncoding} + + + + org.apache.rat + apache-rat-plugin + + + verify + + check + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + + create-jar + prepare-package + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + 2.15 + + + + + + + io.netty + netty-transport + + + io.netty + netty-codec + + + io.netty + netty-codec-http + + + io.netty + netty-handler + + + org.apache.tajo + tajo-rpc-common + + + org.glassfish.jersey.core + jersey-common + + + org.glassfish.jersey.core + jersey-server + + + javax.ws.rs + javax.ws.rs-api + + + com.google.code.gson + gson + + + junit + junit + test + + + + UTF-8 + + + + + docs + + false + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + + module-javadocs + package + + jar + + + ${project.build.directory} + + + + + + + + + dist + + false + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + dist + package + + run + + + + + run() { + echo "\$ ${@}" + "${@}" + res=$? + if [ $res != 0 ]; then + echo + echo "Failed!" + echo + exit $res + fi + } + + ROOT=`cd ${basedir}/..;pwd` + echo + echo "Current directory `pwd`" + echo + run rm -rf ${project.artifactId}-${project.version} + run mkdir ${project.artifactId}-${project.version} + run cd ${project.artifactId}-${project.version} + run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar . + echo + echo "Tajo RESTful Container dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}" + echo + + + + + + + + + + + + + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + 2.15 + + + + + diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestChannelInitializer.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestChannelInitializer.java new file mode 100644 index 0000000000..a1ea72b814 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestChannelInitializer.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.stream.ChunkedWriteHandler; + +/** + * Default Channel Initializer for Netty Rest server. + */ +public class NettyRestChannelInitializer extends ChannelInitializer { + + private ChannelHandler handler; + + public NettyRestChannelInitializer(ChannelHandler handler) { + this.handler = handler; + } + + @Override + protected void initChannel(Channel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + + pipeline.addLast(new HttpServerCodec()); + pipeline.addLast(new HttpObjectAggregator(1 << 16)); + pipeline.addLast(new ChunkedWriteHandler()); + pipeline.addLast(handler); + } + +} diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainer.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainer.java new file mode 100644 index 0000000000..81d1eeb759 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainer.java @@ -0,0 +1,319 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty; + +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.Principal; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.netty.buffer.*; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.CharsetUtil; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.GenericFutureListener; + +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.SecurityContext; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.glassfish.hk2.api.ServiceLocator; +import org.glassfish.jersey.internal.MapPropertiesDelegate; +import org.glassfish.jersey.server.ApplicationHandler; +import org.glassfish.jersey.server.ContainerException; +import org.glassfish.jersey.server.ContainerRequest; +import org.glassfish.jersey.server.ContainerResponse; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.server.internal.ConfigHelper; +import org.glassfish.jersey.server.spi.Container; +import org.glassfish.jersey.server.spi.ContainerLifecycleListener; +import org.glassfish.jersey.server.spi.ContainerResponseWriter; + +/** + * Jersy Container implementation on Netty + */ +@Sharable +public class NettyRestHandlerContainer extends ChannelDuplexHandler implements Container { + + private static Log LOG = LogFactory.getLog(NettyRestHandlerContainer.class); + + private String rootPath; + + private ApplicationHandler applicationHandler; + private ContainerLifecycleListener lifecycleListener; + + NettyRestHandlerContainer(Application application) { + this(new ApplicationHandler(application)); + } + + NettyRestHandlerContainer(Application application, ServiceLocator parentLocator) { + this(new ApplicationHandler(application, null, parentLocator)); + } + + NettyRestHandlerContainer(ApplicationHandler appHandler) { + applicationHandler = appHandler; + lifecycleListener = ConfigHelper.getContainerLifecycleListener(applicationHandler); + } + + @Override + public ResourceConfig getConfiguration() { + return applicationHandler.getConfiguration(); + } + + @Override + public void reload() { + reload(getConfiguration()); + } + + @Override + public void reload(ResourceConfig configuration) { + lifecycleListener.onShutdown(this); + applicationHandler = new ApplicationHandler(configuration); + lifecycleListener = ConfigHelper.getContainerLifecycleListener(applicationHandler); + lifecycleListener.onReload(this); + lifecycleListener.onStartup(this); + + if (LOG.isDebugEnabled()) { + LOG.debug("NettyRestHandlerContainer reloaded."); + } + } + + public void setRootPath(String rootPath) { + String tempRootPath = rootPath; + if (tempRootPath == null || tempRootPath.isEmpty()) { + tempRootPath = "/"; + } else if (tempRootPath.charAt(tempRootPath.length() - 1) != '/') { + tempRootPath += "/"; + } + this.rootPath = tempRootPath; + } + + private URI getBaseUri(ChannelHandlerContext ctx, FullHttpRequest request) { + URI baseUri; + String scheme; + + if (ctx.pipeline().get(SslHandler.class) == null) { + scheme = "http"; + } else { + scheme = "https"; + } + + List hosts = request.headers().getAll(HttpHeaders.Names.HOST); + try { + if (hosts != null && hosts.size() > 0) { + baseUri = new URI(scheme + "://" + hosts.get(0) + rootPath); + } else { + InetSocketAddress localAddress = (InetSocketAddress) ctx.channel().localAddress(); + baseUri = new URI(scheme, null, localAddress.getHostName(), localAddress.getPort(), + rootPath, null, null); + } + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + + return baseUri; + } + + protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { + URI baseUri = getBaseUri(ctx, request); + URI requestUri = baseUri.resolve(request.getUri()); + ByteBuf responseContent = PooledByteBufAllocator.DEFAULT.buffer(); + FullHttpResponse response = + new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, responseContent); + + NettyRestResponseWriter responseWriter = new NettyRestResponseWriter(ctx, response); + ContainerRequest containerRequest = new ContainerRequest(baseUri, requestUri, + request.getMethod().name(), getSecurityContext(), new MapPropertiesDelegate()); + containerRequest.setEntityStream(new ByteBufInputStream(request.content())); + + HttpHeaders httpHeaders = request.headers(); + for (String headerName: httpHeaders.names()) { + List headerValues = httpHeaders.getAll(headerName); + containerRequest.headers(headerName, headerValues); + } + containerRequest.setWriter(responseWriter); + try { + applicationHandler.handle(containerRequest); + } finally { + responseWriter.releaseConnection(); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + boolean needRelease = true; + try { + if (msg instanceof FullHttpRequest) { + FullHttpRequest request = (FullHttpRequest) msg; + messageReceived(ctx, request); + } else { + needRelease = false; + ctx.fireChannelRead(msg); + } + } finally { + if (needRelease) { + ReferenceCountUtil.release(msg); + } + } + } + + private SecurityContext getSecurityContext() { + return new SecurityContext() { + + @Override + public boolean isUserInRole(String role) { + return false; + } + + @Override + public boolean isSecure() { + return false; + } + + @Override + public Principal getUserPrincipal() { + return null; + } + + @Override + public String getAuthenticationScheme() { + return null; + } + }; + } + + /** + * Internal class for writing content on REST service. + */ + static class NettyRestResponseWriter implements ContainerResponseWriter { + + private final ChannelHandlerContext ctx; + private final FullHttpResponse response; + private final AtomicBoolean closed; + + public NettyRestResponseWriter(ChannelHandlerContext ctx, FullHttpResponse response) { + this.ctx = ctx; + this.response = response; + this.closed = new AtomicBoolean(false); + } + + @Override + public void commit() { + if (closed.compareAndSet(false, true)) { + ctx.write(response); + sendLastHttpContent(); + } + } + + @Override + public boolean enableResponseBuffering() { + return false; + } + + @Override + public void failure(Throwable error) { + try { + sendError(HttpResponseStatus.INTERNAL_SERVER_ERROR, error); + } finally { + if (ctx.channel().isActive()) { + ctx.close(); + } + } + } + + private void sendError(HttpResponseStatus status, final Throwable error) { + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, + Unpooled.copiedBuffer(error.getMessage(), CharsetUtil.UTF_8)); + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8"); + ChannelPromise promise = ctx.newPromise(); + promise.addListener(new GenericFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + throw new ContainerException(error); + } + } + }); + + ctx.writeAndFlush(response, promise); + } + + @Override + public void setSuspendTimeout(long timeOut, TimeUnit timeUnit) throws IllegalStateException { + throw new UnsupportedOperationException("setSuspendTimeout is not supported on this container."); + } + + @Override + public boolean suspend(long timeOut, TimeUnit timeUnit, TimeoutHandler timeoutHandler) { + throw new UnsupportedOperationException("suspend is not supported on this container."); + } + + @Override + public OutputStream writeResponseStatusAndHeaders(long contentLength, ContainerResponse context) + throws ContainerException { + MultivaluedMap responseHeaders = context.getStringHeaders(); + HttpHeaders nettyHeaders = response.headers(); + + for (Entry> entry: responseHeaders.entrySet()) { + nettyHeaders.add(entry.getKey(), entry.getValue()); + } + + int status = context.getStatus(); + + response.setStatus(HttpResponseStatus.valueOf(status)); + return new ByteBufOutputStream(response.content()); + } + + private void sendLastHttpContent() { + ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) + .addListener(ChannelFutureListener.CLOSE); + } + + private void releaseConnection() { + if (closed.compareAndSet(false, true)) { + String warnMessage = "ResponseWriter did not be commited."; + LOG.warn(warnMessage); + failure(new IllegalStateException(warnMessage)); + } + } + + } + +} diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainerProvider.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainerProvider.java new file mode 100644 index 0000000000..7481cfbddd --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainerProvider.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty; + +import io.netty.channel.ChannelHandler; + +import javax.ws.rs.ProcessingException; + +import org.glassfish.jersey.server.ApplicationHandler; +import org.glassfish.jersey.server.spi.ContainerProvider; + +/** + * Container Provider for NettyRestHandlerContainer + */ +public final class NettyRestHandlerContainerProvider implements ContainerProvider { + + @Override + public T createContainer(Class type, ApplicationHandler application) throws ProcessingException { + if (type != NettyRestHandlerContainer.class && + (type == null || !ChannelHandler.class.isAssignableFrom(type))) { + return null; + } + return type.cast(new NettyRestHandlerContainer(application)); + } + +} diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServer.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServer.java new file mode 100644 index 0000000000..f7fe148127 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServer.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty; + +import io.netty.channel.ChannelHandler; +import java.net.InetSocketAddress; + +import org.apache.tajo.rpc.NettyServerBase; + +/** + * JAX-RS Http Server on Netty implementation. + */ +public class NettyRestServer extends NettyServerBase { + + private ChannelHandler handler; + private int workerCount; + + public NettyRestServer(InetSocketAddress address, int workerCount) { + this("NettyRestService", address, workerCount); + } + + public NettyRestServer(String serviceName, InetSocketAddress address, int workerCount) { + super(serviceName, address); + + this.workerCount = workerCount; + } + + public ChannelHandler getHandler() { + return handler; + } + + public void setHandler(ChannelHandler handler) { + this.handler = handler; + } + + /** + * Bind desired port and start network service. Before starting network service, {@link NettyRestServer} + * will initialize its configuration. + * + */ + @Override + public void start() { + if (handler == null) { + throw new IllegalStateException("ChannelHandler is null."); + } + + super.init(new NettyRestChannelInitializer(handler), workerCount); + super.start(); + } + +} diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerFactory.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerFactory.java new file mode 100644 index 0000000000..5d2eea10da --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerFactory.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty; + +import java.net.InetSocketAddress; +import java.net.URI; + +import org.glassfish.hk2.api.ServiceLocator; +import org.glassfish.jersey.server.ResourceConfig; + +/** + * Factory class for creating {@link NettyRestServer} instances + */ +public final class NettyRestServerFactory { + + public static NettyRestServer createNettyRestServer(URI uri, ResourceConfig configuration, int workerCount) { + return createNettyRestServer(uri, new NettyRestHandlerContainer(configuration), workerCount, true); + } + + public static NettyRestServer createNettyRestServer(URI uri, ResourceConfig configuration, int workerCount, + boolean start) { + return createNettyRestServer(uri, new NettyRestHandlerContainer(configuration), workerCount, start); + } + + public static NettyRestServer createNettyRestServer(URI uri, ResourceConfig configuration, + ServiceLocator parentLocator, int workerCount) { + return createNettyRestServer(uri, new NettyRestHandlerContainer(configuration, parentLocator), workerCount, true); + } + + public static NettyRestServer createNettyRestServer(URI uri, ResourceConfig configuration, + ServiceLocator parentLocator, int workerCount, boolean start) { + return createNettyRestServer(uri, new NettyRestHandlerContainer(configuration, parentLocator), workerCount, start); + } + + /** + * Creates {@link NettyRestServer} instances with JAX-RS application. + * + * @param uri + * @param handler + * @param start + * @return + */ + private static NettyRestServer createNettyRestServer(URI uri, NettyRestHandlerContainer handler, int workerCount, + boolean start) { + if (uri == null) { + throw new IllegalArgumentException("uri is null."); + } + + String schemeString = uri.getScheme(); + if (!schemeString.equalsIgnoreCase("http") && !schemeString.equalsIgnoreCase("https")) { + throw new IllegalArgumentException("scheme of this uri (" + uri.toString() + ") should be http or https."); + } + + int port = uri.getPort(); + if (port == -1) { + throw new IllegalArgumentException("Port number should be provided."); + } + + handler.setRootPath(uri.getPath()); + + InetSocketAddress bindAddress = new InetSocketAddress(port); + NettyRestServer nettyRestServer = new NettyRestServer("Tajo-REST", bindAddress, workerCount); + + nettyRestServer.setHandler(handler); + nettyRestServer.addListener(new NettyRestServerListener(handler)); + + if (start) { + nettyRestServer.start(); + } + + return nettyRestServer; + } +} diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerListener.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerListener.java new file mode 100644 index 0000000000..ecd5bb0f92 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerListener.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty; + +import org.apache.tajo.rpc.RpcEventListener; +import org.glassfish.jersey.server.ApplicationHandler; +import org.glassfish.jersey.server.internal.ConfigHelper; +import org.glassfish.jersey.server.spi.Container; +import org.glassfish.jersey.server.spi.ContainerLifecycleListener; + +/** + * Event subscriber for netty rest service. + */ +public class NettyRestServerListener implements RpcEventListener { + + private Container container; + + public NettyRestServerListener(Container container) { + this.container = container; + } + + @Override + public void onAfterInit(Object obj) { + + } + + @Override + public void onAfterShutdown(Object obj) { + ApplicationHandler applicationHandler = new ApplicationHandler(container.getConfiguration()); + ContainerLifecycleListener lifecycleListener = ConfigHelper.getContainerLifecycleListener(applicationHandler); + lifecycleListener.onShutdown(container); + } + + @Override + public void onAfterStart(Object obj) { + ApplicationHandler applicationHandler = new ApplicationHandler(container.getConfiguration()); + ContainerLifecycleListener lifecycleListener = ConfigHelper.getContainerLifecycleListener(applicationHandler); + lifecycleListener.onStartup(container); + } + + @Override + public void onBeforeInit(Object obj) { + + } + + @Override + public void onBeforeShutdown(Object obj) { + + } + + @Override + public void onBeforeStart(Object obj) { + + } + +} diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonFeature.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonFeature.java new file mode 100644 index 0000000000..26086d4d62 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonFeature.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty.gson; + +import javax.ws.rs.core.Feature; +import javax.ws.rs.core.FeatureContext; +import javax.ws.rs.ext.MessageBodyReader; +import javax.ws.rs.ext.MessageBodyWriter; + +public class GsonFeature implements Feature { + + @Override + public boolean configure(FeatureContext featureContext) { + featureContext.register(GsonReader.class, MessageBodyReader.class); + featureContext.register(GsonWriter.class, MessageBodyWriter.class); + return true; + } +} diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonReader.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonReader.java new file mode 100644 index 0000000000..4d6e44040a --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonReader.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty.gson; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import javax.ws.rs.Consumes; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyReader; +import java.io.*; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; + +/** + * Custom message body reader with Gson feature. + */ +@Consumes(MediaType.APPLICATION_JSON) +public class GsonReader implements MessageBodyReader { + + @Override + public boolean isReadable(Class aClass, Type type, Annotation[] annotations, MediaType mediaType) { + return GsonUtil.isJsonType(mediaType); + } + + @Override + public T readFrom(Class aClass, Type type, Annotation[] annotations, MediaType mediaType, + MultivaluedMap multivaluedMap, InputStream inputStream) + throws IOException, WebApplicationException { + Gson gson = new GsonBuilder().create(); + Reader reader = new BufferedReader(new InputStreamReader(inputStream)); + return gson.fromJson(reader, type); + } +} diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonUtil.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonUtil.java new file mode 100644 index 0000000000..f16cb9664d --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonUtil.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty.gson; + +import javax.ws.rs.core.MediaType; + +public class GsonUtil { + + public static boolean isJsonType(MediaType mediaType) { + if (mediaType != null) { + String subType = mediaType.getSubtype(); + return "json".equalsIgnoreCase(subType) || subType.endsWith("+json"); + } + return false; + } +} diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonWriter.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonWriter.java new file mode 100644 index 0000000000..d2156114bb --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonWriter.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty.gson; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyWriter; +import java.io.*; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; + +/** + * custom message body writer with Gson feature. + */ +@Produces(MediaType.APPLICATION_JSON) +public class GsonWriter implements MessageBodyWriter { + + @Override + public boolean isWriteable(Class aClass, Type type, Annotation[] annotations, MediaType mediaType) { + return GsonUtil.isJsonType(mediaType); + } + + @Override + public long getSize(T t, Class aClass, Type type, Annotation[] annotations, MediaType mediaType) { + return 0; + } + + @Override + public void writeTo(T t, Class aClass, Type type, Annotation[] annotations, MediaType mediaType, + MultivaluedMap multivaluedMap, OutputStream outputStream) + throws IOException, WebApplicationException { + Gson gson = new GsonBuilder().create(); + Writer writer = new BufferedWriter(new OutputStreamWriter(outputStream)); + + gson.toJson(t, type, writer); + writer.flush(); + } +} diff --git a/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainerProviderTest.java b/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainerProviderTest.java new file mode 100644 index 0000000000..1511068159 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainerProviderTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInboundHandler; +import org.apache.tajo.ws.rs.netty.testapp1.TestApplication1; +import org.glassfish.jersey.server.ApplicationHandler; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class NettyRestHandlerContainerProviderTest { + + private NettyRestHandlerContainerProvider provider; + private ApplicationHandler applicationHandler; + + @Before + public void setUp() throws Exception { + provider = new NettyRestHandlerContainerProvider(); + applicationHandler = new ApplicationHandler(new TestApplication1()); + } + + @Test + public void testCreation() throws Exception { + ChannelHandler handler = provider.createContainer(ChannelHandler.class, applicationHandler); + + assertNotNull(handler); + + ChannelInboundHandler inboundHandler = provider.createContainer(ChannelInboundHandler.class, applicationHandler); + + assertNotNull(inboundHandler); + + NettyRestHandlerContainer container = provider.createContainer(NettyRestHandlerContainer.class, applicationHandler); + + assertNotNull(container); + } + + @Test + public void testNullCreation() throws Exception { + String stringValue = provider.createContainer(String.class, applicationHandler); + + assertNull(stringValue); + + Object objectValue = provider.createContainer(Object.class, applicationHandler); + + assertNull(objectValue); + } +} diff --git a/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/NettyRestServerTest.java b/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/NettyRestServerTest.java new file mode 100644 index 0000000000..d8a57bf84b --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/NettyRestServerTest.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty; + +import org.apache.tajo.ws.rs.netty.gson.GsonFeature; +import org.apache.tajo.ws.rs.netty.testapp1.TestApplication1; +import org.apache.tajo.ws.rs.netty.testapp1.TestResource1; +import org.apache.tajo.ws.rs.netty.testapp2.Directory; +import org.apache.tajo.ws.rs.netty.testapp2.FileManagementApplication; +import org.glassfish.jersey.server.ResourceConfig; +import org.junit.Test; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.URI; +import java.util.Collection; + +import static org.junit.Assert.*; + +public class NettyRestServerTest { + + @Test + public void testNettyRestServerCreation() throws Exception { + ResourceConfig resourceConfig = ResourceConfig.forApplicationClass(TestApplication1.class); + ServerSocket serverSocket = new ServerSocket(0); + int availPort = serverSocket.getLocalPort(); + serverSocket.close(); + URI baseUri = new URI("http://localhost:"+availPort+"/rest"); + + NettyRestServer restServer = NettyRestServerFactory.createNettyRestServer(baseUri, resourceConfig, 3); + + assertNotNull(restServer); + assertNotNull(restServer.getHandler()); + assertNotNull(restServer.getChannel()); + assertNotNull(restServer.getListenAddress()); + + InetSocketAddress listeningAddress = restServer.getListenAddress(); + + assertEquals(availPort, listeningAddress.getPort()); + } + + @Test + public void testTextPlainApplication() throws Exception { + ResourceConfig resourceConfig = ResourceConfig.forApplicationClass(TestApplication1.class); + ServerSocket serverSocket = new ServerSocket(0); + int availPort = serverSocket.getLocalPort(); + serverSocket.close(); + URI baseUri = new URI("http://localhost:"+availPort+"/rest"); + + NettyRestServer restServer = NettyRestServerFactory.createNettyRestServer(baseUri, resourceConfig, 3); + + try { + WebTarget webTarget = ClientBuilder.newClient().target(baseUri + "/testapp1"); + + assertEquals(TestResource1.outputMessage, webTarget.request(MediaType.TEXT_PLAIN).get(String.class)); + } finally { + restServer.shutdown(); + } + } + + protected Directory createDirectory1() { + Directory newDirectory = new Directory(); + + newDirectory.setName("newdir1"); + newDirectory.setOwner("owner1"); + newDirectory.setGroup("group1"); + + return newDirectory; + } + + @Test + public void testFileMgmtApplication() throws Exception { + ResourceConfig resourceConfig = ResourceConfig.forApplicationClass(FileManagementApplication.class) + .register(GsonFeature.class); + ServerSocket serverSocket = new ServerSocket(0); + int availPort = serverSocket.getLocalPort(); + serverSocket.close(); + URI baseUri = new URI("http://localhost:"+availPort+"/rest"); + URI directoriesUri = new URI(baseUri + "/directories"); + Client restClient = ClientBuilder.newBuilder() + .register(GsonFeature.class).build(); + + NettyRestServer restServer = NettyRestServerFactory.createNettyRestServer(baseUri, resourceConfig, 3); + + try { + Directory directory1 = createDirectory1(); + Directory savedDirectory = restClient.target(directoriesUri) + .request().post(Entity.entity(directory1, MediaType.APPLICATION_JSON_TYPE), Directory.class); + + assertNotNull(savedDirectory); + assertNotNull(savedDirectory.getName()); + + Directory fetchedDirectory = restClient.target(directoriesUri).path("{name}") + .resolveTemplate("name", directory1.getName()).request().get(Directory.class); + + assertEquals(directory1.getName(), fetchedDirectory.getName()); + assertEquals(directory1.getOwner(), fetchedDirectory.getOwner()); + assertEquals(directory1.getGroup(), fetchedDirectory.getGroup()); + + GenericType> directoryType = new GenericType>(Collection.class); + Collection directories = restClient.target(directoriesUri).request().get(directoryType); + + assertEquals(1, directories.size()); + + restClient.target(directoriesUri).path("{name}").resolveTemplate("name", directory1.getName()) + .request().delete(); + + directories = restClient.target(directoriesUri).request().get(directoryType); + + assertTrue(directories.isEmpty()); + } finally { + restServer.shutdown(); + } + } +} diff --git a/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp1/TestApplication1.java b/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp1/TestApplication1.java new file mode 100644 index 0000000000..3531b3abc8 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp1/TestApplication1.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty.testapp1; + +import javax.ws.rs.core.Application; +import java.util.HashSet; +import java.util.Set; + +public class TestApplication1 extends Application { + + private final Set> classes; + + public TestApplication1() { + classes = new HashSet>(); + classes.add(TestResource1.class); + } + + @Override + public Set> getClasses() { + return classes; + } +} diff --git a/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp1/TestResource1.java b/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp1/TestResource1.java new file mode 100644 index 0000000000..302e2177f9 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp1/TestResource1.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty.testapp1; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +@Path("testapp1") +public class TestResource1 { + + public static String outputMessage = "TestApplication1"; + + @GET + @Produces(MediaType.TEXT_PLAIN) + public String getApplicationName() { + return outputMessage; + } +} diff --git a/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp2/DirectoriesDao.java b/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp2/DirectoriesDao.java new file mode 100644 index 0000000000..0e82e007ca --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp2/DirectoriesDao.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty.testapp2; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class DirectoriesDao { + private static DirectoriesDao instance = new DirectoriesDao(); + + private final Map directoryMap = new ConcurrentHashMap(); + + private DirectoriesDao() { + } + + public static DirectoriesDao getInstance() { + return instance; + } + + public Map getDirectoryMap() { + return directoryMap; + } +} diff --git a/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp2/DirectoriesResource.java b/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp2/DirectoriesResource.java new file mode 100644 index 0000000000..40f1ceda27 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp2/DirectoriesResource.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty.testapp2; + +import javax.ws.rs.*; +import javax.ws.rs.core.*; +import java.net.URI; +import java.util.Collection; + +@Path("/directories") +public class DirectoriesResource { + + @Context + UriInfo uriInfo; + + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response getAllDirectories() { + Collection directories = DirectoriesDao.getInstance().getDirectoryMap().values(); + GenericEntity> dirEntities = + new GenericEntity>(directories, Collection.class); + return Response.ok(dirEntities).build(); + } + + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response createDirectory(Directory directory) { + String directoryName = directory.getName(); + + if (directoryName == null || directoryName.isEmpty()) { + return Response.status(Response.Status.BAD_REQUEST).build(); + } + + DirectoriesDao.getInstance().getDirectoryMap().put(directoryName, directory); + + UriBuilder uriBuilder = uriInfo.getBaseUriBuilder(); + URI directoryUri = uriBuilder.path(DirectoriesResource.class) + .path(DirectoriesResource.class, "getDirectoryByName") + .build(directoryName); + + return Response.created(directoryUri).entity(directory).build(); + } + + @GET + @Path("{name}") + @Produces(MediaType.APPLICATION_JSON) + public Response getDirectoryByName(@PathParam("name") String directoryName) { + Directory directory = DirectoriesDao.getInstance().getDirectoryMap().get(directoryName); + + if (directory == null) { + return Response.status(Response.Status.NOT_FOUND).build(); + } + + return Response.ok(directory).build(); + } + + @DELETE + @Path("{name}") + public Response deleteDirectory(@PathParam("name") String directoryName) { + if (!DirectoriesDao.getInstance().getDirectoryMap().containsKey(directoryName)) { + return Response.status(Response.Status.NOT_FOUND).build(); + } + + DirectoriesDao.getInstance().getDirectoryMap().remove(directoryName); + + return Response.ok().build(); + } +} diff --git a/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp2/Directory.java b/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp2/Directory.java new file mode 100644 index 0000000000..3851020814 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp2/Directory.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty.testapp2; + +import java.io.Serializable; + +public class Directory implements Serializable { + + private String name; + private String owner; + private String group; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getOwner() { + return owner; + } + + public void setOwner(String owner) { + this.owner = owner; + } + + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } +} diff --git a/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp2/FileManagementApplication.java b/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp2/FileManagementApplication.java new file mode 100644 index 0000000000..49026a0dd9 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/test/java/org/apache/tajo/ws/rs/netty/testapp2/FileManagementApplication.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty.testapp2; + +import javax.ws.rs.core.Application; +import java.util.HashSet; +import java.util.Set; + +public class FileManagementApplication extends Application { + + @Override + public Set> getClasses() { + Set> classes = new HashSet>(); + + classes.add(DirectoriesResource.class); + + return classes; + } +} From 16e5f27e459ea3341dee6e1827010465c2303d27 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 18 Mar 2015 15:41:51 +0900 Subject: [PATCH 3/4] TAJO-1403: Improve 'Simple Query' with only partition columns and constant values --- .../NonForwardQueryResultFileScanner.java | 37 ++++++++- .../tajo/master/exec/QueryExecutor.java | 5 ++ .../apache/tajo/master/TestGlobalPlanner.java | 3 +- .../apache/tajo/plan/util/PlannerUtil.java | 82 ++++++++++++++++++- .../tajo/storage/FileStorageManager.java | 29 ++++++- 5 files changed, 149 insertions(+), 7 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index dc0c44a374..ed4d1a8dbc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -24,6 +24,7 @@ import org.apache.tajo.QueryId; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; @@ -31,12 +32,15 @@ import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.engine.planner.physical.SeqScanExec; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.FileStorageManager; import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.util.StringUtils; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -74,10 +78,37 @@ public void init() throws IOException { initSeqScanExec(); } + /** + * Set partition path and depth if ScanNode's qualification exists + * + * @param storageManager target storage manager to be set with partition info + */ + private void setPartition(StorageManager storageManager) { + if (tableDesc.isExternal() && tableDesc.hasPartition() && scanNode.getQual() != null && + storageManager instanceof FileStorageManager) { + StringBuffer path = new StringBuffer(); + int depth = 0; + if (tableDesc.hasPartition()) { + for (Column c : tableDesc.getPartitionMethod().getExpressionSchema().getColumns()) { + String partitionValue = PlannerUtil.getPartitionValue(scanNode.getQual(), c.getSimpleName()); + if (partitionValue == null) + break; + path.append(String.format("/%s=%s", c.getSimpleName(), StringUtils.escapePathName(partitionValue))); + depth++; + } + } + ((FileStorageManager)storageManager).setPartitionPath(path.toString()); + ((FileStorageManager)storageManager).setCurrentDepth(depth); + scanNode.setQual(null); + } + } + private void initSeqScanExec() throws IOException { - List fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType()) - .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN); - + StorageManager storageManager = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType()); + List fragments = null; + setPartition(storageManager); + fragments = storageManager.getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN); + if (fragments != null && !fragments.isEmpty()) { FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[] {})); this.taskContext = new TaskAttemptContext( diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index db82fca603..aa8b228024 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -212,6 +212,11 @@ public void execSimpleQuery(QueryContext queryContext, Session session, String q scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN); } TableDesc desc = scanNode.getTableDesc(); + // Keep info for partition-column-only queries + SelectionNode selectionNode = plan.getRootBlock().getNode(NodeType.SELECTION); + if (desc.isExternal() && desc.hasPartition() && selectionNode != null) { + scanNode.setQual(selectionNode.getQual()); + } int maxRow = Integer.MAX_VALUE; if (plan.getRootBlock().hasNode(NodeType.LIMIT)) { LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT); diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java index d0f7cf44a8..45c94a37e6 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java @@ -317,8 +317,9 @@ public void testCheckIfSimpleQuery() throws Exception { plan = buildPlan("select * from customer where c_nationkey = 1"); assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); + // c_nationkey is partition column plan = buildPlan("select * from customer_parts where c_nationkey = 1"); - assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); + assertTrue(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); // same column order plan = buildPlan("select c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment" + diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index 0fbd3593ad..f11ac1b21c 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -100,6 +100,7 @@ public static boolean checkIfSimpleQuery(LogicalPlan plan) { PlannerUtil.getRelationLineage(plan.getRootBlock().getRoot()).length == 1; boolean noComplexComputation = false; + boolean prefixPartitionWhere = false; if (singleRelation) { ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN); if (scanNode == null) { @@ -133,11 +134,90 @@ public static boolean checkIfSimpleQuery(LogicalPlan plan) { } } } + + if (!noWhere && scanNode.getTableDesc().isExternal() && scanNode.getTableDesc().getPartitionMethod() != null) { + EvalNode node = ((SelectionNode) plan.getRootBlock().getNode(NodeType.SELECTION)).getQual(); + Schema partSchema = scanNode.getTableDesc().getPartitionMethod().getExpressionSchema(); + if (checkIfPartitionSelection(node, partSchema)) { + prefixPartitionWhere = true; + boolean isPrefix = true; + for (Column c : partSchema.getColumns()) { + String value = getPartitionValue(node, c.getSimpleName()); + if (isPrefix && value == null) + isPrefix = false; + else if (!isPrefix && value != null) { + prefixPartitionWhere = false; + break; + } + } + } + } } return !checkIfDDLPlan(rootNode) && (simpleOperator && noComplexComputation && isOneQueryBlock && - noOrderBy && noGroupBy && noWhere && noJoin && singleRelation); + noOrderBy && noGroupBy && (noWhere || prefixPartitionWhere) && noJoin && singleRelation); + } + + /** + * Checks whether EvalNode consists of only partition columns and const values. + * The partition based simple query can be defined as 'select * from tb_name where col_name1="X" and col_name2="Y" [LIMIT Z]', + * whose WHERE clause consists of only partition-columns with constant values. + * Partition columns must be able to form a prefix of HDFS path like '/tb_name1/col_name1=X/col_name2=Y'. + * + * @param node The qualification node of a SELECTION node + * @param partSchema Partition expression schema + * @return True if the query is partition-column based simple query. + */ + public static boolean checkIfPartitionSelection(EvalNode node, Schema partSchema) { + if (node != null && node instanceof BinaryEval) { + BinaryEval eval = (BinaryEval)node; + EvalNode left = eval.getLeftExpr(); + EvalNode right = eval.getRightExpr(); + EvalType type = eval.getType(); + + if (type == EvalType.EQUAL) { + if (left instanceof FieldEval && right instanceof ConstEval && partSchema.contains(((FieldEval) left).getColumnName())) { + return true; + } else if (left instanceof ConstEval && right instanceof FieldEval && partSchema.contains(((FieldEval) right).getColumnName())) { + return true; + } + } else if (type == EvalType.AND && left instanceof BinaryEval && right instanceof BinaryEval) { + return checkIfPartitionSelection(left, partSchema) && checkIfPartitionSelection(right, partSchema); + } + } + return false; + } + + /** + * Get partition constant value associated with `columnName`. + * + * @param node EvalNode having query predicates + * @param columnName Column name to be looked up + * @return String The value associated with `columnName` in the predicates + */ + public static String getPartitionValue(EvalNode node, String columnName) { + if (node != null && node instanceof BinaryEval) { + BinaryEval eval = (BinaryEval)node; + EvalNode left = eval.getLeftExpr(); + EvalNode right = eval.getRightExpr(); + EvalType type = eval.getType(); + + if (type == EvalType.EQUAL) { + if (left instanceof FieldEval && right instanceof ConstEval && columnName.equals(((FieldEval) left).getColumnName())) { + return ((ConstEval)right).getValue().toString(); + } else if (left instanceof ConstEval && right instanceof FieldEval && columnName.equals(((FieldEval) right).getColumnName())) { + return ((ConstEval)left).getValue().toString(); + } + } else if (type == EvalType.AND && left instanceof BinaryEval && right instanceof BinaryEval) { + String value = getPartitionValue(left, columnName); + if (value == null) { + value = getPartitionValue(right, columnName); + } + return value; + } + } + return null; } /** diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java index c427940c75..0dbf9b3e01 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java @@ -149,6 +149,21 @@ public Path getTablePath(String tableName) { return new Path(tableBaseDir, tableName); } + private String partitionPath = ""; + private int currentDepth = 0; + + /** + * Set a specific partition path for partition-column only queries + * @param path The partition prefix path + */ + public void setPartitionPath(String path) { partitionPath = path; } + + /** + * Set a depth of partition path for partition-column only queries + * @param depth Depth of partitions + */ + public void setCurrentDepth(int depth) { currentDepth = depth; } + @VisibleForTesting public Appender getAppender(TableMeta meta, Schema schema, Path filePath) throws IOException { @@ -722,8 +737,18 @@ public List getNonForwardSplit(TableDesc tableDesc, int currentPage, i List nonZeroLengthFiles = new ArrayList(); if (fs.exists(tablePath)) { - getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numResultFragments, - new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth); + if (!partitionPath.isEmpty()) + { + Path partPath = new Path(tableDesc.getPath() + partitionPath); + if (fs.exists(partPath)) { + getNonZeroLengthDataFiles(fs, partPath, nonZeroLengthFiles, currentPage, numResultFragments, + new AtomicInteger(0), tableDesc.hasPartition(), this.currentDepth, partitionDepth); + } + } + else { + getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numResultFragments, + new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth); + } } List fragments = new ArrayList(); From f03d758c5710e58b371e05baa3276efdd58bc77f Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 19 Mar 2015 07:22:49 +0900 Subject: [PATCH 4/4] removes tailing space --- .../tajo/master/exec/NonForwardQueryResultFileScanner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index ed4d1a8dbc..fe3258c991 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -84,7 +84,7 @@ public void init() throws IOException { * @param storageManager target storage manager to be set with partition info */ private void setPartition(StorageManager storageManager) { - if (tableDesc.isExternal() && tableDesc.hasPartition() && scanNode.getQual() != null && + if (tableDesc.isExternal() && tableDesc.hasPartition() && scanNode.getQual() != null && storageManager instanceof FileStorageManager) { StringBuffer path = new StringBuffer(); int depth = 0;