From 8001bbc4312e792a86dcc03c7a7ced7723316c20 Mon Sep 17 00:00:00 2001 From: Jesus Camacho Rodriguez Date: Wed, 2 May 2018 14:26:46 -0700 Subject: [PATCH 1/3] [ORC-341] Support time zone as a parameter for Java reader and writer --- .../core/src/java/org/apache/orc/OrcFile.java | 25 ++++ .../java/org/apache/orc/impl/ReaderImpl.java | 2 + .../org/apache/orc/impl/RecordReaderImpl.java | 3 +- .../apache/orc/impl/TreeReaderFactory.java | 23 +++- .../java/org/apache/orc/impl/WriterImpl.java | 14 ++- .../orc/impl/writer/TimestampTreeWriter.java | 22 +++- .../apache/orc/impl/writer/WriterContext.java | 2 + .../apache/orc/impl/writer/WriterImplV2.java | 14 ++- .../test/org/apache/orc/TestOrcTimezone4.java | 113 ++++++++++++++++++ 9 files changed, 208 insertions(+), 10 deletions(-) create mode 100644 java/core/src/test/org/apache/orc/TestOrcTimezone4.java diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java index ac0beffcfc..b917faff4d 100644 --- a/java/core/src/java/org/apache/orc/OrcFile.java +++ b/java/core/src/java/org/apache/orc/OrcFile.java @@ -276,6 +276,7 @@ public static class ReaderOptions { // and remove this class altogether. Both footer caching and llap caching just needs OrcTail. // For now keeping this around to avoid complex surgery private FileMetadata fileMetadata; + private boolean useUTCTimestamp; public ReaderOptions(Configuration conf) { this.conf = conf; @@ -320,6 +321,16 @@ public ReaderOptions fileMetadata(final FileMetadata metadata) { public FileMetadata getFileMetadata() { return fileMetadata; } + + public ReaderOptions useUTCTimestamp(boolean value) { + useUTCTimestamp = value; + return this; + } + + public boolean isUseUTCTimestamp() { + return useUTCTimestamp; + } + } public static ReaderOptions readerOptions(Configuration conf) { @@ -392,6 +403,7 @@ public static class WriterOptions implements Cloneable { private BloomFilterVersion bloomFilterVersion; private PhysicalWriter physicalWriter; private WriterVersion writerVersion = CURRENT_WRITER; + private boolean useUTCTimestamp; private boolean overwrite; private boolean writeVariableLengthBlocks; private HadoopShims shims; @@ -666,6 +678,15 @@ protected WriterOptions writerVersion(WriterVersion version) { return this; } + /** + * Manually set the time zone for the writer to utc. + * If not defined, system time zone is assumed. + */ + public WriterOptions useUTCTimestamp(boolean value) { + useUTCTimestamp = value; + return this; + } + public boolean getBlockPadding() { return blockPaddingValue; } @@ -761,6 +782,10 @@ public boolean getWriteVariableLengthBlocks() { public HadoopShims getHadoopShims() { return shims; } + + public boolean isUseUTCTimestamp() { + return useUTCTimestamp; + } } /** diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java index e8d6be9729..bd789617f4 100644 --- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java +++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java @@ -77,6 +77,7 @@ public class ReaderImpl implements Reader { private long deserializedSize = -1; protected final Configuration conf; + protected final boolean useUTCTimestamp; private final List versionList; private final OrcFile.WriterVersion writerVersion; @@ -343,6 +344,7 @@ public ReaderImpl(Path path, OrcFile.ReaderOptions options) throws IOException { this.path = path; this.conf = options.getConfiguration(); this.maxLength = options.getMaxLength(); + this.useUTCTimestamp = options.isUseUTCTimestamp(); FileMetadata fileMetadata = options.getFileMetadata(); if (fileMetadata != null) { this.compressionKind = fileMetadata.getCompressionKind(); diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java index 53cc761644..bf8d2917e5 100644 --- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java +++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java @@ -259,7 +259,8 @@ protected RecordReaderImpl(ReaderImpl fileReader, new TreeReaderFactory.ReaderContext() .setSchemaEvolution(evolution) .skipCorrupt(skipCorrupt) - .fileFormat(fileReader.getFileVersion()); + .fileFormat(fileReader.getFileVersion()) + .useUTCTimestamp(fileReader.useUTCTimestamp); reader = TreeReaderFactory.createTreeReader(evolution.getReaderSchema(), readerContext); diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java index ccae522503..ed21318a72 100644 --- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java +++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java @@ -57,6 +57,8 @@ public interface Context { boolean isSkipCorrupt(); + boolean isUseUTCTimestamp(); + String getWriterTimezone(); OrcFile.Version getFileFormat(); @@ -65,6 +67,7 @@ public interface Context { public static class ReaderContext implements Context { private SchemaEvolution evolution; private boolean skipCorrupt = false; + private boolean useUTCTimestamp = false; private String writerTimezone; private OrcFile.Version fileFormat; @@ -78,6 +81,11 @@ public ReaderContext skipCorrupt(boolean skipCorrupt) { return this; } + public ReaderContext useUTCTimestamp(boolean useUTCTimestamp) { + this.useUTCTimestamp = useUTCTimestamp; + return this; + } + public ReaderContext writerTimeZone(String writerTimezone) { this.writerTimezone = writerTimezone; return this; @@ -98,6 +106,11 @@ public boolean isSkipCorrupt() { return skipCorrupt; } + @Override + public boolean isUseUTCTimestamp() { + return useUTCTimestamp; + } + @Override public String getWriterTimezone() { return writerTimezone; @@ -900,7 +913,11 @@ protected TimestampTreeReader(int columnId, InStream presentStream, InStream dat this.threadLocalDateFormat = new ThreadLocal<>(); this.threadLocalDateFormat.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); this.baseTimestampMap = new HashMap<>(); - this.readerTimeZone = TimeZone.getDefault(); + if (context.isUseUTCTimestamp()) { + this.readerTimeZone = TimeZone.getTimeZone("UTC"); + } else { + this.readerTimeZone = TimeZone.getDefault(); + } if (context.getWriterTimezone() == null || context.getWriterTimezone().isEmpty()) { this.writerTimeZone = readerTimeZone; } else { @@ -990,6 +1007,10 @@ public void nextVector(ColumnVector previousVector, TimestampColumnVector result = (TimestampColumnVector) previousVector; super.nextVector(previousVector, isNull, batchSize); + if (context.isUseUTCTimestamp()) { + result.setIsUTC(true); + } + for (int i = 0; i < batchSize; i++) { if (result.noNulls || !result.isNull[i]) { final int newNanos = parseNanos(nanos.next()); diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java index c3656d4a7d..fdb0467cf5 100644 --- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java +++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java @@ -112,6 +112,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback { private final double bloomFilterFpp; private final OrcFile.BloomFilterVersion bloomFilterVersion; private final boolean writeTimeZone; + private final boolean useUTCTimeZone; public WriterImpl(FileSystem fs, Path path, @@ -133,7 +134,8 @@ public Writer getWriter() { } else { callbackContext = null; } - writeTimeZone = hasTimestamp(schema); + this.writeTimeZone = hasTimestamp(schema); + this.useUTCTimeZone = opts.isUseUTCTimestamp(); this.adjustedStripeSize = opts.getStripeSize(); this.version = opts.getVersion(); this.encodingStrategy = opts.getEncodingStrategy(); @@ -404,6 +406,10 @@ public void writeBloomFilter(StreamName name, physicalWriter.writeBloomFilter(name, bloom, getCustomizedCodec(name.getKind())); } + + public boolean isUseUTCTimestamp() { + return useUTCTimeZone; + } } @@ -431,7 +437,11 @@ private void flushStripe() throws IOException { OrcProto.StripeFooter.Builder builder = OrcProto.StripeFooter.newBuilder(); if (writeTimeZone) { - builder.setWriterTimezone(TimeZone.getDefault().getID()); + if (useUTCTimeZone) { + builder.setWriterTimezone(TimeZone.getTimeZone("UTC").getID()); + } else { + builder.setWriterTimezone(TimeZone.getDefault().getID()); + } } OrcProto.StripeStatistics.Builder stats = OrcProto.StripeStatistics.newBuilder(); diff --git a/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java index a7bfc902d1..b5a28171c3 100644 --- a/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java +++ b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java @@ -28,7 +28,9 @@ import org.apache.orc.impl.SerializationUtils; import java.io.IOException; -import java.sql.Timestamp; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.TimeZone; public class TimestampTreeWriter extends TreeWriterBase { @@ -39,6 +41,7 @@ public class TimestampTreeWriter extends TreeWriterBase { private final IntegerWriter nanos; private final boolean isDirectV2; private final TimeZone localTimezone; + private final DateFormat localDateFormat; private final long baseEpochSecsLocalTz; public TimestampTreeWriter(int columnId, @@ -54,9 +57,20 @@ public TimestampTreeWriter(int columnId, if (rowIndexPosition != null) { recordPosition(rowIndexPosition); } - this.localTimezone = TimeZone.getDefault(); - // for unit tests to set different time zones - this.baseEpochSecsLocalTz = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND; + if (writer.isUseUTCTimestamp()) { + this.localTimezone = TimeZone.getTimeZone("UTC"); + } else { + this.localTimezone = TimeZone.getDefault(); + } + this.localDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + this.localDateFormat.setTimeZone(this.localTimezone); + try { + this.baseEpochSecsLocalTz = this.localDateFormat + .parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() / + TimestampTreeWriter.MILLIS_PER_SECOND; + } catch (ParseException e) { + throw new IOException("Unable to create base timestamp tree writer", e); + } } @Override diff --git a/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java b/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java index e32c6837d7..57c9917465 100644 --- a/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java +++ b/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java @@ -101,4 +101,6 @@ void writeIndex(StreamName name, void writeBloomFilter(StreamName name, OrcProto.BloomFilterIndex.Builder bloom ) throws IOException; + + boolean isUseUTCTimestamp(); } diff --git a/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java b/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java index ab4fc58d14..3581b2d9ff 100644 --- a/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java +++ b/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java @@ -111,6 +111,7 @@ public class WriterImplV2 implements WriterInternal, MemoryManager.Callback { private final double bloomFilterFpp; private final OrcFile.BloomFilterVersion bloomFilterVersion; private final boolean writeTimeZone; + private final boolean useUTCTimeZone; public WriterImplV2(FileSystem fs, Path path, @@ -132,7 +133,8 @@ public Writer getWriter() { } else { callbackContext = null; } - writeTimeZone = hasTimestamp(schema); + this.writeTimeZone = hasTimestamp(schema); + this.useUTCTimeZone = opts.isUseUTCTimestamp(); this.adjustedStripeSize = opts.getStripeSize(); this.version = opts.getVersion(); this.encodingStrategy = opts.getEncodingStrategy(); @@ -346,6 +348,10 @@ public void writeBloomFilter(StreamName name, physicalWriter.writeBloomFilter(name, bloom, getCustomizedCodec(name.getKind())); } + + public boolean isUseUTCTimestamp() { + return useUTCTimeZone; + } } @@ -373,7 +379,11 @@ private void flushStripe() throws IOException { OrcProto.StripeFooter.Builder builder = OrcProto.StripeFooter.newBuilder(); if (writeTimeZone) { - builder.setWriterTimezone(TimeZone.getDefault().getID()); + if (useUTCTimeZone) { + builder.setWriterTimezone(TimeZone.getTimeZone("UTC").getID()); + } else { + builder.setWriterTimezone(TimeZone.getDefault().getID()); + } } OrcProto.StripeStatistics.Builder stats = OrcProto.StripeStatistics.newBuilder(); diff --git a/java/core/src/test/org/apache/orc/TestOrcTimezone4.java b/java/core/src/test/org/apache/orc/TestOrcTimezone4.java new file mode 100644 index 0000000000..d23f804257 --- /dev/null +++ b/java/core/src/test/org/apache/orc/TestOrcTimezone4.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.sql.Timestamp; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.TimeZone; + +import static junit.framework.Assert.assertEquals; + +/** + * + */ +public class TestOrcTimezone4 { + Path workDir = new Path(System.getProperty("test.tmp.dir", + "target" + File.separator + "test" + File.separator + "tmp")); + Configuration conf; + FileSystem fs; + Path testFilePath; + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + static TimeZone defaultTimeZone = TimeZone.getDefault(); + + public TestOrcTimezone4() { + } + + @Rule + public TestName testCaseName = new TestName(); + + @Before + public void openFileSystem() throws Exception { + conf = new Configuration(); + fs = FileSystem.getLocal(conf); + testFilePath = new Path(workDir, "TestOrcTimezone4." + + testCaseName.getMethodName() + ".orc"); + fs.delete(testFilePath, false); + } + + @After + public void restoreTimeZone() { + TimeZone.setDefault(defaultTimeZone); + } + + @Test + public void testTimestampWriter() throws Exception { + TypeDescription schema = TypeDescription.createTimestamp(); + + TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")); + + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000) + .bufferSize(10000)); + List ts = Lists.newArrayList(); + ts.add("1969-12-31 15:59:56.007"); + ts.add("1969-12-31 16:00:14.007"); + ts.add("1969-12-31 16:00:06.021"); + VectorizedRowBatch batch = schema.createRowBatch(); + TimestampColumnVector times = (TimestampColumnVector) batch.cols[0]; + for (String t : ts) { + long time = formatter.parse(t).getTime(); + times.set(batch.size++, new Timestamp(time)); + } + writer.addRowBatch(batch); + writer.close(); + + Reader reader = OrcFile.createReader(testFilePath, + OrcFile.readerOptions(conf).filesystem(fs).useUTCTimestamp(true)); + formatter.setTimeZone(TimeZone.getTimeZone("UTC")); + RecordReader rows = reader.rows(); + batch = reader.getSchema().createRowBatch(); + times = (TimestampColumnVector) batch.cols[0]; + int idx = 0; + while (rows.nextBatch(batch)) { + for(int r=0; r < batch.size; ++r) { + Timestamp timestamp = times.asScratchTimestamp(r); + assertEquals(ts.get(idx++), formatter.format(timestamp)); + } + } + rows.close(); + } +} From 31d68ac24d3ed59a883d47d6de8d897f3495cd11 Mon Sep 17 00:00:00 2001 From: Jesus Camacho Rodriguez Date: Fri, 4 May 2018 20:11:35 -0700 Subject: [PATCH 2/3] Addressing wgtmac and omalley comments --- .../core/src/java/org/apache/orc/OrcFile.java | 4 +- .../apache/orc/TimestampColumnStatistics.java | 12 ++ .../apache/orc/impl/ColumnStatisticsImpl.java | 14 ++ .../java/org/apache/orc/impl/ReaderImpl.java | 2 +- .../org/apache/orc/impl/RecordReaderImpl.java | 136 +++++++++++++++--- .../apache/orc/impl/TreeReaderFactory.java | 11 +- .../java/org/apache/orc/impl/WriterImpl.java | 6 +- .../orc/impl/writer/TimestampTreeWriter.java | 35 +++-- .../apache/orc/impl/writer/WriterContext.java | 2 +- .../apache/orc/impl/writer/WriterImplV2.java | 6 +- .../apache/orc/impl/TestRecordReaderImpl.java | 74 +++++++--- 11 files changed, 234 insertions(+), 68 deletions(-) diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java index b917faff4d..b07355a970 100644 --- a/java/core/src/java/org/apache/orc/OrcFile.java +++ b/java/core/src/java/org/apache/orc/OrcFile.java @@ -327,7 +327,7 @@ public ReaderOptions useUTCTimestamp(boolean value) { return this; } - public boolean isUseUTCTimestamp() { + public boolean getUseUTCTimestamp() { return useUTCTimestamp; } @@ -783,7 +783,7 @@ public HadoopShims getHadoopShims() { return shims; } - public boolean isUseUTCTimestamp() { + public boolean getUseUTCTimestamp() { return useUTCTimestamp; } } diff --git a/java/core/src/java/org/apache/orc/TimestampColumnStatistics.java b/java/core/src/java/org/apache/orc/TimestampColumnStatistics.java index 27dc49f6c3..b095a68642 100644 --- a/java/core/src/java/org/apache/orc/TimestampColumnStatistics.java +++ b/java/core/src/java/org/apache/orc/TimestampColumnStatistics.java @@ -35,4 +35,16 @@ public interface TimestampColumnStatistics extends ColumnStatistics { * @return maximum value */ Timestamp getMaximum(); + + /** + * Get the minimum value for the column in UTC. + * @return minimum value in UTC + */ + Timestamp getMinimumUTC(); + + /** + * Get the maximum value for the column in UTC. + * @return maximum value in UTC + */ + Timestamp getMaximumUTC(); } diff --git a/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java b/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java index 0cd69f47fb..02bfdfaea9 100644 --- a/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java +++ b/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java @@ -1213,6 +1213,16 @@ public Timestamp getMaximum() { maximum)); } + @Override + public Timestamp getMinimumUTC() { + return minimum == null ? null : new Timestamp(minimum); + } + + @Override + public Timestamp getMaximumUTC() { + return maximum == null ? null : new Timestamp(maximum); + } + @Override public String toString() { StringBuilder buf = new StringBuilder(super.toString()); @@ -1221,6 +1231,10 @@ public String toString() { buf.append(getMinimum()); buf.append(" max: "); buf.append(getMaximum()); + buf.append(" min UTC: "); + buf.append(getMinimumUTC()); + buf.append(" max UTC: "); + buf.append(getMaximumUTC()); } return buf.toString(); } diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java index bd789617f4..19916f3a35 100644 --- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java +++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java @@ -344,7 +344,7 @@ public ReaderImpl(Path path, OrcFile.ReaderOptions options) throws IOException { this.path = path; this.conf = options.getConfiguration(); this.maxLength = options.getMaxLength(); - this.useUTCTimestamp = options.isUseUTCTimestamp(); + this.useUTCTimestamp = options.getUseUTCTimestamp(); FileMetadata fileMetadata = options.getFileMetadata(); if (fileMetadata != null) { this.compressionKind = fileMetadata.getCompressionKind(); diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java index bf8d2917e5..d2ab67f448 100644 --- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java +++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java @@ -210,12 +210,14 @@ protected RecordReaderImpl(ReaderImpl fileReader, SearchArgument sarg = options.getSearchArgument(); if (sarg != null && rowIndexStride != 0) { sargApp = new SargApplier(sarg, - rowIndexStride, - evolution, - writerVersion); + rowIndexStride, + evolution, + writerVersion, + fileReader.useUTCTimestamp); } else { sargApp = null; } + long rows = 0; long skippedRows = 0; long offset = options.getOffset(); @@ -331,6 +333,19 @@ static Location compareToRange(Comparable point, T min, T max) { * @return the object for the maximum value or null if there isn't one */ static Object getMax(ColumnStatistics index) { + return getMax(index, false); + } + + /** + * Get the maximum value out of an index entry. + * Includes option to specify if timestamp column stats values + * should be in UTC. + * @param index + * the index entry + * @param useUTCTimestamp + * @return the object for the maximum value or null if there isn't one + */ + static Object getMax(ColumnStatistics index, boolean useUTCTimestamp) { if (index instanceof IntegerColumnStatistics) { return ((IntegerColumnStatistics) index).getMaximum(); } else if (index instanceof DoubleColumnStatistics) { @@ -342,7 +357,11 @@ static Object getMax(ColumnStatistics index) { } else if (index instanceof DecimalColumnStatistics) { return ((DecimalColumnStatistics) index).getMaximum(); } else if (index instanceof TimestampColumnStatistics) { - return ((TimestampColumnStatistics) index).getMaximum(); + if (useUTCTimestamp) { + return ((TimestampColumnStatistics) index).getMaximumUTC(); + } else { + return ((TimestampColumnStatistics) index).getMaximum(); + } } else if (index instanceof BooleanColumnStatistics) { if (((BooleanColumnStatistics)index).getTrueCount()!=0) { return Boolean.TRUE; @@ -361,6 +380,19 @@ static Object getMax(ColumnStatistics index) { * @return the object for the minimum value or null if there isn't one */ static Object getMin(ColumnStatistics index) { + return getMin(index, false); + } + + /** + * Get the minimum value out of an index entry. + * Includes option to specify if timestamp column stats values + * should be in UTC. + * @param index + * the index entry + * @param useUTCTimestamp + * @return the object for the minimum value or null if there isn't one + */ + static Object getMin(ColumnStatistics index, boolean useUTCTimestamp) { if (index instanceof IntegerColumnStatistics) { return ((IntegerColumnStatistics) index).getMinimum(); } else if (index instanceof DoubleColumnStatistics) { @@ -372,7 +404,11 @@ static Object getMin(ColumnStatistics index) { } else if (index instanceof DecimalColumnStatistics) { return ((DecimalColumnStatistics) index).getMinimum(); } else if (index instanceof TimestampColumnStatistics) { - return ((TimestampColumnStatistics) index).getMinimum(); + if (useUTCTimestamp) { + return ((TimestampColumnStatistics) index).getMinimumUTC(); + } else { + return ((TimestampColumnStatistics) index).getMinimum(); + } } else if (index instanceof BooleanColumnStatistics) { if (((BooleanColumnStatistics)index).getFalseCount()!=0) { return Boolean.FALSE; @@ -402,9 +438,35 @@ static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto, OrcProto.BloomFilter bloomFilter, OrcFile.WriterVersion writerVersion, TypeDescription.Category type) { + return evaluatePredicateProto(statsProto, predicate, kind, encoding, bloomFilter, + writerVersion, type, false); + } + + /** + * Evaluate a predicate with respect to the statistics from the column + * that is referenced in the predicate. + * Includes option to specify if timestamp column stats values + * should be in UTC. + * @param statsProto the statistics for the column mentioned in the predicate + * @param predicate the leaf predicate we need to evaluation + * @param bloomFilter the bloom filter + * @param writerVersion the version of software that wrote the file + * @param type what is the kind of this column + * @param useUTCTimestamp + * @return the set of truth values that may be returned for the given + * predicate. + */ + static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto, + PredicateLeaf predicate, + OrcProto.Stream.Kind kind, + OrcProto.ColumnEncoding encoding, + OrcProto.BloomFilter bloomFilter, + OrcFile.WriterVersion writerVersion, + TypeDescription.Category type, + boolean useUTCTimestamp) { ColumnStatistics cs = ColumnStatisticsImpl.deserialize(statsProto); - Object minValue = getMin(cs); - Object maxValue = getMax(cs); + Object minValue = getMin(cs, useUTCTimestamp); + Object maxValue = getMax(cs, useUTCTimestamp); // files written before ORC-135 stores timestamp wrt to local timezone causing issues with PPD. // disable PPD for timestamp for all old files if (type.equals(TypeDescription.Category.TIMESTAMP)) { @@ -421,7 +483,8 @@ static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto, } } return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull(), - BloomFilterIO.deserialize(kind, encoding, writerVersion, type, bloomFilter)); + BloomFilterIO.deserialize(kind, encoding, writerVersion, type, bloomFilter), + useUTCTimestamp); } /** @@ -435,13 +498,32 @@ static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto, public static TruthValue evaluatePredicate(ColumnStatistics stats, PredicateLeaf predicate, BloomFilter bloomFilter) { - Object minValue = getMin(stats); - Object maxValue = getMax(stats); - return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter); + return evaluatePredicate(stats, predicate, bloomFilter, false); + } + + /** + * Evaluate a predicate with respect to the statistics from the column + * that is referenced in the predicate. + * Includes option to specify if timestamp column stats values + * should be in UTC. + * @param stats the statistics for the column mentioned in the predicate + * @param predicate the leaf predicate we need to evaluation + * @param bloomFilter + * @param useUTCTimestamp + * @return the set of truth values that may be returned for the given + * predicate. + */ + public static TruthValue evaluatePredicate(ColumnStatistics stats, + PredicateLeaf predicate, + BloomFilter bloomFilter, + boolean useUTCTimestamp) { + Object minValue = getMin(stats, useUTCTimestamp); + Object maxValue = getMax(stats, useUTCTimestamp); + return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter, useUTCTimestamp); } static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min, - Object max, boolean hasNull, BloomFilter bloomFilter) { + Object max, boolean hasNull, BloomFilter bloomFilter, boolean useUTCTimestamp) { // if we didn't have any values, everything must have been null if (min == null) { if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) { @@ -462,7 +544,7 @@ static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min, result = evaluatePredicateMinMax(predicate, predObj, minValue, maxValue, hasNull); if (shouldEvaluateBloomFilter(predicate, result, bloomFilter)) { - return evaluatePredicateBloomFilter(predicate, predObj, bloomFilter, hasNull); + return evaluatePredicateBloomFilter(predicate, predObj, bloomFilter, hasNull, useUTCTimestamp); } else { return result; } @@ -581,18 +663,18 @@ private static TruthValue evaluatePredicateMinMax(PredicateLeaf predicate, Objec } private static TruthValue evaluatePredicateBloomFilter(PredicateLeaf predicate, - final Object predObj, BloomFilter bloomFilter, boolean hasNull) { + final Object predObj, BloomFilter bloomFilter, boolean hasNull, boolean useUTCTimestamp) { switch (predicate.getOperator()) { case NULL_SAFE_EQUALS: // null safe equals does not return *_NULL variant. So set hasNull to false - return checkInBloomFilter(bloomFilter, predObj, false); + return checkInBloomFilter(bloomFilter, predObj, false, useUTCTimestamp); case EQUALS: - return checkInBloomFilter(bloomFilter, predObj, hasNull); + return checkInBloomFilter(bloomFilter, predObj, hasNull, useUTCTimestamp); case IN: for (Object arg : predicate.getLiteralList()) { // if atleast one value in IN list exist in bloom filter, qualify the row group/stripe Object predObjItem = getBaseObjectForComparison(predicate.getType(), arg); - TruthValue result = checkInBloomFilter(bloomFilter, predObjItem, hasNull); + TruthValue result = checkInBloomFilter(bloomFilter, predObjItem, hasNull, useUTCTimestamp); if (result == TruthValue.YES_NO_NULL || result == TruthValue.YES_NO) { return result; } @@ -603,7 +685,7 @@ private static TruthValue evaluatePredicateBloomFilter(PredicateLeaf predicate, } } - private static TruthValue checkInBloomFilter(BloomFilter bf, Object predObj, boolean hasNull) { + private static TruthValue checkInBloomFilter(BloomFilter bf, Object predObj, boolean hasNull, boolean useUTCTimestamp) { TruthValue result = hasNull ? TruthValue.NO_NULL : TruthValue.NO; if (predObj instanceof Long) { @@ -621,8 +703,14 @@ private static TruthValue checkInBloomFilter(BloomFilter bf, Object predObj, boo result = TruthValue.YES_NO_NULL; } } else if (predObj instanceof Timestamp) { - if (bf.testLong(SerializationUtils.convertToUtc(TimeZone.getDefault(), ((Timestamp) predObj).getTime()))) { - result = TruthValue.YES_NO_NULL; + if (useUTCTimestamp) { + if (bf.testLong(((Timestamp) predObj).getTime())) { + result = TruthValue.YES_NO_NULL; + } + } else { + if (bf.testLong(SerializationUtils.convertToUtc(TimeZone.getDefault(), ((Timestamp) predObj).getTime()))) { + result = TruthValue.YES_NO_NULL; + } } } else if (predObj instanceof Date) { if (bf.testLong(DateWritable.dateToDays((Date) predObj))) { @@ -774,11 +862,13 @@ public static class SargApplier { private final boolean[] sargColumns; private SchemaEvolution evolution; private final long[] exceptionCount; + private final boolean useUTCTimestamp; public SargApplier(SearchArgument sarg, long rowIndexStride, SchemaEvolution evolution, - OrcFile.WriterVersion writerVersion) { + OrcFile.WriterVersion writerVersion, + boolean useUTCTimestamp) { this.writerVersion = writerVersion; this.sarg = sarg; sargLeaves = sarg.getLeaves(); @@ -797,6 +887,7 @@ public SargApplier(SearchArgument sarg, } this.evolution = evolution; exceptionCount = new long[sargLeaves.size()]; + this.useUTCTimestamp = useUTCTimestamp; } /** @@ -848,7 +939,8 @@ public boolean[] pickRowGroups(StripeInformation stripe, leafValues[pred] = evaluatePredicateProto(stats, predicate, bfk, encodings.get(columnIx), bf, writerVersion, evolution.getFileSchema(). - findSubtype(columnIx).getCategory()); + findSubtype(columnIx).getCategory(), + useUTCTimestamp); } catch (Exception e) { exceptionCount[pred] += 1; if (e instanceof SargCastException) { diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java index ed21318a72..b0fd5a7d98 100644 --- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java +++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java @@ -19,7 +19,6 @@ import java.io.EOFException; import java.io.IOException; -import java.sql.Timestamp; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -57,7 +56,7 @@ public interface Context { boolean isSkipCorrupt(); - boolean isUseUTCTimestamp(); + boolean getUseUTCTimestamp(); String getWriterTimezone(); @@ -107,7 +106,7 @@ public boolean isSkipCorrupt() { } @Override - public boolean isUseUTCTimestamp() { + public boolean getUseUTCTimestamp() { return useUTCTimestamp; } @@ -913,7 +912,7 @@ protected TimestampTreeReader(int columnId, InStream presentStream, InStream dat this.threadLocalDateFormat = new ThreadLocal<>(); this.threadLocalDateFormat.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); this.baseTimestampMap = new HashMap<>(); - if (context.isUseUTCTimestamp()) { + if (context.getUseUTCTimestamp()) { this.readerTimeZone = TimeZone.getTimeZone("UTC"); } else { this.readerTimeZone = TimeZone.getDefault(); @@ -1007,9 +1006,7 @@ public void nextVector(ColumnVector previousVector, TimestampColumnVector result = (TimestampColumnVector) previousVector; super.nextVector(previousVector, isNull, batchSize); - if (context.isUseUTCTimestamp()) { - result.setIsUTC(true); - } + result.setIsUTC(context.getUseUTCTimestamp()); for (int i = 0; i < batchSize; i++) { if (result.noNulls || !result.isNull[i]) { diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java index fdb0467cf5..ed3af1ff24 100644 --- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java +++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java @@ -135,7 +135,7 @@ public Writer getWriter() { callbackContext = null; } this.writeTimeZone = hasTimestamp(schema); - this.useUTCTimeZone = opts.isUseUTCTimestamp(); + this.useUTCTimeZone = opts.getUseUTCTimestamp(); this.adjustedStripeSize = opts.getStripeSize(); this.version = opts.getVersion(); this.encodingStrategy = opts.getEncodingStrategy(); @@ -407,7 +407,7 @@ public void writeBloomFilter(StreamName name, getCustomizedCodec(name.getKind())); } - public boolean isUseUTCTimestamp() { + public boolean getUseUTCTimestamp() { return useUTCTimeZone; } } @@ -438,7 +438,7 @@ private void flushStripe() throws IOException { OrcProto.StripeFooter.newBuilder(); if (writeTimeZone) { if (useUTCTimeZone) { - builder.setWriterTimezone(TimeZone.getTimeZone("UTC").getID()); + builder.setWriterTimezone("UTC"); } else { builder.setWriterTimezone(TimeZone.getDefault().getID()); } diff --git a/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java index b5a28171c3..d8714e7c32 100644 --- a/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java +++ b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java @@ -41,8 +41,8 @@ public class TimestampTreeWriter extends TreeWriterBase { private final IntegerWriter nanos; private final boolean isDirectV2; private final TimeZone localTimezone; - private final DateFormat localDateFormat; private final long baseEpochSecsLocalTz; + private final long baseEpochSecsUTC; public TimestampTreeWriter(int columnId, TypeDescription schema, @@ -57,15 +57,19 @@ public TimestampTreeWriter(int columnId, if (rowIndexPosition != null) { recordPosition(rowIndexPosition); } - if (writer.isUseUTCTimestamp()) { - this.localTimezone = TimeZone.getTimeZone("UTC"); - } else { - this.localTimezone = TimeZone.getDefault(); + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + this.localTimezone = TimeZone.getDefault(); + dateFormat.setTimeZone(this.localTimezone); + try { + this.baseEpochSecsLocalTz = dateFormat + .parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() / + TimestampTreeWriter.MILLIS_PER_SECOND; + } catch (ParseException e) { + throw new IOException("Unable to create base timestamp tree writer", e); } - this.localDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - this.localDateFormat.setTimeZone(this.localTimezone); + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); try { - this.baseEpochSecsLocalTz = this.localDateFormat + this.baseEpochSecsUTC = dateFormat .parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() / TimestampTreeWriter.MILLIS_PER_SECOND; } catch (ParseException e) { @@ -99,7 +103,8 @@ public void writeBatch(ColumnVector vector, int offset, if (millis < 0 && newNanos > 999_999) { millis -= MILLIS_PER_SECOND; } - long utc = SerializationUtils.convertToUtc(localTimezone, millis); + long utc = vec.isUTC() ? + millis : SerializationUtils.convertToUtc(localTimezone, millis); indexStatistics.updateTimestamp(utc); if (createBloomFilter) { if (bloomFilter != null) { @@ -109,7 +114,7 @@ public void writeBatch(ColumnVector vector, int offset, } final long nano = formatNanos(vec.nanos[0]); for (int i = 0; i < length; ++i) { - seconds.write(secs - baseEpochSecsLocalTz); + seconds.write(secs - (vec.isUTC() ? baseEpochSecsUTC : baseEpochSecsLocalTz)); nanos.write(nano); } } @@ -124,8 +129,14 @@ public void writeBatch(ColumnVector vector, int offset, if (millis < 0 && newNanos > 999_999) { millis -= MILLIS_PER_SECOND; } - long utc = SerializationUtils.convertToUtc(localTimezone, millis); - seconds.write(secs - baseEpochSecsLocalTz); + long utc; + if (vec.isUTC()) { + utc = millis; + seconds.write(secs - baseEpochSecsUTC); + } else { + utc = SerializationUtils.convertToUtc(localTimezone, millis); + seconds.write(secs - baseEpochSecsLocalTz); + } nanos.write(formatNanos(newNanos)); indexStatistics.updateTimestamp(utc); if (createBloomFilter) { diff --git a/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java b/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java index 57c9917465..1c8ca1a5c2 100644 --- a/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java +++ b/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java @@ -102,5 +102,5 @@ void writeBloomFilter(StreamName name, OrcProto.BloomFilterIndex.Builder bloom ) throws IOException; - boolean isUseUTCTimestamp(); + boolean getUseUTCTimestamp(); } diff --git a/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java b/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java index 3581b2d9ff..8c7b26f868 100644 --- a/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java +++ b/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java @@ -134,7 +134,7 @@ public Writer getWriter() { callbackContext = null; } this.writeTimeZone = hasTimestamp(schema); - this.useUTCTimeZone = opts.isUseUTCTimestamp(); + this.useUTCTimeZone = opts.getUseUTCTimestamp(); this.adjustedStripeSize = opts.getStripeSize(); this.version = opts.getVersion(); this.encodingStrategy = opts.getEncodingStrategy(); @@ -349,7 +349,7 @@ public void writeBloomFilter(StreamName name, getCustomizedCodec(name.getKind())); } - public boolean isUseUTCTimestamp() { + public boolean getUseUTCTimestamp() { return useUTCTimeZone; } } @@ -380,7 +380,7 @@ private void flushStripe() throws IOException { OrcProto.StripeFooter.newBuilder(); if (writeTimeZone) { if (useUTCTimeZone) { - builder.setWriterTimezone(TimeZone.getTimeZone("UTC").getID()); + builder.setWriterTimezone("UTC"); } else { builder.setWriterTimezone(TimeZone.getDefault().getID()); } diff --git a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java index bc85724572..b34e627ae3 100644 --- a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java +++ b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java @@ -38,6 +38,7 @@ import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -433,7 +434,8 @@ static TruthValue evaluateDouble(OrcProto.ColumnStatistics stats, static TruthValue evaluateTimestamp(OrcProto.ColumnStatistics stats, PredicateLeaf predicate, - boolean include135) { + boolean include135, + boolean useUTCTimestamp) { OrcProto.ColumnEncoding encoding = OrcProto.ColumnEncoding.newBuilder() .setKind(OrcProto.ColumnEncoding.Kind.DIRECT) @@ -441,13 +443,14 @@ static TruthValue evaluateTimestamp(OrcProto.ColumnStatistics stats, return RecordReaderImpl.evaluatePredicateProto(stats, predicate, null, encoding, null, include135 ? OrcFile.WriterVersion.ORC_135: OrcFile.WriterVersion.ORC_101, - TypeDescription.Category.TIMESTAMP); + TypeDescription.Category.TIMESTAMP, useUTCTimestamp); } static TruthValue evaluateTimestampBloomfilter(OrcProto.ColumnStatistics stats, PredicateLeaf predicate, BloomFilter bloom, - OrcFile.WriterVersion version) { + OrcFile.WriterVersion version, + boolean useUTCTimestamp) { OrcProto.ColumnEncoding.Builder encoding = OrcProto.ColumnEncoding.newBuilder() .setKind(OrcProto.ColumnEncoding.Kind.DIRECT); @@ -463,7 +466,7 @@ static TruthValue evaluateTimestampBloomfilter(OrcProto.ColumnStatistics stats, BloomFilterIO.serialize(builder, bloom); return RecordReaderImpl.evaluatePredicateProto(stats, predicate, kind, encoding.build(), builder.build(), version, - TypeDescription.Category.TIMESTAMP); + TypeDescription.Category.TIMESTAMP, useUTCTimestamp); } @Test @@ -749,44 +752,44 @@ public void testPredEvalWithTimestampStats() throws Exception { "x", Timestamp.valueOf("2017-01-01 00:00:00"), null); assertEquals(TruthValue.YES_NO, evaluateTimestamp(createTimestampStats("2017-01-01 00:00:00", - "2018-01-01 00:00:00"), pred, true)); + "2018-01-01 00:00:00"), pred, true, false)); pred = createPredicateLeaf(PredicateLeaf.Operator.NULL_SAFE_EQUALS, PredicateLeaf.Type.FLOAT, "x", 15.0, null); assertEquals(TruthValue.YES_NO_NULL, evaluateTimestamp(createTimestampStats("2017-01-01 00:00:00", "2018-01-01 00:00:00"), - pred, true)); + pred, true, false)); assertEquals(TruthValue.YES_NO_NULL, evaluateTimestamp(createTimestampStats("2017-01-01 00:00:00", "2018-01-01 00:00:00"), - pred, true)); + pred, true, false)); // pre orc-135 should always be yes_no_null. pred = createPredicateLeaf(PredicateLeaf.Operator.NULL_SAFE_EQUALS, PredicateLeaf.Type.TIMESTAMP, "x", Timestamp.valueOf("2017-01-01 00:00:00"), null); assertEquals(TruthValue.YES_NO_NULL, evaluateTimestamp(createTimestampStats("2017-01-01 00:00:00", "2017-01-01 00:00:00"), - pred, false)); + pred, false, false)); pred = createPredicateLeaf(PredicateLeaf.Operator.NULL_SAFE_EQUALS, PredicateLeaf.Type.STRING, "x", Timestamp.valueOf("2017-01-01 00:00:00").toString(), null); assertEquals(TruthValue.YES_NO, evaluateTimestamp(createTimestampStats("2017-01-01 00:00:00", "2018-01-01 00:00:00"), - pred, true)); + pred, true, false)); pred = createPredicateLeaf(PredicateLeaf.Operator.NULL_SAFE_EQUALS, PredicateLeaf.Type.DATE, "x", Date.valueOf("2016-01-01"), null); assertEquals(TruthValue.NO, evaluateTimestamp(createTimestampStats("2017-01-01 00:00:00", "2017-01-01 00:00:00"), - pred, true)); + pred, true, false)); assertEquals(TruthValue.YES_NO, evaluateTimestamp(createTimestampStats("2015-01-01 00:00:00", "2016-01-01 00:00:00"), - pred, true)); + pred, true, false)); pred = createPredicateLeaf(PredicateLeaf.Operator.NULL_SAFE_EQUALS, PredicateLeaf.Type.DECIMAL, "x", new HiveDecimalWritable("15"), null); assertEquals(TruthValue.YES_NO_NULL, evaluateTimestamp(createTimestampStats("2015-01-01 00:00:00", "2016-01-01 00:00:00"), - pred, true)); + pred, true, false)); } @Test @@ -1068,13 +1071,50 @@ public void testTimestampStatsOldFiles() throws Exception { "x", Timestamp.valueOf("2000-01-01 00:00:00"), null); OrcProto.ColumnStatistics cs = createTimestampStats("2000-01-01 00:00:00", "2001-01-01 00:00:00"); assertEquals(TruthValue.YES_NO_NULL, - evaluateTimestampBloomfilter(cs, pred, new BloomFilterUtf8(10000, 0.01), OrcFile.WriterVersion.ORC_101)); + evaluateTimestampBloomfilter(cs, pred, new BloomFilterUtf8(10000, 0.01), OrcFile.WriterVersion.ORC_101, false)); BloomFilterUtf8 bf = new BloomFilterUtf8(10, 0.05); bf.addLong(getUtcTimestamp("2000-06-01 00:00:00")); assertEquals(TruthValue.NO_NULL, - evaluateTimestampBloomfilter(cs, pred, bf, OrcFile.WriterVersion.ORC_135)); + evaluateTimestampBloomfilter(cs, pred, bf, OrcFile.WriterVersion.ORC_135, false)); assertEquals(TruthValue.YES_NO_NULL, - evaluateTimestampBloomfilter(cs, pred, bf, OrcFile.WriterVersion.ORC_101)); + evaluateTimestampBloomfilter(cs, pred, bf, OrcFile.WriterVersion.ORC_101, false)); + } + + @Test + public void testTimestampUTC() throws Exception { + DateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + f.setTimeZone(TimeZone.getTimeZone("UTC")); + PredicateLeaf pred = createPredicateLeaf + (PredicateLeaf.Operator.EQUALS, PredicateLeaf.Type.TIMESTAMP, + "x", new Timestamp(f.parse("2015-01-01 00:00:00").getTime()), null); + PredicateLeaf pred2 = createPredicateLeaf + (PredicateLeaf.Operator.EQUALS, PredicateLeaf.Type.TIMESTAMP, + "x", new Timestamp(f.parse("2014-12-31 23:59:59").getTime()), null); + PredicateLeaf pred3 = createPredicateLeaf + (PredicateLeaf.Operator.EQUALS, PredicateLeaf.Type.TIMESTAMP, + "x", new Timestamp(f.parse("2016-01-01 00:00:01").getTime()), null); + OrcProto.ColumnStatistics cs = createTimestampStats("2015-01-01 00:00:00", "2016-01-01 00:00:00"); + + assertEquals(TruthValue.YES_NO_NULL, + evaluateTimestamp(cs, pred, true, true)); + assertEquals(TruthValue.NO_NULL, + evaluateTimestamp(cs, pred2, true, true)); + assertEquals(TruthValue.NO_NULL, + evaluateTimestamp(cs, pred3, true, true)); + + assertEquals(TruthValue.NO_NULL, + evaluateTimestampBloomfilter(cs, pred, new BloomFilterUtf8(10000, 0.01), OrcFile.WriterVersion.ORC_135, true)); + assertEquals(TruthValue.NO_NULL, + evaluateTimestampBloomfilter(cs, pred2, new BloomFilterUtf8(10000, 0.01), OrcFile.WriterVersion.ORC_135, true)); + + BloomFilterUtf8 bf = new BloomFilterUtf8(10, 0.05); + bf.addLong(getUtcTimestamp("2015-06-01 00:00:00")); + assertEquals(TruthValue.NO_NULL, + evaluateTimestampBloomfilter(cs, pred, bf, OrcFile.WriterVersion.ORC_135, true)); + + bf.addLong(getUtcTimestamp("2015-01-01 00:00:00")); + assertEquals(TruthValue.YES_NO_NULL, + evaluateTimestampBloomfilter(cs, pred, bf, OrcFile.WriterVersion.ORC_135, true)); } private static long getUtcTimestamp(String ts) { @@ -1971,7 +2011,7 @@ public void testPickRowGroups() throws Exception { .end().build(); RecordReaderImpl.SargApplier applier = new RecordReaderImpl.SargApplier(sarg, 1000, evolution, - OrcFile.WriterVersion.ORC_135); + OrcFile.WriterVersion.ORC_135, false); OrcProto.StripeInformation stripe = OrcProto.StripeInformation.newBuilder().setNumberOfRows(4000).build(); OrcProto.RowIndex[] indexes = new OrcProto.RowIndex[3]; @@ -2019,7 +2059,7 @@ public void testPickRowGroupsError() throws Exception { .end().build(); RecordReaderImpl.SargApplier applier = new RecordReaderImpl.SargApplier(sarg, 1000, evolution, - OrcFile.WriterVersion.ORC_135); + OrcFile.WriterVersion.ORC_135, false); OrcProto.StripeInformation stripe = OrcProto.StripeInformation.newBuilder().setNumberOfRows(3000).build(); OrcProto.RowIndex[] indexes = new OrcProto.RowIndex[3]; From 83993e8053990e613bb38a6679b0b31a29ff664c Mon Sep 17 00:00:00 2001 From: Jesus Camacho Rodriguez Date: Fri, 4 May 2018 20:23:12 -0700 Subject: [PATCH 3/3] fixup --- .../apache/orc/impl/writer/TimestampTreeWriter.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java index d8714e7c32..0f30d07757 100644 --- a/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java +++ b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java @@ -40,6 +40,7 @@ public class TimestampTreeWriter extends TreeWriterBase { private final IntegerWriter seconds; private final IntegerWriter nanos; private final boolean isDirectV2; + private boolean useUTCTimestamp; private final TimeZone localTimezone; private final long baseEpochSecsLocalTz; private final long baseEpochSecsUTC; @@ -57,6 +58,7 @@ public TimestampTreeWriter(int columnId, if (rowIndexPosition != null) { recordPosition(rowIndexPosition); } + this.useUTCTimestamp = writer.getUseUTCTimestamp(); DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); this.localTimezone = TimeZone.getDefault(); dateFormat.setTimeZone(this.localTimezone); @@ -114,7 +116,7 @@ public void writeBatch(ColumnVector vector, int offset, } final long nano = formatNanos(vec.nanos[0]); for (int i = 0; i < length; ++i) { - seconds.write(secs - (vec.isUTC() ? baseEpochSecsUTC : baseEpochSecsLocalTz)); + seconds.write(secs - (useUTCTimestamp ? baseEpochSecsUTC : baseEpochSecsLocalTz)); nanos.write(nano); } } @@ -129,12 +131,11 @@ public void writeBatch(ColumnVector vector, int offset, if (millis < 0 && newNanos > 999_999) { millis -= MILLIS_PER_SECOND; } - long utc; - if (vec.isUTC()) { - utc = millis; + long utc = vec.isUTC() ? + millis : SerializationUtils.convertToUtc(localTimezone, millis); + if (useUTCTimestamp) { seconds.write(secs - baseEpochSecsUTC); } else { - utc = SerializationUtils.convertToUtc(localTimezone, millis); seconds.write(secs - baseEpochSecsLocalTz); } nanos.write(formatNanos(newNanos));