From de9f7112731e4053613252e4e7e2d924020f5b0a Mon Sep 17 00:00:00 2001 From: mhthanh Date: Tue, 7 Oct 2014 14:43:35 +0900 Subject: [PATCH 1/3] TAJO-1097 --- .../src/main/java/org/apache/tajo/storage/RawFile.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java index edcf686aa9..72ca549047 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java @@ -151,7 +151,11 @@ private boolean fillBuffer() throws IOException { long realRemaining = fragment.getEndKey() - numBytesRead; numBytesRead += bytesRead; if (realRemaining < bufferSize) { - buffer.limit(currentDataSize + (int) realRemaining); + int newLimit = currentDataSize + (int) realRemaining; + if(newLimit > bufferSize) { + newLimit = bufferSize; + } + buffer.limit(newLimit); } return true; } From 8c9a93d4809ce8873850d44bd2a9d1191779f124 Mon Sep 17 00:00:00 2001 From: mhthanh Date: Wed, 8 Oct 2014 10:14:43 +0900 Subject: [PATCH 2/3] Update the patch with advices from Jinho Kim. --- .../src/main/java/org/apache/tajo/storage/RawFile.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java index 72ca549047..c8ac3a2141 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java @@ -108,7 +108,9 @@ public void init() throws IOException { headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); // The middle 2 bytes is for NullFlagSize // initial read - channel.position(fragment.getStartKey()); + if (fragment.getStartKey() > 0) { + channel.position(fragment.getStartKey()); + } numBytesRead = channel.read(buffer); buffer.flip(); @@ -386,7 +388,7 @@ public void reset() throws IOException { // clear the buffer buffer.clear(); // reload initial buffer - channel.position(0); + channel.position(fragment.getStartKey()); numBytesRead = channel.read(buffer); buffer.flip(); eof = false; From ce7beb16832c18887c2b13cca687cdd1ada30209 Mon Sep 17 00:00:00 2001 From: mhthanh Date: Wed, 8 Oct 2014 13:54:35 +0900 Subject: [PATCH 3/3] Add unit test created by Jinho Kim --- .../org/apache/tajo/storage/TestStorages.java | 96 +++++++++++++++++-- 1 file changed, 88 insertions(+), 8 deletions(-) diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java index 3bea740844..5d1b652f9c 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -18,6 +18,7 @@ package org.apache.tajo.storage; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -50,9 +51,9 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.List; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @RunWith(Parameterized.class) @@ -97,13 +98,15 @@ public class TestStorages { private StoreType storeType; private boolean splitable; private boolean statsable; + private boolean seekable; private Path testDir; private FileSystem fs; - public TestStorages(StoreType type, boolean splitable, boolean statsable) throws IOException { + public TestStorages(StoreType type, boolean splitable, boolean statsable, boolean seekable) throws IOException { this.storeType = type; this.splitable = splitable; this.statsable = statsable; + this.seekable = seekable; conf = new TajoConf(); @@ -118,12 +121,12 @@ public TestStorages(StoreType type, boolean splitable, boolean statsable) throws @Parameterized.Parameters public static Collection generateParameters() { return Arrays.asList(new Object[][] { - {StoreType.CSV, true, true}, - {StoreType.RAW, false, false}, - {StoreType.RCFILE, true, true}, - {StoreType.PARQUET, false, false}, - {StoreType.SEQUENCEFILE, true, true}, - {StoreType.AVRO, false, false}, + {StoreType.CSV, true, true, true}, + {StoreType.RAW, false, false, true}, + {StoreType.RCFILE, true, true, false}, + {StoreType.PARQUET, false, false, false}, + {StoreType.SEQUENCEFILE, true, true, false}, + {StoreType.AVRO, false, false, false}, }); } @@ -773,4 +776,81 @@ public void testTime() throws IOException { } } + @Test + public void testSeekableScanner() throws IOException { + if (!seekable) { + return; + } + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("comment", Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + Path tablePath = new Path(testDir, "Seekable.data"); + FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, + tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 100000; + VTuple vTuple; + + List offsets = Lists.newArrayList(); + offsets.add(0L); + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(3); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + vTuple.put(2, DatumFactory.createText("test")); + appender.addTuple(vTuple); + + // find a seek position + if (i % (tupleNum / 3) == 0) { + offsets.add(appender.getOffset()); + } + } + + // end of file + if (!offsets.contains(appender.getOffset())) { + offsets.add(appender.getOffset()); + } + + appender.close(); + if (statsable) { + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + } + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(status.getLen(), appender.getOffset()); + + Scanner scanner; + int tupleCnt = 0; + long prevOffset = 0; + long readBytes = 0; + long readRows = 0; + for (long offset : offsets) { + scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, + new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema); + scanner.init(); + + while (scanner.next() != null) { + tupleCnt++; + } + + scanner.close(); + if (statsable) { + readBytes += scanner.getInputStats().getNumBytes(); + readRows += scanner.getInputStats().getNumRows(); + } + prevOffset = offset; + } + + assertEquals(tupleNum, tupleCnt); + if (statsable) { + assertEquals(appender.getStats().getNumBytes().longValue(), readBytes); + assertEquals(appender.getStats().getNumRows().longValue(), readRows); + } + } }