From 92a123f8629afc74ee06879867736f483d02a58d Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 15 Nov 2017 12:44:10 -0600 Subject: [PATCH 01/15] wip --- .../resources/test-data/impala_timestamp.parq | Bin 0 -> 374 bytes .../parquet/ParquetInteroperabilitySuite.scala | 9 +++++++++ 2 files changed, 9 insertions(+) create mode 100644 sql/core/src/test/resources/test-data/impala_timestamp.parq diff --git a/sql/core/src/test/resources/test-data/impala_timestamp.parq b/sql/core/src/test/resources/test-data/impala_timestamp.parq new file mode 100644 index 0000000000000000000000000000000000000000..21e5318db98c9d0e328bd7c00999a2d1d4bce86d GIT binary patch literal 374 zcmWG=3^EjD5%m!D@eyScWno}Y>0vlDDB$vr3=ARJK(^Zj-M^d+ z4EJ(W8AKUGMMNcZKw5wpsDMj_iGhKEjgg62g+Xlvql_qs2UdB$a07 zq$pUJ8z!ctrJ1FerJ9;0nVP4h8XFs!TBf8X8>N`1r5anL8X23JC8cOe%E$n{avF#O HfFTY5EiXyU literal 0 HcmV?d00001 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index 9dc56292c3720..ec5cb9652e5f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -87,4 +87,13 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS Row(Seq(2, 3)))) } } + + val ImpalaFile = "test-data/impala_timestamp.parq" + test("parquet timestamp conversion") { + // make a table with one parquet file written by impala, and one parquet file written by spark + // we should only adjust the timestamps in the impala file, and only if the conf is set + val impalaFile = Thread.currentThread().getContextClassLoader.getResource(ImpalaFile).toString() + spark.read.parquet(impalaFile).show() + pending + } } From 75cb1f4c35f9b15f810ac8d86fb72799b9394c90 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 15 Nov 2017 16:48:18 -0600 Subject: [PATCH 02/15] test and a bunch of plumbing in place --- .../apache/spark/sql/internal/SQLConf.scala | 7 ++ .../parquet/ParquetFileFormat.scala | 13 ++++ .../parquet/ParquetReadSupport.scala | 3 +- .../parquet/ParquetRecordMaterializer.scala | 6 +- .../parquet/ParquetRowConverter.scala | 5 +- .../ParquetInteroperabilitySuite.scala | 74 +++++++++++++++++-- 6 files changed, 99 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ce9cc562b220f..7a17d1fe72d6a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -291,6 +291,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val PARQUET_SKIP_TIMESTAMP_CONVERSION = buildConf("spark.sql.parquet.skipTimestampConversion") + .doc("This controls whether timestamp adjustments should be applied to INT96 data when " + + "converting to timestamps, for data written by Impala. This is necessary because Impala" + + "stores INT96 data with a different offset than Hive & Spark.") + .booleanConf + .createWithDefault(false) + object ParquetOutputTimestampType extends Enumeration { val INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS = Value } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 2b1064955a777..b2f704b6bf4e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -345,6 +345,8 @@ class ParquetFileFormat resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) val enableRecordFilter: Boolean = sparkSession.sessionState.conf.parquetRecordFilterEnabled + val timestampConversion = + sparkSession.sessionState.conf.getConf(SQLConf.PARQUET_SKIP_TIMESTAMP_CONVERSION) // Whole stage codegen (PhysicalRDD) is able to deal with batches directly val returningBatch = supportBatch(sparkSession, resultSchema) @@ -363,6 +365,17 @@ class ParquetFileFormat fileSplit.getLocations, null) + val convertTimestamp = + if (timestampConversion) { + val footer = + ParquetFileReader.readFooter(broadcastedHadoopConf.value.value, fileSplit.getPath) + val cb = footer.getFileMetaData().getCreatedBy() + logWarning(s"${fileSplit.getPath}.cb = $cb") + cb.startsWith("parquet-mr") + } else { + false + } + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 2854cb1bc0c25..2568b22e8fc8c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -95,7 +95,8 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo new ParquetRecordMaterializer( parquetRequestedSchema, ParquetReadSupport.expandUDT(catalystRequestedSchema), - new ParquetToSparkSchemaConverter(conf)) + new ParquetToSparkSchemaConverter(conf), + conf) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala index 793755e9aaeb5..fb5c009979643 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet +import org.apache.hadoop.conf.Configuration import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer} import org.apache.parquet.schema.MessageType @@ -33,11 +34,12 @@ import org.apache.spark.sql.types.StructType private[parquet] class ParquetRecordMaterializer( parquetSchema: MessageType, catalystSchema: StructType, - schemaConverter: ParquetToSparkSchemaConverter) + schemaConverter: ParquetToSparkSchemaConverter, + hadoopConf: Configuration) extends RecordMaterializer[UnsafeRow] { private val rootConverter = - new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, NoopUpdater) + new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, hadoopConf, NoopUpdater) override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 10f6c3b4f15e3..09518a1297e6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -23,6 +23,7 @@ import java.nio.ByteOrder import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import org.apache.hadoop.conf.Configuration import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type} @@ -117,12 +118,14 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd * @param parquetType Parquet schema of Parquet records * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined * types should have been expanded. + * @param hadoopConf a hadoop Configuration for passing any extra parameters for parquet conversion * @param updater An updater which propagates converted field values to the parent container */ private[parquet] class ParquetRowConverter( schemaConverter: ParquetToSparkSchemaConverter, parquetType: GroupType, catalystType: StructType, + hadoopConf: Configuration, updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) with Logging { @@ -309,7 +312,7 @@ private[parquet] class ParquetRowConverter( case t: StructType => new ParquetRowConverter( - schemaConverter, parquetType.asGroupType(), t, new ParentContainerUpdater { + schemaConverter, parquetType.asGroupType(), t, hadoopConf, new ParentContainerUpdater { override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) }) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index ec5cb9652e5f5..a2ea6d7c5cff0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -18,8 +18,13 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.File +import java.util.TimeZone + +import org.apache.commons.io.FileUtils import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedSQLContext { @@ -90,10 +95,69 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS val ImpalaFile = "test-data/impala_timestamp.parq" test("parquet timestamp conversion") { - // make a table with one parquet file written by impala, and one parquet file written by spark - // we should only adjust the timestamps in the impala file, and only if the conf is set - val impalaFile = Thread.currentThread().getContextClassLoader.getResource(ImpalaFile).toString() - spark.read.parquet(impalaFile).show() - pending + // Make a table with one parquet file written by impala, and one parquet file written by spark. + // We should only adjust the timestamps in the impala file, and only if the conf is set + + // here's the timestamps in the impala file, as they were saved by impala + val impalaFileData = + Seq( + "2001-01-01 01:01:01", + "2002-02-02 02:02:02", + "2003-03-03 03:03:03" + ).map { s => java.sql.Timestamp.valueOf(s) } + val impalaFile = Thread.currentThread().getContextClassLoader.getResource(ImpalaFile) + .toURI.getPath + withTempPath { tableDir => + val ts = Seq( + "2004-04-04 04:04:04", + "2005-05-05 05:05:05", + "2006-06-06 06:06:06" + ).map { s => java.sql.Timestamp.valueOf(s) } + val s = spark + import s.implicits._ + // match the column names of the file from impala + val df = spark.createDataset(ts).toDF().repartition(1).withColumnRenamed("value", "ts") + val schema = df.schema + df.write.parquet(tableDir.getAbsolutePath) + FileUtils.copyFile(new File(impalaFile), new File(tableDir, "part-00001.parq")) + +// Seq(false).foreach { applyConversion => + Seq(false, true).foreach { applyConversion => + Seq(false, true).foreach { vectorized => + withSQLConf( + (SQLConf.PARQUET_SKIP_TIMESTAMP_CONVERSION.key, applyConversion.toString()), + (SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized.toString()) + ) { + val read = spark.read.parquet(tableDir.getAbsolutePath).collect() + assert(read.size === 6) + // if we apply the conversion, we'll get the "right" values, as saved by impala in the + // original file. Otherwise, they're off by the local timezone offset, set to + // America/Los_Angeles in tests + val impalaExpectations = if (applyConversion) { + impalaFileData + } else { + impalaFileData.map { ts => + DateTimeUtils.toJavaTimestamp(DateTimeUtils.convertTz( + DateTimeUtils.fromJavaTimestamp(ts), + TimeZone.getTimeZone("UTC"), + TimeZone.getDefault())) + } + } + val fullExpectations = (ts ++ impalaExpectations).map { + _.toString() + }.sorted.toArray + val actual = read.map { + _.getTimestamp(0).toString() + }.sorted + withClue(s"applyConversion = $applyConversion; vectorized = $vectorized") { + assert(fullExpectations === actual) + + // TODO run query with a filter, make sure pushdown is OK + // https://github.com/apache/spark/pull/16781/files#diff-1e55698cc579cbae676f827a89c2dc2eR449 + } + } + } + } + } } } From 5cad681beb892cec1346beba9b4a3230c3664e16 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 16 Nov 2017 12:38:10 -0600 Subject: [PATCH 03/15] works, needs cleanup --- .../parquet/VectorizedColumnReader.java | 13 +++++++++++-- .../VectorizedParquetRecordReader.java | 15 +++++++++++++-- .../parquet/ParquetFileFormat.scala | 14 +++++++++----- .../parquet/ParquetReadSupport.scala | 1 + .../parquet/ParquetRowConverter.scala | 19 ++++++++++++++++++- .../ParquetInteroperabilitySuite.scala | 1 - 6 files changed, 52 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 71ca8b1b96a98..751c2341ba000 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.parquet; import java.io.IOException; +import java.util.TimeZone; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.ColumnDescriptor; @@ -93,13 +94,18 @@ public class VectorizedColumnReader { private final PageReader pageReader; private final ColumnDescriptor descriptor; private final OriginalType originalType; + // The timezone conversion to apply to int96 timestamps. Null if no conversion. + private final TimeZone convertTz; + private final static TimeZone UTC = TimeZone.getTimeZone("UTC"); public VectorizedColumnReader( ColumnDescriptor descriptor, OriginalType originalType, - PageReader pageReader) throws IOException { + PageReader pageReader, + TimeZone convertTz) throws IOException { this.descriptor = descriptor; this.pageReader = pageReader; + this.convertTz = convertTz; this.originalType = originalType; this.maxDefLevel = descriptor.getMaxDefinitionLevel(); @@ -298,7 +304,10 @@ private void decodeDictionaryIds( // TODO: Convert dictionary of Binaries to dictionary of Longs if (!column.isNullAt(i)) { Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v)); + long rawTime = ParquetRowConverter.binaryToSQLTimestamp(v); + long adjTime = + convertTz == null ? rawTime : DateTimeUtils.convertTz(rawTime, convertTz, UTC); + column.putLong(i, adjTime); } } } else { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 669d71e60779d..8d2932f251632 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -20,7 +20,9 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.TimeZone; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.parquet.column.ColumnDescriptor; @@ -34,6 +36,7 @@ import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -109,6 +112,8 @@ public VectorizedParquetRecordReader(boolean useOffHeap) { MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP; } + private TimeZone convertTz = null; + /** * Implementation of RecordReader API. */ @@ -116,6 +121,12 @@ public VectorizedParquetRecordReader(boolean useOffHeap) { public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException, UnsupportedOperationException { super.initialize(inputSplit, taskAttemptContext); + Configuration hadoopConf = taskAttemptContext.getConfiguration(); + boolean doTsConversion = + Boolean.valueOf(hadoopConf.get(SQLConf.PARQUET_SKIP_TIMESTAMP_CONVERSION().key())); + if (doTsConversion) { + convertTz = TimeZone.getTimeZone(hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE().key())); + } initializeInternal(); } @@ -291,8 +302,8 @@ private void checkEndOfRowGroup() throws IOException { columnReaders = new VectorizedColumnReader[columns.size()]; for (int i = 0; i < columns.size(); ++i) { if (missingColumns[i]) continue; - columnReaders[i] = new VectorizedColumnReader( - columns.get(i), types.get(i).getOriginalType(), pages.getPageReader(columns.get(i))); + columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(), + pages.getPageReader(columns.get(i)), convertTz); } totalCountLoadedSoFar += pages.getRowCount(); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index b2f704b6bf4e6..7159f5383fb1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -307,6 +307,10 @@ class ParquetFileFormat hadoopConf.set( ParquetWriteSupport.SPARK_ROW_SCHEMA, requiredSchema.json) + hadoopConf.set( + SQLConf.SESSION_LOCAL_TIMEZONE.key, + sparkSession.sessionState.conf.sessionLocalTimeZone + ) ParquetWriteSupport.setSchema(requiredSchema, hadoopConf) @@ -365,20 +369,20 @@ class ParquetFileFormat fileSplit.getLocations, null) + val hadoopConf = broadcastedHadoopConf.value.value val convertTimestamp = if (timestampConversion) { - val footer = - ParquetFileReader.readFooter(broadcastedHadoopConf.value.value, fileSplit.getPath) + val footer = ParquetFileReader.readFooter(hadoopConf, fileSplit.getPath, SKIP_ROW_GROUPS) val cb = footer.getFileMetaData().getCreatedBy() - logWarning(s"${fileSplit.getPath}.cb = $cb") - cb.startsWith("parquet-mr") + !cb.startsWith("parquet-mr") } else { false } + hadoopConf.set(SQLConf.PARQUET_SKIP_TIMESTAMP_CONVERSION.key, convertTimestamp.toString) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = - new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) + new TaskAttemptContextImpl(hadoopConf, attemptId) // Try to push down filters when filter push-down is enabled. // Notice: This push-down is RowGroups level, not individual records. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 2568b22e8fc8c..00f44e74e8c65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -30,6 +30,7 @@ import org.apache.parquet.schema.Type.Repetition import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 09518a1297e6f..aa2c389307e12 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.math.{BigDecimal, BigInteger} import java.nio.ByteOrder +import java.util.TimeZone import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -35,6 +36,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -154,6 +156,15 @@ private[parquet] class ParquetRowConverter( |${catalystType.prettyJson} """.stripMargin) + /** + * If true, we adjust parsed timestamps based on the delta between UTC and the current TZ. This + * is *not* the raw value in the sql conf -- we've already looked at the file metadata to decide + * if conversion applies to this file by this point. + */ + val convertTs = hadoopConf.get(SQLConf.PARQUET_SKIP_TIMESTAMP_CONVERSION.key).toBoolean + val sessionTz = TimeZone.getTimeZone(hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)) + val UTC = TimeZone.getTimeZone("UTC") + /** * Updater used together with field converters within a [[ParquetRowConverter]]. It propagates * converted filed values to the `ordinal`-th cell in `currentRow`. @@ -282,7 +293,13 @@ private[parquet] class ParquetRowConverter( val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) val timeOfDayNanos = buf.getLong val julianDay = buf.getInt - updater.setLong(DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)) + val rawTime = DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos) + val adjTime = if (convertTs) { + DateTimeUtils.convertTz(rawTime, sessionTz, UTC) + } else { + rawTime + } + updater.setLong(adjTime) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index a2ea6d7c5cff0..4b824c8d07006 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -121,7 +121,6 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS df.write.parquet(tableDir.getAbsolutePath) FileUtils.copyFile(new File(impalaFile), new File(tableDir, "part-00001.parq")) -// Seq(false).foreach { applyConversion => Seq(false, true).foreach { applyConversion => Seq(false, true).foreach { vectorized => withSQLConf( From 697a187621ef50969573ce7fc90e03e3f22e7979 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 16 Nov 2017 13:35:24 -0600 Subject: [PATCH 04/15] cleanup, test for predicate pushdown --- .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../VectorizedParquetRecordReader.java | 4 +- .../parquet/ParquetFileFormat.scala | 27 ++++++---- .../parquet/ParquetRowConverter.scala | 2 +- .../ParquetInteroperabilitySuite.scala | 51 ++++++++++++++++++- 5 files changed, 72 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7a17d1fe72d6a..01c557497c16a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -291,7 +291,7 @@ object SQLConf { .booleanConf .createWithDefault(true) - val PARQUET_SKIP_TIMESTAMP_CONVERSION = buildConf("spark.sql.parquet.skipTimestampConversion") + val PARQUET_INT96_TIMESTAMP_CONVERSION = buildConf("spark.sql.parquet.int96TimestampConversion") .doc("This controls whether timestamp adjustments should be applied to INT96 data when " + "converting to timestamps, for data written by Impala. This is necessary because Impala" + "stores INT96 data with a different offset than Hive & Spark.") diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 8d2932f251632..1e80397fe15bc 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -122,8 +122,10 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont throws IOException, InterruptedException, UnsupportedOperationException { super.initialize(inputSplit, taskAttemptContext); Configuration hadoopConf = taskAttemptContext.getConfiguration(); + // this is *not* the raw value in the sql conf -- we've already looked at the createdBy field + // for this particular file and adjusted that value accordingly by this point. boolean doTsConversion = - Boolean.valueOf(hadoopConf.get(SQLConf.PARQUET_SKIP_TIMESTAMP_CONVERSION().key())); + Boolean.valueOf(hadoopConf.get(SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION().key())); if (doTsConversion) { convertTz = TimeZone.getTimeZone(hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE().key())); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 7159f5383fb1e..2db7a48480618 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -350,7 +350,7 @@ class ParquetFileFormat val enableRecordFilter: Boolean = sparkSession.sessionState.conf.parquetRecordFilterEnabled val timestampConversion = - sparkSession.sessionState.conf.getConf(SQLConf.PARQUET_SKIP_TIMESTAMP_CONVERSION) + sparkSession.sessionState.conf.getConf(SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION) // Whole stage codegen (PhysicalRDD) is able to deal with batches directly val returningBatch = supportBatch(sparkSession, resultSchema) @@ -369,20 +369,29 @@ class ParquetFileFormat fileSplit.getLocations, null) - val hadoopConf = broadcastedHadoopConf.value.value - val convertTimestamp = + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' + // *only* if the file was created by something other than "parquet-mr", so check the actual + // writer here for this file. We have to do this per-file, as each file in the table may + // have different writers. Sadly, this also means we have to clone the hadoopConf, as + // different threads may want different values. We have to use the hadoopConf as its + // our only way to pass value to ParquetReadSupport.init + val localHadoopConf = if (timestampConversion) { - val footer = ParquetFileReader.readFooter(hadoopConf, fileSplit.getPath, SKIP_ROW_GROUPS) - val cb = footer.getFileMetaData().getCreatedBy() - !cb.startsWith("parquet-mr") + val confCopy = new Configuration(broadcastedHadoopConf.value.value) + val footer = + ParquetFileReader.readFooter(confCopy, fileSplit.getPath, SKIP_ROW_GROUPS) + val doConversion = !footer.getFileMetaData().getCreatedBy().startsWith("parquet-mr") + confCopy.set(SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, doConversion.toString()) + confCopy } else { - false + // Doesn't matter what the parquet metadata says, no thread is going to do any conversion, + // so we don't need to make a copy of the conf here. + broadcastedHadoopConf.value.value } - hadoopConf.set(SQLConf.PARQUET_SKIP_TIMESTAMP_CONVERSION.key, convertTimestamp.toString) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = - new TaskAttemptContextImpl(hadoopConf, attemptId) + new TaskAttemptContextImpl(localHadoopConf, attemptId) // Try to push down filters when filter push-down is enabled. // Notice: This push-down is RowGroups level, not individual records. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index aa2c389307e12..4f3eb0a1bb72b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -161,7 +161,7 @@ private[parquet] class ParquetRowConverter( * is *not* the raw value in the sql conf -- we've already looked at the file metadata to decide * if conversion applies to this file by this point. */ - val convertTs = hadoopConf.get(SQLConf.PARQUET_SKIP_TIMESTAMP_CONVERSION.key).toBoolean + val convertTs = hadoopConf.get(SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key).toBoolean val sessionTz = TimeZone.getTimeZone(hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)) val UTC = TimeZone.getTimeZone("UTC") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index 4b824c8d07006..4a3e2e7a2a826 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -21,6 +21,10 @@ import java.io.File import java.util.TimeZone import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} +import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -124,7 +128,7 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS Seq(false, true).foreach { applyConversion => Seq(false, true).foreach { vectorized => withSQLConf( - (SQLConf.PARQUET_SKIP_TIMESTAMP_CONVERSION.key, applyConversion.toString()), + (SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, applyConversion.toString()), (SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized.toString()) ) { val read = spark.read.parquet(tableDir.getAbsolutePath).collect() @@ -152,7 +156,50 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS assert(fullExpectations === actual) // TODO run query with a filter, make sure pushdown is OK - // https://github.com/apache/spark/pull/16781/files#diff-1e55698cc579cbae676f827a89c2dc2eR449 + // Now test that the behavior is still correct even with a filter which could get + // pushed down into parquet. We don't need extra handling for pushed down + // predicates because (a) in ParquetFilters, we ignore TimestampType and (b) parquet + // does not read statistics from int96 fields, as they are unsigned. See + // scalastyle:off line.size.limit + // https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L419 + // https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L348 + // scalastyle:on line.size.limit + // + // Just to be defensive in case anything ever changes in parquet, this test checks + // the assumption on column stats, and also the end-to-end behavior. + + val hadoopConf = sparkContext.hadoopConfiguration + val fs = FileSystem.get(hadoopConf) + val parts = fs.listStatus(new Path(tableDir.getAbsolutePath), new PathFilter { + override def accept(path: Path): Boolean = !path.getName.startsWith("_") + }) + // grab the meta data from the parquet file. The next section of asserts just make + // sure the test is configured correctly. + assert(parts.size == 2) + parts.map { part => + val oneFooter = + ParquetFileReader.readFooter(hadoopConf, part.getPath, NO_FILTER) + assert(oneFooter.getFileMetaData.getSchema.getColumns.size === 1) + assert(oneFooter.getFileMetaData.getSchema.getColumns.get(0).getType() === + PrimitiveTypeName.INT96) + val oneBlockMeta = oneFooter.getBlocks().get(0) + val oneBlockColumnMeta = oneBlockMeta.getColumns().get(0) + val columnStats = oneBlockColumnMeta.getStatistics + // This is the important assert. Column stats are written, but they are ignored + // when the data is read back as mentioned above, b/c int96 is unsigned. This + // assert makes sure this holds even if we change parquet versions (if eg. there + // were ever statistics even on unsigned columns). + assert(columnStats.isEmpty) + } + + // These queries should return the entire dataset with the conversion applied, + // but if the predicates were applied to the raw values in parquet, they would + // incorrectly filter data out. + val query = spark.read.parquet(tableDir.getAbsolutePath) + .where(s"ts > '2001-01-01 01:00:00'") + val countWithFilter = query.count() + val exp = if (applyConversion) 6 else 5 + assert(countWithFilter === exp, query) } } } From e2df4592d221db380b6e12386f2d99fede37d748 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 16 Nov 2017 15:59:49 -0600 Subject: [PATCH 05/15] fix when conf is unset --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 4 +++- .../datasources/parquet/ParquetInteroperabilitySuite.scala | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 2db7a48480618..e32f32fa8f7ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -386,7 +386,9 @@ class ParquetFileFormat } else { // Doesn't matter what the parquet metadata says, no thread is going to do any conversion, // so we don't need to make a copy of the conf here. - broadcastedHadoopConf.value.value + val conf = broadcastedHadoopConf.value.value + conf.set(SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, "false") + conf } val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index 4a3e2e7a2a826..c1f0a46e9d9ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -155,7 +155,6 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS withClue(s"applyConversion = $applyConversion; vectorized = $vectorized") { assert(fullExpectations === actual) - // TODO run query with a filter, make sure pushdown is OK // Now test that the behavior is still correct even with a filter which could get // pushed down into parquet. We don't need extra handling for pushed down // predicates because (a) in ParquetFilters, we ignore TimestampType and (b) parquet From 08828e3ac35c8d04116f077d1edb585e67783b72 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 17 Nov 2017 00:27:22 -0600 Subject: [PATCH 06/15] review feedback --- .../sql/catalyst/util/DateTimeUtils.scala | 1 + .../apache/spark/sql/internal/SQLConf.scala | 6 ++- .../parquet/VectorizedColumnReader.java | 10 +++-- .../VectorizedParquetRecordReader.java | 25 ++++++----- .../parquet/ParquetFileFormat.scala | 41 ++++++++----------- .../parquet/ParquetReadSupport.scala | 14 +++++-- .../parquet/ParquetRecordMaterializer.scala | 7 ++-- .../parquet/ParquetRowConverter.scala | 23 +++-------- .../ParquetInteroperabilitySuite.scala | 40 ++++++++---------- 9 files changed, 78 insertions(+), 89 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 746c3e8950f7b..b1ed25645b36c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -61,6 +61,7 @@ object DateTimeUtils { final val YearZero = -17999 final val toYearZero = to2001 + 7304850 final val TimeZoneGMT = TimeZone.getTimeZone("GMT") + final val TimeZoneUTC = TimeZone.getTimeZone("UTC") final val MonthOf31Days = Set(1, 3, 5, 7, 8, 10, 12) val TIMEZONE_OPTION = "timeZone" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 01c557497c16a..1121444cc938a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -293,8 +293,8 @@ object SQLConf { val PARQUET_INT96_TIMESTAMP_CONVERSION = buildConf("spark.sql.parquet.int96TimestampConversion") .doc("This controls whether timestamp adjustments should be applied to INT96 data when " + - "converting to timestamps, for data written by Impala. This is necessary because Impala" + - "stores INT96 data with a different offset than Hive & Spark.") + "converting to timestamps, for data written by Impala. This is necessary because Impala " + + "stores INT96 data with a different timezone offset than Hive & Spark.") .booleanConf .createWithDefault(false) @@ -1213,6 +1213,8 @@ class SQLConf extends Serializable with Logging { def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP) + def isParquetINT96TimestampConversion: Boolean = getConf(PARQUET_INT96_TIMESTAMP_CONVERSION) + def isParquetINT64AsTimestampMillis: Boolean = getConf(PARQUET_INT64_AS_TIMESTAMP_MILLIS) def parquetOutputTimestampType: ParquetOutputTimestampType.Value = { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 751c2341ba000..583fd15d1538f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -96,7 +96,7 @@ public class VectorizedColumnReader { private final OriginalType originalType; // The timezone conversion to apply to int96 timestamps. Null if no conversion. private final TimeZone convertTz; - private final static TimeZone UTC = TimeZone.getTimeZone("UTC"); + private final static TimeZone UTC = DateTimeUtils.TimeZoneUTC(); public VectorizedColumnReader( ColumnDescriptor descriptor, @@ -439,9 +439,11 @@ private void readBinaryBatch(int rowId, int num, WritableColumnVector column) { } else if (column.dataType() == DataTypes.TimestampType) { for (int i = 0; i < num; i++) { if (defColumn.readInteger() == maxDefLevel) { - column.putLong(rowId + i, - // Read 12 bytes for INT96 - ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12))); + // Read 12 bytes for INT96 + long rawTime = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)); + long adjTime = + convertTz == null ? rawTime : DateTimeUtils.convertTz(rawTime, convertTz, UTC); + column.putLong(rowId + i, adjTime); } else { column.putNull(rowId + i); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 1e80397fe15bc..fc7ccacc7ee4d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.TimeZone; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.parquet.column.ColumnDescriptor; @@ -36,7 +35,6 @@ import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; -import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -80,6 +78,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa */ private boolean[] missingColumns; + private TimeZone convertTz = null; + /** * columnBatch object that is used for batch decoding. This is created on first use and triggers * batched decoding. It is not valid to interleave calls to the batched interface with the row @@ -108,11 +108,18 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa */ private final MemoryMode MEMORY_MODE; - public VectorizedParquetRecordReader(boolean useOffHeap) { + public VectorizedParquetRecordReader(TimeZone convertTz, boolean useOffHeap) { + this.convertTz = convertTz; MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP; } - private TimeZone convertTz = null; + public VectorizedParquetRecordReader(boolean useOffHeap) { + this(null, useOffHeap); + } + + VectorizedParquetRecordReader() { + this(null, false); + } /** * Implementation of RecordReader API. @@ -121,14 +128,6 @@ public VectorizedParquetRecordReader(boolean useOffHeap) { public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException, UnsupportedOperationException { super.initialize(inputSplit, taskAttemptContext); - Configuration hadoopConf = taskAttemptContext.getConfiguration(); - // this is *not* the raw value in the sql conf -- we've already looked at the createdBy field - // for this particular file and adjusted that value accordingly by this point. - boolean doTsConversion = - Boolean.valueOf(hadoopConf.get(SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION().key())); - if (doTsConversion) { - convertTz = TimeZone.getTimeZone(hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE().key())); - } initializeInternal(); } @@ -305,7 +304,7 @@ private void checkEndOfRowGroup() throws IOException { for (int i = 0; i < columns.size(); ++i) { if (missingColumns[i]) continue; columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(), - pages.getPageReader(columns.get(i)), convertTz); + pages.getPageReader(columns.get(i)), convertTz); } totalCountLoadedSoFar += pages.getRowCount(); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index e32f32fa8f7ea..2be81da0b98cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -45,6 +45,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector} import org.apache.spark.sql.internal.SQLConf @@ -349,8 +350,8 @@ class ParquetFileFormat resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) val enableRecordFilter: Boolean = sparkSession.sessionState.conf.parquetRecordFilterEnabled - val timestampConversion = - sparkSession.sessionState.conf.getConf(SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION) + val timestampConversion: Boolean = + sparkSession.sessionState.conf.isParquetINT96TimestampConversion // Whole stage codegen (PhysicalRDD) is able to deal with batches directly val returningBatch = supportBatch(sparkSession, resultSchema) @@ -369,41 +370,35 @@ class ParquetFileFormat fileSplit.getLocations, null) + val sharedConf = broadcastedHadoopConf.value.value // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' // *only* if the file was created by something other than "parquet-mr", so check the actual // writer here for this file. We have to do this per-file, as each file in the table may - // have different writers. Sadly, this also means we have to clone the hadoopConf, as - // different threads may want different values. We have to use the hadoopConf as its - // our only way to pass value to ParquetReadSupport.init - val localHadoopConf = - if (timestampConversion) { - val confCopy = new Configuration(broadcastedHadoopConf.value.value) - val footer = - ParquetFileReader.readFooter(confCopy, fileSplit.getPath, SKIP_ROW_GROUPS) - val doConversion = !footer.getFileMetaData().getCreatedBy().startsWith("parquet-mr") - confCopy.set(SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, doConversion.toString()) - confCopy + // have different writers. + def isCreatedByParquetMr(): Boolean = { + val footer = ParquetFileReader.readFooter(sharedConf, fileSplit.getPath, SKIP_ROW_GROUPS) + footer.getFileMetaData().getCreatedBy().startsWith("parquet-mr") + } + val convertTz = + if (timestampConversion && !isCreatedByParquetMr()) { + Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) } else { - // Doesn't matter what the parquet metadata says, no thread is going to do any conversion, - // so we don't need to make a copy of the conf here. - val conf = broadcastedHadoopConf.value.value - conf.set(SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, "false") - conf + None } val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = - new TaskAttemptContextImpl(localHadoopConf, attemptId) + new TaskAttemptContextImpl(sharedConf, attemptId) // Try to push down filters when filter push-down is enabled. // Notice: This push-down is RowGroups level, not individual records. if (pushed.isDefined) { - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) + ParquetInputFormat.setFilterPredicate(sharedConf, pushed.get) } val taskContext = Option(TaskContext.get()) val parquetReader = if (enableVectorizedReader) { val vectorizedReader = - new VectorizedParquetRecordReader(enableOffHeapColumnVector && taskContext.isDefined) + new VectorizedParquetRecordReader(convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined) vectorizedReader.initialize(split, hadoopAttemptContext) logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) @@ -416,9 +411,9 @@ class ParquetFileFormat // ParquetRecordReader returns UnsafeRow val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) - new ParquetRecordReader[UnsafeRow](new ParquetReadSupport, parquetFilter) + new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter) } else { - new ParquetRecordReader[UnsafeRow](new ParquetReadSupport) + new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz)) } reader.initialize(split, hadoopAttemptContext) reader diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 00f44e74e8c65..bb5039efe713c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.util.{Map => JMap} +import java.util.{Map => JMap, TimeZone} import scala.collection.JavaConverters._ @@ -49,9 +49,17 @@ import org.apache.spark.sql.types._ * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] * to [[prepareForRead()]], but use a private `var` for simplicity. */ -private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Logging { +private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) + extends ReadSupport[UnsafeRow] with Logging { private var catalystRequestedSchema: StructType = _ + def this() { + // We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only + // used in the vectorized reader, where we get the convertTz value directly, and the value here + // is ignored. + this(None) + } + /** * Called on executor side before [[prepareForRead()]] and instantiating actual Parquet record * readers. Responsible for figuring out Parquet requested schema used for column pruning. @@ -97,7 +105,7 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo parquetRequestedSchema, ParquetReadSupport.expandUDT(catalystRequestedSchema), new ParquetToSparkSchemaConverter(conf), - conf) + convertTz) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala index fb5c009979643..b2459dd0e8bba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.execution.datasources.parquet -import org.apache.hadoop.conf.Configuration +import java.util.TimeZone + import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer} import org.apache.parquet.schema.MessageType @@ -35,11 +36,11 @@ private[parquet] class ParquetRecordMaterializer( parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetToSparkSchemaConverter, - hadoopConf: Configuration) + convertTz: Option[TimeZone]) extends RecordMaterializer[UnsafeRow] { private val rootConverter = - new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, hadoopConf, NoopUpdater) + new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, convertTz, NoopUpdater) override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 4f3eb0a1bb72b..e614052c93618 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -24,7 +24,6 @@ import java.util.TimeZone import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import org.apache.hadoop.conf.Configuration import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type} @@ -36,7 +35,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -120,14 +118,14 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd * @param parquetType Parquet schema of Parquet records * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined * types should have been expanded. - * @param hadoopConf a hadoop Configuration for passing any extra parameters for parquet conversion + * @param convertTz the optional time zone to convert to for int96 data * @param updater An updater which propagates converted field values to the parent container */ private[parquet] class ParquetRowConverter( schemaConverter: ParquetToSparkSchemaConverter, parquetType: GroupType, catalystType: StructType, - hadoopConf: Configuration, + convertTz: Option[TimeZone], updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) with Logging { @@ -156,14 +154,7 @@ private[parquet] class ParquetRowConverter( |${catalystType.prettyJson} """.stripMargin) - /** - * If true, we adjust parsed timestamps based on the delta between UTC and the current TZ. This - * is *not* the raw value in the sql conf -- we've already looked at the file metadata to decide - * if conversion applies to this file by this point. - */ - val convertTs = hadoopConf.get(SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key).toBoolean - val sessionTz = TimeZone.getTimeZone(hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)) - val UTC = TimeZone.getTimeZone("UTC") + val UTC = DateTimeUtils.TimeZoneUTC /** * Updater used together with field converters within a [[ParquetRowConverter]]. It propagates @@ -294,11 +285,7 @@ private[parquet] class ParquetRowConverter( val timeOfDayNanos = buf.getLong val julianDay = buf.getInt val rawTime = DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos) - val adjTime = if (convertTs) { - DateTimeUtils.convertTz(rawTime, sessionTz, UTC) - } else { - rawTime - } + val adjTime = convertTz.map(DateTimeUtils.convertTz(rawTime, _, UTC)).getOrElse(rawTime) updater.setLong(adjTime) } } @@ -329,7 +316,7 @@ private[parquet] class ParquetRowConverter( case t: StructType => new ParquetRowConverter( - schemaConverter, parquetType.asGroupType(), t, hadoopConf, new ParentContainerUpdater { + schemaConverter, parquetType.asGroupType(), t, convertTz, new ParentContainerUpdater { override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) }) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index c1f0a46e9d9ec..cf58db3b4b563 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -97,19 +97,19 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS } } - val ImpalaFile = "test-data/impala_timestamp.parq" test("parquet timestamp conversion") { // Make a table with one parquet file written by impala, and one parquet file written by spark. // We should only adjust the timestamps in the impala file, and only if the conf is set + val impalaFile = "test-data/impala_timestamp.parq" - // here's the timestamps in the impala file, as they were saved by impala + // here are the timestamps in the impala file, as they were saved by impala val impalaFileData = Seq( "2001-01-01 01:01:01", "2002-02-02 02:02:02", "2003-03-03 03:03:03" ).map { s => java.sql.Timestamp.valueOf(s) } - val impalaFile = Thread.currentThread().getContextClassLoader.getResource(ImpalaFile) + val impalaPath = Thread.currentThread().getContextClassLoader.getResource(impalaFile) .toURI.getPath withTempPath { tableDir => val ts = Seq( @@ -117,42 +117,36 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS "2005-05-05 05:05:05", "2006-06-06 06:06:06" ).map { s => java.sql.Timestamp.valueOf(s) } - val s = spark - import s.implicits._ + import testImplicits._ // match the column names of the file from impala val df = spark.createDataset(ts).toDF().repartition(1).withColumnRenamed("value", "ts") - val schema = df.schema df.write.parquet(tableDir.getAbsolutePath) - FileUtils.copyFile(new File(impalaFile), new File(tableDir, "part-00001.parq")) + FileUtils.copyFile(new File(impalaPath), new File(tableDir, "part-00001.parq")) - Seq(false, true).foreach { applyConversion => + Seq(false, true).foreach { int96TimestampConversion => Seq(false, true).foreach { vectorized => withSQLConf( - (SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, applyConversion.toString()), + (SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, int96TimestampConversion.toString()), (SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized.toString()) ) { - val read = spark.read.parquet(tableDir.getAbsolutePath).collect() - assert(read.size === 6) + val readBack = spark.read.parquet(tableDir.getAbsolutePath).collect() + assert(readBack.size === 6) // if we apply the conversion, we'll get the "right" values, as saved by impala in the // original file. Otherwise, they're off by the local timezone offset, set to // America/Los_Angeles in tests - val impalaExpectations = if (applyConversion) { + val impalaExpectations = if (int96TimestampConversion) { impalaFileData } else { impalaFileData.map { ts => DateTimeUtils.toJavaTimestamp(DateTimeUtils.convertTz( DateTimeUtils.fromJavaTimestamp(ts), - TimeZone.getTimeZone("UTC"), - TimeZone.getDefault())) + DateTimeUtils.TimeZoneUTC, + DateTimeUtils.getTimeZone(conf.sessionLocalTimeZone))) } } - val fullExpectations = (ts ++ impalaExpectations).map { - _.toString() - }.sorted.toArray - val actual = read.map { - _.getTimestamp(0).toString() - }.sorted - withClue(s"applyConversion = $applyConversion; vectorized = $vectorized") { + val fullExpectations = (ts ++ impalaExpectations).map(_.toString).sorted.toArray + val actual = readBack.map(_.getTimestamp(0).toString).sorted + withClue(s"applyConversion = $int96TimestampConversion; vectorized = $vectorized") { assert(fullExpectations === actual) // Now test that the behavior is still correct even with a filter which could get @@ -195,9 +189,9 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS // but if the predicates were applied to the raw values in parquet, they would // incorrectly filter data out. val query = spark.read.parquet(tableDir.getAbsolutePath) - .where(s"ts > '2001-01-01 01:00:00'") + .where("ts > '2001-01-01 01:00:00'") val countWithFilter = query.count() - val exp = if (applyConversion) 6 else 5 + val exp = if (int96TimestampConversion) 6 else 5 assert(countWithFilter === exp, query) } } From d890f967628eeff647fdc3e463785b5ea12a2896 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 17 Nov 2017 00:34:45 -0600 Subject: [PATCH 07/15] fix test --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 2be81da0b98cb..c14bc45e5d32d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -393,7 +393,7 @@ class ParquetFileFormat // Try to push down filters when filter push-down is enabled. // Notice: This push-down is RowGroups level, not individual records. if (pushed.isDefined) { - ParquetInputFormat.setFilterPredicate(sharedConf, pushed.get) + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } val taskContext = Option(TaskContext.get()) val parquetReader = if (enableVectorizedReader) { From 15dd36665483812803d360decd577265efdfd715 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 17 Nov 2017 00:40:59 -0600 Subject: [PATCH 08/15] cleanup imports --- .../sql/execution/datasources/parquet/ParquetReadSupport.scala | 1 - .../datasources/parquet/ParquetInteroperabilitySuite.scala | 1 - 2 files changed, 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index bb5039efe713c..40ce5d5e0564e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -30,7 +30,6 @@ import org.apache.parquet.schema.Type.Repetition import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index cf58db3b4b563..392c0c7adf544 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.File -import java.util.TimeZone import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} From 39fc765fd01453922d665b8ba220d2db57330da4 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 17 Nov 2017 08:53:19 -0600 Subject: [PATCH 09/15] feedback --- .../execution/datasources/parquet/ParquetRowConverter.scala | 2 +- .../datasources/parquet/ParquetInteroperabilitySuite.scala | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index e614052c93618..1199725941842 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -154,7 +154,7 @@ private[parquet] class ParquetRowConverter( |${catalystType.prettyJson} """.stripMargin) - val UTC = DateTimeUtils.TimeZoneUTC + private val UTC = DateTimeUtils.TimeZoneUTC /** * Updater used together with field converters within a [[ParquetRowConverter]]. It propagates diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index 392c0c7adf544..cd17547fb7e4c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -107,7 +107,7 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS "2001-01-01 01:01:01", "2002-02-02 02:02:02", "2003-03-03 03:03:03" - ).map { s => java.sql.Timestamp.valueOf(s) } + ).map(java.sql.Timestamp.valueOf) val impalaPath = Thread.currentThread().getContextClassLoader.getResource(impalaFile) .toURI.getPath withTempPath { tableDir => @@ -125,6 +125,8 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS Seq(false, true).foreach { int96TimestampConversion => Seq(false, true).foreach { vectorized => withSQLConf( + (SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, + SQLConf.ParquetOutputTimestampType.INT96.toString), (SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, int96TimestampConversion.toString()), (SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized.toString()) ) { @@ -168,7 +170,7 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS // grab the meta data from the parquet file. The next section of asserts just make // sure the test is configured correctly. assert(parts.size == 2) - parts.map { part => + parts.foreach { part => val oneFooter = ParquetFileReader.readFooter(hadoopConf, part.getPath, NO_FILTER) assert(oneFooter.getFileMetaData.getSchema.getColumns.size === 1) From 0e3d8cdc5de62a7fb16b98d3534c35c276573868 Mon Sep 17 00:00:00 2001 From: Henry Robinson Date: Wed, 6 Dec 2017 17:01:01 -0800 Subject: [PATCH 10/15] Review responses Change-Id: I1185116ad6c17908fba59b6e6a355a703af8ba91 --- .../parquet/VectorizedColumnReader.java | 59 +++++++++++++------ .../ParquetInteroperabilitySuite.scala | 2 +- 2 files changed, 42 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 583fd15d1538f..b5b691a810111 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -94,7 +94,7 @@ public class VectorizedColumnReader { private final PageReader pageReader; private final ColumnDescriptor descriptor; private final OriginalType originalType; - // The timezone conversion to apply to int96 timestamps. Null if no conversion. + // The timezone conversion to apply to int96 timestamps. Null if no conversion. private final TimeZone convertTz; private final static TimeZone UTC = DateTimeUtils.TimeZoneUTC(); @@ -228,6 +228,10 @@ void readBatch(int total, WritableColumnVector column) throws IOException { } } + private boolean shouldConvertTimestamps() { + return convertTz != null && !convertTz.equals(UTC); + } + /** * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`. */ @@ -300,14 +304,22 @@ private void decodeDictionaryIds( break; case INT96: if (column.dataType() == DataTypes.TimestampType) { - for (int i = rowId; i < rowId + num; ++i) { - // TODO: Convert dictionary of Binaries to dictionary of Longs - if (!column.isNullAt(i)) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - long rawTime = ParquetRowConverter.binaryToSQLTimestamp(v); - long adjTime = - convertTz == null ? rawTime : DateTimeUtils.convertTz(rawTime, convertTz, UTC); - column.putLong(i, adjTime); + if (!shouldConvertTimestamps()) { + for (int i = rowId; i < rowId + num; ++i) { + if (!column.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); + column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v)); + } + } + } else { + for (int i = rowId; i < rowId + num; ++i) { + // TODO: Convert dictionary of Binaries to dictionary of Longs + if (!column.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); + long rawTime = ParquetRowConverter.binaryToSQLTimestamp(v); + long adjTime = DateTimeUtils.convertTz(rawTime, convertTz, UTC); + column.putLong(i, adjTime); + } } } } else { @@ -437,15 +449,26 @@ private void readBinaryBatch(int rowId, int num, WritableColumnVector column) { if (column.dataType() == DataTypes.StringType || column.dataType() == DataTypes.BinaryType) { defColumn.readBinarys(num, column, rowId, maxDefLevel, data); } else if (column.dataType() == DataTypes.TimestampType) { - for (int i = 0; i < num; i++) { - if (defColumn.readInteger() == maxDefLevel) { - // Read 12 bytes for INT96 - long rawTime = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)); - long adjTime = - convertTz == null ? rawTime : DateTimeUtils.convertTz(rawTime, convertTz, UTC); - column.putLong(rowId + i, adjTime); - } else { - column.putNull(rowId + i); + if (!shouldConvertTimestamps()) { + for (int i = 0; i < num; i++) { + if (defColumn.readInteger() == maxDefLevel) { + // Read 12 bytes for INT96 + long rawTime = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)); + column.putLong(rowId + i, rawTime); + } else { + column.putNull(rowId + i); + } + } + } else { + for (int i = 0; i < num; i++) { + if (defColumn.readInteger() == maxDefLevel) { + // Read 12 bytes for INT96 + long rawTime = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)); + long adjTime = DateTimeUtils.convertTz(rawTime, convertTz, UTC); + column.putLong(rowId + i, adjTime); + } else { + column.putNull(rowId + i); + } } } } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index cd17547fb7e4c..6e92dbbe023a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -147,7 +147,7 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS } val fullExpectations = (ts ++ impalaExpectations).map(_.toString).sorted.toArray val actual = readBack.map(_.getTimestamp(0).toString).sorted - withClue(s"applyConversion = $int96TimestampConversion; vectorized = $vectorized") { + withClue(s"int96TimestampConversion = $int96TimestampConversion; vectorized = $vectorized") { assert(fullExpectations === actual) // Now test that the behavior is still correct even with a filter which could get From 0a450904d9c59a0b6daa8fd8309e33c8bcd66169 Mon Sep 17 00:00:00 2001 From: Henry Robinson Date: Wed, 6 Dec 2017 17:16:33 -0800 Subject: [PATCH 11/15] Fix line length Change-Id: I2a9f0e3efe7a4ed5e040beef1519db391ef9e603 --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 4 ++-- .../datasources/parquet/ParquetInteroperabilitySuite.scala | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index c14bc45e5d32d..0571168038f56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -397,8 +397,8 @@ class ParquetFileFormat } val taskContext = Option(TaskContext.get()) val parquetReader = if (enableVectorizedReader) { - val vectorizedReader = - new VectorizedParquetRecordReader(convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined) + val vectorizedReader = new VectorizedParquetRecordReader( + convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined) vectorizedReader.initialize(split, hadoopAttemptContext) logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index 6e92dbbe023a5..e3edafa9c84e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -147,7 +147,8 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS } val fullExpectations = (ts ++ impalaExpectations).map(_.toString).sorted.toArray val actual = readBack.map(_.getTimestamp(0).toString).sorted - withClue(s"int96TimestampConversion = $int96TimestampConversion; vectorized = $vectorized") { + withClue( + s"int96TimestampConversion = $int96TimestampConversion; vectorized = $vectorized") { assert(fullExpectations === actual) // Now test that the behavior is still correct even with a filter which could get From 1616fd384f9258f29a382a186be2691c35b379d3 Mon Sep 17 00:00:00 2001 From: Henry Robinson Date: Wed, 6 Dec 2017 18:02:03 -0800 Subject: [PATCH 12/15] Fix accidentally reverted line Change-Id: I835ce14fd8fe08898d74144ab521a6b0b3878056 --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 0571168038f56..5e8806e1d0958 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -387,8 +387,7 @@ class ParquetFileFormat } val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val hadoopAttemptContext = - new TaskAttemptContextImpl(sharedConf, attemptId) + val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId); // Try to push down filters when filter push-down is enabled. // Notice: This push-down is RowGroups level, not individual records. From b7289ad9b03286b01c715fa942836770a091f280 Mon Sep 17 00:00:00 2001 From: Henry Robinson Date: Wed, 6 Dec 2017 18:03:58 -0800 Subject: [PATCH 13/15] Add comment Change-Id: I714460d4c030368e419d4490b4a6d38971a1c3f1 --- .../datasources/parquet/VectorizedParquetRecordReader.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index fc7ccacc7ee4d..83f2816b81637 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -78,6 +78,10 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa */ private boolean[] missingColumns; + /** + * The timezone that timestamp INT96 values should be converted to. Null if no conversion. Here to workaround + * incompatibilities between different engines when writing timestamp values. + */ private TimeZone convertTz = null; /** From 11852808e9fab0efa0ca7a4548e8ea68c42b6266 Mon Sep 17 00:00:00 2001 From: Henry Robinson Date: Thu, 7 Dec 2017 18:10:59 -0800 Subject: [PATCH 14/15] Misc nits Change-Id: I4b74fa741b9ca4fd8d2b0ebcdf543b747888c174 --- .../execution/datasources/parquet/VectorizedColumnReader.java | 1 - .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index b5b691a810111..b7646969bcf3d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -313,7 +313,6 @@ private void decodeDictionaryIds( } } else { for (int i = rowId; i < rowId + num; ++i) { - // TODO: Convert dictionary of Binaries to dictionary of Longs if (!column.isNullAt(i)) { Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); long rawTime = ParquetRowConverter.binaryToSQLTimestamp(v); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 5e8806e1d0958..45a7f44b480b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -387,7 +387,8 @@ class ParquetFileFormat } val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId); + val hadoopAttemptContext = + new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId); // Try to push down filters when filter push-down is enabled. // Notice: This push-down is RowGroups level, not individual records. From 1ea75c0a8f2c5fed33b2a6d6102ad1d8bdf73906 Mon Sep 17 00:00:00 2001 From: Henry Robinson Date: Fri, 8 Dec 2017 13:03:46 -0800 Subject: [PATCH 15/15] Review responses Change-Id: I3fcd623caa5ec41c1b079a4b72dd112d912155d2 --- .../datasources/parquet/VectorizedParquetRecordReader.java | 4 ---- .../execution/datasources/parquet/ParquetFileFormat.scala | 5 ++--- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 83f2816b81637..14f2a58d638f0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -121,10 +121,6 @@ public VectorizedParquetRecordReader(boolean useOffHeap) { this(null, useOffHeap); } - VectorizedParquetRecordReader() { - this(null, false); - } - /** * Implementation of RecordReader API. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 45a7f44b480b6..45bedf70f975c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -310,8 +310,7 @@ class ParquetFileFormat requiredSchema.json) hadoopConf.set( SQLConf.SESSION_LOCAL_TIMEZONE.key, - sparkSession.sessionState.conf.sessionLocalTimeZone - ) + sparkSession.sessionState.conf.sessionLocalTimeZone) ParquetWriteSupport.setSchema(requiredSchema, hadoopConf) @@ -388,7 +387,7 @@ class ParquetFileFormat val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = - new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId); + new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) // Try to push down filters when filter push-down is enabled. // Notice: This push-down is RowGroups level, not individual records.