From aecace6ffab6de0604070740df1064aa0814c52a Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Wed, 19 Aug 2015 23:39:00 +0900 Subject: [PATCH 1/9] TAJO-1783: Query result is not returned by invalid output path. --- .../tajo/engine/query/TestHBaseTable.java | 14 +- .../NonForwardQueryResultFileScanner.java | 91 ++++------- .../exec/NonForwardQueryResultScanner.java | 10 +- .../tajo/master/exec/QueryExecutor.java | 15 +- .../ws/rs/resources/QueryResultResource.java | 3 +- .../apache/tajo/plan/expr/EvalTreeUtil.java | 34 +--- .../rewrite/rules/AccessPathRewriter.java | 1 + .../rules/PartitionedTableRewriter.java | 1 + .../apache/tajo/plan/util/PlannerUtil.java | 24 +-- .../org/apache/tajo/storage/Tablespace.java | 12 -- .../tajo/storage/hbase/HBaseTablespace.java | 83 ---------- .../apache/tajo/storage/FileTablespace.java | 145 ------------------ 12 files changed, 54 insertions(+), 379 deletions(-) diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java index 4ff5e3e3ee..8642331aac 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java @@ -572,16 +572,16 @@ private void assertIndexPredication(boolean isCompositeRowKey) throws Exception @Test public void testNonForwardQuery() throws Exception { - executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int) " + - "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:#b', " + + executeString("CREATE TABLE hbase_mapped_table1 (rk text, col1 text, col2 text, col3 int) " + + "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table1', 'columns'=':key,col1:a,col2:,col3:#b', " + "'hbase.split.rowkeys'='010,040,060,080')").close(); - assertTableExists("hbase_mapped_table"); + assertTableExists("hbase_mapped_table1"); HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); HTable htable = null; try { - hAdmin.tableExists("hbase_table"); - htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + hAdmin.tableExists("hbase_table1"); + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table1"); org.apache.hadoop.hbase.util.Pair keys = htable.getStartEndKeys(); assertEquals(5, keys.getFirst().length); @@ -596,11 +596,11 @@ public void testNonForwardQuery() throws Exception { htable.put(put); } - ResultSet res = executeString("select * from hbase_mapped_table"); + ResultSet res = executeString("select * from hbase_mapped_table1"); assertResultSet(res); res.close(); } finally { - executeString("DROP TABLE hbase_mapped_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table1 PURGE").close(); hAdmin.close(); if (htable == null) { htable.close(); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index ec8760f1be..f345029dcb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -18,26 +18,27 @@ package org.apache.tajo.master.exec; +import com.google.common.collect.Lists; import com.google.protobuf.ByteString; - import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; -import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.plan.expr.EvalTreeUtil; -import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.engine.planner.physical.SeqScanExec; +import org.apache.tajo.engine.planner.physical.PartitionMergeScanExec; +import org.apache.tajo.engine.planner.physical.ScanExec; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.querymaster.Repartitioner; import org.apache.tajo.storage.*; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; -import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -45,11 +46,10 @@ import java.util.List; public class NonForwardQueryResultFileScanner implements NonForwardQueryResultScanner { - private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100; - + private QueryId queryId; private String sessionId; - private SeqScanExec scanExec; + private ScanExec scanExec; private TableDesc tableDesc; private RowStoreEncoder rowEncoder; private int maxRow; @@ -57,8 +57,7 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc private TaskAttemptContext taskContext; private TajoConf tajoConf; private ScanNode scanNode; - - private int currentFragmentIndex = 0; + private List fragments; public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode, TableDesc tableDesc, int maxRow) throws IOException { @@ -69,58 +68,35 @@ public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, Que this.tableDesc = tableDesc; this.maxRow = maxRow; this.rowEncoder = RowStoreUtil.createEncoder(tableDesc.getLogicalSchema()); + this.fragments = Lists.newArrayList(); } - public void init() throws IOException { + public void init() throws IOException, TajoException { initSeqScanExec(); } - /** - * Set partition path and depth if ScanNode's qualification exists - * - * @param tablespace target storage manager to be set with partition info - */ - private void setPartition(Tablespace tablespace) { - if (tableDesc.isExternal() && tableDesc.hasPartition() && scanNode.getQual() != null && - tablespace instanceof FileTablespace) { - StringBuffer path = new StringBuffer(); - int depth = 0; - if (tableDesc.hasPartition()) { - for (Column c : tableDesc.getPartitionMethod().getExpressionSchema().getRootColumns()) { - String partitionValue = EvalTreeUtil.getPartitionValue(scanNode.getQual(), c.getSimpleName()); - if (partitionValue == null) - break; - path.append(String.format("/%s=%s", c.getSimpleName(), StringUtils.escapePathName(partitionValue))); - depth++; - } - } - ((FileTablespace) tablespace).setPartitionPath(path.toString()); - ((FileTablespace) tablespace).setCurrentDepth(depth); - scanNode.setQual(null); - } - } - - private void initSeqScanExec() throws IOException { + private void initSeqScanExec() throws IOException, TajoException { Tablespace tablespace = TablespaceManager.get(tableDesc.getUri()).get(); - List fragments = null; - setPartition(tablespace); - fragments = tablespace.getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN); - if (fragments != null && !fragments.isEmpty()) { - FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[] {})); + if (tableDesc.hasPartition()) { + FileTablespace fileTablespace = TUtil.checkTypeAndGet(tablespace, FileTablespace.class); + fragments.addAll(Repartitioner.getFragmentsFromPartitionedTable(fileTablespace, scanNode, tableDesc)); + } else { + fragments.addAll(tablespace.getSplits(tableDesc.getName(), tableDesc, scanNode)); + } + + if (!fragments.isEmpty()) { + FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[]{})); this.taskContext = new TaskAttemptContext( - new QueryContext(tajoConf), null, - new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0), + new QueryContext(tajoConf), null, + new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0), fragmentProtos, null); try { - // scanNode must be clone cause SeqScanExec change target in the case of - // a partitioned table. - scanExec = new SeqScanExec(taskContext, (ScanNode) scanNode.clone(), fragmentProtos); + scanExec = new PartitionMergeScanExec(taskContext, (ScanNode) scanNode.clone(), fragmentProtos); } catch (CloneNotSupportedException e) { throw new IOException(e.getMessage(), e); } scanExec.init(); - currentFragmentIndex += fragments.size(); } } @@ -132,10 +108,6 @@ public String getSessionId() { return sessionId; } - public void setScanExec(SeqScanExec scanExec) { - this.scanExec = scanExec; - } - public TableDesc getTableDesc() { return tableDesc; } @@ -163,18 +135,9 @@ public List getNextRows(int fetchRowNum) throws IOException { if (tuple == null) { scanExec.close(); scanExec = null; - initSeqScanExec(); - if (scanExec != null) { - tuple = scanExec.next(); - } - if (tuple == null) { - if (scanExec != null) { - scanExec.close(); - scanExec = null; - } - break; - } + break; } + rows.add(ByteString.copyFrom((rowEncoder.toBytes(tuple)))); rowCount++; currentNumRows++; diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java index 75608df7a5..a104c9997d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java @@ -18,14 +18,14 @@ package org.apache.tajo.master.exec; -import java.io.IOException; -import java.util.List; - +import com.google.protobuf.ByteString; import org.apache.tajo.QueryId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.exception.TajoException; -import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.List; public interface NonForwardQueryResultScanner { @@ -41,7 +41,7 @@ public interface NonForwardQueryResultScanner { public TableDesc getTableDesc(); - public void init() throws IOException; + public void init() throws IOException, TajoException; public int getCurrentRowNumber(); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index bd5d69651a..1219292f3e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -49,9 +49,7 @@ import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.Target; -import org.apache.tajo.plan.expr.EvalContext; -import org.apache.tajo.plan.expr.EvalNode; -import org.apache.tajo.plan.expr.GeneralFunctionEval; +import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.function.python.PythonScriptEngine; import org.apache.tajo.plan.function.python.TajoScriptEngine; import org.apache.tajo.plan.logical.*; @@ -246,15 +244,12 @@ public void execQueryOnVirtualTable(QueryContext queryContext, Session session, public void execSimpleQuery(QueryContext queryContext, Session session, String query, LogicalPlan plan, SubmitQueryResponse.Builder response) throws Exception { ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN); - if (scanNode == null) { - scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN); - } TableDesc desc = scanNode.getTableDesc(); - // Keep info for partition-column-only queries - SelectionNode selectionNode = plan.getRootBlock().getNode(NodeType.SELECTION); - if (desc.isExternal() && desc.hasPartition() && selectionNode != null) { - scanNode.setQual(selectionNode.getQual()); + + if (desc.hasPartition()) { + scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN); } + int maxRow = Integer.MAX_VALUE; if (plan.getRootBlock().hasNode(NodeType.LIMIT)) { LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT); diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java index 40e3f25355..2efbfdda59 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java @@ -25,6 +25,7 @@ import org.apache.tajo.QueryId; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.error.Errors.ResultCode; +import org.apache.tajo.exception.TajoException; import org.apache.tajo.master.QueryInfo; import org.apache.tajo.master.TajoMaster.MasterContext; import org.apache.tajo.master.exec.NonForwardQueryResultFileScanner; @@ -110,7 +111,7 @@ private void initializeContext() { private static NonForwardQueryResultScanner getNonForwardQueryResultScanner( MasterContext masterContext, Session session, - QueryId queryId) throws IOException { + QueryId queryId) throws IOException, TajoException { NonForwardQueryResultScanner resultScanner = session.getNonForwardQueryResultScanner(queryId); if (resultScanner == null) { QueryInfo queryInfo = masterContext.getQueryJobManager().getFinishedQuery(queryId); diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java index 89fd81b947..cf005340ae 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalTreeUtil.java @@ -599,41 +599,11 @@ public static boolean checkIfPartitionSelection(EvalNode node, Schema partSchema } else if (left instanceof ConstEval && right instanceof FieldEval && partSchema.contains(((FieldEval) right).getColumnName())) { return true; } - } else if (type == EvalType.AND && left instanceof BinaryEval && right instanceof BinaryEval) { + } else if ((type == EvalType.AND || type == EvalType.OR) + && left instanceof BinaryEval && right instanceof BinaryEval) { return checkIfPartitionSelection(left, partSchema) && checkIfPartitionSelection(right, partSchema); } } return false; } - - /** - * Get partition constant value associated with `columnName`. - * - * @param node EvalNode having query predicates - * @param columnName Column name to be looked up - * @return String The value associated with `columnName` in the predicates - */ - public static String getPartitionValue(EvalNode node, String columnName) { - if (node != null && node instanceof BinaryEval) { - BinaryEval eval = (BinaryEval)node; - EvalNode left = eval.getLeftExpr(); - EvalNode right = eval.getRightExpr(); - EvalType type = eval.getType(); - - if (type == EvalType.EQUAL) { - if (left instanceof FieldEval && right instanceof ConstEval && columnName.equals(((FieldEval) left).getColumnName())) { - return ((ConstEval)right).getValue().toString(); - } else if (left instanceof ConstEval && right instanceof FieldEval && columnName.equals(((FieldEval) right).getColumnName())) { - return ((ConstEval)left).getValue().toString(); - } - } else if (type == EvalType.AND && left instanceof BinaryEval && right instanceof BinaryEval) { - String value = getPartitionValue(left, columnName); - if (value == null) { - value = getPartitionValue(right, columnName); - } - return value; - } - } - return null; - } } \ No newline at end of file diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/AccessPathRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/AccessPathRewriter.java index afabe7a088..33ce4f4633 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/AccessPathRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/AccessPathRewriter.java @@ -122,6 +122,7 @@ public Object visitScan(Object object, LogicalPlan plan, LogicalPlan.QueryBlock } else { PlannerUtil.replaceNode(plan, stack.peek(), scanNode, indexScanNode); } + block.registerNode(indexScanNode); } return null; } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index 244d385d6c..b5cd42b137 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -419,6 +419,7 @@ public Object visitScan(OverridableConf queryContext, LogicalPlan plan, LogicalP } else { PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode); } + block.registerNode(rewrittenScanNode); } catch (IOException e) { throw new TajoInternalError("Partitioned Table Rewrite Failed: \n" + e.getMessage()); } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index 99e95be93f..22f8aee03c 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -41,7 +41,6 @@ import org.apache.tajo.util.StringUtils; import org.apache.tajo.util.TUtil; -import java.io.IOException; import java.util.*; public class PlannerUtil { @@ -121,7 +120,7 @@ public static boolean checkIfSimpleQuery(LogicalPlan plan) { PlannerUtil.getRelationLineage(plan.getRootBlock().getRoot()).length == 1; boolean noComplexComputation = false; - boolean prefixPartitionWhere = false; + boolean partitionWhere = false; if (singleRelation) { ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN); if (scanNode == null) { @@ -156,33 +155,18 @@ public static boolean checkIfSimpleQuery(LogicalPlan plan) { } } - /** - * TODO: Remove isExternal check after resolving the following issues - * - TAJO-1416: INSERT INTO EXTERNAL PARTITIONED TABLE - * - TAJO-1441: INSERT INTO MANAGED PARTITIONED TABLE - */ - if (!noWhere && scanNode.getTableDesc().isExternal() && scanNode.getTableDesc().getPartitionMethod() != null) { + if (!noWhere && scanNode.getTableDesc().hasPartition()) { EvalNode node = ((SelectionNode) plan.getRootBlock().getNode(NodeType.SELECTION)).getQual(); Schema partSchema = scanNode.getTableDesc().getPartitionMethod().getExpressionSchema(); if (EvalTreeUtil.checkIfPartitionSelection(node, partSchema)) { - prefixPartitionWhere = true; - boolean isPrefix = true; - for (Column c : partSchema.getRootColumns()) { - String value = EvalTreeUtil.getPartitionValue(node, c.getSimpleName()); - if (isPrefix && value == null) - isPrefix = false; - else if (!isPrefix && value != null) { - prefixPartitionWhere = false; - break; - } - } + partitionWhere = true; } } } return !checkIfDDLPlan(rootNode) && (simpleOperator && noComplexComputation && isOneQueryBlock && - noOrderBy && noGroupBy && (noWhere || prefixPartitionWhere) && noJoin && singleRelation); + noOrderBy && noGroupBy && (noWhere || partitionWhere) && noJoin && singleRelation); } /** diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java index 0094310998..98e4b32ca5 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java @@ -130,18 +130,6 @@ public URI getRootUri() { public abstract List getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException, TajoException; - /** - * It returns the splits that will serve as input for the non-forward query scanner such as 'select * from table1'. - * The result list should be small. If there is many fragments for scanning, TajoMaster uses the paging navigation. - * @param tableDesc The table description for the target data. - * @param currentPage The current page number within the entire list. - * @param numFragments The number of fragments in the result. - * @return The list of input fragments. - * @throws java.io.IOException - */ - public abstract List getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) - throws IOException; - /** * It returns the storage property. * @return The storage property diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index f613b883e4..220492304b 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -599,89 +599,6 @@ public Appender getAppender(OverridableConf queryContext, } } - @Override - public List getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) - throws IOException { - HTable htable = null; - HBaseAdmin hAdmin = null; - try { - htable = new HTable(hbaseConf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY)); - - org.apache.hadoop.hbase.util.Pair keys = htable.getStartEndKeys(); - if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { - return new ArrayList(1); - } - hAdmin = new HBaseAdmin(hbaseConf); - Map serverLoadMap = new HashMap(); - - List fragments = new ArrayList(keys.getFirst().length); - - int start = currentPage * numFragments; - if (start >= keys.getFirst().length) { - return new ArrayList(1); - } - int end = (currentPage + 1) * numFragments; - if (end > keys.getFirst().length) { - end = keys.getFirst().length; - } - for (int i = start; i < end; i++) { - HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false); - - String regionName = location.getRegionInfo().getRegionNameAsString(); - ServerLoad serverLoad = serverLoadMap.get(location.getServerName()); - if (serverLoad == null) { - serverLoad = hAdmin.getClusterStatus().getLoad(location.getServerName()); - serverLoadMap.put(location.getServerName(), serverLoad); - } - - HBaseFragment fragment = new HBaseFragment( - tableDesc.getUri(), - tableDesc.getName(), - htable.getName().getNameAsString(), - location.getRegionInfo().getStartKey(), - location.getRegionInfo().getEndKey(), - location.getHostname()); - - // get region size - boolean foundLength = false; - for (Map.Entry entry : serverLoad.getRegionsLoad().entrySet()) { - if (regionName.equals(Bytes.toString(entry.getKey()))) { - RegionLoad regionLoad = entry.getValue(); - long storeLength = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * 1024L * 1024L; - if (storeLength == 0) { - // If store size is smaller than 1 MB, storeLength is zero - storeLength = 1 * 1024 * 1024; //default 1MB - } - fragment.setLength(storeLength); - foundLength = true; - break; - } - } - - if (!foundLength) { - fragment.setLength(TajoConstants.UNKNOWN_LENGTH); - } - - fragments.add(fragment); - if (LOG.isDebugEnabled()) { - LOG.debug("getFragments: fragment -> " + i + " -> " + fragment); - } - } - - if (!fragments.isEmpty()) { - ((HBaseFragment) fragments.get(fragments.size() - 1)).setLast(true); - } - return fragments; - } finally { - if (htable != null) { - htable.close(); - } - if (hAdmin != null) { - hAdmin.close(); - } - } - } - public HConnection getConnection() throws IOException { synchronized(connMap) { HConnectionKey key = new HConnectionKey(hbaseConf); diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 678675d874..f79471aa8b 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -47,7 +47,6 @@ import java.net.URI; import java.text.NumberFormat; import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT; @@ -178,21 +177,6 @@ public URI getTableUri(String databaseName, String tableName) { return StorageUtil.concatPath(spacePath, databaseName, tableName).toUri(); } - private String partitionPath = ""; - private int currentDepth = 0; - - /** - * Set a specific partition path for partition-column only queries - * @param path The partition prefix path - */ - public void setPartitionPath(String path) { partitionPath = path; } - - /** - * Set a depth of partition path for partition-column only queries - * @param depth Depth of partitions - */ - public void setCurrentDepth(int depth) { currentDepth = depth; } - @VisibleForTesting public Appender getAppender(TableMeta meta, Schema schema, Path filePath) throws IOException { @@ -706,135 +690,6 @@ public void purgeTable(TableDesc tableDesc) throws IOException { } } - @Override - public List getNonForwardSplit(TableDesc tableDesc, int currentPage, int numResultFragments) throws IOException { - // Listing table data file which is not empty. - // If the table is a partitioned table, return file list which has same partition key. - Path tablePath = new Path(tableDesc.getUri()); - FileSystem fs = tablePath.getFileSystem(conf); - - //In the case of partitioned table, we should return same partition key data files. - int partitionDepth = 0; - if (tableDesc.hasPartition()) { - partitionDepth = tableDesc.getPartitionMethod().getExpressionSchema().getRootColumns().size(); - } - - List nonZeroLengthFiles = new ArrayList(); - if (fs.exists(tablePath)) { - if (!partitionPath.isEmpty()) { - Path partPath = new Path(tableDesc.getUri() + partitionPath); - if (fs.exists(partPath)) { - getNonZeroLengthDataFiles(fs, partPath, nonZeroLengthFiles, currentPage, numResultFragments, - new AtomicInteger(0), tableDesc.hasPartition(), this.currentDepth, partitionDepth); - } - } else { - getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numResultFragments, - new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth); - } - } - - List fragments = new ArrayList(); - - String[] previousPartitionPathNames = null; - for (FileStatus eachFile: nonZeroLengthFiles) { - FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), null); - - if (partitionDepth > 0) { - // finding partition key; - Path filePath = fileFragment.getPath(); - Path parentPath = filePath; - String[] parentPathNames = new String[partitionDepth]; - for (int i = 0; i < partitionDepth; i++) { - parentPath = parentPath.getParent(); - parentPathNames[partitionDepth - i - 1] = parentPath.getName(); - } - - // If current partitionKey == previousPartitionKey, add to result. - if (previousPartitionPathNames == null) { - fragments.add(fileFragment); - } else if (previousPartitionPathNames != null && Arrays.equals(previousPartitionPathNames, parentPathNames)) { - fragments.add(fileFragment); - } else { - break; - } - previousPartitionPathNames = parentPathNames; - } else { - fragments.add(fileFragment); - } - } - - return fragments; - } - - /** - * - * @param fs - * @param path The table path - * @param result The final result files to be used - * @param startFileIndex - * @param numResultFiles - * @param currentFileIndex - * @param partitioned A flag to indicate if this table is partitioned - * @param currentDepth Current visiting depth of partition directories - * @param maxDepth The partition depth of this table - * @throws IOException - */ - private void getNonZeroLengthDataFiles(FileSystem fs, Path path, List result, - int startFileIndex, int numResultFiles, - AtomicInteger currentFileIndex, boolean partitioned, - int currentDepth, int maxDepth) throws IOException { - // Intermediate directory - if (fs.isDirectory(path)) { - - FileStatus[] files = fs.listStatus(path, hiddenFileFilter); - - if (files != null && files.length > 0) { - - for (FileStatus eachFile : files) { - - // checking if the enough number of files are found - if (result.size() >= numResultFiles) { - return; - } - if (eachFile.isDirectory()) { - - getNonZeroLengthDataFiles( - fs, - eachFile.getPath(), - result, - startFileIndex, - numResultFiles, - currentFileIndex, - partitioned, - currentDepth + 1, // increment a visiting depth - maxDepth); - - // if partitioned table, we should ignore files located in the intermediate directory. - // we can ensure that this file is in leaf directory if currentDepth == maxDepth. - } else if (eachFile.isFile() && eachFile.getLen() > 0 && (!partitioned || currentDepth == maxDepth)) { - if (currentFileIndex.get() >= startFileIndex) { - result.add(eachFile); - } - currentFileIndex.incrementAndGet(); - } - } - } - - // Files located in leaf directory - } else { - FileStatus fileStatus = fs.getFileStatus(path); - if (fileStatus != null && fileStatus.getLen() > 0) { - if (currentFileIndex.get() >= startFileIndex) { - result.add(fileStatus); - } - currentFileIndex.incrementAndGet(); - if (result.size() >= numResultFiles) { - return; - } - } - } - } - @Override public StorageProperty getProperty() { return FileStorageProperties; From db75fddd277bfb41d09fe6420e28a9760ae4c22e Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 11:12:18 +0900 Subject: [PATCH 2/9] remove usless clone --- .../master/exec/NonForwardQueryResultFileScanner.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index f345029dcb..877e32bd77 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -57,7 +57,6 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc private TaskAttemptContext taskContext; private TajoConf tajoConf; private ScanNode scanNode; - private List fragments; public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode, TableDesc tableDesc, int maxRow) throws IOException { @@ -68,7 +67,6 @@ public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, Que this.tableDesc = tableDesc; this.maxRow = maxRow; this.rowEncoder = RowStoreUtil.createEncoder(tableDesc.getLogicalSchema()); - this.fragments = Lists.newArrayList(); } public void init() throws IOException, TajoException { @@ -78,6 +76,7 @@ public void init() throws IOException, TajoException { private void initSeqScanExec() throws IOException, TajoException { Tablespace tablespace = TablespaceManager.get(tableDesc.getUri()).get(); + List fragments = Lists.newArrayList(); if (tableDesc.hasPartition()) { FileTablespace fileTablespace = TUtil.checkTypeAndGet(tablespace, FileTablespace.class); fragments.addAll(Repartitioner.getFragmentsFromPartitionedTable(fileTablespace, scanNode, tableDesc)); @@ -91,11 +90,7 @@ private void initSeqScanExec() throws IOException, TajoException { new QueryContext(tajoConf), null, new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0), fragmentProtos, null); - try { - scanExec = new PartitionMergeScanExec(taskContext, (ScanNode) scanNode.clone(), fragmentProtos); - } catch (CloneNotSupportedException e) { - throw new IOException(e.getMessage(), e); - } + scanExec = new PartitionMergeScanExec(taskContext, scanNode, fragmentProtos); scanExec.init(); } } From 5fb02bd39923496f2701bedfdb3d1c78da11df19 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 13:55:15 +0900 Subject: [PATCH 3/9] test travis failure --- .../apache/tajo/plan/function/python/PythonScriptEngine.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java index 74c0e5aef3..668441ab1d 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java @@ -33,6 +33,7 @@ import org.apache.tajo.plan.function.stream.*; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.TUtil; @@ -323,6 +324,10 @@ public void shutdown() { private void startUdfController() throws IOException { ProcessBuilder processBuilder = StreamingUtil.createProcess(buildCommand()); + if (systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { + processBuilder.redirectErrorStream(true); + } + process = processBuilder.start(); } From 40bc9fee4beaef9183397f225e3f83be83e9c6ea Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 14:15:44 +0900 Subject: [PATCH 4/9] test python --- .../test/java/org/apache/tajo/engine/eval/ExprTestBase.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java index f2b6477452..108c9a49a3 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java @@ -284,7 +284,10 @@ public void testEval(OverridableConf context, Schema schema, String tableName, S codegen = new EvalCodeGenerator(classLoader); } - QueryExecutor.startScriptExecutors(queryContext, evalContext, targets); + if (evalContext.getAllScriptEngines().size() > 0) { + QueryExecutor.startScriptExecutors(queryContext, evalContext, targets); + Thread.sleep(100); + } Tuple outTuple = new VTuple(targets.length); for (int i = 0; i < targets.length; i++) { EvalNode eval = targets[i].getEvalTree(); @@ -326,6 +329,7 @@ public void testEval(OverridableConf context, Schema schema, String tableName, S } else { throw e; } + } catch (InterruptedException e) { } finally { if (schema != null) { cat.dropTable(qualifiedTableName); From 74afffdc72ee1c5ce44903e62c036bdfc2ab7a9d Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 14:39:09 +0900 Subject: [PATCH 5/9] remove test codes --- .../apache/tajo/plan/function/python/PythonScriptEngine.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java index 668441ab1d..74c0e5aef3 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java @@ -33,7 +33,6 @@ import org.apache.tajo.plan.function.stream.*; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; -import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.TUtil; @@ -324,10 +323,6 @@ public void shutdown() { private void startUdfController() throws IOException { ProcessBuilder processBuilder = StreamingUtil.createProcess(buildCommand()); - if (systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { - processBuilder.redirectErrorStream(true); - } - process = processBuilder.start(); } From 403c8419e0950ed76d52c82e550bca5f9aa94272 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 16:16:43 +0900 Subject: [PATCH 6/9] test exit code --- .../function/python/PythonScriptEngine.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java index 74c0e5aef3..1c49da2673 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java @@ -27,12 +27,16 @@ import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; -import org.apache.tajo.function.*; +import org.apache.tajo.function.FunctionInvocation; +import org.apache.tajo.function.FunctionSignature; +import org.apache.tajo.function.FunctionSupplement; +import org.apache.tajo.function.PythonInvocationDesc; import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.plan.function.PythonAggFunctionInvoke.PythonAggFunctionContext; import org.apache.tajo.plan.function.stream.*; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.TUtil; @@ -311,6 +315,19 @@ public void start(Configuration systemConf) throws IOException { @Override public void shutdown() { process.destroy(); + try { + int exitCode = process.waitFor(); + if (systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { + LOG.warn("Process exit code: " + exitCode); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Process exit code: " + exitCode); + } + } + } catch (InterruptedException e) { + LOG.warn(e.getMessage(), e); + } + FileUtil.cleanup(LOG, stdin, stdout, stderr, inputHandler, outputHandler); stdin = null; stdout = stderr = null; From 279c8a7dd4a21eed3918ac422ed6435335aff8fd Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 16:25:31 +0900 Subject: [PATCH 7/9] remove test codes --- .../test/java/org/apache/tajo/engine/eval/ExprTestBase.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java index 108c9a49a3..f2b6477452 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java @@ -284,10 +284,7 @@ public void testEval(OverridableConf context, Schema schema, String tableName, S codegen = new EvalCodeGenerator(classLoader); } - if (evalContext.getAllScriptEngines().size() > 0) { - QueryExecutor.startScriptExecutors(queryContext, evalContext, targets); - Thread.sleep(100); - } + QueryExecutor.startScriptExecutors(queryContext, evalContext, targets); Tuple outTuple = new VTuple(targets.length); for (int i = 0; i < targets.length; i++) { EvalNode eval = targets[i].getEvalTree(); @@ -329,7 +326,6 @@ public void testEval(OverridableConf context, Schema schema, String tableName, S } else { throw e; } - } catch (InterruptedException e) { } finally { if (schema != null) { cat.dropTable(qualifiedTableName); From daa1fd3626bebf571b8782ca9cc989e2a1623844 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 19:56:05 +0900 Subject: [PATCH 8/9] remove python test codes --- .../function/python/PythonScriptEngine.java | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java index 1c49da2673..74c0e5aef3 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java @@ -27,16 +27,12 @@ import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; -import org.apache.tajo.function.FunctionInvocation; -import org.apache.tajo.function.FunctionSignature; -import org.apache.tajo.function.FunctionSupplement; -import org.apache.tajo.function.PythonInvocationDesc; +import org.apache.tajo.function.*; import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.plan.function.PythonAggFunctionInvoke.PythonAggFunctionContext; import org.apache.tajo.plan.function.stream.*; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; -import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.TUtil; @@ -315,19 +311,6 @@ public void start(Configuration systemConf) throws IOException { @Override public void shutdown() { process.destroy(); - try { - int exitCode = process.waitFor(); - if (systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { - LOG.warn("Process exit code: " + exitCode); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Process exit code: " + exitCode); - } - } - } catch (InterruptedException e) { - LOG.warn(e.getMessage(), e); - } - FileUtil.cleanup(LOG, stdin, stdout, stderr, inputHandler, outputHandler); stdin = null; stdout = stderr = null; From 491297c3060810bf9681ccefabc3d2ea108ef9d9 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Fri, 21 Aug 2015 18:09:56 +0900 Subject: [PATCH 9/9] add simple query tests --- .../tajo/engine/query/TestSimpleQuery.java | 179 ++++++++++++++++++ .../results/TestSimpleQuery/testLimit.result | 3 + .../TestSimpleQuery/testNoWhere.result | 7 + .../testPartitionColumnWhere.result | 4 + .../results/TestSimpleQuery/testWhere.result | 4 + .../function/python/PythonScriptEngine.java | 19 +- 6 files changed, 198 insertions(+), 18 deletions(-) create mode 100644 tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSimpleQuery.java create mode 100644 tajo-core-tests/src/test/resources/results/TestSimpleQuery/testLimit.result create mode 100644 tajo-core-tests/src/test/resources/results/TestSimpleQuery/testNoWhere.result create mode 100644 tajo-core-tests/src/test/resources/results/TestSimpleQuery/testPartitionColumnWhere.result create mode 100644 tajo-core-tests/src/test/resources/results/TestSimpleQuery/testWhere.result diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSimpleQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSimpleQuery.java new file mode 100644 index 0000000000..4c180973b1 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSimpleQuery.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.query; + +import org.apache.tajo.*; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.client.QueryStatus; +import org.apache.tajo.client.TajoClientUtil; +import org.apache.tajo.exception.QueryNotFoundException; +import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.master.GlobalEngine; +import org.apache.tajo.master.QueryInfo; +import org.apache.tajo.master.QueryManager; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.session.Session; +import org.apache.tajo.util.StringUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.sql.ResultSet; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class TestSimpleQuery extends QueryTestCaseBase { + private static String table; + private static String partitionedTable; + private NodeType nodeType; + private String testTable; + + public TestSimpleQuery(NodeType nodeType) throws IOException { + super(TajoConstants.DEFAULT_DATABASE_NAME); + this.nodeType = nodeType; + if (nodeType == NodeType.SCAN) { + testTable = table; + } else if (nodeType == NodeType.PARTITIONS_SCAN) { + testTable = partitionedTable; + } + } + + @Parameterized.Parameters + public static Collection generateParameters() { + return Arrays.asList(new Object[][]{ + //type + {NodeType.SCAN}, + {NodeType.PARTITIONS_SCAN}, + }); + } + + @BeforeClass + public static void setupClass() throws Exception { + createTestTable(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + client.dropTable(table, true); + client.dropTable(partitionedTable, true); + } + + private static void createTestTable() throws Exception { + partitionedTable = CatalogUtil.normalizeIdentifier("TestSimpleQuery_Partitioned"); + client.executeQueryAndGetResult("create table " + partitionedTable + + " (col4 text) partition by column(col1 int4, col2 int4, col3 float8) " + + "as select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem"); + + table = CatalogUtil.normalizeIdentifier("TestSimpleQuery"); + client.executeQueryAndGetResult("create table " + table + + " (col4 text, col1 int4, col2 int4, col3 float8) " + + "as select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem"); + } + + @Test + public final void testNoWhere() throws Exception { + String query = "select * from " + testTable; + + isSimpleQuery(query, true); + hasQueryMaster(query, false); + ResultSet res = executeString(query); + assertResultSet(res); + cleanupQuery(res); + } + + @Test + public final void testLimit() throws Exception { + String query = "select * from " + testTable + " limit 1"; + + isSimpleQuery(query, true); + hasQueryMaster(query, false); + ResultSet res = executeString(query); + assertResultSet(res); + cleanupQuery(res); + } + + @Test + public final void testWhere() throws Exception { + String query = "select * from " + testTable + " where col4 = 'R' and col1 = 3"; + + isSimpleQuery(query, false); + hasQueryMaster(query, true); + ResultSet res = executeString(query); + assertResultSet(res); + cleanupQuery(res); + } + + @Test + public final void testPartitionColumnWhere() throws Exception { + String query = "select * from " + testTable + " where col1 = 1 and (col3 = 36.0 or col3 = 17.0) "; + + if (nodeType == NodeType.SCAN) { + isSimpleQuery(query, false); + hasQueryMaster(query, true); + } else { + isSimpleQuery(query, true); + hasQueryMaster(query, false); + } + + ResultSet res = executeString(query); + assertResultSet(res); + cleanupQuery(res); + } + + private void isSimpleQuery(String queryStr, boolean expected) throws Exception { + GlobalEngine globalEngine = testingCluster.getMaster().getContext().getGlobalEngine(); + + QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf); + Session session = testingCluster.getMaster().getContext().getSessionManager().getSession(client.getSessionId()); + LogicalPlan plan = globalEngine.getLogicalPlanner(). + createPlan(queryContext, globalEngine.buildExpressionFromSql(queryStr, session)); + + globalEngine.getLogicalOptimizer().optimize(plan); + assertEquals(expected, PlannerUtil.checkIfSimpleQuery(plan)); + } + + private void hasQueryMaster(String queryStr, boolean expected) throws QueryNotFoundException { + ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr); + QueryId queryId = new QueryId(res.getQueryId()); + + QueryManager queryManager = testingCluster.getMaster().getContext().getQueryJobManager(); + if (expected) { + assertEquals(ClientProtos.SubmitQueryResponse.ResultType.FETCH, res.getResultType()); + QueryStatus status = TajoClientUtil.waitCompletion(client, queryId); + assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, status.getState()); + client.closeQuery(queryId); + } else { + assertEquals(ClientProtos.SubmitQueryResponse.ResultType.ENCLOSED, res.getResultType()); + QueryInfo queryInfo = queryManager.getFinishedQuery(queryId); + assertNotNull(queryInfo); + assertTrue(StringUtils.isEmpty(queryInfo.getQueryMasterHost())); + client.closeQuery(queryId); + } + } +} diff --git a/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testLimit.result b/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testLimit.result new file mode 100644 index 0000000000..15e3a92d54 --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testLimit.result @@ -0,0 +1,3 @@ +col4,col1,col2,col3 +------------------------------- +N,1,1,17.0 \ No newline at end of file diff --git a/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testNoWhere.result b/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testNoWhere.result new file mode 100644 index 0000000000..ba5c0eaf87 --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testNoWhere.result @@ -0,0 +1,7 @@ +col4,col1,col2,col3 +------------------------------- +N,1,1,17.0 +N,1,1,36.0 +N,2,2,38.0 +R,3,2,45.0 +R,3,3,49.0 \ No newline at end of file diff --git a/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testPartitionColumnWhere.result b/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testPartitionColumnWhere.result new file mode 100644 index 0000000000..c38334d4f7 --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testPartitionColumnWhere.result @@ -0,0 +1,4 @@ +col4,col1,col2,col3 +------------------------------- +N,1,1,17.0 +N,1,1,36.0 \ No newline at end of file diff --git a/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testWhere.result b/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testWhere.result new file mode 100644 index 0000000000..9f337a1c75 --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestSimpleQuery/testWhere.result @@ -0,0 +1,4 @@ +col4,col1,col2,col3 +------------------------------- +R,3,2,45.0 +R,3,3,49.0 \ No newline at end of file diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java index 3c4a25dda2..02dff1986e 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java @@ -27,16 +27,12 @@ import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; -import org.apache.tajo.function.FunctionInvocation; -import org.apache.tajo.function.FunctionSignature; -import org.apache.tajo.function.FunctionSupplement; -import org.apache.tajo.function.PythonInvocationDesc; +import org.apache.tajo.function.*; import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.plan.function.PythonAggFunctionInvoke.PythonAggFunctionContext; import org.apache.tajo.plan.function.stream.*; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; -import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.TUtil; @@ -328,19 +324,6 @@ public void start(Configuration systemConf) throws IOException { @Override public void shutdown() { process.destroy(); - try { - int exitCode = process.waitFor(); - if (systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { - LOG.warn("Process exit code: " + exitCode); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Process exit code: " + exitCode); - } - } - } catch (InterruptedException e) { - LOG.warn(e.getMessage(), e); - } - FileUtil.cleanup(LOG, stdin, stdout, stderr, inputHandler, outputHandler); stdin = null; stdout = stderr = null;