From 1d7d3d687a48d43ee7f5976cff0efce786eeabca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=ED=98=95=EC=A4=80?= Date: Thu, 10 Jul 2014 21:22:31 +0900 Subject: [PATCH] TAJO-909: {SortBased, Col}PartitionStoreExec should not write partition keys to files. --- .../physical/ColPartitionStoreExec.java | 9 +++- .../org/apache/tajo/QueryTestCaseBase.java | 47 +++++++++++++------ .../engine/query/TestTablePartitions.java | 10 +++- .../TestQueryUnitStatusUpdate.java | 8 +++- 4 files changed, 56 insertions(+), 18 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index fe3690523a..12eb0e0493 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -23,9 +23,10 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.engine.planner.logical.InsertNode; import org.apache.tajo.engine.planner.logical.CreateTableNode; +import org.apache.tajo.engine.planner.logical.InsertNode; import org.apache.tajo.engine.planner.logical.NodeType; import org.apache.tajo.engine.planner.logical.StoreTableNode; import org.apache.tajo.storage.StorageUtil; @@ -60,6 +61,12 @@ public ColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, Ph // Find column index to name subpartition directory path keyNum = this.plan.getPartitionMethod().getExpressionSchema().size(); + if (plan.getType() == NodeType.INSERT && keyNum > 0) { + Column[] removedPartitionColumns = new Column[this.outSchema.size() - keyNum]; + System.arraycopy(this.outSchema.toArray(), 0, removedPartitionColumns, 0, removedPartitionColumns.length); + this.outSchema = new Schema(removedPartitionColumns); + } + keyIds = new int[keyNum]; keyNames = new String[keyNum]; for (int i = 0; i < keyNum; i++) { diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java index 70c73f9cda..5103a7f236 100644 --- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -574,23 +574,14 @@ private String compileTemplate(String template, @Nullable String dataFileName, @ public String getTableFileContents(Path path) throws Exception { FileSystem fs = path.getFileSystem(conf); - FileStatus[] files = fs.listStatus(path); - - if (files == null || files.length == 0) { - return null; - } - StringBuilder sb = new StringBuilder(); - byte[] buf = new byte[1024]; - - for (FileStatus file: files) { - if (file.isDirectory()) { - continue; - } - InputStream in = fs.open(file.getPath()); + List paths = listFiles(fs, path); + for (Path eachPath: paths) { + InputStream in = fs.open(eachPath); try { while (true) { + byte[] buf = new byte[1024]; int readBytes = in.read(buf); if (readBytes <= 0) { break; @@ -602,7 +593,6 @@ public String getTableFileContents(Path path) throws Exception { in.close(); } } - return sb.toString(); } @@ -621,4 +611,33 @@ public String getTableFileContents(String tableName) throws Exception { Path path = tableDesc.getPath(); return getTableFileContents(path); } + + public List listTableFiles(String tableName) throws Exception { + TableDesc tableDesc = testingCluster.getMaster().getCatalog().getTableDesc(getCurrentDatabase(), tableName); + if (tableDesc == null) { + return null; + } + + Path path = tableDesc.getPath(); + FileSystem fs = path.getFileSystem(conf); + + return listFiles(fs, path); + } + + private List listFiles(FileSystem fs, Path path) throws Exception { + List result = new ArrayList(); + FileStatus[] files = fs.listStatus(path); + if (files == null || files.length == 0) { + return result; + } + + for (FileStatus eachFile: files) { + if (eachFile.isDirectory()) { + result.addAll(listFiles(fs, eachFile.getPath())); + } else { + result.add(eachFile.getPath()); + } + } + return result; + } } diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index 8c989b5622..c34c3f416d 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -240,13 +240,21 @@ public final void testColumnPartitionedTableByThreeColumns() throws Exception { assertEquals(5, desc.getStats().getNumRows().intValue()); } + String expected = "N\n" + + "N\n" + + "N\n" + + "R\n" + + "R\n"; + + String tableData = getTableFileContents(desc.getPath()); + assertEquals(expected, tableData); + res = executeString("select * from " + tableName + " where col2 = 2"); Map resultRows1 = Maps.newHashMap(); resultRows1.put(45.0d, new int[]{3, 2}); resultRows1.put(38.0d, new int[]{2, 2}); - for (int i = 0; i < 2; i++) { assertTrue(res.next()); assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(2)); diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java index c52b2771c3..2026ffb1df 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java @@ -91,8 +91,12 @@ public final void case3() throws Exception { // first stage's num rows = (left: 1 , right: 2 (filtered)) * 5 (tasks) long[] expectedNumRows = new long[]{15, 2, 2, 2, 7, 2, 2, 2}; - long[] expectedNumBytes = new long[]{45, 34, 34, 18, 109, 34, 34, 18}; - long[] expectedReadBytes = new long[]{45, 0, 34, 0, 109, 0, 34, 0}; + long[] expectedNumBytes = new long[]{20, 34, 34, 18, 109, 34, 34, 18}; + long[] expectedReadBytes = new long[]{20, 0, 34, 0, 109, 0, 34, 0}; + +// long[] expectedNumRows = new long[]{2, 2, 5, 5, 7, 2, 2, 2}; +// long[] expectedNumBytes = new long[]{8, 34, 20, 75, 109, 34, 34, 18}; +// long[] expectedReadBytes = new long[]{8, 0, 20, 0, 109, 0, 34, 0}; assertStatus(2, expectedNumRows, expectedNumBytes, expectedReadBytes); } finally {