From 7ef2784b3c3e2773cb625388000f80664ad0090e Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Sat, 27 Sep 2014 00:03:40 -0700 Subject: [PATCH 1/4] TAJO-1042: Implement block iteration interfaces for executors and scanners. --- .../tajo/engine/planner/physical/MemTableScanner.java | 7 +++++++ .../tajo/engine/planner/physical/PairWiseMerger.java | 7 +++++++ .../tajo/engine/planner/physical/PhysicalExec.java | 7 +++++++ .../org/apache/tajo/engine/utils/TupleCacheScanner.java | 7 +++++++ .../main/java/org/apache/tajo/storage/FileScanner.java | 7 +++++++ .../main/java/org/apache/tajo/storage/MergeScanner.java | 7 +++++++ .../src/main/java/org/apache/tajo/storage/Scanner.java | 9 +++++++++ .../java/org/apache/tajo/storage/v2/FileScannerV2.java | 7 +++++++ 8 files changed, 58 insertions(+) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java index 7bd6a703ea..a2dc87e83d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java @@ -23,9 +23,11 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.storage.Scanner; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import java.io.IOException; import java.util.Collection; @@ -73,6 +75,11 @@ public Tuple next() throws IOException { } } + @Override + public boolean nextFetch(OffHeapRowBlock rowBlock) { + throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented"); + } + @Override public void reset() throws IOException { init(); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java index 2ac8662d6c..f801b35fc2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java @@ -24,9 +24,11 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.storage.Scanner; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import java.io.IOException; import java.util.Comparator; @@ -158,6 +160,11 @@ public Tuple next() throws IOException { return outTuple; } + @Override + public boolean nextFetch(OffHeapRowBlock rowBlock) { + throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented"); + } + @Override public void reset() throws IOException { if (state == State.INITED) { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java index 31cfc4d359..859c053ff9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java @@ -25,7 +25,10 @@ import org.apache.tajo.catalog.SchemaObject; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.engine.codegen.CompilationError; +import org.apache.tajo.exception.UnimplementedException; +import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -60,6 +63,10 @@ protected void compile() throws CompilationError { public abstract Tuple next() throws IOException; + public boolean nextFetch(OffHeapRowBlock rowBlock) { + throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented"); + } + public abstract void rescan() throws IOException; public abstract void close() throws IOException; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java index 743d70c695..10b3a22575 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java @@ -21,8 +21,10 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.storage.Scanner; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import java.io.IOException; import java.util.Iterator; @@ -62,6 +64,11 @@ public Tuple next() throws IOException { } } + @Override + public boolean nextFetch(OffHeapRowBlock rowBlock) { + throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented"); + } + @Override public void reset() throws IOException { init(); diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java index f15c4c97a5..6aa59e6a45 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java @@ -29,7 +29,9 @@ import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import java.io.IOException; @@ -80,6 +82,11 @@ public Schema getSchema() { return schema; } + @Override + public boolean nextFetch(OffHeapRowBlock rowBlock) { + throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented"); + } + @Override public void setTarget(Column[] targets) { if (inited) { diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java index 8917f21ce3..890455ad90 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java @@ -26,7 +26,9 @@ import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import java.io.IOException; import java.util.Iterator; @@ -113,6 +115,11 @@ public Tuple next() throws IOException { return tuple; } + @Override + public boolean nextFetch(OffHeapRowBlock rowBlock) { + throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented"); + } + @Override public void reset() throws IOException { this.iterator = fragments.iterator(); diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java index 16c4faa4a5..f532e0e646 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java @@ -21,6 +21,8 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.SchemaObject; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.exception.UnimplementedException; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import java.io.Closeable; import java.io.IOException; @@ -41,6 +43,13 @@ public interface Scanner extends SchemaObject, Closeable { * @throws IOException if internal I/O error occurs during next method */ Tuple next() throws IOException; + + /** + * + * @param rowBlock + * @return + */ + boolean nextFetch(OffHeapRowBlock rowBlock); /** * Reset the cursor. After executed, the scanner diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java index da7084c721..ebbe2f43e1 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java @@ -28,9 +28,11 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.storage.Scanner; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @@ -222,6 +224,11 @@ public Tuple next() throws IOException { return nextTuple(); } + @Override + public boolean nextFetch(OffHeapRowBlock rowBlock) { + throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented"); + } + @Override public float getProgress() { return progress; From a71a98147ced8021bc5a521666efdfeff4ae84c7 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Sat, 27 Sep 2014 10:50:46 -0700 Subject: [PATCH 2/4] TAJO-1080: Cleanup BooleanDatum and Inet4Datum. --- .../java/org/apache/tajo/datum/BooleanDatum.java | 8 +++----- .../main/java/org/apache/tajo/datum/Inet4Datum.java | 7 ++----- .../main/java/org/apache/tajo/datum/Int2Datum.java | 13 ------------- .../main/java/org/apache/tajo/util/NetUtils.java | 9 +++++++++ .../apache/tajo/datum/TestArithmeticOperator.java | 10 +++++----- 5 files changed, 19 insertions(+), 28 deletions(-) diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java index 93933a85a9..a8eeca0c2f 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java @@ -167,12 +167,10 @@ public BooleanDatum equalsTo(Datum datum) { public int compareTo(Datum datum) { switch (datum.type()) { case BOOLEAN: - if (val && !datum.asBool()) { - return -1; - } else if (val && datum.asBool()) { - return 1; - } else { + if ((val ^ datum.asBool()) == false) { // if both are the same regardless of its value. return 0; + } else { + return val ? -1 : 1; } default: throw new InvalidOperationException(datum.type()); diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java index 1de81cd9f5..ed48a028e3 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java @@ -22,6 +22,7 @@ import com.google.gson.annotations.Expose; import org.apache.tajo.exception.InvalidOperationException; import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.NetUtils; import static org.apache.tajo.common.TajoDataTypes.Type; @@ -36,11 +37,7 @@ public class Inet4Datum extends Datum { public Inet4Datum(String addr) { super(Type.INET4); - String [] elems = addr.split("\\."); - address = Integer.parseInt(elems[3]) & 0xFF - | ((Integer.parseInt(elems[2]) << 8) & 0xFF00) - | ((Integer.parseInt(elems[1]) << 16) & 0xFF0000) - | ((Integer.parseInt(elems[0]) << 24) & 0xFF000000); + address = NetUtils.convertIPStringToInt(addr); } public Inet4Datum(byte[] addr) { diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java index 2a6c691cbf..7bd24da791 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java @@ -195,7 +195,6 @@ public int compareTo(Datum datum) { public Datum plus(Datum datum) { switch (datum.type()) { case INT2: - return DatumFactory.createInt2((short) (val + datum.asInt2())); case INT4: return DatumFactory.createInt4(val + datum.asInt4()); case INT8: @@ -220,7 +219,6 @@ public Datum plus(Datum datum) { public Datum minus(Datum datum) { switch (datum.type()) { case INT2: - return DatumFactory.createInt2((short) (val - datum.asInt2())); case INT4: return DatumFactory.createInt4(val - datum.asInt4()); case INT8: @@ -245,7 +243,6 @@ public Datum minus(Datum datum) { public Datum multiply(Datum datum) { switch (datum.type()) { case INT2: - return DatumFactory.createInt4(val * datum.asInt2()); case INT4: return DatumFactory.createInt4(val * datum.asInt4()); case INT8: @@ -268,11 +265,6 @@ public Datum multiply(Datum datum) { public Datum divide(Datum datum) { switch (datum.type()) { case INT2: - short paramValueI2 = datum.asInt2(); - if (!validateDivideZero(paramValueI2)) { - return NullDatum.get(); - } - return DatumFactory.createInt2((short) (val / paramValueI2)); case INT4: int paramValueI4 = datum.asInt4(); if (!validateDivideZero(paramValueI4)) { @@ -308,11 +300,6 @@ public Datum divide(Datum datum) { public Datum modular(Datum datum) { switch (datum.type()) { case INT2: - short paramValueI2 = datum.asInt2(); - if (!validateDivideZero(paramValueI2)) { - return NullDatum.get(); - } - return DatumFactory.createInt2((short) (val % paramValueI2)); case INT4: int paramValueI4 = datum.asInt4(); if (!validateDivideZero(paramValueI4)) { diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java index 829829f9cc..fc24a5beb0 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java @@ -101,4 +101,13 @@ public static String normalizeHost(String host) { } return host; } + + public static int convertIPStringToInt(String ipAddr) { + String [] elems = ipAddr.split("\\."); + int address = Integer.parseInt(elems[3]) & 0xFF + | ((Integer.parseInt(elems[2]) << 8) & 0xFF00) + | ((Integer.parseInt(elems[1]) << 16) & 0xFF0000) + | ((Integer.parseInt(elems[0]) << 24) & 0xFF000000); + return address; + } } \ No newline at end of file diff --git a/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java b/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java index 42623bd3d8..8915ea3dad 100644 --- a/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java +++ b/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java @@ -62,20 +62,20 @@ public void setUp() { @Test public void testInt2Datum() throws Exception { //plus - runAndAssert("plus", new Int2Datum((short)10), new Int2Datum((short)5), new Int2Datum((short)15)); + runAndAssert("plus", new Int2Datum((short)10), new Int2Datum((short)5), new Int4Datum((short)15)); runAndAssert("plus", new Int2Datum((short)10), new Int4Datum(5), new Int4Datum(15)); runAndAssert("plus", new Int2Datum((short)10), new Int8Datum(5), new Int8Datum(15)); runAndAssert("plus", new Int2Datum((short)10), new Float4Datum(5.0f), new Float4Datum(15.0f)); runAndAssert("plus", new Int2Datum((short)10), new Float8Datum(5.0), new Float8Datum(15.0)); //minus - runAndAssert("minus", new Int2Datum((short)10), new Int2Datum((short)5), new Int2Datum((short)5)); + runAndAssert("minus", new Int2Datum((short)10), new Int2Datum((short)5), new Int4Datum((short)5)); runAndAssert("minus", new Int2Datum((short)10), new Int4Datum(5), new Int4Datum(5)); runAndAssert("minus", new Int2Datum((short)10), new Int8Datum(5), new Int8Datum(5)); runAndAssert("minus", new Int2Datum((short)10), new Float4Datum(5.0f), new Float4Datum(5.0f)); runAndAssert("minus", new Int2Datum((short)10), new Float8Datum(5.0), new Float8Datum(5.0)); - runAndAssert("minus", new Int2Datum((short)5), new Int2Datum((short)10), new Int2Datum((short)-5)); + runAndAssert("minus", new Int2Datum((short)5), new Int2Datum((short)10), new Int4Datum((short)-5)); runAndAssert("minus", new Int2Datum((short)5), new Int4Datum(10), new Int4Datum(-5)); runAndAssert("minus", new Int2Datum((short)5), new Int8Datum(10), new Int8Datum(-5)); runAndAssert("minus", new Int2Datum((short)5), new Float4Datum(10.0f), new Float4Datum(-5.0f)); @@ -89,7 +89,7 @@ public void testInt2Datum() throws Exception { runAndAssert("multiply", new Int2Datum((short)10), new Float8Datum(5.0), new Float8Datum(50.0)); //divide - runAndAssert("divide", new Int2Datum((short)10), new Int2Datum((short)5), new Int2Datum((short)2)); + runAndAssert("divide", new Int2Datum((short)10), new Int2Datum((short)5), new Int4Datum((short)2)); runAndAssert("divide", new Int2Datum((short)10), new Int4Datum(5), new Int4Datum(2)); runAndAssert("divide", new Int2Datum((short)10), new Int8Datum(5), new Int8Datum(2)); runAndAssert("divide", new Int2Datum((short)10), new Float4Datum(5.0f), new Float4Datum(2.0f)); @@ -102,7 +102,7 @@ public void testInt2Datum() throws Exception { runAndAssert("divide", new Int2Datum((short)10), new Float8Datum(0.0), NullDatum.get()); //modular - runAndAssert("modular", new Int2Datum((short)10), new Int2Datum((short)3), new Int2Datum((short)1)); + runAndAssert("modular", new Int2Datum((short)10), new Int2Datum((short)3), new Int4Datum((short)1)); runAndAssert("modular", new Int2Datum((short)10), new Int4Datum(3), new Int4Datum(1)); runAndAssert("modular", new Int2Datum((short)10), new Int8Datum(3), new Int8Datum(1)); runAndAssert("modular", new Int2Datum((short)10), new Float4Datum(3.0f), new Float4Datum(1.0f)); From c128eca315fdb084815c4a9ac4fb9c00c87f5466 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Sat, 27 Sep 2014 13:48:26 -0700 Subject: [PATCH 3/4] TAJO-1043: Implement nextFetch(RowBlock) of CSVScanner. (hyunsik) --- .../java/org/apache/tajo/storage/CSVFile.java | 51 ++ .../org/apache/tajo/storage/FileScanner.java | 2 +- .../java/org/apache/tajo/storage/Scanner.java | 2 +- .../storage/TextSerializerDeserializer.java | 94 +- .../tajo/tuple/offheap/UnSafeTuple.java | 4 + .../apache/tajo/storage/TestNextFetches.java | 854 ++++++++++++++++++ 6 files changed, 1004 insertions(+), 3 deletions(-) create mode 100644 tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java index 211379467e..06ff081271 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java @@ -35,11 +35,15 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.compress.CodecPool; import org.apache.tajo.storage.exception.AlreadyExistsStorageException; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; +import org.apache.tajo.tuple.offheap.OffHeapRowBlockWriter; +import org.apache.tajo.tuple.offheap.RowWriter; import org.apache.tajo.util.BytesUtils; import java.io.*; @@ -480,6 +484,53 @@ public Tuple next() throws IOException { } } + TextSerializerDeserializer deserializer = new TextSerializerDeserializer(); + + boolean hasNext() throws IOException { + if (currentIdx == validIdx) { + if (eof) { + return false; + } else { + page(); + + if(currentIdx == validIdx){ + return false; + } + } + } + + return true; + } + + @Override + public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException { + rowBlock.clear(); + OffHeapRowBlockWriter writer = (OffHeapRowBlockWriter) rowBlock.getWriter(); + + while(hasNext() && rowBlock.rows() < rowBlock.maxRowNum()) { + byte[][] cells = BytesUtils.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx), + rowLengthList.get(currentIdx), delimiter, targetColumnIndexes); + currentIdx++; + + int fieldIdx = 0; + writer.startRow(); + for (; fieldIdx < cells.length && fieldIdx < schema.size(); fieldIdx++) { + if (cells[fieldIdx] == null) { + writer.skipField(); + } else { + deserializer.write(writer, schema.getColumn(fieldIdx), cells[fieldIdx], 0, cells[fieldIdx].length, nullChars); + + } + } + for (; fieldIdx < schema.size(); fieldIdx++) { + writer.skipField(); + } + writer.endRow(); + } + + return rowBlock.rows() > 0; + } + private boolean isCompress() { return codec != null; } diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java index 6aa59e6a45..d4357e33e2 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java @@ -83,7 +83,7 @@ public Schema getSchema() { } @Override - public boolean nextFetch(OffHeapRowBlock rowBlock) { + public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException { throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented"); } diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java index f532e0e646..3478e23cdb 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java @@ -49,7 +49,7 @@ public interface Scanner extends SchemaObject, Closeable { * @param rowBlock * @return */ - boolean nextFetch(OffHeapRowBlock rowBlock); + boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException; /** * Reset the cursor. After executed, the scanner diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java index b42c1b5142..6dfe6c1386 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java @@ -25,8 +25,11 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.*; import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; +import org.apache.tajo.tuple.offheap.RowWriter; import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.NumberUtil; +import org.apache.tajo.util.datetime.DateTimeUtil; import java.io.IOException; import java.io.OutputStream; @@ -35,7 +38,7 @@ public class TextSerializerDeserializer implements SerializerDeserializer { public static final byte[] trueBytes = "true".getBytes(); public static final byte[] falseBytes = "false".getBytes(); - private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); + private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); @Override @@ -213,6 +216,95 @@ public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[ return datum; } + public static void write(RowWriter writer, Column col, byte [] bytes, int offset, int length, byte [] nullChar) throws IOException { + TajoDataTypes.Type type = col.getDataType().getType(); + boolean nullField; + if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) { + nullField = isNullText(bytes, offset, length, nullChar); + } else { + nullField = isNull(bytes, offset, length, nullChar); + } + + if (nullField) { + writer.skipField(); + return; + } else { + switch (col.getDataType().getType()) { + case BOOLEAN: + writer.putBool(bytes[offset] == 't' || bytes[offset] == 'T'); + break; + + case CHAR: + case TEXT: + writer.putText(bytes); + break; + + case INT1: + case INT2: + writer.putInt2((short) NumberUtil.parseInt(bytes, offset, length)); + break; + + case INT4: + writer.putInt4(NumberUtil.parseInt(bytes, offset, length)); + break; + + case INT8: + writer.putInt8(Long.parseLong(new String(bytes, offset, length))); + break; + + case FLOAT4: + writer.putFloat4(Float.parseFloat(new String(bytes, offset, length))); + break; + + case FLOAT8: + writer.putFloat8(Double.parseDouble(new String(bytes, offset, length))); + break; + + case DATE: + writer.putDate(DateTimeUtil.toJulianDate(new String(bytes, offset, length))); + break; + + case TIME: + writer.putInt8(DateTimeUtil.toJulianTime(new String(bytes, offset, length))); + break; + + case TIMESTAMP: + writer.putInt8(DateTimeUtil.toJulianTimestamp(new String(bytes, offset, length))); + break; + + case INTERVAL: + writer.putInterval(DatumFactory.createInterval(new String(bytes, offset, length))); + break; + + case PROTOBUF: + ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType()); + Message.Builder builder = factory.newBuilder(); + try { + byte[] protoBytes = new byte[length]; + System.arraycopy(bytes, offset, protoBytes, 0, length); + protobufJsonFormat.merge(protoBytes, builder); + writer.putProtoDatum(factory.createDatum(builder.build())); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + + break; + + case INET4: + writer.putInet4(NetUtils.convertIPStringToInt(new String(bytes, offset, length))); + break; + + case BLOB: + writer.putBlob(Base64.decodeBase64(bytes)); + break; + + default: + writer.skipField(); + } + } + } + private static boolean isNull(byte[] val, int offset, int length, byte[] nullBytes) { return length == 0 || ((length == nullBytes.length) && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length)); diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java index d8bafea539..6f4d385043 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java @@ -145,6 +145,8 @@ public Datum get(int fieldId) { switch (types[fieldId].getType()) { case BOOLEAN: return DatumFactory.createBool(getBool(fieldId)); + case CHAR: + return DatumFactory.createChar(getBytes(fieldId)); case INT1: case INT2: return DatumFactory.createInt2(getInt2(fieldId)); @@ -158,6 +160,8 @@ public Datum get(int fieldId) { return DatumFactory.createFloat8(getFloat8(fieldId)); case TEXT: return DatumFactory.createText(getText(fieldId)); + case BLOB: + return DatumFactory.createBlob(getBytes(fieldId)); case TIMESTAMP: return DatumFactory.createTimestamp(getInt8(fieldId)); case DATE: diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java new file mode 100644 index 0000000000..d1b3afd189 --- /dev/null +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java @@ -0,0 +1,854 @@ +/* + * Lisensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.tajo.QueryId; +import org.apache.tajo.TajoIdProtos; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.ProtobufDatumFactory; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.rcfile.RCFile; +import org.apache.tajo.storage.sequencefile.SequenceFileScanner; +import org.apache.tajo.tuple.RowBlockReader; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; +import org.apache.tajo.tuple.offheap.UnSafeTuple; +import org.apache.tajo.tuple.offheap.ZeroCopyTuple; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.UnsafeUtil; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import sun.misc.Unsafe; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class TestNextFetches { + private TajoConf conf; + private static String TEST_PATH = "target/test-data/TestStorages"; + + private static String TEST_PROJECTION_AVRO_SCHEMA = + "{\n" + + " \"type\": \"record\",\n" + + " \"namespace\": \"org.apache.tajo\",\n" + + " \"name\": \"testProjection\",\n" + + " \"fields\": [\n" + + " { \"name\": \"id\", \"type\": \"int\" },\n" + + " { \"name\": \"age\", \"type\": \"long\" },\n" + + " { \"name\": \"score\", \"type\": \"float\" }\n" + + " ]\n" + + "}\n"; + + private static String TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA = + "{\n" + + " \"type\": \"record\",\n" + + " \"namespace\": \"org.apache.tajo\",\n" + + " \"name\": \"testNullHandlingTypes\",\n" + + " \"fields\": [\n" + + " { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" + + " { \"name\": \"col2\", \"type\": [\"null\", \"int\"] },\n" + + " { \"name\": \"col3\", \"type\": [\"null\", \"string\"] },\n" + + " { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" + + " { \"name\": \"col5\", \"type\": [\"null\", \"int\"] },\n" + + " { \"name\": \"col6\", \"type\": [\"null\", \"long\"] },\n" + + " { \"name\": \"col7\", \"type\": [\"null\", \"float\"] },\n" + + " { \"name\": \"col8\", \"type\": [\"null\", \"double\"] },\n" + + " { \"name\": \"col9\", \"type\": [\"null\", \"string\"] },\n" + + " { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" + + " { \"name\": \"col11\", \"type\": [\"null\", \"bytes\"] },\n" + + " { \"name\": \"col12\", \"type\": \"null\" },\n" + + " { \"name\": \"col13\", \"type\": [\"null\", \"bytes\"] }\n" + + " ]\n" + + "}\n"; + + private StoreType storeType; + private boolean splitable; + private boolean statsable; + private Path testDir; + private FileSystem fs; + + public TestNextFetches(StoreType type, boolean splitable, boolean statsable) throws IOException { + this.storeType = type; + this.splitable = splitable; + this.statsable = statsable; + + conf = new TajoConf(); + + if (storeType == StoreType.RCFILE) { + conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100); + } + + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + fs = testDir.getFileSystem(conf); + } + + @Parameterized.Parameters + public static Collection generateParameters() { + return Arrays.asList(new Object[][] { + {StoreType.CSV, true, true}, + // TODO - to be implemented +// {StoreType.RAW, false, false}, +// {StoreType.RCFILE, true, true}, +// {StoreType.PARQUET, false, false}, +// {StoreType.SEQUENCEFILE, true, true}, +// {StoreType.AVRO, false, false}, + }); + } + + @Test + public void testSplitable() throws IOException { + if (splitable) { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + Path tablePath = new Path(testDir, "Splitable.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(2); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + appender.addTuple(vTuple); + } + appender.close(); + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + long randomNum = (long) (Math.random() * fileLen) + 1; + + FileFragment[] tablets = new FileFragment[2]; + tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum); + tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum)); + + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + int tupleCnt = 0; + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + while (scanner.nextFetch(rowBlock)) { + tupleCnt += rowBlock.rows(); + } + scanner.close(); + + scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + while (scanner.nextFetch(rowBlock)) { + tupleCnt += rowBlock.rows(); + } + scanner.close(); + + assertEquals(tupleNum, tupleCnt); + + rowBlock.release(); + } + } + + @Test + public void testSplitableForRCFileBug() throws IOException { + if (storeType == StoreType.RCFILE) { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + Path tablePath = new Path(testDir, "Splitable.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(2); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + appender.addTuple(vTuple); + } + appender.close(); + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + long randomNum = 122; // header size + + FileFragment[] tablets = new FileFragment[2]; + tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum); + tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum)); + + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + int tupleCnt = 0; + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + while (scanner.nextFetch(rowBlock)) { + tupleCnt += rowBlock.rows(); + } + scanner.close(); + + scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + while (scanner.nextFetch(rowBlock)) { + tupleCnt += rowBlock.rows(); + } + scanner.close(); + + assertEquals(tupleNum, tupleCnt); + + rowBlock.release(); + } + } + + @Test + public void testProjection() throws IOException { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("score", Type.FLOAT4); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + meta.setOptions(StorageUtil.newPhysicalProperties(storeType)); + if (storeType == StoreType.AVRO) { + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, + TEST_PROJECTION_AVRO_SCHEMA); + } + + Path tablePath = new Path(testDir, "testProjection.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(3); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(i + 2)); + vTuple.put(2, DatumFactory.createFloat4(i + 3)); + appender.addTuple(vTuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("testReadAndWrite", tablePath, 0, status.getLen()); + + Schema target = new Schema(); + target.addColumn("age", Type.INT8); + target.addColumn("score", Type.FLOAT4); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment, target); + scanner.init(); + int tupleCnt = 0; + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + ZeroCopyTuple tuple = new ZeroCopyTuple(); + while (scanner.nextFetch(rowBlock)) { + RowBlockReader reader = rowBlock.getReader(); + while (reader.next(tuple)) { + if (storeType == StoreType.RCFILE + || storeType == StoreType.TREVNI + || storeType == StoreType.CSV + || storeType == StoreType.PARQUET + || storeType == StoreType.SEQUENCEFILE + || storeType == StoreType.AVRO) { + assertTrue(tuple.isNull(0)); + } + assertTrue(tupleCnt + 2 == tuple.getInt8(1)); + assertTrue(tupleCnt + 3 == tuple.getFloat4(2)); + tupleCnt++; + } + } + scanner.close(); + + assertEquals(tupleNum, tupleCnt); + + rowBlock.release(); + } + + @Test + public void testVariousTypes() throws IOException { + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.CHAR, 7); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.FLOAT4); + schema.addColumn("col7", Type.FLOAT8); + schema.addColumn("col8", Type.TEXT); + schema.addColumn("col9", Type.BLOB); + schema.addColumn("col10", Type.INET4); + schema.addColumn("col11", Type.NULL_TYPE); + schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.setOptions(StorageUtil.newPhysicalProperties(storeType)); + if (storeType == StoreType.AVRO) { + String path = FileUtil.getResourcePath("testVariousTypes.avsc").toString(); + meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path); + } + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(12); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createChar("hyunsik"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("hyunsik"), + DatumFactory.createBlob("hyunsik".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + while (scanner.nextFetch(rowBlock)) { + RowBlockReader reader = rowBlock.getReader(); + while (reader.next(zcTuple)) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), zcTuple.get(i)); + } + } + } + scanner.close(); + + rowBlock.release(); + } + + @Test + public void testNullHandlingTypes() throws IOException { + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.CHAR, 7); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.FLOAT4); + schema.addColumn("col7", Type.FLOAT8); + schema.addColumn("col8", Type.TEXT); + schema.addColumn("col9", Type.BLOB); + schema.addColumn("col10", Type.INET4); + schema.addColumn("col11", Type.NULL_TYPE); + schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.setOptions(StorageUtil.newPhysicalProperties(storeType)); + meta.putOption(StorageConstants.CSVFILE_NULL, "\\\\N"); + meta.putOption(StorageConstants.RCFILE_NULL, "\\\\N"); + meta.putOption(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName()); + meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\"); + if (storeType == StoreType.AVRO) { + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, + TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA); + } + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple seedTuple = new VTuple(12); + seedTuple.put(new Datum[]{ + DatumFactory.createBool(true), // 0 + DatumFactory.createChar("hyunsik"), // 1 + DatumFactory.createInt2((short) 17), // 2 + DatumFactory.createInt4(59), // 3 + DatumFactory.createInt8(23l), // 4 + DatumFactory.createFloat4(77.9f), // 5 + DatumFactory.createFloat8(271.9f), // 6 + DatumFactory.createText("hyunsik"), // 7 + DatumFactory.createBlob("hyunsik".getBytes()),// 8 + DatumFactory.createInet4("192.168.0.1"), // 9 + NullDatum.get(), // 10 + factory.createDatum(queryid.getProto()) // 11 + }); + + // Making tuples with different null column positions + Tuple tuple; + for (int i = 0; i < 12; i++) { + tuple = new VTuple(12); + for (int j = 0; j < 12; j++) { + if (i == j) { // i'th column will have NULL value + tuple.put(j, NullDatum.get()); + } else { + tuple.put(j, seedTuple.get(j)); + } + } + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + ZeroCopyTuple retrieved = new ZeroCopyTuple(); + + int i = 0; + while (scanner.nextFetch(rowBlock)) { + RowBlockReader reader = rowBlock.getReader(); + + while(reader.next(retrieved)) { + assertEquals(12, retrieved.size()); + for (int j = 0; j < 12; j++) { + if (i == j) { + assertEquals(NullDatum.get(), retrieved.get(j)); + } else { + assertEquals(seedTuple.get(j), retrieved.get(j)); + } + } + + i++; + } + } + scanner.close(); + + rowBlock.release(); + } + + @Test + public void testRCFileTextSerializeDeserialize() throws IOException { + if(storeType != StoreType.RCFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.CHAR, 7); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.FLOAT4); + schema.addColumn("col7", Type.FLOAT8); + schema.addColumn("col8", Type.TEXT); + schema.addColumn("col9", Type.BLOB); + schema.addColumn("col10", Type.INET4); + schema.addColumn("col11", Type.NULL_TYPE); + schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(12); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + ZeroCopyTuple retrieved = new ZeroCopyTuple(); + while (scanner.nextFetch(rowBlock)) { + RowBlockReader reader = rowBlock.getReader(); + while (reader.next(retrieved)) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + + rowBlock.release(); + } + + @Test + public void testRCFileBinarySerializeDeserialize() throws IOException { + if(storeType != StoreType.RCFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.CHAR, 7); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.FLOAT4); + schema.addColumn("col7", Type.FLOAT8); + schema.addColumn("col8", Type.TEXT); + schema.addColumn("col9", Type.BLOB); + schema.addColumn("col10", Type.INET4); + schema.addColumn("col11", Type.NULL_TYPE); + schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(12); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + ZeroCopyTuple retrieved = new ZeroCopyTuple(); + while (scanner.nextFetch(rowBlock)) { + RowBlockReader reader = rowBlock.getReader(); + while (reader.next(retrieved)) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + + rowBlock.release(); + } + + @Test + public void testSequenceFileTextSerializeDeserialize() throws IOException { + if(storeType != StoreType.SEQUENCEFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.CHAR, 7); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.FLOAT4); + schema.addColumn("col7", Type.FLOAT8); + schema.addColumn("col8", Type.TEXT); + schema.addColumn("col9", Type.BLOB); + schema.addColumn("col10", Type.INET4); + schema.addColumn("col11", Type.NULL_TYPE); + schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(12); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + assertTrue(scanner instanceof SequenceFileScanner); + Writable key = ((SequenceFileScanner) scanner).getKey(); + assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName()); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + ZeroCopyTuple retrieved = new ZeroCopyTuple(); + + while (scanner.nextFetch(rowBlock)) { + RowBlockReader reader = rowBlock.getReader(); + while (reader.next(retrieved)) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + + rowBlock.release(); + } + + @Test + public void testSequenceFileBinarySerializeDeserialize() throws IOException { + if(storeType != StoreType.SEQUENCEFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.BIT); + schema.addColumn("col3", Type.CHAR, 7); + schema.addColumn("col4", Type.INT2); + schema.addColumn("col5", Type.INT4); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.FLOAT4); + schema.addColumn("col8", Type.FLOAT8); + schema.addColumn("col9", Type.TEXT); + schema.addColumn("col10", Type.BLOB); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", Type.NULL_TYPE); + schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(13); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + assertTrue(scanner instanceof SequenceFileScanner); + Writable key = ((SequenceFileScanner) scanner).getKey(); + assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName()); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + ZeroCopyTuple retrieved = new ZeroCopyTuple(); + + while (scanner.nextFetch(rowBlock)) { + RowBlockReader reader = rowBlock.getReader(); + while (reader.next(retrieved)) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + } + + @Test + public void testTime() throws IOException { + if (storeType == StoreType.CSV || storeType == StoreType.RAW) { + Schema schema = new Schema(); + schema.addColumn("col1", Type.DATE); + schema.addColumn("col2", Type.TIME); + schema.addColumn("col3", Type.TIMESTAMP); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + + Path tablePath = new Path(testDir, "testTime.data"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); + appender.init(); + + Tuple tuple = new VTuple(3); + tuple.put(new Datum[]{ + DatumFactory.createDate("1980-04-01"), + DatumFactory.createTime("12:34:56"), + DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 1000)) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setRows(1024); + + ZeroCopyTuple retrieved = new ZeroCopyTuple(); + + while (scanner.nextFetch(rowBlock)) { + RowBlockReader reader = rowBlock.getReader(); + while (reader.next(retrieved)) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + } + scanner.close(); + + rowBlock.release(); + } + } + +} From 292f6743845139dfc4ae5d7d672386b25202beed Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Sun, 28 Sep 2014 10:42:20 -0700 Subject: [PATCH 4/4] TAJO-1083: StoreTableExec should be block iterative. --- .../java/org/apache/tajo/SessionVars.java | 2 + .../java/org/apache/tajo/conf/TajoConf.java | 1 + .../org/apache/tajo/engine/eval/EvalNode.java | 7 + .../apache/tajo/engine/planner/Projector.java | 23 +- .../engine/planner/physical/PhysicalExec.java | 2 +- .../engine/planner/physical/SeqScanExec.java | 24 ++ .../planner/physical/StoreTableExec.java | 28 +++ .../tajo/engine/utils/TupleBuilderUtil.java | 102 +++++++++ .../org/apache/tajo/master/GlobalEngine.java | 20 ++ .../java/org/apache/tajo/worker/Task.java | 14 +- .../physical/block/TestBlockIteratorExec.java | 211 ++++++++++++++++++ .../TestTajoCli/testHelpSessionVars.result | 1 + .../tajo/tuple/offheap/OffHeapRowBlock.java | 6 +- .../tuple/offheap/OffHeapRowBlockWriter.java | 2 +- .../tuple/offheap/ResizableLimitSpec.java | 2 +- .../apache/tajo/storage/TestNextFetches.java | 23 +- 16 files changed, 449 insertions(+), 19 deletions(-) create mode 100644 tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleBuilderUtil.java create mode 100644 tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/block/TestBlockIteratorExec.java diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java index cc875b2c1a..b63d4f4536 100644 --- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java @@ -99,6 +99,8 @@ public enum SessionVars implements ConfigKey { "shuffle output size for partition table write (mb)", DEFAULT), // for physical Executors + EXEC_ENGINE(ConfVars.$EXECUTOR_ENGINE, + "executor engine types that queries will use. Types: volcano and block (default is volcano)", DEFAULT), EXTSORT_BUFFER_SIZE(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE, "sort buffer size for external sort (mb)", DEFAULT), HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash join (mb)", DEFAULT), INNER_HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD, diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index b5a9b506cb..b1229ebc93 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -317,6 +317,7 @@ public static enum ConfVars implements ConfigKey { $DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb", 256), // for physical Executors + $EXECUTOR_ENGINE("tajo.executor.engine", "volcano"), // volcano, and block $EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L), $EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-bytes", (long)256 * 1048576), diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNode.java index 754f8885ce..b48700180f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNode.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNode.java @@ -23,8 +23,10 @@ import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.datum.Datum; import org.apache.tajo.engine.json.CoreGsonHelper; +import org.apache.tajo.engine.utils.TupleBuilderUtil; import org.apache.tajo.json.GsonObject; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.offheap.RowWriter; /** * An annotated expression which includes actual data domains. @@ -59,6 +61,11 @@ public String toJson() { public abstract T eval(Schema schema, Tuple tuple); + public void eval(Schema schema, Tuple tuple, RowWriter builder) { + Datum result = eval(schema, tuple); + TupleBuilderUtil.writeEvalResult(builder, result.type(), result); + } + @Deprecated public abstract void preOrder(EvalNodeVisitor visitor); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java index d8499d0696..0d8bd5fa3f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java @@ -21,7 +21,10 @@ import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.Schema; import org.apache.tajo.engine.eval.EvalNode; +import org.apache.tajo.engine.utils.TupleBuilderUtil; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.TupleBuilder; +import org.apache.tajo.tuple.offheap.RowWriter; import org.apache.tajo.worker.TaskAttemptContext; public class Projector { @@ -33,7 +36,14 @@ public class Projector { private final int targetNum; private final EvalNode[] evals; + private final boolean useJITInSession; + private final boolean useJITInOperator; + public Projector(TaskAttemptContext context, Schema inSchema, Schema outSchema, Target [] targets) { + this(context, inSchema, outSchema, targets, true); + } + + public Projector(TaskAttemptContext context, Schema inSchema, Schema outSchema, Target [] targets, boolean useJIT) { this.context = context; this.inSchema = inSchema; if (targets == null) { @@ -45,7 +55,10 @@ public Projector(TaskAttemptContext context, Schema inSchema, Schema outSchema, this.targetNum = this.targets.length; evals = new EvalNode[targetNum]; - if (context.getQueryContext().getBool(SessionVars.CODEGEN)) { + useJITInOperator = useJIT; + useJITInSession = context.getQueryContext().getBool(SessionVars.CODEGEN); + + if (useJITInOperator && useJITInSession) { EvalNode eval; for (int i = 0; i < targetNum; i++) { eval = this.targets[i].getEvalTree(); @@ -63,4 +76,12 @@ public void eval(Tuple in, Tuple out) { out.put(i, evals[i].eval(inSchema, in)); } } + + public void eval(Tuple in, RowWriter builder) { + if (useJITInOperator && useJITInSession) { + TupleBuilderUtil.evaluateNative(inSchema, in, builder, evals); + } else { + TupleBuilderUtil.evaluate(inSchema, in, builder, evals); + } + } } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java index 859c053ff9..99cf61015c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java @@ -63,7 +63,7 @@ protected void compile() throws CompilationError { public abstract Tuple next() throws IOException; - public boolean nextFetch(OffHeapRowBlock rowBlock) { + public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException { throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented"); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 122d4f3908..fd11c7b0bc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -41,6 +41,11 @@ import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.tuple.RowBlockReader; +import org.apache.tajo.tuple.TupleBuilder; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; +import org.apache.tajo.tuple.offheap.ZeroCopyTuple; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -67,6 +72,8 @@ public class SeqScanExec extends PhysicalExec { private boolean cacheRead = false; + private OffHeapRowBlock inRowBlock; + public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm, ScanNode plan, CatalogProtos.FragmentProto [] fragments) throws IOException { super(context, plan.getInSchema(), plan.getOutSchema()); @@ -94,6 +101,8 @@ public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm, ScanNo && plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) { rewriteColumnPartitionedTableSchema(); } + + inRowBlock = new OffHeapRowBlock(inSchema, 64 * StorageUnit.KB); } /** @@ -289,6 +298,21 @@ public Tuple next() throws IOException { } } + public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException { + boolean noMoreTuple = scanner.nextFetch(inRowBlock); + if (!noMoreTuple) { + return false; + } + + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + RowBlockReader reader = inRowBlock.getReader(); + while (reader.next(zcTuple)) { + projector.eval(zcTuple, rowBlock.getWriter()); + } + + return true; + } + @Override public void rescan() throws IOException { scanner.reset(); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java index 3199b56c07..f88af150ac 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java @@ -28,9 +28,13 @@ import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.engine.planner.logical.InsertNode; import org.apache.tajo.engine.planner.logical.PersistentStoreNode; +import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.storage.Appender; import org.apache.tajo.storage.StorageManagerFactory; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.RowBlockReader; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; +import org.apache.tajo.tuple.offheap.ZeroCopyTuple; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.worker.TaskAttemptContext; @@ -121,6 +125,30 @@ public Tuple next() throws IOException { return null; } + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + RowBlockReader reader; + + public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException { + if (child.nextFetch(rowBlock)) { + reader = rowBlock.getReader(); + while (reader.next(zcTuple)) { + appender.addTuple(zcTuple);; + + if (maxPerFileSize > 0 && maxPerFileSize <= appender.getEstimatedOutputSize()) { + appender.close(); + + writtenFileNum++; + StatisticsUtil.aggregateTableStat(sumStats, appender.getStats()); + openNewFile(writtenFileNum); + } + } + + return true; + } else { + return false; + } + } + @Override public void rescan() throws IOException { // nothing to do diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleBuilderUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleBuilderUtil.java new file mode 100644 index 0000000000..dc0f0582af --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleBuilderUtil.java @@ -0,0 +1,102 @@ +/* + * Lisensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.utils; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.engine.eval.EvalNode; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.TupleBuilder; +import org.apache.tajo.tuple.offheap.RowWriter; + +public class TupleBuilderUtil { + + public static void evaluate(Schema inSchema, Tuple input, RowWriter builder, EvalNode[] evals) { + builder.startRow(); + for (int i = 0; i < evals.length; i++) { + Datum result = evals[i].eval(inSchema, input); + writeEvalResult(builder, result.type(), result); + } + builder.endRow(); + } + + public static void evaluateNative(Schema inSchema, Tuple input, RowWriter builder, EvalNode[] evals) { + builder.startRow(); + for (int i = 0; i < evals.length; i++) { + evals[i].eval(inSchema, input, builder); + } + builder.endRow(); + } + + public static void writeEvalResult(RowWriter builder, TajoDataTypes.Type type, Datum datum) { + switch (type) { + case NULL_TYPE: + builder.skipField(); + break; + case BOOLEAN: + builder.putBool(datum.asBool()); + break; + case INT1: + case INT2: + builder.putInt2(datum.asInt2()); + break; + case INT4: + builder.putInt4(datum.asInt4()); + break; + case INT8: + builder.putInt8(datum.asInt8()); + break; + case FLOAT4: + builder.putFloat4(datum.asFloat4()); + break; + case FLOAT8: + builder.putFloat8(datum.asFloat8()); + break; + case TIMESTAMP: + builder.putTimestamp(datum.asInt8()); + break; + case TIME: + builder.putTime(datum.asInt8()); + break; + case DATE: + builder.putDate(datum.asInt4()); + break; + case INTERVAL: + builder.putInterval((org.apache.tajo.datum.IntervalDatum) datum); + break; + case CHAR: + case TEXT: + builder.putText(datum.asTextBytes()); + break; + case BLOB: + builder.putBlob(datum.asByteArray()); + break; + case INET4: + builder.putInet4(datum.asInt4()); + break; + case PROTOBUF: + builder.putProtoDatum((org.apache.tajo.datum.ProtobufDatum) datum); + break; + default: + throw new UnsupportedException("Unknown Type: " + type.name()); + } + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 504a7929d7..23c494901b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -115,6 +115,26 @@ public void stop() { super.stop(); } + public SQLAnalyzer getSQLAnalyzer() { + return analyzer; + } + + public PreLogicalPlanVerifier getPreLogicalPlanVerifier() { + return preVerifier; + } + + public LogicalPlanner getLogicalPlanner() { + return planner; + } + + public LogicalOptimizer getLogicalOptimizer() { + return optimizer; + } + + public LogicalPlanVerifier getLogicalPlanVerifier() { + return annotatedPlanVerifier; + } + private QueryContext createQueryContext(Session session) { QueryContext newQueryContext = new QueryContext(context.getConf(), session); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 5127e90696..67210eaed3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.SessionVars; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoProtos; import org.apache.tajo.TajoProtos.TaskAttemptState; @@ -57,6 +58,8 @@ import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.NetUtils; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.handler.codec.http.QueryStringDecoder; @@ -446,7 +449,16 @@ public void run() throws Exception { createPlan(context, plan); this.executor.init(); - while(!killed && !aborted && executor.next() != null) { + String engineType = context.getQueryContext().get(SessionVars.EXEC_ENGINE); + LOG.info(engineType.toUpperCase() + " Executor Engine is chosen."); + if (engineType.equalsIgnoreCase("volcano")) { + while (!killed && !aborted && executor.next() != null) { + } + } else if (engineType.equalsIgnoreCase("block")) { + OffHeapRowBlock rowBlock = new OffHeapRowBlock(executor.getSchema(), 64 * StorageUnit.KB); + while (!killed && !aborted && executor.nextFetch(rowBlock)) { + } + rowBlock.release(); } } catch (Throwable e) { error = e ; diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/block/TestBlockIteratorExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/block/TestBlockIteratorExec.java new file mode 100644 index 0000000000..f6a1e0546a --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/block/TestBlockIteratorExec.java @@ -0,0 +1,211 @@ +/* + * Lisensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical.block; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.*; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.*; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.planner.logical.*; +import org.apache.tajo.engine.planner.physical.*; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.master.GlobalEngine; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.tuple.RowBlockReader; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; +import org.apache.tajo.tuple.offheap.ZeroCopyTuple; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.List; + +import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; + +public class TestBlockIteratorExec extends QueryTestCaseBase { + + private static SQLAnalyzer analyzer; + private static LogicalPlanner planner; + private static LogicalOptimizer optimizer; + private static PhysicalPlanner physicalPlanner; + private static AbstractStorageManager sm; + + @BeforeClass + public static void setUp() throws IOException { + GlobalEngine engine = testingCluster.getMaster().getContext().getGlobalEngine(); + analyzer = engine.getSQLAnalyzer(); + planner = engine.getLogicalPlanner(); + optimizer = engine.getLogicalOptimizer(); + + Path path = CommonTestingUtil.getTestDir("target/test-data/TestBlockExecutor"); + sm = StorageManagerFactory.getStorageManager(conf, path); + + physicalPlanner = new PhysicalPlannerImpl(conf, sm); + } + + private static int i = 0; + static Path outputPath; + + /** + * Build a physical execution plan, which is a tree consisting of a number of physical executors. + * + * @param sql a SQL statement + * @return Physical Execution Plan + * @throws PlanningException + * @throws IOException + */ + public static PhysicalExec buildPhysicalPlan(String sql) throws PlanningException, IOException { + Expr expr = analyzer.parse(sql); + + QueryContext context = LocalTajoTestingUtility.createDummyContext(conf); + LogicalPlan plan = planner.createPlan(context, expr); + optimizer.optimize(context, plan); + + LogicalNode [] founds = PlannerUtil.findAllNodes(plan.getRootBlock().getRoot(), NodeType.SCAN); + + List mergedFragments = Lists.newArrayList(); + + for (LogicalNode node : founds) { + ScanNode scan = (ScanNode) node; + TableDesc table = scan.getTableDesc(); + FileFragment[] frags = StorageManager.splitNG(conf, scan.getCanonicalName(), table.getMeta(), table.getPath(), + Integer.MAX_VALUE); + + for (FileFragment f : frags) { + mergedFragments.add(f); + } + } + + Path workDir = CommonTestingUtil.getTestDir("target/test-data/testdir_" + (i++)); + + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newQueryUnitAttemptId(), mergedFragments.toArray(new FileFragment[mergedFragments.size()]), workDir); + + outputPath = new Path(workDir, "output"); + ctx.setOutputPath(outputPath); + ctx.setEnforcer(new Enforcer()); + + return physicalPlanner.createPlan(ctx, plan.getRootBlock().getRoot()); + } + + @Test + public void testSeqScan() throws IOException, PlanningException { + PhysicalExec exec = buildPhysicalPlan("select * from lineitem"); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(exec.getSchema(), 64 * StorageUnit.KB); + rowBlock.setMaxRow(1024); + + exec.init(); + + int countForTuple = 0; + int countForRowBlock = 0; + while(exec.nextFetch(rowBlock)) { + ZeroCopyTuple tuple = new ZeroCopyTuple(); + RowBlockReader reader = rowBlock.getReader(); + while (reader.next(tuple)) { + countForTuple++; + } + countForRowBlock += rowBlock.rows(); + } + exec.close(); + rowBlock.release(); + + assertEquals(5, countForTuple); + assertEquals(5, countForRowBlock); + } + + @Test + public void testScanWithProjector() throws IOException, PlanningException { + PhysicalExec exec = buildPhysicalPlan("select l_orderkey, l_partkey from lineitem"); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(exec.getSchema(), 64 * StorageUnit.KB); + rowBlock.setMaxRow(1024); + + exec.init(); + + int countForTuple = 0; + int countForRowBlock = 0; + while(exec.nextFetch(rowBlock)) { + ZeroCopyTuple tuple = new ZeroCopyTuple(); + RowBlockReader reader = rowBlock.getReader(); + while (reader.next(tuple)) { + countForTuple++; + } + countForRowBlock += rowBlock.rows(); + } + exec.close(); + rowBlock.release(); + + assertEquals(5, countForTuple); + assertEquals(5, countForRowBlock); + } + + @Test + public void testStoreTableExec() throws IOException, PlanningException { + PhysicalExec exec = buildPhysicalPlan("create table t1 using CSV as select * from lineitem"); + + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(exec.getSchema(), 64 * StorageUnit.KB); + rowBlock.setMaxRow(1024); + + exec.init(); + + int countForTuple = 0; + int countForRowBlock = 0; + while(exec.nextFetch(rowBlock)) { + ZeroCopyTuple tuple = new ZeroCopyTuple(); + RowBlockReader reader = rowBlock.getReader(); + while (reader.next(tuple)) { + countForTuple++; + } + countForRowBlock += rowBlock.rows(); + } + exec.close(); + + assertEquals(5, countForTuple); + assertEquals(5, countForRowBlock); + + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(meta, exec.getSchema(), outputPath); + scanner.init(); + + int readTupleCount = 0; + while (scanner.nextFetch(rowBlock)) { + readTupleCount += rowBlock.rows(); + } + scanner.close(); + + assertEquals(5, readTupleCount); + + rowBlock.release(); + } +} \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result index e6b12b1fcb..f6edb3d2de 100644 --- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result +++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result @@ -25,6 +25,7 @@ Available Session Variables: \set JOIN_PER_SHUFFLE_SIZE [int value] - shuffle output size for join (mb) \set GROUPBY_PER_SHUFFLE_SIZE [int value] - shuffle output size for sort (mb) \set TABLE_PARTITION_PER_SHUFFLE_SIZE [int value] - shuffle output size for partition table write (mb) +\set EXEC_ENGINE [text value] - executor engine types that queries will use. Types: volcano and block (default is volcano) \set EXTSORT_BUFFER_SIZE [long value] - sort buffer size for external sort (mb) \set HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash join (mb) \set INNER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash inner join (mb) diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java index 689efb7419..ea86a5a7b3 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java @@ -127,7 +127,11 @@ public int rows() { return rowNum; } - public void setRows(int rowNum) { + public void setMaxRow(int rowNum) { + this.maxRowNum = rowNum; + } + + void setRow(int rowNum) { this.rowNum = rowNum; } diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java index d177e0caff..ba59b1654d 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java @@ -48,7 +48,7 @@ public void ensureSize(int size) { @Override public void endRow() { super.endRow(); - rowBlock.setRows(rowBlock.rows() + 1); + rowBlock.setRow(rowBlock.rows() + 1); } @Override diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java index 14e67b25eb..8d782ebdfd 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java @@ -105,7 +105,7 @@ public boolean canIncrease(long currentSize) { } public long remain(long currentSize) { - Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes."); + Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes. But, its size is " + currentSize); return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize; } diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java index d1b3afd189..e81964ba8b 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java @@ -42,17 +42,14 @@ import org.apache.tajo.storage.sequencefile.SequenceFileScanner; import org.apache.tajo.tuple.RowBlockReader; import org.apache.tajo.tuple.offheap.OffHeapRowBlock; -import org.apache.tajo.tuple.offheap.UnSafeTuple; import org.apache.tajo.tuple.offheap.ZeroCopyTuple; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.KeyValueSet; -import org.apache.tajo.util.UnsafeUtil; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import sun.misc.Unsafe; import java.io.IOException; import java.util.Arrays; @@ -173,7 +170,7 @@ public void testSplitable() throws IOException { int tupleCnt = 0; OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); - rowBlock.setRows(1024); + rowBlock.setMaxRow(1024); while (scanner.nextFetch(rowBlock)) { tupleCnt += rowBlock.rows(); @@ -233,7 +230,7 @@ public void testSplitableForRCFileBug() throws IOException { int tupleCnt = 0; OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); - rowBlock.setRows(1024); + rowBlock.setMaxRow(1024); while (scanner.nextFetch(rowBlock)) { tupleCnt += rowBlock.rows(); @@ -294,7 +291,7 @@ public void testProjection() throws IOException { int tupleCnt = 0; OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); - rowBlock.setRows(1024); + rowBlock.setMaxRow(1024); ZeroCopyTuple tuple = new ZeroCopyTuple(); while (scanner.nextFetch(rowBlock)) { @@ -376,7 +373,7 @@ public void testVariousTypes() throws IOException { scanner.init(); OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); - rowBlock.setRows(1024); + rowBlock.setMaxRow(1024); ZeroCopyTuple zcTuple = new ZeroCopyTuple(); while (scanner.nextFetch(rowBlock)) { @@ -465,7 +462,7 @@ public void testNullHandlingTypes() throws IOException { scanner.init(); OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); - rowBlock.setRows(1024); + rowBlock.setMaxRow(1024); ZeroCopyTuple retrieved = new ZeroCopyTuple(); @@ -548,7 +545,7 @@ public void testRCFileTextSerializeDeserialize() throws IOException { scanner.init(); OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); - rowBlock.setRows(1024); + rowBlock.setMaxRow(1024); ZeroCopyTuple retrieved = new ZeroCopyTuple(); while (scanner.nextFetch(rowBlock)) { @@ -624,7 +621,7 @@ public void testRCFileBinarySerializeDeserialize() throws IOException { scanner.init(); OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); - rowBlock.setRows(1024); + rowBlock.setMaxRow(1024); ZeroCopyTuple retrieved = new ZeroCopyTuple(); while (scanner.nextFetch(rowBlock)) { @@ -703,7 +700,7 @@ public void testSequenceFileTextSerializeDeserialize() throws IOException { assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName()); OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); - rowBlock.setRows(1024); + rowBlock.setMaxRow(1024); ZeroCopyTuple retrieved = new ZeroCopyTuple(); @@ -785,7 +782,7 @@ public void testSequenceFileBinarySerializeDeserialize() throws IOException { assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName()); OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); - rowBlock.setRows(1024); + rowBlock.setMaxRow(1024); ZeroCopyTuple retrieved = new ZeroCopyTuple(); @@ -833,7 +830,7 @@ public void testTime() throws IOException { scanner.init(); OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); - rowBlock.setRows(1024); + rowBlock.setMaxRow(1024); ZeroCopyTuple retrieved = new ZeroCopyTuple();