From 66fb3a5474aa46d082dcc2cddc13849fb50aa86f Mon Sep 17 00:00:00 2001 From: jhkim Date: Mon, 20 Oct 2014 21:03:40 +0900 Subject: [PATCH] TAJO-1045: Implement nextFetch(RowBlock) of RCFile scanner. --- .../org/apache/tajo/datum/Inet4Datum.java | 12 +- .../org/apache/tajo/util/ReflectionUtil.java | 25 +++- .../storage/BinarySerializerDeserializer.java | 63 ++++++++++ .../tajo/storage/SerializerDeserializer.java | 4 + .../storage/TextSerializerDeserializer.java | 5 +- .../apache/tajo/storage/rcfile/RCFile.java | 82 +++++++++++-- .../tajo/tuple/offheap/OffHeapRowWriter.java | 10 +- .../apache/tajo/tuple/offheap/RowWriter.java | 2 + .../hadoop/ParquetRowBlockParquetReader.java | 73 +++++------ .../apache/tajo/storage/TestNextFetches.java | 116 +++++++++++++++--- 10 files changed, 315 insertions(+), 77 deletions(-) diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java index ed48a028e3..b293642d2e 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java @@ -42,17 +42,17 @@ public Inet4Datum(String addr) { public Inet4Datum(byte[] addr) { super(Type.INET4); - Preconditions.checkArgument(addr.length == size); - address = addr[3] & 0xFF - | ((addr[2] << 8) & 0xFF00) - | ((addr[1] << 16) & 0xFF0000) - | ((addr[0] << 24) & 0xFF000000); + address = readAsInt(addr, 0, addr.length); } public Inet4Datum(byte[] addr, int offset, int length) { super(Type.INET4); + address = readAsInt(addr, offset, length); + } + + public static int readAsInt(byte[] addr, int offset, int length) { Preconditions.checkArgument(length == size); - address = addr[offset + 3] & 0xFF + return addr[offset + 3] & 0xFF | ((addr[offset + 2] << 8) & 0xFF00) | ((addr[offset + 1] << 16) & 0xFF0000) | ((addr[offset] << 24) & 0xFF000000); diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java index 410815f475..297115904d 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java @@ -32,8 +32,25 @@ public class ReflectionUtil { private static final Map, Constructor> CONSTRUCTOR_CACHE = new ConcurrentHashMap, Constructor>(); - public static Object newInstance(Class clazz) - throws InstantiationException, IllegalAccessException { - return clazz.newInstance(); - } + /** Create an object for the given class and initialize it + * + * @param theClass class of which an object is created + * @return a new object + */ + @SuppressWarnings("unchecked") + public static T newInstance(Class theClass) { + T result; + try { + Constructor meth = (Constructor) CONSTRUCTOR_CACHE.get(theClass); + if (meth == null) { + meth = theClass.getDeclaredConstructor(EMPTY_ARRAY); + meth.setAccessible(true); + CONSTRUCTOR_CACHE.put(theClass, meth); + } + result = meth.newInstance(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + return result; + } } diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java index 609a3df0c7..543d891dd5 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java @@ -22,6 +22,7 @@ import com.google.protobuf.Message; import org.apache.tajo.catalog.Column; import org.apache.tajo.datum.*; +import org.apache.tajo.tuple.offheap.RowWriter; import org.apache.tajo.util.Bytes; import java.io.IOException; @@ -157,6 +158,68 @@ public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[ return datum; } + public void write(RowWriter writer, Column col, byte [] bytes, int offset, int length, byte [] nullChar) + throws IOException { + + if (length == 0) { + writer.skipField(); + return; + } else { + switch (col.getDataType().getType()) { + case BOOLEAN: + writer.putBool(BooleanDatum.TRUE_INT == bytes[offset]); + break; + case BIT: + writer.putByte(bytes[offset]); + break; + case CHAR: + case TEXT: + if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, 0, INVALID_UTF__SINGLE_BYTE.length, bytes, offset, length)) { + writer.putText(new byte[0]); + } else { + writer.putText(bytes, offset, length); + } + break; + + case INT1: + case INT2: + writer.putInt2(Bytes.toShort(bytes, offset, length)); + break; + + case INT4: + writer.putInt4((int) Bytes.readVLong(bytes, offset)); + break; + + case INT8: + writer.putInt8(Bytes.readVLong(bytes, offset)); + break; + + case FLOAT4: + writer.putFloat4(toFloat(bytes, offset, length)); + break; + + case FLOAT8: + writer.putFloat8(toDouble(bytes, offset, length)); + break; + + case PROTOBUF: + writer.putBlob(bytes, offset, length); + break; + + case INET4: + writer.putInet4(Inet4Datum.readAsInt(bytes, offset, length)); + break; + + case BLOB: + writer.putBlob(bytes, offset, length); + break; + + default: + writer.skipField(); + } + } + } + private byte[] shortBytes = new byte[2]; public int writeShort(OutputStream out, short val) throws IOException { diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java index 333f205e82..2d0f8ba435 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java @@ -20,6 +20,7 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.datum.Datum; +import org.apache.tajo.tuple.offheap.RowWriter; import java.io.IOException; import java.io.OutputStream; @@ -31,4 +32,7 @@ public interface SerializerDeserializer { public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException; + public void write(RowWriter writer, Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) + throws IOException; + } diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java index 6dfe6c1386..91a2d86a85 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java @@ -216,7 +216,8 @@ public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[ return datum; } - public static void write(RowWriter writer, Column col, byte [] bytes, int offset, int length, byte [] nullChar) throws IOException { + public void write(RowWriter writer, Column col, byte [] bytes, int offset, int length, byte [] nullChar) + throws IOException { TajoDataTypes.Type type = col.getDataType().getType(); boolean nullField; if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) { @@ -236,7 +237,7 @@ public static void write(RowWriter writer, Column col, byte [] bytes, int offset case CHAR: case TEXT: - writer.putText(bytes); + writer.putText(bytes, offset, length); break; case INT1: diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java index e5507ad81f..36413982f0 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java @@ -39,6 +39,9 @@ import org.apache.tajo.datum.NullDatum; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; +import org.apache.tajo.tuple.offheap.OffHeapRowBlockWriter; +import org.apache.tajo.util.ReflectionUtil; import java.io.Closeable; import java.io.*; @@ -738,7 +741,7 @@ public void init() throws IOException { try { Class codecClass = conf.getClassByName( codecClassname).asSubclass(CompressionCodec.class); - codec = ReflectionUtils.newInstance(codecClass, conf); + codec = ReflectionUtil.newInstance(codecClass); } catch (ClassNotFoundException cnfe) { throw new IllegalArgumentException( "Unknown codec: " + codecClassname, cnfe); @@ -762,7 +765,7 @@ public void init() throws IOException { String serdeClass = this.meta.getOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName()); try { - serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); + serde = (SerializerDeserializer) ReflectionUtil.newInstance(Class.forName(serdeClass)); } catch (Exception e) { LOG.error(e.getMessage(), e); throw new IOException(e); @@ -1373,7 +1376,7 @@ private void initHeader() throws IOException { } else{ serdeClass = this.meta.getOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName()); } - serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); + serde = (SerializerDeserializer) ReflectionUtil.newInstance(Class.forName(serdeClass)); } catch (Exception e) { LOG.error(e.getMessage(), e); throw new IOException(e); @@ -1620,24 +1623,38 @@ protected void currentValueBuffer() throws IOException { @Override public Tuple next() throws IOException { + if(!hasNext()) return null; + + Tuple tuple = new VTuple(schema.size()); + getCurrentRow(tuple); + return tuple; + } + + public boolean hasNext() throws IOException { if (!more) { - return null; + return false; } more = nextBuffer(rowId); long lastSeenSyncPos = lastSeenSyncPos(); if (lastSeenSyncPos >= endOffset) { more = false; - return null; + return more; } - if (!more) { - return null; + return more; + } + + @Override + public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException { + rowBlock.clear(); + + OffHeapRowBlockWriter writer = (OffHeapRowBlockWriter) rowBlock.getWriter(); + while(rowBlock.rows() < rowBlock.maxRowNum() && hasNext()) { + getCurrentRowBlock(writer); } - Tuple tuple = new VTuple(schema.size()); - getCurrentRow(tuple); - return tuple; + return rowBlock.rows() > 0; } @Override @@ -1721,6 +1738,51 @@ public void getCurrentRow(Tuple tuple) throws IOException { rowFetched = true; } + /** + * get the current row used,make sure called {@link #nextFetch(OffHeapRowBlock rowBlock)} + * first. + * + * @throws IOException + */ + public void getCurrentRowBlock(OffHeapRowBlockWriter writer) throws IOException { + if (!keyInit || rowFetched) { + return; + } + + if (!currentValue.inited) { + currentValueBuffer(); + } + writer.startRow(); + int currentSchemaIndex = 0; + for (int j = 0; j < selectedColumns.length; ++j) { + SelectedColumn col = selectedColumns[j]; + int i = col.colIndex; + + while(i > currentSchemaIndex){ + writer.skipField(); + currentSchemaIndex++; + } + + if (col.isNulled) { + writer.skipField(); + } else { + colAdvanceRow(j, col); + + serde.write(writer, schema.getColumn(i), + currentValue.loadedColumnsValueBuffer[j].getData(), col.rowReadIndex, col.prvLength, nullChars); + col.rowReadIndex += col.prvLength; + } + currentSchemaIndex++; + } + + while(schema.size() > currentSchemaIndex){ + writer.skipField(); + currentSchemaIndex++; + } + writer.endRow(); + rowFetched = true; + } + /** * Advance column state to the next now: update offsets, run lengths etc * diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java index 3f98f8b4ff..7a7fc5e5e6 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java @@ -26,7 +26,6 @@ import org.apache.tajo.datum.TextDatum; import org.apache.tajo.util.SizeOf; import org.apache.tajo.util.UnsafeUtil; -import sun.misc.Unsafe; /** * @@ -146,12 +145,17 @@ public void putBool(boolean val) { @UsedByJIT public void putBool(byte val) { - ensureSize(SizeOf.SIZE_OF_BOOL); + putByte(val); + } + + @UsedByJIT + public void putByte(byte val) { + ensureSize(SizeOf.SIZE_OF_BYTE); forwardField(); OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, val); - curOffset += SizeOf.SIZE_OF_BOOL; + curOffset += SizeOf.SIZE_OF_BYTE; } @UsedByJIT diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java index cfa955ca0c..1ba9a73663 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java @@ -43,6 +43,8 @@ public interface RowWriter { public void skipField(int num); + public void putByte(byte val); + public void putBool(boolean val); public void putBool(byte val); diff --git a/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockParquetReader.java b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockParquetReader.java index 24920a6ff7..976719ba79 100644 --- a/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockParquetReader.java +++ b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockParquetReader.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import org.apache.tajo.tuple.offheap.RowWriter; import parquet.filter.UnboundRecordFilter; @@ -154,41 +155,43 @@ public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException { if (values[actualId] != null) { switch (tajoTypes[actualId]) { - case BOOLEAN: - writer.putBool((Boolean) values[actualId]); - break; - case CHAR: - writer.putText(((Binary) values[actualId]).getBytes()); - break; - case INT1: - case INT2: - writer.putInt2((Short) values[actualId]); - break; - case INT4: - case INET4: - case DATE: - writer.putInt4((Integer) values[actualId]); - break; - case INT8: - case TIMESTAMP: - case TIME: - writer.putInt8((Long) values[actualId]); - break; - case FLOAT4: - writer.putFloat4((Float) values[actualId]); - break; - case FLOAT8: - writer.putFloat8((Double) values[actualId]); - break; - case TEXT: - writer.putText(((Binary) values[actualId]).getBytes()); - break; - case BLOB: - writer.putBlob(((Binary) values[actualId]).getBytes()); - break; - - default: - throw new IOException("Not supported type: " + tajoTypes[actualId].name()); + case BOOLEAN: + writer.putBool((Boolean) values[actualId]); + break; + case CHAR: + writer.putText(((Binary) values[actualId]).getBytes()); + break; + case INT1: + case INT2: + writer.putInt2((Short) values[actualId]); + break; + case INT4: + case INET4: + case DATE: + writer.putInt4((Integer) values[actualId]); + break; + case INT8: + case TIMESTAMP: + case TIME: + writer.putInt8((Long) values[actualId]); + break; + case FLOAT4: + writer.putFloat4((Float) values[actualId]); + break; + case FLOAT8: + writer.putFloat8((Double) values[actualId]); + break; + case TEXT: + writer.putText(((Binary) values[actualId]).getBytes()); + break; + case BLOB: + writer.putBlob(((Binary) values[actualId]).getBytes()); + break; + case PROTOBUF: + writer.putProtoDatum((ProtobufDatum)values[actualId]); + break; + default: + throw new IOException("Not supported type: " + tajoTypes[actualId].name()); } } else { writer.skipField(); diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java index 11d0eb4b15..98074faef4 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java @@ -18,6 +18,7 @@ package org.apache.tajo.storage; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -54,6 +55,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -102,14 +104,16 @@ public class TestNextFetches { private StoreType storeType; private boolean splitable; private boolean statsable; + private boolean seekable; private Path testDir; private FileSystem fs; private Tuple allTypedTuple; - public TestNextFetches(StoreType type, boolean splitable, boolean statsable) throws IOException { + public TestNextFetches(StoreType type, boolean splitable, boolean statsable, boolean seekable) throws IOException { this.storeType = type; this.splitable = splitable; this.statsable = statsable; + this.seekable = seekable; conf = new TajoConf(); @@ -132,42 +136,38 @@ public TestNextFetches(StoreType type, boolean splitable, boolean statsable) thr schema.addColumn("col9", Type.BLOB); schema.addColumn("col10", Type.INET4); schema.addColumn("col11", Type.NULL_TYPE); - if (storeType == StoreType.RAW) { - schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); - } + schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); QueryId queryid = new QueryId("12345", 5); ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); - int columnNum = 11 + (storeType == StoreType.RAW ? 1 : 0); + int columnNum = 12; allTypedTuple = new VTuple(columnNum); allTypedTuple.put(new Datum[]{ DatumFactory.createBool(true), - DatumFactory.createChar("jinho"), + DatumFactory.createChar("hyunsik"), DatumFactory.createInt2((short) 17), DatumFactory.createInt4(59), DatumFactory.createInt8(23l), DatumFactory.createFloat4(77.9f), DatumFactory.createFloat8(271.9f), - DatumFactory.createText("jinho"), - DatumFactory.createBlob("jinho babo".getBytes()), + DatumFactory.createText("hyunsik"), + DatumFactory.createBlob("emiya muljomdao".getBytes()), DatumFactory.createInet4("192.168.0.1"), NullDatum.get(), + factory.createDatum(queryid.getProto()) }); - if (storeType == StoreType.RAW) { - allTypedTuple.put(11, factory.createDatum(queryid.getProto())); - } } @Parameterized.Parameters public static Collection generateParameters() { return Arrays.asList(new Object[][] { - {StoreType.CSV, true, true}, + {StoreType.CSV, true, true, true}, // TODO - to be implemented -// {StoreType.RAW, false, false}, -// {StoreType.RCFILE, true, true}, - {StoreType.BLOCK_PARQUET, false, false}, -// {StoreType.SEQUENCEFILE, true, true}, -// {StoreType.AVRO, false, false}, +// {StoreType.RAW, false, true, true}, + {StoreType.RCFILE, true, true, false}, + {StoreType.BLOCK_PARQUET, false, false, false}, +// {StoreType.SEQUENCEFILE, true, true, false}, +// {StoreType.AVRO, false, false, false}, }); } @@ -346,6 +346,7 @@ public void testProjection() throws IOException { || storeType == StoreType.AVRO) { assertTrue(tuple.isNull(0)); } + assertTrue(tuple.toString(), tupleCnt + 2 == tuple.getInt8(1)); assertTrue(tuple.toString(), tupleCnt + 3 == tuple.getFloat4(2)); tupleCnt++; @@ -705,4 +706,85 @@ public void testTime() throws IOException { } } + @Test + public void testSeekableScanner() throws IOException { + if (!seekable) { + return; + } + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("comment", Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + Path tablePath = new Path(testDir, "Seekable.data"); + FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, + tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 100000; + VTuple vTuple; + + List offsets = Lists.newArrayList(); + offsets.add(0L); + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(3); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + vTuple.put(2, DatumFactory.createText("test")); + appender.addTuple(vTuple); + + // find a seek position + if (i % (tupleNum / 3) == 0) { + offsets.add(appender.getOffset()); + } + } + + // end of file + if (!offsets.contains(appender.getOffset())) { + offsets.add(appender.getOffset()); + } + + appender.close(); + if (statsable) { + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + } + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(status.getLen(), appender.getOffset()); + + Scanner scanner; + int tupleCnt = 0; + long prevOffset = 0; + long readBytes = 0; + long readRows = 0; + for (long offset : offsets) { + scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, + new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema); + scanner.init(); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB); + rowBlock.setMaxRow(1024); + + while (scanner.nextFetch(rowBlock)) { + tupleCnt += rowBlock.rows(); + } + scanner.close(); + + if (statsable) { + readBytes += scanner.getInputStats().getNumBytes(); + readRows += scanner.getInputStats().getNumRows(); + } + prevOffset = offset; + rowBlock.release(); + } + + assertEquals(tupleNum, tupleCnt); + if (statsable) { + assertEquals(appender.getStats().getNumBytes().longValue(), readBytes); + assertEquals(appender.getStats().getNumRows().longValue(), readRows); + } + } }