From d27abb93941e336ca5ed7ea65ebac889406e09da Mon Sep 17 00:00:00 2001 From: jinossy Date: Thu, 17 Jul 2014 16:21:27 +0900 Subject: [PATCH 1/2] TAJO-953: RawFile should release a DirectBuffer immediately --- .../java/org/apache/tajo/storage/RawFile.java | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java index 9677bca055..ba373fcb6a 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; @@ -32,6 +33,7 @@ import org.apache.tajo.datum.ProtobufDatumFactory; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.BitArray; +import sun.nio.ch.DirectBuffer; import java.io.File; import java.io.FileInputStream; @@ -92,7 +94,7 @@ public void init() throws IOException { LOG.debug("RawFileScanner open:" + path + "," + channel.position() + ", size :" + channel.size()); } - buffer = ByteBuffer.allocateDirect(128 * 1024); + buffer = ByteBuffer.allocateDirect(64 * 1024); columnTypes = new DataType[schema.size()]; for (int i = 0; i < schema.size(); i++) { @@ -378,9 +380,13 @@ public void close() throws IOException { tableStats.setReadBytes(fileSize); tableStats.setNumRows(recordCount); } - buffer.clear(); - channel.close(); - fis.close(); + + if(buffer.isDirect()){ + ((DirectBuffer) buffer).cleaner().clean(); + } else { + buffer.clear(); + } + IOUtils.cleanup(LOG, channel, fis); } @Override @@ -706,7 +712,9 @@ public void addTuple(Tuple t) throws IOException { @Override public void flush() throws IOException { - flushBuffer(); + if(buffer != null){ + flushBuffer(); + } } @Override @@ -718,8 +726,13 @@ public void close() throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path); } - channel.close(); - randomAccessFile.close(); + + if(buffer.isDirect()){ + ((DirectBuffer) buffer).cleaner().clean(); + } else { + buffer.clear(); + } + IOUtils.cleanup(LOG, channel, randomAccessFile); } @Override From c1c9d208d2ffdc385ad6236423b7145e033d831d Mon Sep 17 00:00:00 2001 From: jinossy Date: Thu, 17 Jul 2014 17:27:07 +0900 Subject: [PATCH 2/2] move to StorageUtil --- .../java/org/apache/tajo/storage/RawFile.java | 13 ++----------- .../org/apache/tajo/storage/StorageUtil.java | 18 ++++++++++++++++-- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java index ba373fcb6a..41d1e05451 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java @@ -33,7 +33,6 @@ import org.apache.tajo.datum.ProtobufDatumFactory; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.BitArray; -import sun.nio.ch.DirectBuffer; import java.io.File; import java.io.FileInputStream; @@ -381,11 +380,7 @@ public void close() throws IOException { tableStats.setNumRows(recordCount); } - if(buffer.isDirect()){ - ((DirectBuffer) buffer).cleaner().clean(); - } else { - buffer.clear(); - } + StorageUtil.closeBuffer(buffer); IOUtils.cleanup(LOG, channel, fis); } @@ -727,11 +722,7 @@ public void close() throws IOException { LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path); } - if(buffer.isDirect()){ - ((DirectBuffer) buffer).cleaner().clean(); - } else { - buffer.clear(); - } + StorageUtil.closeBuffer(buffer); IOUtils.cleanup(LOG, channel, randomAccessFile); } diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java index d11dc097b4..95bb96f01a 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java @@ -23,17 +23,21 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.KeyValueSet; import parquet.hadoop.ParquetOutputFormat; +import sun.nio.ch.DirectBuffer; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -public class StorageUtil extends StorageConstants{ +public class StorageUtil extends StorageConstants { public static int getRowByteSize(Schema schema) { int sum = 0; for(Column col : schema.getColumns()) { @@ -185,4 +189,14 @@ public static int getMaxFileSequence(FileSystem fs, Path path, boolean recursive return -1; } } + + public static void closeBuffer(ByteBuffer buffer) { + if (buffer != null) { + if (buffer.isDirect()) { + ((DirectBuffer) buffer).cleaner().clean(); + } else { + buffer.clear(); + } + } + } }