From 356a07dcab9d8205c04b9774e6137432c1c3dbc7 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 2 Nov 2015 16:44:51 +0900 Subject: [PATCH 1/3] TAJO-1954: Fix memory leak in physical operator. --- .../java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java | 7 ++++--- .../physical/DistinctGroupbySortAggregationExec.java | 9 +++++---- .../planner/physical/SortBasedColPartitionStoreExec.java | 2 ++ .../master/exec/NonForwardQueryResultFileScanner.java | 1 + 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java b/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java index f0f01bfb54..279fce7329 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java @@ -31,6 +31,7 @@ import org.apache.tajo.master.TajoMaster; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; +import org.junit.Assert; import org.junit.Test; import static junit.framework.Assert.assertTrue; @@ -150,9 +151,9 @@ private void verifyDataBaseAndTable(ServiceTracker tracker) throws Exception { TajoClient client = null; try { client = new TajoClientImpl(tracker); - client.existDatabase("default"); - client.existTable("default.ha_test1"); - client.existTable("default.ha_test2"); + Assert.assertTrue(client.existDatabase("default")); + Assert.assertTrue(client.existTable("default.ha_test1")); + Assert.assertTrue(client.existTable("default.ha_test2")); } finally { IOUtils.cleanup(null, client); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java index 58cfca4a7c..03a3ad13ac 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java @@ -68,10 +68,6 @@ public DistinctGroupbySortAggregationExec(final TaskAttemptContext context, Dist for(int i = 0; i < resultColumnIds.length; i++) { resultColumnIdIndexes[resultColumnIds[i]] = i; } - - for (SortAggregateExec eachExec: aggregateExecs) { - eachExec.init(); - } } boolean first = true; @@ -172,6 +168,11 @@ public void close() throws IOException { @Override public void init() throws IOException { + if (aggregateExecs != null) { + for (SortAggregateExec eachExec: aggregateExecs) { + eachExec.init(); + } + } } @Override diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java index 607dff763e..176b6fb501 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java @@ -102,6 +102,8 @@ public void close() throws IOException { StatisticsUtil.aggregateTableStat(aggregatedStats, appender.getStats()); context.setResultStats(aggregatedStats); } + + super.close(); } @Override diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index ce692704a3..1706318435 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -252,6 +252,7 @@ public SerializedResultSet nextRowBlock(int fetchRowNum) throws IOException { } return resultSetBuilder.build(); } catch (Throwable t) { + close(); throw new TajoInternalError(t.getCause()); } } From 9ddb370f65ba928586be73f3316fcb32da2d2232 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 2 Nov 2015 17:14:31 +0900 Subject: [PATCH 2/3] add missing super::init --- .../planner/physical/DistinctGroupbySortAggregationExec.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java index 03a3ad13ac..cb7478a3f2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java @@ -168,6 +168,8 @@ public void close() throws IOException { @Override public void init() throws IOException { + super.init(); + if (aggregateExecs != null) { for (SortAggregateExec eachExec: aggregateExecs) { eachExec.init(); From c3f5677f83f148818bb99e38f878a79b36163be0 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 2 Nov 2015 17:27:18 +0900 Subject: [PATCH 3/3] TAJO-1954 --- .../src/main/java/org/apache/tajo/storage/RawFile.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java index fba12ddadf..a7b33faea4 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java @@ -78,6 +78,7 @@ public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Fragmen super(conf, schema, meta, fragment); } + @Override public void init() throws IOException { File file; try { @@ -99,8 +100,10 @@ public void init() throws IOException { + ", fragment length :" + fragment.getLength()); } - buf = BufferPool.directBuffer(conf.getInt(READ_BUFFER_SIZE, DEFAULT_BUFFER_SIZE)); - buffer = buf.nioBuffer(0, buf.capacity()); + if(buf == null) { + buf = BufferPool.directBuffer(conf.getInt(READ_BUFFER_SIZE, DEFAULT_BUFFER_SIZE)); + buffer = buf.nioBuffer(0, buf.capacity()); + } columnTypes = new DataType[schema.size()]; for (int i = 0; i < schema.size(); i++) {