From 4b9b992fcdbfba0901145a6df494a12a96d0a974 Mon Sep 17 00:00:00 2001 From: Jongyoung Park Date: Tue, 12 Jan 2016 12:26:46 +0900 Subject: [PATCH 1/8] ORC version upgraded --- .../apache/tajo/storage/StorageConstants.java | 2 + tajo-storage/tajo-storage-hdfs/pom.xml | 2 +- .../apache/tajo/storage/orc/ORCScanner.java | 194 ++++++++---------- .../TajoStructObjectInspector.java | 5 + .../thirdparty/orc/FileOrcDataSource.java | 132 ------------ .../thirdparty/orc/HdfsOrcDataSource.java | 19 +- 6 files changed, 105 insertions(+), 249 deletions(-) delete mode 100644 tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java index d7f1ec561a..097963cb25 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java @@ -82,6 +82,8 @@ public class StorageConstants { // ORC file properties ------------------------------------------------- public static final String ORC_MAX_MERGE_DISTANCE = "orc.max.merge.distance"; public static final String DEFAULT_ORC_MAX_MERGE_DISTANCE = "1048576"; // 1MB + public static final String ORC_MAX_READ_BUFFER_SIZE = "orc.max.read.buffer"; + public static final String DEFAULT_ORC_MAX_READ_BUFFER_SIZE = "8388608"; // 8MB public static final String ORC_STRIPE_SIZE = "orc.stripe.size"; public static final String DEFAULT_ORC_STRIPE_SIZE = "67108864"; // 64MB diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml index 3b89e1c145..6c10a88baf 100644 --- a/tajo-storage/tajo-storage-hdfs/pom.xml +++ b/tajo-storage/tajo-storage-hdfs/pom.xml @@ -358,7 +358,7 @@ com.facebook.presto presto-orc - 0.86 + 0.132 diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java index 9351c59926..054bb4b88e 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java @@ -18,7 +18,16 @@ package org.apache.tajo.storage.orc; +import com.facebook.presto.orc.OrcDataSource; +import com.facebook.presto.orc.OrcPredicate; +import com.facebook.presto.orc.OrcReader; +import com.facebook.presto.orc.OrcRecordReader; +import com.facebook.presto.orc.memory.AggregatedMemoryContext; +import com.facebook.presto.orc.metadata.OrcMetadataReader; +import com.facebook.presto.spi.block.Block; +import com.facebook.presto.spi.type.*; import com.google.protobuf.InvalidProtocolBufferException; +import io.airlift.units.DataSize; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -39,15 +48,12 @@ import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; import org.apache.tajo.storage.fragment.Fragment; -import com.facebook.presto.orc.*; -import com.facebook.presto.orc.metadata.OrcMetadataReader; import org.apache.tajo.storage.thirdparty.orc.HdfsOrcDataSource; -import org.apache.tajo.util.datetime.DateTimeUtil; import org.joda.time.DateTimeZone; import java.io.IOException; -import java.util.HashSet; -import java.util.Set; +import java.util.HashMap; +import java.util.Map; import java.util.TimeZone; /** @@ -56,42 +62,16 @@ public class ORCScanner extends FileScanner { private static final Log LOG = LogFactory.getLog(ORCScanner.class); private OrcRecordReader recordReader; - private Vector [] vectors; + private Block[] blocks; private int currentPosInBatch = 0; private int batchSize = 0; private Tuple outTuple; + private AggregatedMemoryContext aggrMemoryContext = new AggregatedMemoryContext(); public ORCScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) { super(conf, schema, meta, fragment); } - private Vector createOrcVector(TajoDataTypes.DataType type) { - switch (type.getType()) { - case INT1: case INT2: case INT4: case INT8: - case INET4: - case TIMESTAMP: - case DATE: - return new LongVector(); - - case FLOAT4: - case FLOAT8: - return new DoubleVector(); - - case BOOLEAN: - case NULL_TYPE: - return new BooleanVector(); - - case BLOB: - case TEXT: - case CHAR: - case PROTOBUF: - return new SliceVector(); - - default: - throw new TajoRuntimeException(new NotImplementedException(type.getType().name() + " for orc")); - } - } - private FileSystem fs; private FSDataInputStream fis; @@ -108,6 +88,10 @@ private static class ColumnInfo { @Override public void init() throws IOException { OrcReader orcReader; + DataSize maxMergeDistance = new DataSize(Double.parseDouble(meta.getProperty(StorageConstants.ORC_MAX_MERGE_DISTANCE, + StorageConstants.DEFAULT_ORC_MAX_MERGE_DISTANCE)), DataSize.Unit.BYTE); + DataSize maxReadSize = new DataSize(Double.parseDouble(meta.getProperty(StorageConstants.ORC_MAX_READ_BUFFER_SIZE, + StorageConstants.DEFAULT_ORC_MAX_READ_BUFFER_SIZE)), DataSize.Unit.BYTE); if (targets == null) { targets = schema.toArray(); @@ -129,8 +113,8 @@ public void init() throws IOException { this.fragment.getPath().toString(), fis, fs.getFileStatus(path).getLen(), - Integer.parseInt(meta.getProperty(StorageConstants.ORC_MAX_MERGE_DISTANCE, - StorageConstants.DEFAULT_ORC_MAX_MERGE_DISTANCE))); + maxMergeDistance, + maxReadSize); targetColInfo = new ColumnInfo[targets.length]; for (int i=0; i columnSet = new HashSet<>(); + Map columnMap = new HashMap<>(); for (ColumnInfo colInfo: targetColInfo) { - columnSet.add(colInfo.id); + columnMap.put(colInfo.id, createFBtypeByTajoType(colInfo.type)); } - orcReader = new OrcReader(orcDataSource, new OrcMetadataReader()); + orcReader = new OrcReader(orcDataSource, new OrcMetadataReader(), maxMergeDistance, maxReadSize); TimeZone timezone = TimeZone.getTimeZone(meta.getProperty(StorageConstants.TIMEZONE, TajoConstants.DEFAULT_SYSTEM_TIMEZONE)); // TODO: make OrcPredicate useful // presto-orc uses joda timezone, so it needs to be converted. - recordReader = orcReader.createRecordReader(columnSet, OrcPredicate.TRUE, - fragment.getStartKey(), fragment.getLength(), DateTimeZone.forTimeZone(timezone)); + recordReader = orcReader.createRecordReader(columnMap, OrcPredicate.TRUE, + fragment.getStartKey(), fragment.getLength(), DateTimeZone.forTimeZone(timezone), aggrMemoryContext); super.init(); LOG.debug("file fragment { path: " + fragment.getPath() + @@ -179,7 +160,7 @@ public Tuple next() throws IOException { } for (int i=0; i Map readFully(Map diskRanges) - throws IOException - { - checkNotNull(diskRanges, "diskRanges is null"); - - if (diskRanges.isEmpty()) { - return ImmutableMap.of(); - } - - // TODO: benchmark alternatively strategies: - // 1) sort ranges and perform one read per range - // 2) single read with transferTo() using custom WritableByteChannel - - Iterable mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance); - - // read ranges - Map buffers = new LinkedHashMap<>(); - for (DiskRange mergedRange : mergedRanges) { - // read full range in one request - byte[] buffer = new byte[mergedRange.getLength()]; - readFully(mergedRange.getOffset(), buffer); - buffers.put(mergedRange, buffer); - } - - ImmutableMap.Builder slices = ImmutableMap.builder(); - for (Entry entry : diskRanges.entrySet()) { - slices.put(entry.getKey(), getDiskRangeSlice(entry.getValue(), buffers)); - } - return slices.build(); - } - - @Override - public String toString() - { - return path.getPath(); - } -} diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java index da12461a53..3eb11c33c2 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java @@ -17,7 +17,8 @@ import com.facebook.presto.orc.DiskRange; import com.facebook.presto.orc.OrcDataSource; import com.google.common.collect.ImmutableMap; -import io.airlift.slice.Slice; +import io.airlift.slice.BasicSliceInput; +import io.airlift.slice.FixedLengthSliceInput; import io.airlift.units.DataSize; import org.apache.hadoop.fs.FSDataInputStream; @@ -43,17 +44,19 @@ public class HdfsOrcDataSource private final String path; private final long size; private final DataSize maxMergeDistance; + private final DataSize maxReadSize; private long readTimeNanos; - public HdfsOrcDataSource(String path, FSDataInputStream inputStream, long size, double maxMergeDistance) + public HdfsOrcDataSource(String path, FSDataInputStream inputStream, long size, + DataSize maxMergeDistance, DataSize maxReadSize) { this.path = checkNotNull(path, "path is null"); this.inputStream = checkNotNull(inputStream, "inputStream is null"); this.size = size; checkArgument(size >= 0, "size is negative"); - DataSize mergeDistance = new DataSize(maxMergeDistance, DataSize.Unit.BYTE); - this.maxMergeDistance = checkNotNull(mergeDistance, "maxMergeDistance is null"); + this.maxMergeDistance = checkNotNull(maxMergeDistance, "maxMergeDistance is null"); + this.maxReadSize = checkNotNull(maxReadSize, "maxMergeDistance is null"); } @Override @@ -94,7 +97,7 @@ public void readFully(long position, byte[] buffer, int bufferOffset, int buffer } @Override - public Map readFully(Map diskRanges) + public Map readFully(Map diskRanges) throws IOException { checkNotNull(diskRanges, "diskRanges is null"); @@ -103,7 +106,7 @@ public Map readFully(Map diskRanges) return ImmutableMap.of(); } - Iterable mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance); + Iterable mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance, maxReadSize); // read ranges Map buffers = new LinkedHashMap<>(); @@ -114,9 +117,9 @@ public Map readFully(Map diskRanges) buffers.put(mergedRange, buffer); } - ImmutableMap.Builder slices = ImmutableMap.builder(); + ImmutableMap.Builder slices = ImmutableMap.builder(); for (Entry entry : diskRanges.entrySet()) { - slices.put(entry.getKey(), getDiskRangeSlice(entry.getValue(), buffers)); + slices.put(entry.getKey(), new BasicSliceInput(getDiskRangeSlice(entry.getValue(), buffers))); } return slices.build(); } From 3c562d3eea0b421604167695e8151c9f1adc7085 Mon Sep 17 00:00:00 2001 From: Jongyoung Park Date: Tue, 12 Jan 2016 14:32:12 +0900 Subject: [PATCH 2/8] Types added --- .../java/org/apache/tajo/storage/orc/ORCScanner.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java index 054bb4b88e..afb68c3189 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java @@ -49,6 +49,7 @@ import org.apache.tajo.storage.VTuple; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.thirdparty.orc.HdfsOrcDataSource; +import org.apache.tajo.util.datetime.DateTimeUtil; import org.joda.time.DateTimeZone; import java.io.IOException; @@ -209,6 +210,7 @@ private Datum createValueDatum(Block block, TajoDataTypes.DataType type) { if (block.isNull(currentPosInBatch)) return NullDatum.get(); + // NOTE: block.get() methods are determined by the type size wich is in createFBtypeByTajoType() switch (type.getType()) { case INT1: return DatumFactory.createInt2((short)block.getLong(currentPosInBatch, 0)); @@ -252,6 +254,14 @@ private Datum createValueDatum(Block block, TajoDataTypes.DataType type) { return NullDatum.get(); } + case TIMESTAMP: + return DatumFactory.createTimestamp( + DateTimeUtil.javaTimeToJulianTime(block.getLong(currentPosInBatch, 0))); + + case DATE: + return DatumFactory.createDate( + block.getInt(currentPosInBatch, 0) + DateTimeUtil.DAYS_FROM_JULIAN_TO_EPOCH); + case INET4: return DatumFactory.createInet4((int)block.getLong(currentPosInBatch, 0)); From 696a8162056314c7975c2961917f080ed9943cf9 Mon Sep 17 00:00:00 2001 From: Jongyoung Park Date: Tue, 12 Jan 2016 16:19:34 +0900 Subject: [PATCH 3/8] comment modified --- .../src/main/java/org/apache/tajo/storage/orc/ORCScanner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java index afb68c3189..0a4ebc6948 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java @@ -210,7 +210,7 @@ private Datum createValueDatum(Block block, TajoDataTypes.DataType type) { if (block.isNull(currentPosInBatch)) return NullDatum.get(); - // NOTE: block.get() methods are determined by the type size wich is in createFBtypeByTajoType() + // NOTE: block.get***() methods are determined by the type size wich is in createFBtypeByTajoType() switch (type.getType()) { case INT1: return DatumFactory.createInt2((short)block.getLong(currentPosInBatch, 0)); From 9a2b54b381f35317ad71b9f5102f48279878edc8 Mon Sep 17 00:00:00 2001 From: Jongyoung Park Date: Tue, 12 Jan 2016 16:28:13 +0900 Subject: [PATCH 4/8] The document is refined --- tajo-docs/src/main/sphinx/table_management/orc.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/tajo-docs/src/main/sphinx/table_management/orc.rst b/tajo-docs/src/main/sphinx/table_management/orc.rst index 2733afcd12..eb84b20301 100644 --- a/tajo-docs/src/main/sphinx/table_management/orc.rst +++ b/tajo-docs/src/main/sphinx/table_management/orc.rst @@ -34,6 +34,7 @@ The ``WITH`` clause in the CREATE TABLE statement allows users to set those para Now, ORC file provides the following physical properties. * ``orc.max.merge.distance``: When ORC file is read, if stripes are too closer and the distance is lower than this value, they are merged and read at once. Default is 1MB. +* ``orc.max.read.buffer``: When ORC file is read, it defines maximum read buffer size. That is, it can be maximum size of a single read. Default is 8MB. * ``orc.stripe.size``: It decides size of each stripe. Default is 64MB. * ``orc.compression.kind``: It means the compression algorithm used to compress and write data. It should be one of ``none``, ``snappy``, ``zlib``. Default is ``none``. * ``orc.buffer.size``: It decides size of writing buffer. Default is 256KB. From 11bf05f9c76023bd10c3ca2f67023f3d1fb7e220 Mon Sep 17 00:00:00 2001 From: Jongyoung Park Date: Thu, 28 Jan 2016 16:13:44 +0900 Subject: [PATCH 5/8] Refined by using stream --- .../tajo/storage/thirdparty/orc/HdfsOrcDataSource.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java index 3eb11c33c2..0aef1e227c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java @@ -118,9 +118,9 @@ public Map readFully(Map diskRanges) } ImmutableMap.Builder slices = ImmutableMap.builder(); - for (Entry entry : diskRanges.entrySet()) { - slices.put(entry.getKey(), new BasicSliceInput(getDiskRangeSlice(entry.getValue(), buffers))); - } + diskRanges.forEach((K key, DiskRange range) -> + slices.put(key, new BasicSliceInput(getDiskRangeSlice(range, buffers)))); + return slices.build(); } From ce40a7c5d59d7972df4949bdf0ac935f6621c3a4 Mon Sep 17 00:00:00 2001 From: Jongyoung Park Date: Thu, 28 Jan 2016 16:54:09 +0900 Subject: [PATCH 6/8] CI trigger --- .../apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java | 1 - 1 file changed, 1 deletion(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java index 0aef1e227c..5357f51a09 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java @@ -92,7 +92,6 @@ public void readFully(long position, byte[] buffer, int bufferOffset, int buffer long start = System.nanoTime(); inputStream.readFully(position, buffer, bufferOffset, bufferLength); - readTimeNanos += System.nanoTime() - start; } From b289b799e25b1db05fc94b175c6b70c34573d6c2 Mon Sep 17 00:00:00 2001 From: Jongyoung Park Date: Thu, 28 Jan 2016 17:36:27 +0900 Subject: [PATCH 7/8] CI trigger --- .../apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java index 5357f51a09..0aef1e227c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java @@ -92,6 +92,7 @@ public void readFully(long position, byte[] buffer, int bufferOffset, int buffer long start = System.nanoTime(); inputStream.readFully(position, buffer, bufferOffset, bufferLength); + readTimeNanos += System.nanoTime() - start; } From 36aae19e5722a6cfc4311451f8721824b71c57ba Mon Sep 17 00:00:00 2001 From: Jongyoung Park Date: Fri, 29 Jan 2016 14:50:02 +0900 Subject: [PATCH 8/8] Revert "CI trigger" This reverts commit b289b799e25b1db05fc94b175c6b70c34573d6c2. --- .../apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java | 1 - 1 file changed, 1 deletion(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java index 0aef1e227c..5357f51a09 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java @@ -92,7 +92,6 @@ public void readFully(long position, byte[] buffer, int bufferOffset, int buffer long start = System.nanoTime(); inputStream.readFully(position, buffer, bufferOffset, bufferLength); - readTimeNanos += System.nanoTime() - start; }