From ea8b40897715fd2defa77a98a539f6952c917719 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Tue, 2 Dec 2014 10:04:11 +0900 Subject: [PATCH] TAJO-1190: INSERT INTO to partition tables may cause NPE. --- .../planner/physical/PhysicalPlanUtil.java | 65 +++++++++++++++---- .../engine/planner/physical/SeqScanExec.java | 7 +- .../tajo/worker/TajoWorkerClientService.java | 16 +++-- .../engine/query/TestTablePartitions.java | 56 ++++++++++++++++ ...itionedTableWithSmallerExpressions5.result | 7 ++ ...itionedTableWithSmallerExpressions6.result | 4 ++ .../apache/tajo/rpc/RemoteCallException.java | 6 +- 7 files changed, 138 insertions(+), 23 deletions(-) create mode 100644 tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions5.result create mode 100644 tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions6.result diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java index fe1f795a71..a63b838e4e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java @@ -77,31 +77,33 @@ public static CatalogProtos.FragmentProto[] getNonZeroLengthDataFiles(TajoConf t Path path = new Path(tableDesc.getPath()); FileSystem fs = path.getFileSystem(tajoConf); + //In the case of partitioned table, we should return same partition key data files. + int partitionDepth = 0; + if (tableDesc.hasPartition()) { + partitionDepth = tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size(); + } + List nonZeroLengthFiles = new ArrayList(); if (fs.exists(path)) { getNonZeroLengthDataFiles(fs, path, nonZeroLengthFiles, fileIndex, numResultFiles, - new AtomicInteger(0)); + new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth); } List fragments = new ArrayList(); - //In the case of partitioned table, return same partition key data files. - int numPartitionColumns = 0; - if (tableDesc.hasPartition()) { - numPartitionColumns = tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size(); - } + String[] previousPartitionPathNames = null; for (FileStatus eachFile: nonZeroLengthFiles) { FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), null); - if (numPartitionColumns > 0) { + if (partitionDepth > 0) { // finding partition key; Path filePath = fileFragment.getPath(); Path parentPath = filePath; - String[] parentPathNames = new String[numPartitionColumns]; - for (int i = 0; i < numPartitionColumns; i++) { + String[] parentPathNames = new String[partitionDepth]; + for (int i = 0; i < partitionDepth; i++) { parentPath = parentPath.getParent(); - parentPathNames[numPartitionColumns - i - 1] = parentPath.getName(); + parentPathNames[partitionDepth - i - 1] = parentPath.getName(); } // If current partitionKey == previousPartitionKey, add to result. @@ -120,20 +122,53 @@ public static CatalogProtos.FragmentProto[] getNonZeroLengthDataFiles(TajoConf t return FragmentConvertor.toFragmentProtoArray(fragments.toArray(new FileFragment[]{})); } + /** + * + * @param fs + * @param path The table path + * @param result The final result files to be used + * @param startFileIndex + * @param numResultFiles + * @param currentFileIndex + * @param partitioned A flag to indicate if this table is partitioned + * @param currentDepth Current visiting depth of partition directories + * @param maxDepth The partition depth of this table + * @throws IOException + */ private static void getNonZeroLengthDataFiles(FileSystem fs, Path path, List result, int startFileIndex, int numResultFiles, - AtomicInteger currentFileIndex) throws IOException { + AtomicInteger currentFileIndex, boolean partitioned, + int currentDepth, int maxDepth) throws IOException { + // Intermediate directory if (fs.isDirectory(path)) { + FileStatus[] files = fs.listStatus(path, StorageManager.hiddenFileFilter); + if (files != null && files.length > 0) { + for (FileStatus eachFile : files) { + + // checking if the enough number of files are found if (result.size() >= numResultFiles) { return; } + if (eachFile.isDirectory()) { - getNonZeroLengthDataFiles(fs, eachFile.getPath(), result, startFileIndex, numResultFiles, - currentFileIndex); - } else if (eachFile.isFile() && eachFile.getLen() > 0) { + getNonZeroLengthDataFiles( + fs, + eachFile.getPath(), + result, + startFileIndex, + numResultFiles, + currentFileIndex, + partitioned, + currentDepth + 1, // increment a visiting depth + maxDepth); + + + // if partitioned table, we should ignore files located in the intermediate directory. + // we can ensure that this file is in leaf directory if currentDepth == maxDepth. + } else if (eachFile.isFile() && eachFile.getLen() > 0 && (!partitioned || currentDepth == maxDepth)) { if (currentFileIndex.get() >= startFileIndex) { result.add(eachFile); } @@ -141,6 +176,8 @@ private static void getNonZeroLengthDataFiles(FileSystem fs, Path path, List 0) { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 3cbb7c962f..759b19c6ab 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -332,7 +332,12 @@ public TableStats getInputStats() { if (scanner != null) { return scanner.getInputStats(); } else { - return inputStats; + if (inputStats != null) { + return inputStats; + } else { + // If no fragment, there is no scanner. So, we need to create a dummy table stat. + return new TableStats(); + } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java index a41ffcef06..0f4a60cb7e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java @@ -32,6 +32,7 @@ import org.apache.tajo.TajoIdProtos; import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse; import org.apache.tajo.ipc.ClientProtos.QueryIdRequest; @@ -39,8 +40,6 @@ import org.apache.tajo.ipc.QueryMasterClientProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.querymaster.Query; -import org.apache.tajo.master.querymaster.QueryInProgress; -import org.apache.tajo.master.querymaster.QueryJobManager; import org.apache.tajo.master.querymaster.QueryMasterTask; import org.apache.tajo.rpc.BlockingRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; @@ -132,6 +131,10 @@ public PrimitiveProtos.BoolProto updateSessionVariables( return null; } + private boolean hasResultTableDesc(QueryContext queryContext) { + return !(queryContext.isCreateTable() || queryContext.isInsert()); + } + @Override public ClientProtos.GetQueryResultResponse getQueryResult( RpcController controller, @@ -151,7 +154,9 @@ public ClientProtos.GetQueryResultResponse getQueryResult( } else { switch (queryMasterTask.getState()) { case QUERY_SUCCEEDED: - builder.setTableDesc(queryMasterTask.getQuery().getResultDesc().getProto()); +// if (hasResultTableDesc(queryMasterTask.getQueryTaskContext().getQueryContext())) { + builder.setTableDesc(queryMasterTask.getQuery().getResultDesc().getProto()); + //} break; case QUERY_FAILED: case QUERY_ERROR: @@ -191,10 +196,7 @@ public ClientProtos.GetQueryStatusResponse getQueryStatus( return builder.build(); } - builder.setHasResult( - !(queryMasterTask.getQueryTaskContext().getQueryContext().isCreateTable() || - queryMasterTask.getQueryTaskContext().getQueryContext().isInsert()) - ); + builder.setHasResult(hasResultTableDesc(queryMasterTask.getQueryTaskContext().getQueryContext())); queryMasterTask.touchSessionTime(); Query query = queryMasterTask.getQuery(); diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index 0e9ec7dca5..cff5bfb719 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -19,6 +19,7 @@ package org.apache.tajo.engine.query; import com.google.common.collect.Maps; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -43,6 +44,7 @@ import org.apache.tajo.master.querymaster.QueryMasterTask; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.worker.TajoWorker; import org.junit.Test; @@ -820,6 +822,39 @@ public final void testColumnPartitionedTableWithSmallerExpressions3() throws Exc } } + @Test + public final void testColumnPartitionedTableWithSmallerExpressions5() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions5"); + ResultSet res = executeString( + "create table " + tableName + " (col1 text) partition by column(col2 text) "); + res.close(); + + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + + res = executeString("insert overwrite into " + tableName + "(col1) select l_returnflag from lineitem"); + res.close(); + res = executeString("select * from " + tableName); + assertResultSet(res); + res.close(); + } + + @Test + public final void testColumnPartitionedTableWithSmallerExpressions6() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions6"); + ResultSet res = executeString( + "create table " + tableName + " (col1 text) partition by column(col2 text) "); + res.close(); + + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + + res = executeString( + "insert overwrite into " + tableName + "(col1) select l_returnflag from lineitem where l_orderkey = 1"); + res.close(); + res = executeString("select * from " + tableName); + assertResultSet(res); + res.close(); + } + private MasterPlan getQueryPlan(ResultSet res) { QueryId queryId = ((TajoResultSet)res).getQueryId(); for (TajoWorker eachWorker: testingCluster.getTajoWorkers()) { @@ -936,4 +971,25 @@ public final void TestSpecialCharPartitionKeys2() throws Exception { assertResultSet(res); cleanupQuery(res); } + + @Test + public final void testIgnoreFilesInIntermediateDir() throws Exception { + // See - TAJO-1219: Files located in intermediate directories of partitioned table should be ignored + // It verifies that Tajo ignores files located in intermediate directories of partitioned table. + + Path testDir = CommonTestingUtil.getTestDir(); + + executeString( + "CREATE EXTERNAL TABLE testIgnoreFilesInIntermediateDir (col1 int) USING CSV PARTITION BY COLUMN (col2 text) " + + "LOCATION '" + testDir + "'"); + + FileSystem fs = testDir.getFileSystem(conf); + FSDataOutputStream fos = fs.create(new Path(testDir, "table1.data")); + fos.write("a|b|c".getBytes()); + fos.close(); + + ResultSet res = executeString("select * from testIgnoreFilesInIntermediateDir;"); + assertFalse(res.next()); + res.close(); + } } diff --git a/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions5.result b/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions5.result new file mode 100644 index 0000000000..f972753048 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions5.result @@ -0,0 +1,7 @@ +col1,col2 +------------------------------- +N,__TAJO_DEFAULT_PARTITION__ +N,__TAJO_DEFAULT_PARTITION__ +N,__TAJO_DEFAULT_PARTITION__ +R,__TAJO_DEFAULT_PARTITION__ +R,__TAJO_DEFAULT_PARTITION__ \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions6.result b/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions6.result new file mode 100644 index 0000000000..6b8e2f1bda --- /dev/null +++ b/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions6.result @@ -0,0 +1,4 @@ +col1,col2 +------------------------------- +N,__TAJO_DEFAULT_PARTITION__ +N,__TAJO_DEFAULT_PARTITION__ \ No newline at end of file diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java index 90ee58a806..52ef31ab7b 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java @@ -49,7 +49,11 @@ public RemoteCallException(int seqId, Throwable t) { public RpcResponse getResponse() { RpcResponse.Builder builder = RpcResponse.newBuilder(); builder.setId(seqId); - builder.setErrorMessage(getCause().getMessage()); + if (getCause().getMessage() == null) { + builder.setErrorMessage(getCause().getClass().getName()); + } else { + builder.setErrorMessage(getCause().getMessage()); + } builder.setErrorTrace(getStackTraceString(getCause())); builder.setErrorClass(originExceptionClass);