From 7ef2784b3c3e2773cb625388000f80664ad0090e Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Sat, 27 Sep 2014 00:03:40 -0700 Subject: [PATCH 1/3] TAJO-1042: Implement block iteration interfaces for executors and scanners. --- .../tajo/engine/planner/physical/MemTableScanner.java | 7 +++++++ .../tajo/engine/planner/physical/PairWiseMerger.java | 7 +++++++ .../tajo/engine/planner/physical/PhysicalExec.java | 7 +++++++ .../org/apache/tajo/engine/utils/TupleCacheScanner.java | 7 +++++++ .../main/java/org/apache/tajo/storage/FileScanner.java | 7 +++++++ .../main/java/org/apache/tajo/storage/MergeScanner.java | 7 +++++++ .../src/main/java/org/apache/tajo/storage/Scanner.java | 9 +++++++++ .../java/org/apache/tajo/storage/v2/FileScannerV2.java | 7 +++++++ 8 files changed, 58 insertions(+) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java index 7bd6a703ea..a2dc87e83d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java @@ -23,9 +23,11 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.storage.Scanner; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import java.io.IOException; import java.util.Collection; @@ -73,6 +75,11 @@ public Tuple next() throws IOException { } } + @Override + public boolean nextFetch(OffHeapRowBlock rowBlock) { + throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented"); + } + @Override public void reset() throws IOException { init(); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java index 2ac8662d6c..f801b35fc2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java @@ -24,9 +24,11 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.storage.Scanner; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import java.io.IOException; import java.util.Comparator; @@ -158,6 +160,11 @@ public Tuple next() throws IOException { return outTuple; } + @Override + public boolean nextFetch(OffHeapRowBlock rowBlock) { + throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented"); + } + @Override public void reset() throws IOException { if (state == State.INITED) { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java index 31cfc4d359..859c053ff9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java @@ -25,7 +25,10 @@ import org.apache.tajo.catalog.SchemaObject; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.engine.codegen.CompilationError; +import org.apache.tajo.exception.UnimplementedException; +import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -60,6 +63,10 @@ protected void compile() throws CompilationError { public abstract Tuple next() throws IOException; + public boolean nextFetch(OffHeapRowBlock rowBlock) { + throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented"); + } + public abstract void rescan() throws IOException; public abstract void close() throws IOException; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java index 743d70c695..10b3a22575 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java @@ -21,8 +21,10 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.storage.Scanner; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import java.io.IOException; import java.util.Iterator; @@ -62,6 +64,11 @@ public Tuple next() throws IOException { } } + @Override + public boolean nextFetch(OffHeapRowBlock rowBlock) { + throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented"); + } + @Override public void reset() throws IOException { init(); diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java index f15c4c97a5..6aa59e6a45 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java @@ -29,7 +29,9 @@ import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import java.io.IOException; @@ -80,6 +82,11 @@ public Schema getSchema() { return schema; } + @Override + public boolean nextFetch(OffHeapRowBlock rowBlock) { + throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented"); + } + @Override public void setTarget(Column[] targets) { if (inited) { diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java index 8917f21ce3..890455ad90 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java @@ -26,7 +26,9 @@ import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import java.io.IOException; import java.util.Iterator; @@ -113,6 +115,11 @@ public Tuple next() throws IOException { return tuple; } + @Override + public boolean nextFetch(OffHeapRowBlock rowBlock) { + throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented"); + } + @Override public void reset() throws IOException { this.iterator = fragments.iterator(); diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java index 16c4faa4a5..f532e0e646 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java @@ -21,6 +21,8 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.SchemaObject; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.exception.UnimplementedException; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import java.io.Closeable; import java.io.IOException; @@ -41,6 +43,13 @@ public interface Scanner extends SchemaObject, Closeable { * @throws IOException if internal I/O error occurs during next method */ Tuple next() throws IOException; + + /** + * + * @param rowBlock + * @return + */ + boolean nextFetch(OffHeapRowBlock rowBlock); /** * Reset the cursor. After executed, the scanner diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java index da7084c721..ebbe2f43e1 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java @@ -28,9 +28,11 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.storage.Scanner; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @@ -222,6 +224,11 @@ public Tuple next() throws IOException { return nextTuple(); } + @Override + public boolean nextFetch(OffHeapRowBlock rowBlock) { + throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented"); + } + @Override public float getProgress() { return progress; From a71a98147ced8021bc5a521666efdfeff4ae84c7 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Sat, 27 Sep 2014 10:50:46 -0700 Subject: [PATCH 2/3] TAJO-1080: Cleanup BooleanDatum and Inet4Datum. --- .../java/org/apache/tajo/datum/BooleanDatum.java | 8 +++----- .../main/java/org/apache/tajo/datum/Inet4Datum.java | 7 ++----- .../main/java/org/apache/tajo/datum/Int2Datum.java | 13 ------------- .../main/java/org/apache/tajo/util/NetUtils.java | 9 +++++++++ .../apache/tajo/datum/TestArithmeticOperator.java | 10 +++++----- 5 files changed, 19 insertions(+), 28 deletions(-) diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java index 93933a85a9..a8eeca0c2f 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java @@ -167,12 +167,10 @@ public BooleanDatum equalsTo(Datum datum) { public int compareTo(Datum datum) { switch (datum.type()) { case BOOLEAN: - if (val && !datum.asBool()) { - return -1; - } else if (val && datum.asBool()) { - return 1; - } else { + if ((val ^ datum.asBool()) == false) { // if both are the same regardless of its value. return 0; + } else { + return val ? -1 : 1; } default: throw new InvalidOperationException(datum.type()); diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java index 1de81cd9f5..ed48a028e3 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java @@ -22,6 +22,7 @@ import com.google.gson.annotations.Expose; import org.apache.tajo.exception.InvalidOperationException; import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.NetUtils; import static org.apache.tajo.common.TajoDataTypes.Type; @@ -36,11 +37,7 @@ public class Inet4Datum extends Datum { public Inet4Datum(String addr) { super(Type.INET4); - String [] elems = addr.split("\\."); - address = Integer.parseInt(elems[3]) & 0xFF - | ((Integer.parseInt(elems[2]) << 8) & 0xFF00) - | ((Integer.parseInt(elems[1]) << 16) & 0xFF0000) - | ((Integer.parseInt(elems[0]) << 24) & 0xFF000000); + address = NetUtils.convertIPStringToInt(addr); } public Inet4Datum(byte[] addr) { diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java index 2a6c691cbf..7bd24da791 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java @@ -195,7 +195,6 @@ public int compareTo(Datum datum) { public Datum plus(Datum datum) { switch (datum.type()) { case INT2: - return DatumFactory.createInt2((short) (val + datum.asInt2())); case INT4: return DatumFactory.createInt4(val + datum.asInt4()); case INT8: @@ -220,7 +219,6 @@ public Datum plus(Datum datum) { public Datum minus(Datum datum) { switch (datum.type()) { case INT2: - return DatumFactory.createInt2((short) (val - datum.asInt2())); case INT4: return DatumFactory.createInt4(val - datum.asInt4()); case INT8: @@ -245,7 +243,6 @@ public Datum minus(Datum datum) { public Datum multiply(Datum datum) { switch (datum.type()) { case INT2: - return DatumFactory.createInt4(val * datum.asInt2()); case INT4: return DatumFactory.createInt4(val * datum.asInt4()); case INT8: @@ -268,11 +265,6 @@ public Datum multiply(Datum datum) { public Datum divide(Datum datum) { switch (datum.type()) { case INT2: - short paramValueI2 = datum.asInt2(); - if (!validateDivideZero(paramValueI2)) { - return NullDatum.get(); - } - return DatumFactory.createInt2((short) (val / paramValueI2)); case INT4: int paramValueI4 = datum.asInt4(); if (!validateDivideZero(paramValueI4)) { @@ -308,11 +300,6 @@ public Datum divide(Datum datum) { public Datum modular(Datum datum) { switch (datum.type()) { case INT2: - short paramValueI2 = datum.asInt2(); - if (!validateDivideZero(paramValueI2)) { - return NullDatum.get(); - } - return DatumFactory.createInt2((short) (val % paramValueI2)); case INT4: int paramValueI4 = datum.asInt4(); if (!validateDivideZero(paramValueI4)) { diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java index 829829f9cc..fc24a5beb0 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java @@ -101,4 +101,13 @@ public static String normalizeHost(String host) { } return host; } + + public static int convertIPStringToInt(String ipAddr) { + String [] elems = ipAddr.split("\\."); + int address = Integer.parseInt(elems[3]) & 0xFF + | ((Integer.parseInt(elems[2]) << 8) & 0xFF00) + | ((Integer.parseInt(elems[1]) << 16) & 0xFF0000) + | ((Integer.parseInt(elems[0]) << 24) & 0xFF000000); + return address; + } } \ No newline at end of file diff --git a/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java b/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java index 42623bd3d8..8915ea3dad 100644 --- a/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java +++ b/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java @@ -62,20 +62,20 @@ public void setUp() { @Test public void testInt2Datum() throws Exception { //plus - runAndAssert("plus", new Int2Datum((short)10), new Int2Datum((short)5), new Int2Datum((short)15)); + runAndAssert("plus", new Int2Datum((short)10), new Int2Datum((short)5), new Int4Datum((short)15)); runAndAssert("plus", new Int2Datum((short)10), new Int4Datum(5), new Int4Datum(15)); runAndAssert("plus", new Int2Datum((short)10), new Int8Datum(5), new Int8Datum(15)); runAndAssert("plus", new Int2Datum((short)10), new Float4Datum(5.0f), new Float4Datum(15.0f)); runAndAssert("plus", new Int2Datum((short)10), new Float8Datum(5.0), new Float8Datum(15.0)); //minus - runAndAssert("minus", new Int2Datum((short)10), new Int2Datum((short)5), new Int2Datum((short)5)); + runAndAssert("minus", new Int2Datum((short)10), new Int2Datum((short)5), new Int4Datum((short)5)); runAndAssert("minus", new Int2Datum((short)10), new Int4Datum(5), new Int4Datum(5)); runAndAssert("minus", new Int2Datum((short)10), new Int8Datum(5), new Int8Datum(5)); runAndAssert("minus", new Int2Datum((short)10), new Float4Datum(5.0f), new Float4Datum(5.0f)); runAndAssert("minus", new Int2Datum((short)10), new Float8Datum(5.0), new Float8Datum(5.0)); - runAndAssert("minus", new Int2Datum((short)5), new Int2Datum((short)10), new Int2Datum((short)-5)); + runAndAssert("minus", new Int2Datum((short)5), new Int2Datum((short)10), new Int4Datum((short)-5)); runAndAssert("minus", new Int2Datum((short)5), new Int4Datum(10), new Int4Datum(-5)); runAndAssert("minus", new Int2Datum((short)5), new Int8Datum(10), new Int8Datum(-5)); runAndAssert("minus", new Int2Datum((short)5), new Float4Datum(10.0f), new Float4Datum(-5.0f)); @@ -89,7 +89,7 @@ public void testInt2Datum() throws Exception { runAndAssert("multiply", new Int2Datum((short)10), new Float8Datum(5.0), new Float8Datum(50.0)); //divide - runAndAssert("divide", new Int2Datum((short)10), new Int2Datum((short)5), new Int2Datum((short)2)); + runAndAssert("divide", new Int2Datum((short)10), new Int2Datum((short)5), new Int4Datum((short)2)); runAndAssert("divide", new Int2Datum((short)10), new Int4Datum(5), new Int4Datum(2)); runAndAssert("divide", new Int2Datum((short)10), new Int8Datum(5), new Int8Datum(2)); runAndAssert("divide", new Int2Datum((short)10), new Float4Datum(5.0f), new Float4Datum(2.0f)); @@ -102,7 +102,7 @@ public void testInt2Datum() throws Exception { runAndAssert("divide", new Int2Datum((short)10), new Float8Datum(0.0), NullDatum.get()); //modular - runAndAssert("modular", new Int2Datum((short)10), new Int2Datum((short)3), new Int2Datum((short)1)); + runAndAssert("modular", new Int2Datum((short)10), new Int2Datum((short)3), new Int4Datum((short)1)); runAndAssert("modular", new Int2Datum((short)10), new Int4Datum(3), new Int4Datum(1)); runAndAssert("modular", new Int2Datum((short)10), new Int8Datum(3), new Int8Datum(1)); runAndAssert("modular", new Int2Datum((short)10), new Float4Datum(3.0f), new Float4Datum(1.0f)); From c128eca315fdb084815c4a9ac4fb9c00c87f5466 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Sat, 27 Sep 2014 13:48:26 -0700 Subject: [PATCH 3/3] TAJO-1043: Implement nextFetch(RowBlock) of CSVScanner. (hyunsik) --- .../java/org/apache/tajo/storage/CSVFile.java | 51 ++ .../org/apache/tajo/storage/FileScanner.java | 2 +- .../java/org/apache/tajo/storage/Scanner.java | 2 +- .../storage/TextSerializerDeserializer.java | 94 +- .../tajo/tuple/offheap/UnSafeTuple.java | 4 + .../apache/tajo/storage/TestNextFetches.java | 854 ++++++++++++++++++ 6 files changed, 1004 insertions(+), 3 deletions(-) create mode 100644 tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java index 211379467e..06ff081271 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java @@ -35,11 +35,15 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.compress.CodecPool; import org.apache.tajo.storage.exception.AlreadyExistsStorageException; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; +import org.apache.tajo.tuple.offheap.OffHeapRowBlockWriter; +import org.apache.tajo.tuple.offheap.RowWriter; import org.apache.tajo.util.BytesUtils; import java.io.*; @@ -480,6 +484,53 @@ public Tuple next() throws IOException { } } + TextSerializerDeserializer deserializer = new TextSerializerDeserializer(); + + boolean hasNext() throws IOException { + if (currentIdx == validIdx) { + if (eof) { + return false; + } else { + page(); + + if(currentIdx == validIdx){ + return false; + } + } + } + + return true; + } + + @Override + public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException { + rowBlock.clear(); + OffHeapRowBlockWriter writer = (OffHeapRowBlockWriter) rowBlock.getWriter(); + + while(hasNext() && rowBlock.rows() < rowBlock.maxRowNum()) { + byte[][] cells = BytesUtils.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx), + rowLengthList.get(currentIdx), delimiter, targetColumnIndexes); + currentIdx++; + + int fieldIdx = 0; + writer.startRow(); + for (; fieldIdx < cells.length && fieldIdx < schema.size(); fieldIdx++) { + if (cells[fieldIdx] == null) { + writer.skipField(); + } else { + deserializer.write(writer, schema.getColumn(fieldIdx), cells[fieldIdx], 0, cells[fieldIdx].length, nullChars); + + } + } + for (; fieldIdx < schema.size(); fieldIdx++) { + writer.skipField(); + } + writer.endRow(); + } + + return rowBlock.rows() > 0; + } + private boolean isCompress() { return codec != null; } diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java index 6aa59e6a45..d4357e33e2 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java @@ -83,7 +83,7 @@ public Schema getSchema() { } @Override - public boolean nextFetch(OffHeapRowBlock rowBlock) { + public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException { throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented"); } diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java index f532e0e646..3478e23cdb 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java @@ -49,7 +49,7 @@ public interface Scanner extends SchemaObject, Closeable { * @param rowBlock * @return */ - boolean nextFetch(OffHeapRowBlock rowBlock); + boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException; /** * Reset the cursor. After executed, the scanner diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java index b42c1b5142..6dfe6c1386 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java @@ -25,8 +25,11 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.*; import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; +import org.apache.tajo.tuple.offheap.RowWriter; import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.NumberUtil; +import org.apache.tajo.util.datetime.DateTimeUtil; import java.io.IOException; import java.io.OutputStream; @@ -35,7 +38,7 @@ public class TextSerializerDeserializer implements SerializerDeserializer { public static final byte[] trueBytes = "true".getBytes(); public static final byte[] falseBytes = "false".getBytes(); - private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); + private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); @Override @@ -213,6 +216,95 @@ public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[ return datum; } + public static void write(RowWriter writer, Column col, byte [] bytes, int offset, int length, byte [] nullChar) throws IOException { + TajoDataTypes.Type type = col.getDataType().getType(); + boolean nullField; + if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) { + nullField = isNullText(bytes, offset, length, nullChar); + } else { + nullField = isNull(bytes, offset, length, nullChar); + } + + if (nullField) { + writer.skipField(); + return; + } else { + switch (col.getDataType().getType()) { + case BOOLEAN: + writer.putBool(bytes[offset] == 't' || bytes[offset] == 'T'); + break; + + case CHAR: + case TEXT: + writer.putText(bytes); + break; + + case INT1: + case INT2: + writer.putInt2((short) NumberUtil.parseInt(bytes, offset, length)); + break; + + case INT4: + writer.putInt4(NumberUtil.parseInt(bytes, offset, length)); + break; + + case INT8: + writer.putInt8(Long.parseLong(new String(bytes, offset, length))); + break; + + case FLOAT4: + writer.putFloat4(Float.parseFloat(new String(bytes, offset, length))); + break; + + case FLOAT8: + writer.putFloat8(Double.parseDouble(new String(bytes, offset, length))); + break; + + case DATE: + writer.putDate(DateTimeUtil.toJulianDate(new String(bytes, offset, length))); + break; + + case TIME: + writer.putInt8(DateTimeUtil.toJulianTime(new String(bytes, offset, length))); + break; + + case TIMESTAMP: + writer.putInt8(DateTimeUtil.toJulianTimestamp(new String(bytes, offset, length))); + break; + + case INTERVAL: + writer.putInterval(DatumFactory.createInterval(new String(bytes, offset, length))); + break; + + case PROTOBUF: + ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType()); + Message.Builder builder = factory.newBuilder(); + try { + byte[] protoBytes = new byte[length]; + System.arraycopy(bytes, offset, protoBytes, 0, length); + protobufJsonFormat.merge(protoBytes, builder); + writer.putProtoDatum(factory.createDatum(builder.build())); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + + break; + + case INET4: + writer.putInet4(NetUtils.convertIPStringToInt(new String(bytes, offset, length))); + break; + + case BLOB: + writer.putBlob(Base64.decodeBase64(bytes)); + break; + + default: + writer.skipField(); + } + } + } + private static boolean isNull(byte[] val, int offset, int length, byte[] nullBytes) { return length == 0 || ((length == nullBytes.length) && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length)); diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java index d8bafea539..6f4d385043 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java @@ -145,6 +145,8 @@ public Datum get(int fieldId) { switch (types[fieldId].getType()) { case BOOLEAN: return DatumFactory.createBool(getBool(fieldId)); + case CHAR: + return DatumFactory.createChar(getBytes(fieldId)); case INT1: case INT2: return DatumFactory.createInt2(getInt2(fieldId)); @@ -158,6 +160,8 @@ public Datum get(int fieldId) { return DatumFactory.createFloat8(getFloat8(fieldId)); case TEXT: return DatumFactory.createText(getText(fieldId)); + case BLOB: + return DatumFactory.createBlob(getBytes(fieldId)); case TIMESTAMP: return DatumFactory.createTimestamp(getInt8(fieldId)); case DATE: diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java new file mode 100644 index 0000000000..d1b3afd189 --- /dev/null +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java @@ -0,0 +1,854 @@ +/* + * Lisensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.tajo.QueryId; +import org.apache.tajo.TajoIdProtos; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.ProtobufDatumFactory; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.rcfile.RCFile; +import org.apache.tajo.storage.sequencefile.SequenceFileScanner; +import org.apache.tajo.tuple.RowBlockReader; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; +import org.apache.tajo.tuple.offheap.UnSafeTuple; +import org.apache.tajo.tuple.offheap.ZeroCopyTuple; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.UnsafeUtil; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import sun.misc.Unsafe; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class TestNextFetches { + private TajoConf conf; + private static String TEST_PATH = "target/test-data/TestStorages"; + + private static String TEST_PROJECTION_AVRO_SCHEMA = + "{\n" + + " \"type\": \"record\",\n" + + " \"namespace\": \"org.apache.tajo\",\n" + + " \"name\": \"testProjection\",\n" + + " \"fields\": [\n" + + " { \"name\": \"id\", \"type\": \"int\" },\n" + + " { \"name\": \"age\", \"type\": \"long\" },\n" + + " { \"name\": \"score\", \"type\": \"float\" }\n" + + " ]\n" + + "}\n"; + + private static String TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA = + "{\n" + + " \"type\": \"record\",\n" + + " \"namespace\": \"org.apache.tajo\",\n" + + " \"name\": \"testNullHandlingTypes\",\n" + + " \"fields\": [\n" + + " { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" + + " { \"name\": \"col2\", \"type\": [\"null\", \"int\"] },\n" + + " { \"name\": \"col3\", \"type\": [\"null\", \"string\"] },\n" + + " { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" + + " { \"name\": \"col5\", \"type\": [\"null\", \"int\"] },\n" + + " { \"name\": \"col6\", \"type\": [\"null\", \"long\"] },\n" + + " { \"name\": \"col7\", \"type\": [\"null\", \"float\"] },\n" + + " { \"name\": \"col8\", \"type\": [\"null\", \"double\"] },\n" + + " { \"name\": \"col9\", \"type\": [\"null\", \"string\"] },\n" + + " { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" + + " { \"name\": \"col11\", \"type\": [\"null\", \"bytes\"] },\n" + + " { \"name\": \"col12\", \"type\": \"null\" },\n" + + " { \"name\": \"col13\", \"type\": [\"null\", \"bytes\"] }\n" + + " ]\n" + + "}\n"; + + private StoreType storeType; + private boolean splitable; + private boolean statsable; + private Path testDir; + private FileSystem fs; + + public TestNextFetches(StoreType type, boolean splitable, boolean statsable) throws IOException { + this.storeType = type; + this.splitable = splitable; + this.statsable = statsable; + + conf = new TajoConf(); + + if (storeType == StoreType.RCFILE) { + conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100); + } + + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + fs = testDir.getFileSystem(conf); + } + + @Parameterized.Parameters + public static Collection generateParameters() { + return Arrays.asList(new Object[][] { + {StoreType.CSV, true, true}, + // TODO - to be implemented +// {StoreType.RAW, false, false}, +// {StoreType.RCFILE, true, true}, +// {StoreType.PARQUET, false, false}, +// {StoreType.SEQUENCEFILE, true, true}, +// {StoreType.AVRO, false, false}, + }); + } + + @Test + public void testSplitable() throws IOException { + if (splitable) { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + Path tablePath = new Path(testDir, "Splitable.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(2); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + appender.addTuple(vTuple); + } + appender.close(); + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + long randomNum = (long) (Math.random() * fileLen) + 1; + + FileFragment[] tablets = new FileFragment[2]; + tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum); + tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum)); + + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + int tupleCnt = 0; + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + while (scanner.nextFetch(rowBlock)) { + tupleCnt += rowBlock.rows(); + } + scanner.close(); + + scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + while (scanner.nextFetch(rowBlock)) { + tupleCnt += rowBlock.rows(); + } + scanner.close(); + + assertEquals(tupleNum, tupleCnt); + + rowBlock.release(); + } + } + + @Test + public void testSplitableForRCFileBug() throws IOException { + if (storeType == StoreType.RCFILE) { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + Path tablePath = new Path(testDir, "Splitable.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(2); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + appender.addTuple(vTuple); + } + appender.close(); + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + long randomNum = 122; // header size + + FileFragment[] tablets = new FileFragment[2]; + tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum); + tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum)); + + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + int tupleCnt = 0; + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + while (scanner.nextFetch(rowBlock)) { + tupleCnt += rowBlock.rows(); + } + scanner.close(); + + scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + while (scanner.nextFetch(rowBlock)) { + tupleCnt += rowBlock.rows(); + } + scanner.close(); + + assertEquals(tupleNum, tupleCnt); + + rowBlock.release(); + } + } + + @Test + public void testProjection() throws IOException { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("score", Type.FLOAT4); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + meta.setOptions(StorageUtil.newPhysicalProperties(storeType)); + if (storeType == StoreType.AVRO) { + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, + TEST_PROJECTION_AVRO_SCHEMA); + } + + Path tablePath = new Path(testDir, "testProjection.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(3); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(i + 2)); + vTuple.put(2, DatumFactory.createFloat4(i + 3)); + appender.addTuple(vTuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("testReadAndWrite", tablePath, 0, status.getLen()); + + Schema target = new Schema(); + target.addColumn("age", Type.INT8); + target.addColumn("score", Type.FLOAT4); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment, target); + scanner.init(); + int tupleCnt = 0; + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + ZeroCopyTuple tuple = new ZeroCopyTuple(); + while (scanner.nextFetch(rowBlock)) { + RowBlockReader reader = rowBlock.getReader(); + while (reader.next(tuple)) { + if (storeType == StoreType.RCFILE + || storeType == StoreType.TREVNI + || storeType == StoreType.CSV + || storeType == StoreType.PARQUET + || storeType == StoreType.SEQUENCEFILE + || storeType == StoreType.AVRO) { + assertTrue(tuple.isNull(0)); + } + assertTrue(tupleCnt + 2 == tuple.getInt8(1)); + assertTrue(tupleCnt + 3 == tuple.getFloat4(2)); + tupleCnt++; + } + } + scanner.close(); + + assertEquals(tupleNum, tupleCnt); + + rowBlock.release(); + } + + @Test + public void testVariousTypes() throws IOException { + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.CHAR, 7); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.FLOAT4); + schema.addColumn("col7", Type.FLOAT8); + schema.addColumn("col8", Type.TEXT); + schema.addColumn("col9", Type.BLOB); + schema.addColumn("col10", Type.INET4); + schema.addColumn("col11", Type.NULL_TYPE); + schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.setOptions(StorageUtil.newPhysicalProperties(storeType)); + if (storeType == StoreType.AVRO) { + String path = FileUtil.getResourcePath("testVariousTypes.avsc").toString(); + meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path); + } + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(12); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createChar("hyunsik"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("hyunsik"), + DatumFactory.createBlob("hyunsik".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + while (scanner.nextFetch(rowBlock)) { + RowBlockReader reader = rowBlock.getReader(); + while (reader.next(zcTuple)) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), zcTuple.get(i)); + } + } + } + scanner.close(); + + rowBlock.release(); + } + + @Test + public void testNullHandlingTypes() throws IOException { + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.CHAR, 7); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.FLOAT4); + schema.addColumn("col7", Type.FLOAT8); + schema.addColumn("col8", Type.TEXT); + schema.addColumn("col9", Type.BLOB); + schema.addColumn("col10", Type.INET4); + schema.addColumn("col11", Type.NULL_TYPE); + schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.setOptions(StorageUtil.newPhysicalProperties(storeType)); + meta.putOption(StorageConstants.CSVFILE_NULL, "\\\\N"); + meta.putOption(StorageConstants.RCFILE_NULL, "\\\\N"); + meta.putOption(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName()); + meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\"); + if (storeType == StoreType.AVRO) { + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, + TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA); + } + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple seedTuple = new VTuple(12); + seedTuple.put(new Datum[]{ + DatumFactory.createBool(true), // 0 + DatumFactory.createChar("hyunsik"), // 1 + DatumFactory.createInt2((short) 17), // 2 + DatumFactory.createInt4(59), // 3 + DatumFactory.createInt8(23l), // 4 + DatumFactory.createFloat4(77.9f), // 5 + DatumFactory.createFloat8(271.9f), // 6 + DatumFactory.createText("hyunsik"), // 7 + DatumFactory.createBlob("hyunsik".getBytes()),// 8 + DatumFactory.createInet4("192.168.0.1"), // 9 + NullDatum.get(), // 10 + factory.createDatum(queryid.getProto()) // 11 + }); + + // Making tuples with different null column positions + Tuple tuple; + for (int i = 0; i < 12; i++) { + tuple = new VTuple(12); + for (int j = 0; j < 12; j++) { + if (i == j) { // i'th column will have NULL value + tuple.put(j, NullDatum.get()); + } else { + tuple.put(j, seedTuple.get(j)); + } + } + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + ZeroCopyTuple retrieved = new ZeroCopyTuple(); + + int i = 0; + while (scanner.nextFetch(rowBlock)) { + RowBlockReader reader = rowBlock.getReader(); + + while(reader.next(retrieved)) { + assertEquals(12, retrieved.size()); + for (int j = 0; j < 12; j++) { + if (i == j) { + assertEquals(NullDatum.get(), retrieved.get(j)); + } else { + assertEquals(seedTuple.get(j), retrieved.get(j)); + } + } + + i++; + } + } + scanner.close(); + + rowBlock.release(); + } + + @Test + public void testRCFileTextSerializeDeserialize() throws IOException { + if(storeType != StoreType.RCFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.CHAR, 7); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.FLOAT4); + schema.addColumn("col7", Type.FLOAT8); + schema.addColumn("col8", Type.TEXT); + schema.addColumn("col9", Type.BLOB); + schema.addColumn("col10", Type.INET4); + schema.addColumn("col11", Type.NULL_TYPE); + schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(12); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + ZeroCopyTuple retrieved = new ZeroCopyTuple(); + while (scanner.nextFetch(rowBlock)) { + RowBlockReader reader = rowBlock.getReader(); + while (reader.next(retrieved)) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + + rowBlock.release(); + } + + @Test + public void testRCFileBinarySerializeDeserialize() throws IOException { + if(storeType != StoreType.RCFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.CHAR, 7); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.FLOAT4); + schema.addColumn("col7", Type.FLOAT8); + schema.addColumn("col8", Type.TEXT); + schema.addColumn("col9", Type.BLOB); + schema.addColumn("col10", Type.INET4); + schema.addColumn("col11", Type.NULL_TYPE); + schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(12); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + ZeroCopyTuple retrieved = new ZeroCopyTuple(); + while (scanner.nextFetch(rowBlock)) { + RowBlockReader reader = rowBlock.getReader(); + while (reader.next(retrieved)) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + + rowBlock.release(); + } + + @Test + public void testSequenceFileTextSerializeDeserialize() throws IOException { + if(storeType != StoreType.SEQUENCEFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.CHAR, 7); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.FLOAT4); + schema.addColumn("col7", Type.FLOAT8); + schema.addColumn("col8", Type.TEXT); + schema.addColumn("col9", Type.BLOB); + schema.addColumn("col10", Type.INET4); + schema.addColumn("col11", Type.NULL_TYPE); + schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(12); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + assertTrue(scanner instanceof SequenceFileScanner); + Writable key = ((SequenceFileScanner) scanner).getKey(); + assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName()); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + ZeroCopyTuple retrieved = new ZeroCopyTuple(); + + while (scanner.nextFetch(rowBlock)) { + RowBlockReader reader = rowBlock.getReader(); + while (reader.next(retrieved)) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + + rowBlock.release(); + } + + @Test + public void testSequenceFileBinarySerializeDeserialize() throws IOException { + if(storeType != StoreType.SEQUENCEFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.BIT); + schema.addColumn("col3", Type.CHAR, 7); + schema.addColumn("col4", Type.INT2); + schema.addColumn("col5", Type.INT4); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.FLOAT4); + schema.addColumn("col8", Type.FLOAT8); + schema.addColumn("col9", Type.TEXT); + schema.addColumn("col10", Type.BLOB); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", Type.NULL_TYPE); + schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(13); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + assertTrue(scanner instanceof SequenceFileScanner); + Writable key = ((SequenceFileScanner) scanner).getKey(); + assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName()); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + ZeroCopyTuple retrieved = new ZeroCopyTuple(); + + while (scanner.nextFetch(rowBlock)) { + RowBlockReader reader = rowBlock.getReader(); + while (reader.next(retrieved)) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + } + + @Test + public void testTime() throws IOException { + if (storeType == StoreType.CSV || storeType == StoreType.RAW) { + Schema schema = new Schema(); + schema.addColumn("col1", Type.DATE); + schema.addColumn("col2", Type.TIME); + schema.addColumn("col3", Type.TIMESTAMP); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + + Path tablePath = new Path(testDir, "testTime.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.init(); + + Tuple tuple = new VTuple(3); + tuple.put(new Datum[]{ + DatumFactory.createDate("1980-04-01"), + DatumFactory.createTime("12:34:56"), + DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 1000)) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + ZeroCopyTuple retrieved = new ZeroCopyTuple(); + + while (scanner.nextFetch(rowBlock)) { + RowBlockReader reader = rowBlock.getReader(); + while (reader.next(retrieved)) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + } + scanner.close(); + + rowBlock.release(); + } + } + +}