From 205331847c2a44381299ab1c9fed4c9c7370472e Mon Sep 17 00:00:00 2001 From: Jongyoung Park Date: Fri, 19 Dec 2014 01:19:29 +0900 Subject: [PATCH 1/2] Close() for FileAppender is added and derived classes are modified --- .../java/org/apache/tajo/storage/CSVFile.java | 16 +++++++++++----- .../org/apache/tajo/storage/FileAppender.java | 7 +++++++ .../java/org/apache/tajo/storage/RawFile.java | 7 +++++++ .../java/org/apache/tajo/storage/RowFile.java | 8 +++++++- .../apache/tajo/storage/avro/AvroAppender.java | 12 +++++++++++- .../tajo/storage/parquet/ParquetAppender.java | 10 +++++++++- .../org/apache/tajo/storage/rcfile/RCFile.java | 7 +++++++ .../sequencefile/SequenceFileAppender.java | 16 ++++++++++------ .../tajo/storage/text/DelimitedTextFile.java | 7 +++++++ 9 files changed, 76 insertions(+), 14 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java index 28c263c7b8..7859dab83b 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java @@ -212,6 +212,13 @@ public void flush() throws IOException { @Override public void close() throws IOException { + try { + super.close(); + } catch(IllegalStateException ex) { + LOG.error(ex.getMessage()); + return; + } + try { flush(); @@ -220,7 +227,7 @@ public void close() throws IOException { stats.setNumBytes(getOffset()); } - if(deflateFilter != null) { + if(this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) { deflateFilter.finish(); deflateFilter.resetState(); deflateFilter = null; @@ -229,10 +236,9 @@ public void close() throws IOException { os.close(); } finally { IOUtils.cleanup(LOG, fos); - if (compressor != null) { - CodecPool.returnCompressor(compressor); - compressor = null; - } + + CodecPool.returnCompressor(compressor); + compressor = null; } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java index b208a71d8a..883134e93c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java @@ -83,5 +83,12 @@ public long getEstimatedOutputSize() throws IOException { return getOffset(); } + @Override + public void close() throws IOException { + if (!inited) { + throw new IllegalStateException("Closing is tried for not initialized object"); + } + } + public abstract long getOffset() throws IOException; } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java index 45e07d3195..e4a2acddb9 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java @@ -741,6 +741,13 @@ public void flush() throws IOException { @Override public void close() throws IOException { + try { + super.close(); + } catch (IllegalStateException ex) { + LOG.error(ex.getMessage()); + return; + } + flush(); if (enabledStats) { stats.setNumBytes(getOffset()); diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java index 5510cbf0cf..12ff696ebc 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java @@ -356,6 +356,8 @@ public void init() throws IOException { if (enabledStats) { this.stats = new TableStatistics(this.schema); } + + super.init(); } private void writeHeader() throws IOException { @@ -462,13 +464,17 @@ public void flush() throws IOException { @Override public void close() throws IOException { - if (out != null) { + try { + super.close(); + if (enabledStats) { stats.setNumBytes(out.getPos()); } sync(); out.flush(); out.close(); + } catch (IllegalStateException ex) { + LOG.error(ex.getMessage()); } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java index 69399dc0e0..4c65c66f9d 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java @@ -18,6 +18,9 @@ package org.apache.tajo.storage.avro; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -46,6 +49,8 @@ * FileAppender for writing to Avro files. */ public class AvroAppender extends FileAppender { + private static final Log LOG = LogFactory.getLog(AvroAppender.class); + private TableStatistics stats; private Schema avroSchema; private List avroFields; @@ -201,7 +206,12 @@ public void flush() throws IOException { */ @Override public void close() throws IOException { - dataFileWriter.close(); + try { + super.close(); + dataFileWriter.close(); + } catch (IllegalStateException ex) { + LOG.error(ex.getMessage()); + } } /** diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java index ef5203c6d1..cea89cc64b 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java @@ -18,6 +18,8 @@ package org.apache.tajo.storage.parquet; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.storage.StorageConstants; import parquet.hadoop.ParquetOutputFormat; @@ -38,6 +40,7 @@ * FileAppender for writing to Parquet files. */ public class ParquetAppender extends FileAppender { + private static final Log LOG = LogFactory.getLog(ParquetAppender.class); private TajoParquetWriter writer; private int blockSize; private int pageSize; @@ -128,7 +131,12 @@ public void flush() throws IOException { */ @Override public void close() throws IOException { - writer.close(); + try { + super.close(); + writer.close(); + } catch (IllegalStateException ex) { + LOG.error(ex.getMessage()); + } } public long getEstimatedOutputSize() throws IOException { diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java index 2c09100089..c69fe425c9 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java @@ -1098,6 +1098,13 @@ public TableStats getStats() { @Override public void close() throws IOException { + try { + super.close(); + } catch (IllegalStateException ex) { + LOG.error(ex.getMessage()); + return; + } + if (bufferedRecords > 0) { flushRecords(); } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java index 8b5d677c44..c74cddaa85 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java @@ -253,13 +253,17 @@ public void flush() throws IOException { @Override public void close() throws IOException { - // Statistical section - if (enabledStats) { - stats.setNumBytes(getOffset()); - } + try { + // Statistical section + if (enabledStats) { + stats.setNumBytes(getOffset()); + } - os.close(); - writer.close(); + os.close(); + writer.close(); + } catch (IllegalStateException ex) { + LOG.error(ex.getMessage()); + } } @Override diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index 59129d1008..34c411e617 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -220,6 +220,13 @@ public void flush() throws IOException { @Override public void close() throws IOException { + try { + super.close(); + } catch (IllegalStateException ex) { + LOG.error(ex.getMessage()); + return; + } + try { serializer.release(); From 7fb632de1860065f287b87498abe9a85eb95848c Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 18 Dec 2014 16:50:09 +0900 Subject: [PATCH 2/2] TAJO-1232: Implicit groupby queries with LIMIT lead to wrong results. (jihoon) --- CHANGES | 3 +++ .../org/apache/tajo/master/GlobalEngine.java | 8 +++---- .../tajo/engine/query/TestGroupByQuery.java | 21 +++++++++++++++++++ .../testGroupbyWithLimit1.sql | 1 + .../testGroupbyWithLimit2.sql | 1 + .../testGroupbyWithLimit3.sql | 1 + .../testGroupbyWithLimit1.result | 3 +++ .../testGroupbyWithLimit2.result | 3 +++ .../testGroupbyWithLimit3.result | 3 +++ .../org/apache/tajo/plan/LogicalPlanner.java | 7 +++++++ 10 files changed, 47 insertions(+), 4 deletions(-) create mode 100644 tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit1.sql create mode 100644 tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit2.sql create mode 100644 tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit3.sql create mode 100644 tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit1.result create mode 100644 tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit2.result create mode 100644 tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit3.result diff --git a/CHANGES b/CHANGES index 2230d0f30a..29b66d1d08 100644 --- a/CHANGES +++ b/CHANGES @@ -112,6 +112,9 @@ Release 0.9.1 - unreleased BUG FIXES + TAJO-1232: Implicit groupby queries with LIMIT lead to wrong results. + (jihoon) + TAJO-1254: Fix getProgress race conditions in Query. (jinho) TAJO-1252: PathValidator should allow hdfs paths which contain IP addresses. diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index a9624f8feb..71b1f9b588 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -204,10 +204,10 @@ public Expr buildExpressionFromSql(QueryContext queryContext, String sql) } private SubmitQueryResponse executeQueryInternal(QueryContext queryContext, - Session session, - LogicalPlan plan, - String sql, - String jsonExpr) throws Exception { + Session session, + LogicalPlan plan, + String sql, + String jsonExpr) throws Exception { LogicalRootNode rootNode = plan.getRootBlock().getRoot(); diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java index 3dd1219bb1..edbe029071 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java @@ -749,4 +749,25 @@ public int compare(QueryMasterTask o1, QueryMasterTask o2) { ConfVars.$DIST_QUERY_GROUPBY_PARTITION_VOLUME.defaultVal); } } + + @Test + public final void testGroupbyWithLimit1() throws Exception { + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } + + @Test + public final void testGroupbyWithLimit2() throws Exception { + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } + + @Test + public final void testGroupbyWithLimit3() throws Exception { + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } } diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit1.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit1.sql new file mode 100644 index 0000000000..06d96af802 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit1.sql @@ -0,0 +1 @@ +select sum(l_linenumber) from lineitem limit 1; \ No newline at end of file diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit2.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit2.sql new file mode 100644 index 0000000000..1ddb171326 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit2.sql @@ -0,0 +1 @@ +select sum(l_linenumber) from lineitem limit 10; \ No newline at end of file diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit3.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit3.sql new file mode 100644 index 0000000000..1bb7c7f1ec --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit3.sql @@ -0,0 +1 @@ +select l_orderkey, sum(l_linenumber) from lineitem group by l_orderkey limit 1; \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit1.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit1.result new file mode 100644 index 0000000000..23bbaea591 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit1.result @@ -0,0 +1,3 @@ +?sum +------------------------------- +7 \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit2.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit2.result new file mode 100644 index 0000000000..23bbaea591 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit2.result @@ -0,0 +1,3 @@ +?sum +------------------------------- +7 \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit3.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit3.result new file mode 100644 index 0000000000..cb1e1413f9 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit3.result @@ -0,0 +1,3 @@ +l_orderkey,?sum +------------------------------- +3,3 \ No newline at end of file diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java index df45c3a28b..1a426e0893 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java @@ -716,6 +716,13 @@ private LogicalNode insertGroupbyNode(PlanContext context, LogicalNode child, St LogicalPlan plan = context.plan; QueryBlock block = context.queryBlock; + + // The limit operation must affect to the number of results, not the number of input records. + // Thus, the aggregation must be carried out before the limit operation. + if (child.getType() == NodeType.LIMIT) { + child = ((LimitNode)child).getChild(); + } + GroupbyNode groupbyNode = context.plan.createNode(GroupbyNode.class); groupbyNode.setChild(child); groupbyNode.setInSchema(child.getOutSchema());