From 74a76c2f78ad139993f3bbe0f2ff8f1c81c3bd84 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Nov 2018 12:37:55 +0100 Subject: [PATCH 01/29] New and legacy time parser --- .../spark/sql/catalyst/util/TimeParser.scala | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala new file mode 100644 index 000000000000..b28f159dd552 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import java.time.LocalDateTime +import java.time.format.{DateTimeFormatter, ResolverStyle} +import java.time.temporal.ChronoField +import java.util.{Locale, TimeZone} + +import org.apache.commons.lang3.time.FastDateFormat + +sealed trait TimeParser { + def toMicros(s: String): Long +} + +class Iso8601TimeParser(pattern: String, timeZone: TimeZone, locale: Locale) extends TimeParser { + val format = DateTimeFormatter.ofPattern(pattern) + .withLocale(Locale.US) + .withZone(timeZone.toZoneId) + .withResolverStyle(ResolverStyle.SMART) + + def toMicros(s: String): Long = { + val localDateTime = LocalDateTime.parse(s, format) + val microOfSecond = localDateTime.getLong(ChronoField.MICRO_OF_SECOND) + val epochSecond = localDateTime.atZone(timeZone.toZoneId).toInstant.getEpochSecond + + epochSecond * DateTimeUtils.MICROS_PER_SECOND + microOfSecond + } +} + +class LegacyTimeParser(pattern: String, timeZone: TimeZone, locale: Locale) extends TimeParser { + val format = FastDateFormat.getInstance(pattern, timeZone, locale) + + def toMicros(s: String): Long = { + format.parse(s).getTime * DateTimeUtils.MICROS_PER_MILLIS + } +} + +object TimeParser { + def apply(format: String, timeZone: TimeZone, locale: Locale): TimeParser = { + if (true) { + new Iso8601TimeParser(format, timeZone, locale) + } else { + new LegacyTimeParser(format, timeZone, locale) + } + } +} From 63cf6112085029c52e4aee6f9bb2e6b84ce18a96 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Nov 2018 13:00:10 +0100 Subject: [PATCH 02/29] Add config spark.sql.legacy.timeParser.enabled --- .../apache/spark/sql/catalyst/util/TimeParser.scala | 8 +++++--- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 10 ++++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala index b28f159dd552..06e0b0a7091b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala @@ -24,6 +24,8 @@ import java.util.{Locale, TimeZone} import org.apache.commons.lang3.time.FastDateFormat +import org.apache.spark.sql.internal.SQLConf + sealed trait TimeParser { def toMicros(s: String): Long } @@ -53,10 +55,10 @@ class LegacyTimeParser(pattern: String, timeZone: TimeZone, locale: Locale) exte object TimeParser { def apply(format: String, timeZone: TimeZone, locale: Locale): TimeParser = { - if (true) { - new Iso8601TimeParser(format, timeZone, locale) - } else { + if (SQLConf.get.legacyTimeParserEnabled) { new LegacyTimeParser(format, timeZone, locale) + } else { + new Iso8601TimeParser(format, timeZone, locale) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7bcf21595ce5..feccc408aeb0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1610,6 +1610,14 @@ object SQLConf { """ "... N more fields" placeholder.""") .intConf .createWithDefault(25) + + val LEGACY_TIME_PARSER_ENABLED = + buildConf("spark.sql.legacy.timeParser.enabled") + .doc("When set to true, java.text.SimpleDateFormat is using for formatting and parsing dates " + + " and timestamps in a locale-sensitive manner. When set to false, classes from " + + "java.time.* packages are using for the same purpose.") + .booleanConf + .createWithDefault(false) } /** @@ -2030,6 +2038,8 @@ class SQLConf extends Serializable with Logging { def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS) + def legacyTimeParserEnabled: Boolean = getConf(SQLConf.LEGACY_TIME_PARSER_ENABLED) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ From 2a2ab83a5ecb251ce81e7f12a8c0d3067f88b2d5 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Nov 2018 14:24:07 +0100 Subject: [PATCH 03/29] Fallback legacy parser --- .../spark/sql/catalyst/util/TimeParser.scala | 42 +++++++++++++++---- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala index 06e0b0a7091b..f6b386996590 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala @@ -17,46 +17,70 @@ package org.apache.spark.sql.catalyst.util -import java.time.LocalDateTime +import java.time.{LocalDateTime, ZoneOffset} import java.time.format.{DateTimeFormatter, ResolverStyle} -import java.time.temporal.ChronoField +import java.time.temporal.ChronoUnit import java.util.{Locale, TimeZone} +import scala.util.Try + import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.internal.SQLConf sealed trait TimeParser { + def toMillis(s: String): Long def toMicros(s: String): Long + def toDays(s: String): Int } class Iso8601TimeParser(pattern: String, timeZone: TimeZone, locale: Locale) extends TimeParser { val format = DateTimeFormatter.ofPattern(pattern) - .withLocale(Locale.US) + .withLocale(locale) .withZone(timeZone.toZoneId) .withResolverStyle(ResolverStyle.SMART) + // Seconds since 1970-01-01T00:00:00Z + val epoch = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC) + + def toMillis(s: String): Long = { + val localDateTime = LocalDateTime.parse(s, format) + ChronoUnit.MILLIS.between(epoch, localDateTime) + } def toMicros(s: String): Long = { val localDateTime = LocalDateTime.parse(s, format) - val microOfSecond = localDateTime.getLong(ChronoField.MICRO_OF_SECOND) - val epochSecond = localDateTime.atZone(timeZone.toZoneId).toInstant.getEpochSecond + ChronoUnit.MICROS.between(epoch, localDateTime) + } - epochSecond * DateTimeUtils.MICROS_PER_SECOND + microOfSecond + def toDays(s: String): Int = { + val localDateTime = LocalDateTime.parse(s, format) + ChronoUnit.DAYS.between(epoch, localDateTime).toInt } } class LegacyTimeParser(pattern: String, timeZone: TimeZone, locale: Locale) extends TimeParser { val format = FastDateFormat.getInstance(pattern, timeZone, locale) - def toMicros(s: String): Long = { - format.parse(s).getTime * DateTimeUtils.MICROS_PER_MILLIS + def toMillis(s: String): Long = format.parse(s).getTime + + def toMicros(s: String): Long = toMillis(s) * DateTimeUtils.MICROS_PER_MILLIS + + def toDays(s: String): Int = DateTimeUtils.millisToDays(toMillis(s)) +} + +class LegacyFallbackTimeParser( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends LegacyTimeParser(pattern, timeZone, locale) { + override def toMillis(s: String): Long = { + Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime) } } object TimeParser { def apply(format: String, timeZone: TimeZone, locale: Locale): TimeParser = { if (SQLConf.get.legacyTimeParserEnabled) { - new LegacyTimeParser(format, timeZone, locale) + new LegacyFallbackTimeParser(format, timeZone, locale) } else { new Iso8601TimeParser(format, timeZone, locale) } From 667bf9f65a90ac69b8cbbad77a17e21f9dd18733 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Nov 2018 16:54:19 +0100 Subject: [PATCH 04/29] something --- .../sql/catalyst/csv/CSVInferSchema.scala | 11 +- .../spark/sql/catalyst/csv/CSVOptions.scala | 9 +- .../sql/catalyst/csv/UnivocityParser.scala | 28 +--- .../catalyst/expressions/csvExpressions.scala | 5 +- .../spark/sql/catalyst/util/TimeParser.scala | 17 ++- .../catalyst/csv/CSVInferSchemaSuite.scala | 130 +++++++++++------- .../catalyst/csv/UnivocityParserSuite.scala | 7 +- .../expressions/CsvExpressionsSuite.scala | 2 +- .../datasources/csv/CSVDataSource.scala | 4 +- 9 files changed, 111 insertions(+), 102 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 799e9994451b..14bff07be273 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -23,10 +23,12 @@ import scala.util.control.Exception.allCatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.TimeParser import org.apache.spark.sql.types._ -object CSVInferSchema { +class CSVInferSchema(val options: CSVOptions) { + + private val timeParser = TimeParser(options.timestampFormat, options.timeZone, options.locale) /** * Similar to the JSON schema inference @@ -154,10 +156,7 @@ object CSVInferSchema { private def tryParseTimestamp(field: String, options: CSVOptions): DataType = { // This case infers a custom `dataFormat` is set. - if ((allCatch opt options.timestampFormat.parse(field)).isDefined) { - TimestampType - } else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) { - // We keep this for backwards compatibility. + if ((allCatch opt timeParser.toMicros(field)).isDefined) { TimestampType } else { tryParseBoolean(field, options) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 94bdb72d675d..918f1e707f13 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -146,13 +146,10 @@ class CSVOptions( // A language tag in IETF BCP 47 format val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US) - // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. - val dateFormat: FastDateFormat = - FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), locale) + val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd") - val timestampFormat: FastDateFormat = - FastDateFormat.getInstance( - parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, locale) + val timestampFormat: String = + parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSS") val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index ed196935e357..040d59cfc361 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.csv import java.io.InputStream import java.math.BigDecimal -import scala.util.Try import scala.util.control.NonFatal import com.univocity.parsers.csv.CsvParser @@ -28,7 +27,7 @@ import com.univocity.parsers.csv.CsvParser import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, FailureSafeParser} +import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, FailureSafeParser, TimeParser} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -76,6 +75,9 @@ class UnivocityParser( private val row = new GenericInternalRow(requiredSchema.length) + private val timeParser = TimeParser(options.timestampFormat, options.timeZone, options.locale) + private val dateParser = TimeParser(options.dateFormat, options.timeZone, options.locale) + // Retrieve the raw record string. private def getCurrentInput: UTF8String = { UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) @@ -154,28 +156,10 @@ class UnivocityParser( } case _: TimestampType => (d: String) => - nullSafeDatum(d, name, nullable, options) { datum => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - Try(options.timestampFormat.parse(datum).getTime * 1000L) - .getOrElse { - // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - DateTimeUtils.stringToTime(datum).getTime * 1000L - } - } + nullSafeDatum(d, name, nullable, options)(timeParser.toMicros) case _: DateType => (d: String) => - nullSafeDatum(d, name, nullable, options) { datum => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681.x - Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime)) - .getOrElse { - // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime) - } - } + nullSafeDatum(d, name, nullable, options)(dateParser.toDays) case _: StringType => (d: String) => nullSafeDatum(d, name, nullable, options)(UTF8String.fromString) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 1e4e1c663c90..00aa9096e1c3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -180,8 +180,9 @@ case class SchemaOfCsv( val header = row.zipWithIndex.map { case (_, index) => s"_c$index" } val startType: Array[DataType] = Array.fill[DataType](header.length)(NullType) - val fieldTypes = CSVInferSchema.inferRowType(parsedOptions)(startType, row) - val st = StructType(CSVInferSchema.toStructFields(fieldTypes, header, parsedOptions)) + val inferSchema = new CSVInferSchema(parsedOptions) + val fieldTypes = inferSchema.inferRowType(parsedOptions)(startType, row) + val st = StructType(inferSchema.toStructFields(fieldTypes, header, parsedOptions)) UTF8String.fromString(st.catalogString) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala index f6b386996590..820d4867b837 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala @@ -18,14 +18,12 @@ package org.apache.spark.sql.catalyst.util import java.time.{LocalDateTime, ZoneOffset} -import java.time.format.{DateTimeFormatter, ResolverStyle} -import java.time.temporal.ChronoUnit +import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverStyle} +import java.time.temporal.{ChronoField, ChronoUnit} import java.util.{Locale, TimeZone} import scala.util.Try - import org.apache.commons.lang3.time.FastDateFormat - import org.apache.spark.sql.internal.SQLConf sealed trait TimeParser { @@ -35,25 +33,26 @@ sealed trait TimeParser { } class Iso8601TimeParser(pattern: String, timeZone: TimeZone, locale: Locale) extends TimeParser { - val format = DateTimeFormatter.ofPattern(pattern) + val formatter = DateTimeFormatter.ofPattern(pattern) .withLocale(locale) .withZone(timeZone.toZoneId) .withResolverStyle(ResolverStyle.SMART) - // Seconds since 1970-01-01T00:00:00Z + + // Seconds since 1970-01-01T00:00:00 val epoch = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC) def toMillis(s: String): Long = { - val localDateTime = LocalDateTime.parse(s, format) + val localDateTime = LocalDateTime.parse(s, formatter) ChronoUnit.MILLIS.between(epoch, localDateTime) } def toMicros(s: String): Long = { - val localDateTime = LocalDateTime.parse(s, format) + val localDateTime = LocalDateTime.parse(s, formatter) ChronoUnit.MICROS.between(epoch, localDateTime) } def toDays(s: String): Int = { - val localDateTime = LocalDateTime.parse(s, format) + val localDateTime = LocalDateTime.parse(s, formatter) ChronoUnit.DAYS.between(epoch, localDateTime).toInt } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index 651846d2ebcb..cdcb46a0ea34 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -18,125 +18,151 @@ package org.apache.spark.sql.catalyst.csv import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.types._ -class CSVInferSchemaSuite extends SparkFunSuite { +class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { test("String fields types are inferred correctly from null types") { - val options = new CSVOptions(Map.empty[String, String], false, "GMT") - assert(CSVInferSchema.inferField(NullType, "", options) == NullType) - assert(CSVInferSchema.inferField(NullType, null, options) == NullType) - assert(CSVInferSchema.inferField(NullType, "100000000000", options) == LongType) - assert(CSVInferSchema.inferField(NullType, "60", options) == IntegerType) - assert(CSVInferSchema.inferField(NullType, "3.5", options) == DoubleType) - assert(CSVInferSchema.inferField(NullType, "test", options) == StringType) - assert(CSVInferSchema.inferField(NullType, "2015-08-20 15:57:00", options) == TimestampType) - assert(CSVInferSchema.inferField(NullType, "True", options) == BooleanType) - assert(CSVInferSchema.inferField(NullType, "FAlSE", options) == BooleanType) + val options = new CSVOptions(Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss"), false, "GMT") + val inferSchema = new CSVInferSchema(options) + + assert(inferSchema.inferField(NullType, "", options) == NullType) + assert(inferSchema.inferField(NullType, null, options) == NullType) + assert(inferSchema.inferField(NullType, "100000000000", options) == LongType) + assert(inferSchema.inferField(NullType, "60", options) == IntegerType) + assert(inferSchema.inferField(NullType, "3.5", options) == DoubleType) + assert(inferSchema.inferField(NullType, "test", options) == StringType) + assert(inferSchema.inferField(NullType, "2015-08-20 15:57:00", options) == TimestampType) + assert(inferSchema.inferField(NullType, "True", options) == BooleanType) + assert(inferSchema.inferField(NullType, "FAlSE", options) == BooleanType) val textValueOne = Long.MaxValue.toString + "0" val decimalValueOne = new java.math.BigDecimal(textValueOne) val expectedTypeOne = DecimalType(decimalValueOne.precision, decimalValueOne.scale) - assert(CSVInferSchema.inferField(NullType, textValueOne, options) == expectedTypeOne) + assert(inferSchema.inferField(NullType, textValueOne, options) == expectedTypeOne) } test("String fields types are inferred correctly from other types") { - val options = new CSVOptions(Map.empty[String, String], false, "GMT") - assert(CSVInferSchema.inferField(LongType, "1.0", options) == DoubleType) - assert(CSVInferSchema.inferField(LongType, "test", options) == StringType) - assert(CSVInferSchema.inferField(IntegerType, "1.0", options) == DoubleType) - assert(CSVInferSchema.inferField(DoubleType, null, options) == DoubleType) - assert(CSVInferSchema.inferField(DoubleType, "test", options) == StringType) - assert(CSVInferSchema.inferField(LongType, "2015-08-20 14:57:00", options) == TimestampType) - assert(CSVInferSchema.inferField(DoubleType, "2015-08-20 15:57:00", options) == TimestampType) - assert(CSVInferSchema.inferField(LongType, "True", options) == BooleanType) - assert(CSVInferSchema.inferField(IntegerType, "FALSE", options) == BooleanType) - assert(CSVInferSchema.inferField(TimestampType, "FALSE", options) == BooleanType) + val options = new CSVOptions(Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss"), false, "GMT") + val inferSchema = new CSVInferSchema(options) + + assert(inferSchema.inferField(LongType, "1.0", options) == DoubleType) + assert(inferSchema.inferField(LongType, "test", options) == StringType) + assert(inferSchema.inferField(IntegerType, "1.0", options) == DoubleType) + assert(inferSchema.inferField(DoubleType, null, options) == DoubleType) + assert(inferSchema.inferField(DoubleType, "test", options) == StringType) + assert(inferSchema.inferField(LongType, "2015-08-20 14:57:00", options) == TimestampType) + assert(inferSchema.inferField(DoubleType, "2015-08-20 15:57:00", options) == TimestampType) + assert(inferSchema.inferField(LongType, "True", options) == BooleanType) + assert(inferSchema.inferField(IntegerType, "FALSE", options) == BooleanType) + assert(inferSchema.inferField(TimestampType, "FALSE", options) == BooleanType) val textValueOne = Long.MaxValue.toString + "0" val decimalValueOne = new java.math.BigDecimal(textValueOne) val expectedTypeOne = DecimalType(decimalValueOne.precision, decimalValueOne.scale) - assert(CSVInferSchema.inferField(IntegerType, textValueOne, options) == expectedTypeOne) + assert(inferSchema.inferField(IntegerType, textValueOne, options) == expectedTypeOne) } test("Timestamp field types are inferred correctly via custom data format") { - var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), false, "GMT") - assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) + var options = new CSVOptions(Map("timestampFormat" -> "yyyy-MM"), false, "GMT") + var inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) + options = new CSVOptions(Map("timestampFormat" -> "yyyy"), false, "GMT") - assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType) + inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(TimestampType, "2015", options) == TimestampType) } test("Timestamp field types are inferred correctly from other types") { val options = new CSVOptions(Map.empty[String, String], false, "GMT") - assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType) - assert(CSVInferSchema.inferField(DoubleType, "2015-08-20 14:10", options) == StringType) - assert(CSVInferSchema.inferField(LongType, "2015-08 14:49:00", options) == StringType) + val inferSchema = new CSVInferSchema(options) + + assert(inferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType) + assert(inferSchema.inferField(DoubleType, "2015-08-20 14:10", options) == StringType) + assert(inferSchema.inferField(LongType, "2015-08 14:49:00", options) == StringType) } test("Boolean fields types are inferred correctly from other types") { val options = new CSVOptions(Map.empty[String, String], false, "GMT") - assert(CSVInferSchema.inferField(LongType, "Fale", options) == StringType) - assert(CSVInferSchema.inferField(DoubleType, "TRUEe", options) == StringType) + val inferSchema = new CSVInferSchema(options) + + assert(inferSchema.inferField(LongType, "Fale", options) == StringType) + assert(inferSchema.inferField(DoubleType, "TRUEe", options) == StringType) } test("Type arrays are merged to highest common type") { + val options = new CSVOptions(Map.empty[String, String], false, "GMT") + val inferSchema = new CSVInferSchema(options) + assert( - CSVInferSchema.mergeRowTypes(Array(StringType), + inferSchema.mergeRowTypes(Array(StringType), Array(DoubleType)).deep == Array(StringType).deep) assert( - CSVInferSchema.mergeRowTypes(Array(IntegerType), + inferSchema.mergeRowTypes(Array(IntegerType), Array(LongType)).deep == Array(LongType).deep) assert( - CSVInferSchema.mergeRowTypes(Array(DoubleType), + inferSchema.mergeRowTypes(Array(DoubleType), Array(LongType)).deep == Array(DoubleType).deep) } test("Null fields are handled properly when a nullValue is specified") { var options = new CSVOptions(Map("nullValue" -> "null"), false, "GMT") - assert(CSVInferSchema.inferField(NullType, "null", options) == NullType) - assert(CSVInferSchema.inferField(StringType, "null", options) == StringType) - assert(CSVInferSchema.inferField(LongType, "null", options) == LongType) + var inferSchema = new CSVInferSchema(options) + + assert(inferSchema.inferField(NullType, "null", options) == NullType) + assert(inferSchema.inferField(StringType, "null", options) == StringType) + assert(inferSchema.inferField(LongType, "null", options) == LongType) options = new CSVOptions(Map("nullValue" -> "\\N"), false, "GMT") - assert(CSVInferSchema.inferField(IntegerType, "\\N", options) == IntegerType) - assert(CSVInferSchema.inferField(DoubleType, "\\N", options) == DoubleType) - assert(CSVInferSchema.inferField(TimestampType, "\\N", options) == TimestampType) - assert(CSVInferSchema.inferField(BooleanType, "\\N", options) == BooleanType) - assert(CSVInferSchema.inferField(DecimalType(1, 1), "\\N", options) == DecimalType(1, 1)) + inferSchema = new CSVInferSchema(options) + + assert(inferSchema.inferField(IntegerType, "\\N", options) == IntegerType) + assert(inferSchema.inferField(DoubleType, "\\N", options) == DoubleType) + assert(inferSchema.inferField(TimestampType, "\\N", options) == TimestampType) + assert(inferSchema.inferField(BooleanType, "\\N", options) == BooleanType) + assert(inferSchema.inferField(DecimalType(1, 1), "\\N", options) == DecimalType(1, 1)) } test("Merging Nulltypes should yield Nulltype.") { - val mergedNullTypes = CSVInferSchema.mergeRowTypes(Array(NullType), Array(NullType)) + val options = new CSVOptions(Map.empty[String, String], false, "GMT") + val inferSchema = new CSVInferSchema(options) + val mergedNullTypes = inferSchema.mergeRowTypes(Array(NullType), Array(NullType)) assert(mergedNullTypes.deep == Array(NullType).deep) } test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"), false, "GMT") - assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) + val inferSchema = new CSVInferSchema(options) + + assert(inferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) } test("SPARK-18877: `inferField` on DecimalType should find a common type with `typeSoFar`") { val options = new CSVOptions(Map.empty[String, String], false, "GMT") + val inferSchema = new CSVInferSchema(options) // 9.03E+12 is Decimal(3, -10) and 1.19E+11 is Decimal(3, -9). - assert(CSVInferSchema.inferField(DecimalType(3, -10), "1.19E+11", options) == + assert(inferSchema.inferField(DecimalType(3, -10), "1.19E+11", options) == DecimalType(4, -9)) // BigDecimal("12345678901234567890.01234567890123456789") is precision 40 and scale 20. val value = "12345678901234567890.01234567890123456789" - assert(CSVInferSchema.inferField(DecimalType(3, -10), value, options) == DoubleType) + assert(inferSchema.inferField(DecimalType(3, -10), value, options) == DoubleType) // Seq(s"${Long.MaxValue}1", "2015-12-01 00:00:00") should be StringType - assert(CSVInferSchema.inferField(NullType, s"${Long.MaxValue}1", options) == DecimalType(20, 0)) - assert(CSVInferSchema.inferField(DecimalType(20, 0), "2015-12-01 00:00:00", options) + assert(inferSchema.inferField(NullType, s"${Long.MaxValue}1", options) == DecimalType(20, 0)) + assert(inferSchema.inferField(DecimalType(20, 0), "2015-12-01 00:00:00", options) == StringType) } test("DoubleType should be inferred when user defined nan/inf are provided") { val options = new CSVOptions(Map("nanValue" -> "nan", "negativeInf" -> "-inf", "positiveInf" -> "inf"), false, "GMT") - assert(CSVInferSchema.inferField(NullType, "nan", options) == DoubleType) - assert(CSVInferSchema.inferField(NullType, "inf", options) == DoubleType) - assert(CSVInferSchema.inferField(NullType, "-inf", options) == DoubleType) + val inferSchema = new CSVInferSchema(options) + + assert(inferSchema.inferField(NullType, "nan", options) == DoubleType) + assert(inferSchema.inferField(NullType, "inf", options) == DoubleType) + assert(inferSchema.inferField(NullType, "-inf", options) == DoubleType) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index e4e7dc2e8c0e..64cf7f8fc7e2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.csv import java.math.BigDecimal +import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ @@ -109,7 +110,9 @@ class UnivocityParserSuite extends SparkFunSuite { val timestampsOptions = new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), false, "GMT") val customTimestamp = "31/01/2015 00:00" - val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime + val format = FastDateFormat.getInstance( + timestampsOptions.timestampFormat, timestampsOptions.timeZone, timestampsOptions.locale) + val expectedTime = format.parse(customTimestamp).getTime val castedTimestamp = parser.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions) .apply(customTimestamp) @@ -117,7 +120,7 @@ class UnivocityParserSuite extends SparkFunSuite { val customDate = "31/01/2015" val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), false, "GMT") - val expectedDate = dateOptions.dateFormat.parse(customDate).getTime + val expectedDate = format.parse(customDate).getTime val castedDate = parser.makeConverter("_1", DateType, nullable = true, options = dateOptions) .apply(customTimestamp) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala index 98c93a4946f4..6942564ec65e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala @@ -73,7 +73,7 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P test("from_csv with timestamp") { val schema = StructType(StructField("t", TimestampType) :: Nil) - val csvData1 = "2016-01-01T00:00:00.123Z" + val csvData1 = "2016-01-01T00:00:00.123" var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) c.set(2016, 0, 1, 0, 0, 0) c.set(Calendar.MILLISECOND, 123) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index b35b8851918b..b2c7ce47aeb2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -135,7 +135,7 @@ object TextInputCSVDataSource extends CSVDataSource { val parser = new CsvParser(parsedOptions.asParserSettings) linesWithoutHeader.map(parser.parseLine) } - CSVInferSchema.infer(tokenRDD, header, parsedOptions) + new CSVInferSchema(parsedOptions).infer(tokenRDD, header, parsedOptions) case _ => // If the first line could not be read, just return the empty schema. StructType(Nil) @@ -208,7 +208,7 @@ object MultiLineCSVDataSource extends CSVDataSource { encoding = parsedOptions.charset) } val sampled = CSVUtils.sample(tokenRDD, parsedOptions) - CSVInferSchema.infer(sampled, header, parsedOptions) + new CSVInferSchema(parsedOptions).infer(sampled, header, parsedOptions) case None => // If the first row could not be read, just return the empty schema. StructType(Nil) From 227a7bdc53bdd022e9c365b410810c58f56e8bea Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 25 Nov 2018 13:15:15 +0100 Subject: [PATCH 05/29] Using instances --- .../sql/catalyst/csv/CSVInferSchema.scala | 8 +++- .../spark/sql/catalyst/csv/CSVOptions.scala | 2 +- .../spark/sql/catalyst/util/TimeParser.scala | 43 +++++++++++-------- .../expressions/CsvExpressionsSuite.scala | 2 +- 4 files changed, 33 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 14bff07be273..d4c02ab8e410 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -26,9 +26,13 @@ import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.util.TimeParser import org.apache.spark.sql.types._ -class CSVInferSchema(val options: CSVOptions) { +class CSVInferSchema(val options: CSVOptions) extends Serializable { - private val timeParser = TimeParser(options.timestampFormat, options.timeZone, options.locale) + @transient + private lazy val timeParser = TimeParser( + options.timestampFormat, + options.timeZone, + options.locale) /** * Similar to the JSON schema inference diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 918f1e707f13..064a53cd29db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -149,7 +149,7 @@ class CSVOptions( val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd") val timestampFormat: String = - parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSS") + parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSX") val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala index 820d4867b837..6d8d78f6076b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala @@ -17,13 +17,15 @@ package org.apache.spark.sql.catalyst.util -import java.time.{LocalDateTime, ZoneOffset} -import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverStyle} -import java.time.temporal.{ChronoField, ChronoUnit} +import java.time._ +import java.time.format.DateTimeFormatter +import java.time.temporal.TemporalQueries import java.util.{Locale, TimeZone} import scala.util.Try + import org.apache.commons.lang3.time.FastDateFormat + import org.apache.spark.sql.internal.SQLConf sealed trait TimeParser { @@ -33,27 +35,32 @@ sealed trait TimeParser { } class Iso8601TimeParser(pattern: String, timeZone: TimeZone, locale: Locale) extends TimeParser { - val formatter = DateTimeFormatter.ofPattern(pattern) - .withLocale(locale) - .withZone(timeZone.toZoneId) - .withResolverStyle(ResolverStyle.SMART) + val formatter = DateTimeFormatter.ofPattern(pattern, locale) - // Seconds since 1970-01-01T00:00:00 - val epoch = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC) - - def toMillis(s: String): Long = { - val localDateTime = LocalDateTime.parse(s, formatter) - ChronoUnit.MILLIS.between(epoch, localDateTime) + def toInstant(s: String): Instant = { + val temporalAccessor = formatter.parse(s) + if (temporalAccessor.query(TemporalQueries.offset()) == null) { + val localDateTime = LocalDateTime.from(temporalAccessor) + val zonedDateTime = ZonedDateTime.of(localDateTime, timeZone.toZoneId) + Instant.from(zonedDateTime) + } else { + Instant.from(temporalAccessor) + } } - def toMicros(s: String): Long = { - val localDateTime = LocalDateTime.parse(s, formatter) - ChronoUnit.MICROS.between(epoch, localDateTime) + def conv(instant: Instant, secMul: Long, nanoDiv: Long): Long = { + val sec = Math.multiplyExact(instant.getEpochSecond, secMul) + val result = Math.addExact(sec, instant.getNano / nanoDiv) + result } + def toMillis(s: String): Long = conv(toInstant(s), 1000, 1000000) + + def toMicros(s: String): Long = conv(toInstant(s), 1000000, 1000) + def toDays(s: String): Int = { - val localDateTime = LocalDateTime.parse(s, formatter) - ChronoUnit.DAYS.between(epoch, localDateTime).toInt + val instant = toInstant(s) + (instant.getEpochSecond / DateTimeUtils.SECONDS_PER_DAY).toInt } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala index 6942564ec65e..98c93a4946f4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala @@ -73,7 +73,7 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P test("from_csv with timestamp") { val schema = StructType(StructField("t", TimestampType) :: Nil) - val csvData1 = "2016-01-01T00:00:00.123" + val csvData1 = "2016-01-01T00:00:00.123Z" var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) c.set(2016, 0, 1, 0, 0, 0) c.set(Calendar.MILLISECOND, 123) From 73ee56088bf4d2856c454a7bbd4171b61cfe4614 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 25 Nov 2018 14:52:02 +0100 Subject: [PATCH 06/29] Added generator --- .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 2 +- .../sql/catalyst/csv/UnivocityGenerator.scala | 8 +++++--- .../spark/sql/catalyst/util/TimeParser.scala | 14 ++++++++++++++ 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 064a53cd29db..b98560edd4ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -149,7 +149,7 @@ class CSVOptions( val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd") val timestampFormat: String = - parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSX") + parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX") val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index 1218f9242afe..ea00d43a46ac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -22,7 +22,7 @@ import java.io.Writer import com.univocity.parsers.csv.CsvWriter import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimeParser} import org.apache.spark.sql.types._ class UnivocityGenerator( @@ -42,14 +42,16 @@ class UnivocityGenerator( private val valueConverters: Array[ValueConverter] = schema.map(_.dataType).map(makeConverter).toArray + private val timeFormatter = TimeParser(options.timestampFormat, options.timeZone, options.locale) + private val dateFormatter = TimeParser(options.dateFormat, options.timeZone, options.locale) + private def makeConverter(dataType: DataType): ValueConverter = dataType match { case DateType => (row: InternalRow, ordinal: Int) => options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) case TimestampType => - (row: InternalRow, ordinal: Int) => - options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + (row: InternalRow, ordinal: Int) => timeFormatter.fromMicros(row.getLong(ordinal)) case udt: UserDefinedType[_] => makeConverter(udt.sqlType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala index 6d8d78f6076b..3de28b4cffb9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala @@ -32,6 +32,8 @@ sealed trait TimeParser { def toMillis(s: String): Long def toMicros(s: String): Long def toDays(s: String): Int + + def fromMicros(us: Long): String } class Iso8601TimeParser(pattern: String, timeZone: TimeZone, locale: Locale) extends TimeParser { @@ -62,6 +64,14 @@ class Iso8601TimeParser(pattern: String, timeZone: TimeZone, locale: Locale) ext val instant = toInstant(s) (instant.getEpochSecond / DateTimeUtils.SECONDS_PER_DAY).toInt } + + def fromMicros(us: Long): String = { + val secs = Math.floorDiv(us, 1000000) + val mos = Math.floorMod(us, 1000000) + val instant = Instant.ofEpochSecond(secs, mos * 1000) + + formatter.withZone(timeZone.toZoneId).format(instant) + } } class LegacyTimeParser(pattern: String, timeZone: TimeZone, locale: Locale) extends TimeParser { @@ -72,6 +82,10 @@ class LegacyTimeParser(pattern: String, timeZone: TimeZone, locale: Locale) exte def toMicros(s: String): Long = toMillis(s) * DateTimeUtils.MICROS_PER_MILLIS def toDays(s: String): Int = DateTimeUtils.millisToDays(toMillis(s)) + + def fromMicros(us: Long): String = { + format.format(DateTimeUtils.toJavaTimestamp(us)) + } } class LegacyFallbackTimeParser( From f35f6e13270eb994ac97627da79497673b4fe686 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 25 Nov 2018 19:03:17 +0100 Subject: [PATCH 07/29] Refactoring of TimeFormatter --- .../sql/catalyst/csv/CSVInferSchema.scala | 6 +-- .../sql/catalyst/csv/UnivocityGenerator.scala | 8 +-- .../sql/catalyst/csv/UnivocityParser.scala | 8 +-- .../{TimeParser.scala => TimeFormatter.scala} | 50 ++++++++----------- 4 files changed, 33 insertions(+), 39 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/{TimeParser.scala => TimeFormatter.scala} (68%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index d4c02ab8e410..429cb095a0a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -23,13 +23,13 @@ import scala.util.control.Exception.allCatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion -import org.apache.spark.sql.catalyst.util.TimeParser +import org.apache.spark.sql.catalyst.util.TimeFormatter import org.apache.spark.sql.types._ class CSVInferSchema(val options: CSVOptions) extends Serializable { @transient - private lazy val timeParser = TimeParser( + private lazy val timeParser = TimeFormatter( options.timestampFormat, options.timeZone, options.locale) @@ -160,7 +160,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { private def tryParseTimestamp(field: String, options: CSVOptions): DataType = { // This case infers a custom `dataFormat` is set. - if ((allCatch opt timeParser.toMicros(field)).isDefined) { + if ((allCatch opt timeParser.parse(field)).isDefined) { TimestampType } else { tryParseBoolean(field, options) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index ea00d43a46ac..ceaa0f98384a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -22,7 +22,7 @@ import java.io.Writer import com.univocity.parsers.csv.CsvWriter import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimeParser} +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimeFormatter} import org.apache.spark.sql.types._ class UnivocityGenerator( @@ -42,8 +42,8 @@ class UnivocityGenerator( private val valueConverters: Array[ValueConverter] = schema.map(_.dataType).map(makeConverter).toArray - private val timeFormatter = TimeParser(options.timestampFormat, options.timeZone, options.locale) - private val dateFormatter = TimeParser(options.dateFormat, options.timeZone, options.locale) + private val timeFormatter = TimeFormatter(options.timestampFormat, options.timeZone, options.locale) + private val dateFormatter = TimeFormatter(options.dateFormat, options.timeZone, options.locale) private def makeConverter(dataType: DataType): ValueConverter = dataType match { case DateType => @@ -51,7 +51,7 @@ class UnivocityGenerator( options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) case TimestampType => - (row: InternalRow, ordinal: Int) => timeFormatter.fromMicros(row.getLong(ordinal)) + (row: InternalRow, ordinal: Int) => timeFormatter.format(row.getLong(ordinal)) case udt: UserDefinedType[_] => makeConverter(udt.sqlType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 040d59cfc361..b76e20608e1d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -27,7 +27,7 @@ import com.univocity.parsers.csv.CsvParser import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, FailureSafeParser, TimeParser} +import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, FailureSafeParser, TimeFormatter} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -75,8 +75,8 @@ class UnivocityParser( private val row = new GenericInternalRow(requiredSchema.length) - private val timeParser = TimeParser(options.timestampFormat, options.timeZone, options.locale) - private val dateParser = TimeParser(options.dateFormat, options.timeZone, options.locale) + private val timeParser = TimeFormatter(options.timestampFormat, options.timeZone, options.locale) + private val dateParser = TimeFormatter(options.dateFormat, options.timeZone, options.locale) // Retrieve the raw record string. private def getCurrentInput: UTF8String = { @@ -156,7 +156,7 @@ class UnivocityParser( } case _: TimestampType => (d: String) => - nullSafeDatum(d, name, nullable, options)(timeParser.toMicros) + nullSafeDatum(d, name, nullable, options)(timeParser.parse) case _: DateType => (d: String) => nullSafeDatum(d, name, nullable, options)(dateParser.toDays) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala similarity index 68% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala index 3de28b4cffb9..3fb668ee6d9f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala @@ -28,15 +28,15 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.internal.SQLConf -sealed trait TimeParser { - def toMillis(s: String): Long - def toMicros(s: String): Long - def toDays(s: String): Int - - def fromMicros(us: Long): String +sealed trait TimeFormatter { + def parse(s: String): Long + def format(us: Long): String } -class Iso8601TimeParser(pattern: String, timeZone: TimeZone, locale: Locale) extends TimeParser { +class Iso8601TimeFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends TimeFormatter { val formatter = DateTimeFormatter.ofPattern(pattern, locale) def toInstant(s: String): Instant = { @@ -56,16 +56,9 @@ class Iso8601TimeParser(pattern: String, timeZone: TimeZone, locale: Locale) ext result } - def toMillis(s: String): Long = conv(toInstant(s), 1000, 1000000) + def parse(s: String): Long = conv(toInstant(s), 1000000, 1000) - def toMicros(s: String): Long = conv(toInstant(s), 1000000, 1000) - - def toDays(s: String): Int = { - val instant = toInstant(s) - (instant.getEpochSecond / DateTimeUtils.SECONDS_PER_DAY).toInt - } - - def fromMicros(us: Long): String = { + def format(us: Long): String = { val secs = Math.floorDiv(us, 1000000) val mos = Math.floorMod(us, 1000000) val instant = Instant.ofEpochSecond(secs, mos * 1000) @@ -74,35 +67,36 @@ class Iso8601TimeParser(pattern: String, timeZone: TimeZone, locale: Locale) ext } } -class LegacyTimeParser(pattern: String, timeZone: TimeZone, locale: Locale) extends TimeParser { +class LegacyTimeFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends TimeFormatter { val format = FastDateFormat.getInstance(pattern, timeZone, locale) - def toMillis(s: String): Long = format.parse(s).getTime - - def toMicros(s: String): Long = toMillis(s) * DateTimeUtils.MICROS_PER_MILLIS + protected def toMillis(s: String): Long = format.parse(s).getTime - def toDays(s: String): Int = DateTimeUtils.millisToDays(toMillis(s)) + def parse(s: String): Long = toMillis(s) * DateTimeUtils.MICROS_PER_MILLIS - def fromMicros(us: Long): String = { + def format(us: Long): String = { format.format(DateTimeUtils.toJavaTimestamp(us)) } } -class LegacyFallbackTimeParser( +class LegacyFallbackTimeFormatter( pattern: String, timeZone: TimeZone, - locale: Locale) extends LegacyTimeParser(pattern, timeZone, locale) { + locale: Locale) extends LegacyTimeFormatter(pattern, timeZone, locale) { override def toMillis(s: String): Long = { Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime) } } -object TimeParser { - def apply(format: String, timeZone: TimeZone, locale: Locale): TimeParser = { +object TimeFormatter { + def apply(format: String, timeZone: TimeZone, locale: Locale): TimeFormatter = { if (SQLConf.get.legacyTimeParserEnabled) { - new LegacyFallbackTimeParser(format, timeZone, locale) + new LegacyFallbackTimeFormatter(format, timeZone, locale) } else { - new Iso8601TimeParser(format, timeZone, locale) + new Iso8601TimeFormatter(format, timeZone, locale) } } } From 1c09b58e6fe3e0fd565c852dcb73dc012fa56819 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 25 Nov 2018 19:06:22 +0100 Subject: [PATCH 08/29] Renaming to DateTimeFormatter --- .../sql/catalyst/csv/CSVInferSchema.scala | 4 +-- .../sql/catalyst/csv/UnivocityGenerator.scala | 6 ++--- .../sql/catalyst/csv/UnivocityParser.scala | 6 ++--- ...ormatter.scala => DateTimeFormatter.scala} | 26 +++++++++---------- 4 files changed, 21 insertions(+), 21 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/{TimeFormatter.scala => DateTimeFormatter.scala} (81%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 429cb095a0a3..89855f853e01 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -23,13 +23,13 @@ import scala.util.control.Exception.allCatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion -import org.apache.spark.sql.catalyst.util.TimeFormatter +import org.apache.spark.sql.catalyst.util.DateTimeFormatter import org.apache.spark.sql.types._ class CSVInferSchema(val options: CSVOptions) extends Serializable { @transient - private lazy val timeParser = TimeFormatter( + private lazy val timeParser = DateTimeFormatter( options.timestampFormat, options.timeZone, options.locale) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index ceaa0f98384a..563e8796cedb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -22,7 +22,7 @@ import java.io.Writer import com.univocity.parsers.csv.CsvWriter import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimeFormatter} +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, DateTimeFormatter} import org.apache.spark.sql.types._ class UnivocityGenerator( @@ -42,8 +42,8 @@ class UnivocityGenerator( private val valueConverters: Array[ValueConverter] = schema.map(_.dataType).map(makeConverter).toArray - private val timeFormatter = TimeFormatter(options.timestampFormat, options.timeZone, options.locale) - private val dateFormatter = TimeFormatter(options.dateFormat, options.timeZone, options.locale) + private val timeFormatter = DateTimeFormatter(options.timestampFormat, options.timeZone, options.locale) + private val dateFormatter = DateTimeFormatter(options.dateFormat, options.timeZone, options.locale) private def makeConverter(dataType: DataType): ValueConverter = dataType match { case DateType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index b76e20608e1d..7f84d2efe864 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -27,7 +27,7 @@ import com.univocity.parsers.csv.CsvParser import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, FailureSafeParser, TimeFormatter} +import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, FailureSafeParser, DateTimeFormatter} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -75,8 +75,8 @@ class UnivocityParser( private val row = new GenericInternalRow(requiredSchema.length) - private val timeParser = TimeFormatter(options.timestampFormat, options.timeZone, options.locale) - private val dateParser = TimeFormatter(options.dateFormat, options.timeZone, options.locale) + private val timeParser = DateTimeFormatter(options.timestampFormat, options.timeZone, options.locale) + private val dateParser = DateTimeFormatter(options.dateFormat, options.timeZone, options.locale) // Retrieve the raw record string. private def getCurrentInput: UTF8String = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala similarity index 81% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala index 3fb668ee6d9f..2638ee048b64 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.util import java.time._ -import java.time.format.DateTimeFormatter +import java.time.format.{DateTimeFormatter => JavaDateTimeFormatter} import java.time.temporal.TemporalQueries import java.util.{Locale, TimeZone} @@ -28,16 +28,16 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.internal.SQLConf -sealed trait TimeFormatter { +sealed trait DateTimeFormatter { def parse(s: String): Long def format(us: Long): String } -class Iso8601TimeFormatter( +class Iso8601DateTimeFormatter( pattern: String, timeZone: TimeZone, - locale: Locale) extends TimeFormatter { - val formatter = DateTimeFormatter.ofPattern(pattern, locale) + locale: Locale) extends DateTimeFormatter { + val formatter = JavaDateTimeFormatter.ofPattern(pattern, locale) def toInstant(s: String): Instant = { val temporalAccessor = formatter.parse(s) @@ -67,10 +67,10 @@ class Iso8601TimeFormatter( } } -class LegacyTimeFormatter( +class LegacyDateTimeFormatter( pattern: String, timeZone: TimeZone, - locale: Locale) extends TimeFormatter { + locale: Locale) extends DateTimeFormatter { val format = FastDateFormat.getInstance(pattern, timeZone, locale) protected def toMillis(s: String): Long = format.parse(s).getTime @@ -82,21 +82,21 @@ class LegacyTimeFormatter( } } -class LegacyFallbackTimeFormatter( +class LegacyFallbackDateTimeFormatter( pattern: String, timeZone: TimeZone, - locale: Locale) extends LegacyTimeFormatter(pattern, timeZone, locale) { + locale: Locale) extends LegacyDateTimeFormatter(pattern, timeZone, locale) { override def toMillis(s: String): Long = { Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime) } } -object TimeFormatter { - def apply(format: String, timeZone: TimeZone, locale: Locale): TimeFormatter = { +object DateTimeFormatter { + def apply(format: String, timeZone: TimeZone, locale: Locale): DateTimeFormatter = { if (SQLConf.get.legacyTimeParserEnabled) { - new LegacyFallbackTimeFormatter(format, timeZone, locale) + new LegacyFallbackDateTimeFormatter(format, timeZone, locale) } else { - new Iso8601TimeFormatter(format, timeZone, locale) + new Iso8601DateTimeFormatter(format, timeZone, locale) } } } From 7b213d5b2ae404c87f090da622a78d3d19fee6a9 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 25 Nov 2018 19:32:54 +0100 Subject: [PATCH 09/29] Added DateFormatter --- .../sql/catalyst/csv/UnivocityGenerator.scala | 13 +-- .../sql/catalyst/csv/UnivocityParser.scala | 15 ++-- .../sql/catalyst/util/DateTimeFormatter.scala | 79 ++++++++++++++++++- 3 files changed, 93 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index 563e8796cedb..ea35ffb6253b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -20,9 +20,8 @@ package org.apache.spark.sql.catalyst.csv import java.io.Writer import com.univocity.parsers.csv.CsvWriter - import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.{DateTimeUtils, DateTimeFormatter} +import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter, DateTimeUtils} import org.apache.spark.sql.types._ class UnivocityGenerator( @@ -42,13 +41,15 @@ class UnivocityGenerator( private val valueConverters: Array[ValueConverter] = schema.map(_.dataType).map(makeConverter).toArray - private val timeFormatter = DateTimeFormatter(options.timestampFormat, options.timeZone, options.locale) - private val dateFormatter = DateTimeFormatter(options.dateFormat, options.timeZone, options.locale) + private val timeFormatter = DateTimeFormatter( + options.timestampFormat, + options.timeZone, + options.locale) + private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale) private def makeConverter(dataType: DataType): ValueConverter = dataType match { case DateType => - (row: InternalRow, ordinal: Int) => - options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) + (row: InternalRow, ordinal: Int) => dateFormatter.format(row.getInt(ordinal)) case TimestampType => (row: InternalRow, ordinal: Int) => timeFormatter.format(row.getLong(ordinal)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 7f84d2efe864..bb5ef40f8ca8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -21,13 +21,11 @@ import java.io.InputStream import java.math.BigDecimal import scala.util.control.NonFatal - import com.univocity.parsers.csv.CsvParser - import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, FailureSafeParser, DateTimeFormatter} +import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -75,8 +73,11 @@ class UnivocityParser( private val row = new GenericInternalRow(requiredSchema.length) - private val timeParser = DateTimeFormatter(options.timestampFormat, options.timeZone, options.locale) - private val dateParser = DateTimeFormatter(options.dateFormat, options.timeZone, options.locale) + private val timeFormatter = DateTimeFormatter( + options.timestampFormat, + options.timeZone, + options.locale) + private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale) // Retrieve the raw record string. private def getCurrentInput: UTF8String = { @@ -156,10 +157,10 @@ class UnivocityParser( } case _: TimestampType => (d: String) => - nullSafeDatum(d, name, nullable, options)(timeParser.parse) + nullSafeDatum(d, name, nullable, options)(timeFormatter.parse) case _: DateType => (d: String) => - nullSafeDatum(d, name, nullable, options)(dateParser.toDays) + nullSafeDatum(d, name, nullable, options)(dateFormatter.parse) case _: StringType => (d: String) => nullSafeDatum(d, name, nullable, options)(UTF8String.fromString) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala index 2638ee048b64..d7710184e852 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -29,7 +29,7 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.internal.SQLConf sealed trait DateTimeFormatter { - def parse(s: String): Long + def parse(s: String): Long // returns microseconds since epoch def format(us: Long): String } @@ -100,3 +100,80 @@ object DateTimeFormatter { } } } + +sealed trait DateFormatter { + def parse(s: String): Int // returns days since epoch + def format(days: Int): String +} + +class Iso8601DateFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends DateFormatter { + val formatter = JavaDateTimeFormatter.ofPattern(pattern, locale) + + protected def toInstant(s: String): Instant = { + val temporalAccessor = formatter.parse(s) + if (temporalAccessor.query(TemporalQueries.offset()) == null) { + val localDate = LocalDate.from(temporalAccessor) + val zonedDate = localDate.atStartOfDay(timeZone.toZoneId) + Instant.from(zonedDate) + } else { + Instant.from(temporalAccessor) + } + } + + protected def conv(instant: Instant, secMul: Long, nanoDiv: Long): Long = { + val sec = Math.multiplyExact(instant.getEpochSecond, secMul) + val result = Math.addExact(sec, instant.getNano / nanoDiv) + result + } + + override def parse(s: String): Int = { + val seconds = toInstant(s).getEpochSecond + (seconds / DateTimeUtils.SECONDS_PER_DAY).toInt + } + + override def format(days: Int): String = { + val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY) + formatter.withZone(timeZone.toZoneId).format(instant) + } +} + +class LegacyDateFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends DateFormatter { + val format = FastDateFormat.getInstance(pattern, timeZone, locale) + + def parse(s: String): Int = { + val milliseconds = format.parse(s).getTime + DateTimeUtils.millisToDays(milliseconds) + } + + def format(days: Int): String = { + val date = DateTimeUtils.toJavaDate(days) + format.format(date) + } +} + +class LegacyFallbackDateFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends LegacyDateFormatter(pattern, timeZone, locale) { + override def parse(s: String): Int = { + Try(super.parse(s)).getOrElse { + DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(s).getTime) + } + } +} + +object DateFormatter { + def apply(format: String, timeZone: TimeZone, locale: Locale): DateFormatter = { + if (SQLConf.get.legacyTimeParserEnabled) { + new LegacyFallbackDateFormatter(format, timeZone, locale) + } else { + new Iso8601DateFormatter(format, timeZone, locale) + } + } +} From 242ba474dcf112b48bd286811daed86a66366c39 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 25 Nov 2018 20:58:08 +0100 Subject: [PATCH 10/29] Default values in parsing --- .../sql/catalyst/util/DateTimeFormatter.scala | 37 ++++++++----------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala index d7710184e852..c361f3d2a334 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.catalyst.util import java.time._ -import java.time.format.{DateTimeFormatter => JavaDateTimeFormatter} -import java.time.temporal.TemporalQueries +import java.time.format.DateTimeFormatterBuilder +import java.time.temporal.{ChronoField, TemporalQueries} import java.util.{Locale, TimeZone} import scala.util.Try @@ -37,7 +37,16 @@ class Iso8601DateTimeFormatter( pattern: String, timeZone: TimeZone, locale: Locale) extends DateTimeFormatter { - val formatter = JavaDateTimeFormatter.ofPattern(pattern, locale) + val formatter = new DateTimeFormatterBuilder() + .appendPattern(pattern) + .parseDefaulting(ChronoField.YEAR_OF_ERA, 1970) + .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1) + .parseDefaulting(ChronoField.DAY_OF_MONTH, 1) + .parseDefaulting(ChronoField.HOUR_OF_DAY, 0) + .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0) + .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) + .parseDefaulting(ChronoField.NANO_OF_SECOND, 0) + .toFormatter(locale) def toInstant(s: String): Instant = { val temporalAccessor = formatter.parse(s) @@ -110,33 +119,17 @@ class Iso8601DateFormatter( pattern: String, timeZone: TimeZone, locale: Locale) extends DateFormatter { - val formatter = JavaDateTimeFormatter.ofPattern(pattern, locale) - protected def toInstant(s: String): Instant = { - val temporalAccessor = formatter.parse(s) - if (temporalAccessor.query(TemporalQueries.offset()) == null) { - val localDate = LocalDate.from(temporalAccessor) - val zonedDate = localDate.atStartOfDay(timeZone.toZoneId) - Instant.from(zonedDate) - } else { - Instant.from(temporalAccessor) - } - } - - protected def conv(instant: Instant, secMul: Long, nanoDiv: Long): Long = { - val sec = Math.multiplyExact(instant.getEpochSecond, secMul) - val result = Math.addExact(sec, instant.getNano / nanoDiv) - result - } + val dateTimeFormatter = new Iso8601DateTimeFormatter(pattern, timeZone, locale) override def parse(s: String): Int = { - val seconds = toInstant(s).getEpochSecond + val seconds = dateTimeFormatter.toInstant(s).getEpochSecond (seconds / DateTimeUtils.SECONDS_PER_DAY).toInt } override def format(days: Int): String = { val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY) - formatter.withZone(timeZone.toZoneId).format(instant) + dateTimeFormatter.formatter.withZone(timeZone.toZoneId).format(instant) } } From db48ee6918eef06e19c3bdf64e3c44f4541cc294 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 25 Nov 2018 22:09:08 +0100 Subject: [PATCH 11/29] Parse as date type because format for timestamp is not not matched to values --- .../src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index 1c359ce1d201..63a3c1d883f6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -53,7 +53,7 @@ class CsvFunctionsSuite extends QueryTest with SharedSQLContext { test("checking the columnNameOfCorruptRecord option") { val columnNameOfCorruptRecord = "_unparsed" val df = Seq("0,2013-111-11 12:13:14", "1,1983-08-04").toDS() - val schema = new StructType().add("a", IntegerType).add("b", TimestampType) + val schema = new StructType().add("a", IntegerType).add("b", DateType) val schemaWithCorrField1 = schema.add(columnNameOfCorruptRecord, StringType) val df2 = df .select(from_csv($"value", schemaWithCorrField1, Map( From e18841b38050ac411a507a2a2643584f2c8739ce Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 25 Nov 2018 22:53:11 +0100 Subject: [PATCH 12/29] Fix tests --- .../sql/catalyst/csv/UnivocityParser.scala | 9 +- .../sql/catalyst/util/DateTimeFormatter.scala | 2 +- .../catalyst/csv/UnivocityParserSuite.scala | 101 ++++++++++-------- .../execution/datasources/csv/CSVSuite.scala | 2 +- 4 files changed, 62 insertions(+), 52 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index bb5ef40f8ca8..2ca1cc42c59b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -21,7 +21,9 @@ import java.io.InputStream import java.math.BigDecimal import scala.util.control.NonFatal + import com.univocity.parsers.csv.CsvParser + import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow @@ -104,7 +106,7 @@ class UnivocityParser( // // output row - ["A", 2] private val valueConverters: Array[ValueConverter] = { - requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray + requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable)).toArray } /** @@ -117,8 +119,7 @@ class UnivocityParser( def makeConverter( name: String, dataType: DataType, - nullable: Boolean = true, - options: CSVOptions): ValueConverter = dataType match { + nullable: Boolean = true): ValueConverter = dataType match { case _: ByteType => (d: String) => nullSafeDatum(d, name, nullable, options)(_.toByte) @@ -166,7 +167,7 @@ class UnivocityParser( nullSafeDatum(d, name, nullable, options)(UTF8String.fromString) case udt: UserDefinedType[_] => (datum: String) => - makeConverter(name, udt.sqlType, nullable, options) + makeConverter(name, udt.sqlType, nullable) // We don't actually hit this exception though, we keep it for understandability case _ => throw new RuntimeException(s"Unsupported type: ${dataType.typeName}") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala index c361f3d2a334..bcbd29402eb0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -45,7 +45,7 @@ class Iso8601DateTimeFormatter( .parseDefaulting(ChronoField.HOUR_OF_DAY, 0) .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0) .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) - .parseDefaulting(ChronoField.NANO_OF_SECOND, 0) + .parseDefaulting(ChronoField.MICRO_OF_SECOND, 0) .toFormatter(locale) def toInstant(s: String): Instant = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 64cf7f8fc7e2..73c14dce5c88 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -20,16 +20,13 @@ package org.apache.spark.sql.catalyst.csv import java.math.BigDecimal import org.apache.commons.lang3.time.FastDateFormat + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String class UnivocityParserSuite extends SparkFunSuite { - private val parser = new UnivocityParser( - StructType(Seq.empty), - new CSVOptions(Map.empty[String, String], false, "GMT")) - private def assertNull(v: Any) = assert(v == null) test("Can parse decimal type values") { @@ -40,7 +37,8 @@ class UnivocityParserSuite extends SparkFunSuite { stringValues.zip(decimalValues).foreach { case (strVal, decimalVal) => val decimalValue = new BigDecimal(decimalVal.toString) val options = new CSVOptions(Map.empty[String, String], false, "GMT") - assert(parser.makeConverter("_1", decimalType, options = options).apply(strVal) === + val parser = new UnivocityParser(StructType(Seq.empty), options) + assert(parser.makeConverter("_1", decimalType).apply(strVal) === Decimal(decimalValue, decimalType.precision, decimalType.scale)) } } @@ -53,22 +51,23 @@ class UnivocityParserSuite extends SparkFunSuite { types.foreach { t => // Tests that a custom nullValue. val nullValueOptions = new CSVOptions(Map("nullValue" -> "-"), false, "GMT") - val converter = - parser.makeConverter("_1", t, nullable = true, options = nullValueOptions) + var parser = new UnivocityParser(StructType(Seq.empty), nullValueOptions) + val converter = parser.makeConverter("_1", t, nullable = true) assertNull(converter.apply("-")) assertNull(converter.apply(null)) // Tests that the default nullValue is empty string. val options = new CSVOptions(Map.empty[String, String], false, "GMT") - assertNull(parser.makeConverter("_1", t, nullable = true, options = options).apply("")) + parser = new UnivocityParser(StructType(Seq.empty), options) + assertNull(parser.makeConverter("_1", t, nullable = true).apply("")) } // Not nullable field with nullValue option. types.foreach { t => // Casts a null to not nullable field should throw an exception. val options = new CSVOptions(Map("nullValue" -> "-"), false, "GMT") - val converter = - parser.makeConverter("_1", t, nullable = false, options = options) + val parser = new UnivocityParser(StructType(Seq.empty), options) + val converter = parser.makeConverter("_1", t, nullable = false) var message = intercept[RuntimeException] { converter.apply("-") }.getMessage @@ -83,64 +82,74 @@ class UnivocityParserSuite extends SparkFunSuite { // null. Seq(true, false).foreach { b => val options = new CSVOptions(Map("nullValue" -> "null"), false, "GMT") - val converter = - parser.makeConverter("_1", StringType, nullable = b, options = options) + val parser = new UnivocityParser(StructType(Seq.empty), options) + val converter = parser.makeConverter("_1", StringType, nullable = b) assert(converter.apply("") == UTF8String.fromString("")) } } test("Throws exception for empty string with non null type") { - val options = new CSVOptions(Map.empty[String, String], false, "GMT") + val options = new CSVOptions(Map.empty[String, String], false, "GMT") + val parser = new UnivocityParser(StructType(Seq.empty), options) val exception = intercept[RuntimeException]{ - parser.makeConverter("_1", IntegerType, nullable = false, options = options).apply("") + parser.makeConverter("_1", IntegerType, nullable = false).apply("") } assert(exception.getMessage.contains("null value found but field _1 is not nullable.")) } test("Types are cast correctly") { val options = new CSVOptions(Map.empty[String, String], false, "GMT") - assert(parser.makeConverter("_1", ByteType, options = options).apply("10") == 10) - assert(parser.makeConverter("_1", ShortType, options = options).apply("10") == 10) - assert(parser.makeConverter("_1", IntegerType, options = options).apply("10") == 10) - assert(parser.makeConverter("_1", LongType, options = options).apply("10") == 10) - assert(parser.makeConverter("_1", FloatType, options = options).apply("1.00") == 1.0) - assert(parser.makeConverter("_1", DoubleType, options = options).apply("1.00") == 1.0) - assert(parser.makeConverter("_1", BooleanType, options = options).apply("true") == true) - - val timestampsOptions = + var parser = new UnivocityParser(StructType(Seq.empty), options) + assert(parser.makeConverter("_1", ByteType).apply("10") == 10) + assert(parser.makeConverter("_1", ShortType).apply("10") == 10) + assert(parser.makeConverter("_1", IntegerType).apply("10") == 10) + assert(parser.makeConverter("_1", LongType).apply("10") == 10) + assert(parser.makeConverter("_1", FloatType).apply("1.00") == 1.0) + assert(parser.makeConverter("_1", DoubleType).apply("1.00") == 1.0) + assert(parser.makeConverter("_1", BooleanType).apply("true") == true) + + var timestampsOptions = new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), false, "GMT") + parser = new UnivocityParser(StructType(Seq.empty), timestampsOptions) val customTimestamp = "31/01/2015 00:00" - val format = FastDateFormat.getInstance( + var format = FastDateFormat.getInstance( timestampsOptions.timestampFormat, timestampsOptions.timeZone, timestampsOptions.locale) val expectedTime = format.parse(customTimestamp).getTime - val castedTimestamp = - parser.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions) + val castedTimestamp = parser.makeConverter("_1", TimestampType, nullable = true) .apply(customTimestamp) assert(castedTimestamp == expectedTime * 1000L) val customDate = "31/01/2015" val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), false, "GMT") + parser = new UnivocityParser(StructType(Seq.empty), dateOptions) + format = FastDateFormat.getInstance( + dateOptions.dateFormat, dateOptions.timeZone, dateOptions.locale) val expectedDate = format.parse(customDate).getTime - val castedDate = - parser.makeConverter("_1", DateType, nullable = true, options = dateOptions) - .apply(customTimestamp) + val castedDate = parser.makeConverter("_1", DateType, nullable = true) + .apply(customDate) assert(castedDate == DateTimeUtils.millisToDays(expectedDate)) val timestamp = "2015-01-01 00:00:00" - assert(parser.makeConverter("_1", TimestampType, options = options).apply(timestamp) == - DateTimeUtils.stringToTime(timestamp).getTime * 1000L) - assert(parser.makeConverter("_1", DateType, options = options).apply("2015-01-01") == - DateTimeUtils.millisToDays(DateTimeUtils.stringToTime("2015-01-01").getTime)) + timestampsOptions = new CSVOptions(Map( + "timestampFormat" -> "yyyy-MM-dd HH:mm:ss", + "dateFormat" -> "yyyy-MM-dd"), false, "UTC") + parser = new UnivocityParser(StructType(Seq.empty), timestampsOptions) + val expected = 1420070400 * DateTimeUtils.MICROS_PER_SECOND + assert(parser.makeConverter("_1", TimestampType).apply(timestamp) == + expected) + assert(parser.makeConverter("_1", DateType).apply("2015-01-01") == + expected / DateTimeUtils.MICROS_PER_DAY) } test("Throws exception for casting an invalid string to Float and Double Types") { val options = new CSVOptions(Map.empty[String, String], false, "GMT") + val parser = new UnivocityParser(StructType(Seq.empty), options) val types = Seq(DoubleType, FloatType) val input = Seq("10u000", "abc", "1 2/3") types.foreach { dt => input.foreach { v => val message = intercept[NumberFormatException] { - parser.makeConverter("_1", dt, options = options).apply(v) + parser.makeConverter("_1", dt).apply(v) }.getMessage assert(message.contains(v)) } @@ -149,9 +158,9 @@ class UnivocityParserSuite extends SparkFunSuite { test("Float NaN values are parsed correctly") { val options = new CSVOptions(Map("nanValue" -> "nn"), false, "GMT") + val parser = new UnivocityParser(StructType(Seq.empty), options) val floatVal: Float = parser.makeConverter( - "_1", FloatType, nullable = true, options = options - ).apply("nn").asInstanceOf[Float] + "_1", FloatType, nullable = true).apply("nn").asInstanceOf[Float] // Java implements the IEEE-754 floating point standard which guarantees that any comparison // against NaN will return false (except != which returns true) @@ -160,41 +169,41 @@ class UnivocityParserSuite extends SparkFunSuite { test("Double NaN values are parsed correctly") { val options = new CSVOptions(Map("nanValue" -> "-"), false, "GMT") + val parser = new UnivocityParser(StructType(Seq.empty), options) val doubleVal: Double = parser.makeConverter( - "_1", DoubleType, nullable = true, options = options - ).apply("-").asInstanceOf[Double] + "_1", DoubleType, nullable = true).apply("-").asInstanceOf[Double] assert(doubleVal.isNaN) } test("Float infinite values can be parsed") { val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), false, "GMT") + var parser = new UnivocityParser(StructType(Seq.empty), negativeInfOptions) val floatVal1 = parser.makeConverter( - "_1", FloatType, nullable = true, options = negativeInfOptions - ).apply("max").asInstanceOf[Float] + "_1", FloatType, nullable = true).apply("max").asInstanceOf[Float] assert(floatVal1 == Float.NegativeInfinity) val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), false, "GMT") + parser = new UnivocityParser(StructType(Seq.empty), positiveInfOptions) val floatVal2 = parser.makeConverter( - "_1", FloatType, nullable = true, options = positiveInfOptions - ).apply("max").asInstanceOf[Float] + "_1", FloatType, nullable = true).apply("max").asInstanceOf[Float] assert(floatVal2 == Float.PositiveInfinity) } test("Double infinite values can be parsed") { val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), false, "GMT") + var parser = new UnivocityParser(StructType(Seq.empty), negativeInfOptions) val doubleVal1 = parser.makeConverter( - "_1", DoubleType, nullable = true, options = negativeInfOptions - ).apply("max").asInstanceOf[Double] + "_1", DoubleType, nullable = true).apply("max").asInstanceOf[Double] assert(doubleVal1 == Double.NegativeInfinity) val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), false, "GMT") + parser = new UnivocityParser(StructType(Seq.empty), positiveInfOptions) val doubleVal2 = parser.makeConverter( - "_1", DoubleType, nullable = true, options = positiveInfOptions - ).apply("max").asInstanceOf[Double] + "_1", DoubleType, nullable = true).apply("max").asInstanceOf[Double] assert(doubleVal2 == Double.PositiveInfinity) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index c275d63d32cc..f96dd294aa0d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -622,7 +622,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val options = Map( "header" -> "true", "inferSchema" -> "false", - "dateFormat" -> "dd/MM/yyyy hh:mm") + "dateFormat" -> "dd/MM/yyyy HH:mm") val results = spark.read .format("csv") .options(options) From 8db023834b680f336ff5a0e08253ba2cb3b6e3b7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 26 Nov 2018 00:09:20 +0100 Subject: [PATCH 13/29] CSVSuite passed --- .../execution/datasources/csv/CSVSuite.scala | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f96dd294aa0d..01c627f63582 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -37,7 +37,7 @@ import org.apache.spark.{SparkException, TestUtils} import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} +import org.apache.spark.sql.test.{SQLTestUtils, SharedSQLContext} import org.apache.spark.sql.types._ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with TestCsvData { @@ -586,6 +586,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val results = spark.read .format("csv") .options(Map("comment" -> "~", "header" -> "false", "inferSchema" -> "true")) + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") .load(testFile(commentsFile)) .collect() @@ -626,22 +627,19 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val results = spark.read .format("csv") .options(options) + .option("timeZone", "UTC") .schema(customSchema) .load(testFile(datesFile)) .select("date") .collect() - val dateFormat = new SimpleDateFormat("dd/MM/yyyy hh:mm", Locale.US) - val expected = Seq( - new Date(dateFormat.parse("26/08/2015 18:00").getTime), - new Date(dateFormat.parse("27/10/2014 18:30").getTime), - new Date(dateFormat.parse("28/01/2016 20:00").getTime)) + val expected = Seq("2015-08-26", "2014-10-27", "2016-01-28") val dates = results.toSeq.map(_.toSeq.head) expected.zip(dates).foreach { case (expectedDate, date) => // As it truncates the hours, minutes and etc., we only check // if the dates (days, months and years) are the same via `toString()`. - assert(expectedDate.toString === date.toString) + assert(expectedDate == date.toString) } } @@ -916,11 +914,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te .option("header", "true") .load(iso8601datesPath) - val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd", Locale.US) - val expectedDates = dates.collect().map { r => - // This should be ISO8601 formatted string. - Row(iso8501.format(r.toSeq.head)) - } + val expectedDates = Seq(Row("2015-08-26"), Row("2014-10-27"), Row("2016-01-28")) checkAnswer(iso8601dates, expectedDates) } @@ -1107,7 +1101,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` field") { Seq(false, true).foreach { multiLine => - val schema = new StructType().add("a", IntegerType).add("b", TimestampType) + val schema = new StructType().add("a", IntegerType).add("b", DateType) // We use `PERMISSIVE` mode by default if invalid string is given. val df1 = spark .read @@ -1139,7 +1133,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val schemaWithCorrField2 = new StructType() .add("a", IntegerType) .add(columnNameOfCorruptRecord, StringType) - .add("b", TimestampType) + .add("b", DateType) val df3 = spark .read .option("mode", "permissive") @@ -1325,7 +1319,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val columnNameOfCorruptRecord = "_corrupt_record" val schema = new StructType() .add("a", IntegerType) - .add("b", TimestampType) + .add("b", DateType) .add(columnNameOfCorruptRecord, StringType) // negative cases val msg = intercept[AnalysisException] { From 0b9ed92a456d60db0934340f37e0bd428b2f6a42 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 26 Nov 2018 23:00:10 +0100 Subject: [PATCH 14/29] Fix imports --- .../org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index ea35ffb6253b..9b82d96f2734 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -20,8 +20,9 @@ package org.apache.spark.sql.catalyst.csv import java.io.Writer import com.univocity.parsers.csv.CsvWriter + import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter, DateTimeUtils} +import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter} import org.apache.spark.sql.types._ class UnivocityGenerator( From 799ebb3432dec7fe1e1099d68a3f1c09e714aa8e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 26 Nov 2018 23:03:19 +0100 Subject: [PATCH 15/29] Revert test back --- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 01c627f63582..2e32565c864d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -633,13 +633,17 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te .select("date") .collect() - val expected = Seq("2015-08-26", "2014-10-27", "2016-01-28") + val dateFormat = new SimpleDateFormat("dd/MM/yyyy hh:mm", Locale.US) + val expected = Seq( + new Date(dateFormat.parse("26/08/2015 18:00").getTime), + new Date(dateFormat.parse("27/10/2014 18:30").getTime), + new Date(dateFormat.parse("28/01/2016 20:00").getTime)) val dates = results.toSeq.map(_.toSeq.head) expected.zip(dates).foreach { case (expectedDate, date) => // As it truncates the hours, minutes and etc., we only check // if the dates (days, months and years) are the same via `toString()`. - assert(expectedDate == date.toString) + assert(expectedDate.toString === date.toString) } } From 5a223919439e2d22814b92c0e1e572b3c318566f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 26 Nov 2018 23:17:11 +0100 Subject: [PATCH 16/29] Set timeZone --- .../execution/datasources/csv/CSVSuite.scala | 56 ++++++++++--------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 2e32565c864d..e27bb0696916 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -37,7 +37,7 @@ import org.apache.spark.{SparkException, TestUtils} import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.{SQLTestUtils, SharedSQLContext} +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.sql.types._ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with TestCsvData { @@ -895,32 +895,38 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } test("Write dates correctly in ISO8601 format by default") { - withTempDir { dir => - val customSchema = new StructType(Array(StructField("date", DateType, true))) - val iso8601datesPath = s"${dir.getCanonicalPath}/iso8601dates.csv" - val dates = spark.read - .format("csv") - .schema(customSchema) - .option("header", "true") - .option("inferSchema", "false") - .option("dateFormat", "dd/MM/yyyy HH:mm") - .load(testFile(datesFile)) - dates.write - .format("csv") - .option("header", "true") - .save(iso8601datesPath) - - // This will load back the dates as string. - val stringSchema = StructType(StructField("date", StringType, true) :: Nil) - val iso8601dates = spark.read - .format("csv") - .schema(stringSchema) - .option("header", "true") - .load(iso8601datesPath) + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + withTempDir { dir => + val customSchema = new StructType(Array(StructField("date", DateType, true))) + val iso8601datesPath = s"${dir.getCanonicalPath}/iso8601dates.csv" + val dates = spark.read + .format("csv") + .schema(customSchema) + .option("header", "true") + .option("inferSchema", "false") + .option("dateFormat", "dd/MM/yyyy HH:mm") + .load(testFile(datesFile)) + dates.write + .format("csv") + .option("header", "true") + .save(iso8601datesPath) - val expectedDates = Seq(Row("2015-08-26"), Row("2014-10-27"), Row("2016-01-28")) + // This will load back the dates as string. + val stringSchema = StructType(StructField("date", StringType, true) :: Nil) + val iso8601dates = spark.read + .format("csv") + .schema(stringSchema) + .option("header", "true") + .load(iso8601datesPath) + + val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd", Locale.US) + val expectedDates = dates.collect().map { r => + // This should be ISO8601 formatted string. + Row(iso8501.format(r.toSeq.head)) + } - checkAnswer(iso8601dates, expectedDates) + checkAnswer(iso8601dates, expectedDates) + } } } From f287b77d94de9e9f466c0ff2c2370f22a46b48f7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 26 Nov 2018 23:44:42 +0100 Subject: [PATCH 17/29] Removing default for micros because it causes conflicts in parsing --- .../org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala index bcbd29402eb0..25cb7bef143c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -45,7 +45,6 @@ class Iso8601DateTimeFormatter( .parseDefaulting(ChronoField.HOUR_OF_DAY, 0) .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0) .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) - .parseDefaulting(ChronoField.MICRO_OF_SECOND, 0) .toFormatter(locale) def toInstant(s: String): Instant = { From 52074f77a7ffbf26444e67077a353c0ebd07d832 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 27 Nov 2018 09:51:41 +0100 Subject: [PATCH 18/29] Set timezone otherwise default is using --- .../apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 73c14dce5c88..d0908767ffcc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.csv import java.math.BigDecimal +import java.util.TimeZone import org.apache.commons.lang3.time.FastDateFormat @@ -127,7 +128,7 @@ class UnivocityParserSuite extends SparkFunSuite { val expectedDate = format.parse(customDate).getTime val castedDate = parser.makeConverter("_1", DateType, nullable = true) .apply(customDate) - assert(castedDate == DateTimeUtils.millisToDays(expectedDate)) + assert(castedDate == DateTimeUtils.millisToDays(expectedDate, TimeZone.getTimeZone("GMT"))) val timestamp = "2015-01-01 00:00:00" timestampsOptions = new CSVOptions(Map( From 647b09c10be7067abb50ef305f932895cf5e14e3 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 29 Nov 2018 13:10:32 +0100 Subject: [PATCH 19/29] Removing CSVOptions param from CsvInferSchema methods --- .../sql/catalyst/csv/CSVInferSchema.scala | 59 ++++++------- .../spark/sql/catalyst/csv/CSVOptions.scala | 1 - .../catalyst/expressions/csvExpressions.scala | 4 +- .../catalyst/csv/CSVInferSchemaSuite.scala | 88 +++++++++---------- .../datasources/csv/CSVDataSource.scala | 4 +- 5 files changed, 76 insertions(+), 80 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 89855f853e01..c9d9c10d6697 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -42,14 +42,13 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { */ def infer( tokenRDD: RDD[Array[String]], - header: Array[String], - options: CSVOptions): StructType = { + header: Array[String]): StructType = { val fields = if (options.inferSchemaFlag) { val startType: Array[DataType] = Array.fill[DataType](header.length)(NullType) val rootTypes: Array[DataType] = - tokenRDD.aggregate(startType)(inferRowType(options), mergeRowTypes) + tokenRDD.aggregate(startType)(inferRowType, mergeRowTypes) - toStructFields(rootTypes, header, options) + toStructFields(rootTypes, header) } else { // By default fields are assumed to be StringType header.map(fieldName => StructField(fieldName, StringType, nullable = true)) @@ -60,8 +59,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { def toStructFields( fieldTypes: Array[DataType], - header: Array[String], - options: CSVOptions): Array[StructField] = { + header: Array[String]): Array[StructField] = { header.zip(fieldTypes).map { case (thisHeader, rootType) => val dType = rootType match { case _: NullType => StringType @@ -71,11 +69,10 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { } } - def inferRowType(options: CSVOptions) - (rowSoFar: Array[DataType], next: Array[String]): Array[DataType] = { + def inferRowType(rowSoFar: Array[DataType], next: Array[String]): Array[DataType] = { var i = 0 while (i < math.min(rowSoFar.length, next.length)) { // May have columns on right missing. - rowSoFar(i) = inferField(rowSoFar(i), next(i), options) + rowSoFar(i) = inferField(rowSoFar(i), next(i)) i+=1 } rowSoFar @@ -91,20 +88,20 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { * Infer type of string field. Given known type Double, and a string "1", there is no * point checking if it is an Int, as the final type must be Double or higher. */ - def inferField(typeSoFar: DataType, field: String, options: CSVOptions): DataType = { + def inferField(typeSoFar: DataType, field: String): DataType = { if (field == null || field.isEmpty || field == options.nullValue) { typeSoFar } else { typeSoFar match { - case NullType => tryParseInteger(field, options) - case IntegerType => tryParseInteger(field, options) - case LongType => tryParseLong(field, options) + case NullType => tryParseInteger(field) + case IntegerType => tryParseInteger(field) + case LongType => tryParseLong(field) case _: DecimalType => // DecimalTypes have different precisions and scales, so we try to find the common type. - compatibleType(typeSoFar, tryParseDecimal(field, options)).getOrElse(StringType) - case DoubleType => tryParseDouble(field, options) - case TimestampType => tryParseTimestamp(field, options) - case BooleanType => tryParseBoolean(field, options) + compatibleType(typeSoFar, tryParseDecimal(field)).getOrElse(StringType) + case DoubleType => tryParseDouble(field) + case TimestampType => tryParseTimestamp(field) + case BooleanType => tryParseBoolean(field) case StringType => StringType case other: DataType => throw new UnsupportedOperationException(s"Unexpected data type $other") @@ -112,27 +109,27 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { } } - private def isInfOrNan(field: String, options: CSVOptions): Boolean = { + private def isInfOrNan(field: String): Boolean = { field == options.nanValue || field == options.negativeInf || field == options.positiveInf } - private def tryParseInteger(field: String, options: CSVOptions): DataType = { + private def tryParseInteger(field: String): DataType = { if ((allCatch opt field.toInt).isDefined) { IntegerType } else { - tryParseLong(field, options) + tryParseLong(field) } } - private def tryParseLong(field: String, options: CSVOptions): DataType = { + private def tryParseLong(field: String): DataType = { if ((allCatch opt field.toLong).isDefined) { LongType } else { - tryParseDecimal(field, options) + tryParseDecimal(field) } } - private def tryParseDecimal(field: String, options: CSVOptions): DataType = { + private def tryParseDecimal(field: String): DataType = { val decimalTry = allCatch opt { // `BigDecimal` conversion can fail when the `field` is not a form of number. val bigDecimal = new BigDecimal(field) @@ -144,30 +141,30 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { // 2. scale is bigger than precision. DecimalType(bigDecimal.precision, bigDecimal.scale) } else { - tryParseDouble(field, options) + tryParseDouble(field) } } - decimalTry.getOrElse(tryParseDouble(field, options)) + decimalTry.getOrElse(tryParseDouble(field)) } - private def tryParseDouble(field: String, options: CSVOptions): DataType = { - if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field, options)) { + private def tryParseDouble(field: String): DataType = { + if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) { DoubleType } else { - tryParseTimestamp(field, options) + tryParseTimestamp(field) } } - private def tryParseTimestamp(field: String, options: CSVOptions): DataType = { + private def tryParseTimestamp(field: String): DataType = { // This case infers a custom `dataFormat` is set. if ((allCatch opt timeParser.parse(field)).isDefined) { TimestampType } else { - tryParseBoolean(field, options) + tryParseBoolean(field) } } - private def tryParseBoolean(field: String, options: CSVOptions): DataType = { + private def tryParseBoolean(field: String): DataType = { if ((allCatch opt field.toBoolean).isDefined) { BooleanType } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index b98560edd4ae..90c96d1f55c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets import java.util.{Locale, TimeZone} import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling} -import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util._ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 00aa9096e1c3..83b0299bac44 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -181,8 +181,8 @@ case class SchemaOfCsv( val header = row.zipWithIndex.map { case (_, index) => s"_c$index" } val startType: Array[DataType] = Array.fill[DataType](header.length)(NullType) val inferSchema = new CSVInferSchema(parsedOptions) - val fieldTypes = inferSchema.inferRowType(parsedOptions)(startType, row) - val st = StructType(inferSchema.toStructFields(fieldTypes, header, parsedOptions)) + val fieldTypes = inferSchema.inferRowType(startType, row) + val st = StructType(inferSchema.toStructFields(fieldTypes, header)) UTF8String.fromString(st.catalogString) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index cdcb46a0ea34..42b4b0968b42 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -27,68 +27,68 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { val options = new CSVOptions(Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss"), false, "GMT") val inferSchema = new CSVInferSchema(options) - assert(inferSchema.inferField(NullType, "", options) == NullType) - assert(inferSchema.inferField(NullType, null, options) == NullType) - assert(inferSchema.inferField(NullType, "100000000000", options) == LongType) - assert(inferSchema.inferField(NullType, "60", options) == IntegerType) - assert(inferSchema.inferField(NullType, "3.5", options) == DoubleType) - assert(inferSchema.inferField(NullType, "test", options) == StringType) - assert(inferSchema.inferField(NullType, "2015-08-20 15:57:00", options) == TimestampType) - assert(inferSchema.inferField(NullType, "True", options) == BooleanType) - assert(inferSchema.inferField(NullType, "FAlSE", options) == BooleanType) + assert(inferSchema.inferField(NullType, "") == NullType) + assert(inferSchema.inferField(NullType, null) == NullType) + assert(inferSchema.inferField(NullType, "100000000000") == LongType) + assert(inferSchema.inferField(NullType, "60") == IntegerType) + assert(inferSchema.inferField(NullType, "3.5") == DoubleType) + assert(inferSchema.inferField(NullType, "test") == StringType) + assert(inferSchema.inferField(NullType, "2015-08-20 15:57:00") == TimestampType) + assert(inferSchema.inferField(NullType, "True") == BooleanType) + assert(inferSchema.inferField(NullType, "FAlSE") == BooleanType) val textValueOne = Long.MaxValue.toString + "0" val decimalValueOne = new java.math.BigDecimal(textValueOne) val expectedTypeOne = DecimalType(decimalValueOne.precision, decimalValueOne.scale) - assert(inferSchema.inferField(NullType, textValueOne, options) == expectedTypeOne) + assert(inferSchema.inferField(NullType, textValueOne) == expectedTypeOne) } test("String fields types are inferred correctly from other types") { val options = new CSVOptions(Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss"), false, "GMT") val inferSchema = new CSVInferSchema(options) - assert(inferSchema.inferField(LongType, "1.0", options) == DoubleType) - assert(inferSchema.inferField(LongType, "test", options) == StringType) - assert(inferSchema.inferField(IntegerType, "1.0", options) == DoubleType) - assert(inferSchema.inferField(DoubleType, null, options) == DoubleType) - assert(inferSchema.inferField(DoubleType, "test", options) == StringType) - assert(inferSchema.inferField(LongType, "2015-08-20 14:57:00", options) == TimestampType) - assert(inferSchema.inferField(DoubleType, "2015-08-20 15:57:00", options) == TimestampType) - assert(inferSchema.inferField(LongType, "True", options) == BooleanType) - assert(inferSchema.inferField(IntegerType, "FALSE", options) == BooleanType) - assert(inferSchema.inferField(TimestampType, "FALSE", options) == BooleanType) + assert(inferSchema.inferField(LongType, "1.0") == DoubleType) + assert(inferSchema.inferField(LongType, "test") == StringType) + assert(inferSchema.inferField(IntegerType, "1.0") == DoubleType) + assert(inferSchema.inferField(DoubleType, null) == DoubleType) + assert(inferSchema.inferField(DoubleType, "test") == StringType) + assert(inferSchema.inferField(LongType, "2015-08-20 14:57:00") == TimestampType) + assert(inferSchema.inferField(DoubleType, "2015-08-20 15:57:00") == TimestampType) + assert(inferSchema.inferField(LongType, "True") == BooleanType) + assert(inferSchema.inferField(IntegerType, "FALSE") == BooleanType) + assert(inferSchema.inferField(TimestampType, "FALSE") == BooleanType) val textValueOne = Long.MaxValue.toString + "0" val decimalValueOne = new java.math.BigDecimal(textValueOne) val expectedTypeOne = DecimalType(decimalValueOne.precision, decimalValueOne.scale) - assert(inferSchema.inferField(IntegerType, textValueOne, options) == expectedTypeOne) + assert(inferSchema.inferField(IntegerType, textValueOne) == expectedTypeOne) } test("Timestamp field types are inferred correctly via custom data format") { var options = new CSVOptions(Map("timestampFormat" -> "yyyy-MM"), false, "GMT") var inferSchema = new CSVInferSchema(options) - assert(inferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) + assert(inferSchema.inferField(TimestampType, "2015-08") == TimestampType) options = new CSVOptions(Map("timestampFormat" -> "yyyy"), false, "GMT") inferSchema = new CSVInferSchema(options) - assert(inferSchema.inferField(TimestampType, "2015", options) == TimestampType) + assert(inferSchema.inferField(TimestampType, "2015") == TimestampType) } test("Timestamp field types are inferred correctly from other types") { val options = new CSVOptions(Map.empty[String, String], false, "GMT") val inferSchema = new CSVInferSchema(options) - assert(inferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType) - assert(inferSchema.inferField(DoubleType, "2015-08-20 14:10", options) == StringType) - assert(inferSchema.inferField(LongType, "2015-08 14:49:00", options) == StringType) + assert(inferSchema.inferField(IntegerType, "2015-08-20 14") == StringType) + assert(inferSchema.inferField(DoubleType, "2015-08-20 14:10") == StringType) + assert(inferSchema.inferField(LongType, "2015-08 14:49:00") == StringType) } test("Boolean fields types are inferred correctly from other types") { val options = new CSVOptions(Map.empty[String, String], false, "GMT") val inferSchema = new CSVInferSchema(options) - assert(inferSchema.inferField(LongType, "Fale", options) == StringType) - assert(inferSchema.inferField(DoubleType, "TRUEe", options) == StringType) + assert(inferSchema.inferField(LongType, "Fale") == StringType) + assert(inferSchema.inferField(DoubleType, "TRUEe") == StringType) } test("Type arrays are merged to highest common type") { @@ -110,18 +110,18 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { var options = new CSVOptions(Map("nullValue" -> "null"), false, "GMT") var inferSchema = new CSVInferSchema(options) - assert(inferSchema.inferField(NullType, "null", options) == NullType) - assert(inferSchema.inferField(StringType, "null", options) == StringType) - assert(inferSchema.inferField(LongType, "null", options) == LongType) + assert(inferSchema.inferField(NullType, "null") == NullType) + assert(inferSchema.inferField(StringType, "null") == StringType) + assert(inferSchema.inferField(LongType, "null") == LongType) options = new CSVOptions(Map("nullValue" -> "\\N"), false, "GMT") inferSchema = new CSVInferSchema(options) - assert(inferSchema.inferField(IntegerType, "\\N", options) == IntegerType) - assert(inferSchema.inferField(DoubleType, "\\N", options) == DoubleType) - assert(inferSchema.inferField(TimestampType, "\\N", options) == TimestampType) - assert(inferSchema.inferField(BooleanType, "\\N", options) == BooleanType) - assert(inferSchema.inferField(DecimalType(1, 1), "\\N", options) == DecimalType(1, 1)) + assert(inferSchema.inferField(IntegerType, "\\N") == IntegerType) + assert(inferSchema.inferField(DoubleType, "\\N") == DoubleType) + assert(inferSchema.inferField(TimestampType, "\\N") == TimestampType) + assert(inferSchema.inferField(BooleanType, "\\N") == BooleanType) + assert(inferSchema.inferField(DecimalType(1, 1), "\\N") == DecimalType(1, 1)) } test("Merging Nulltypes should yield Nulltype.") { @@ -135,7 +135,7 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"), false, "GMT") val inferSchema = new CSVInferSchema(options) - assert(inferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) + assert(inferSchema.inferField(TimestampType, "2015-08") == TimestampType) } test("SPARK-18877: `inferField` on DecimalType should find a common type with `typeSoFar`") { @@ -143,16 +143,16 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { val inferSchema = new CSVInferSchema(options) // 9.03E+12 is Decimal(3, -10) and 1.19E+11 is Decimal(3, -9). - assert(inferSchema.inferField(DecimalType(3, -10), "1.19E+11", options) == + assert(inferSchema.inferField(DecimalType(3, -10), "1.19E+11") == DecimalType(4, -9)) // BigDecimal("12345678901234567890.01234567890123456789") is precision 40 and scale 20. val value = "12345678901234567890.01234567890123456789" - assert(inferSchema.inferField(DecimalType(3, -10), value, options) == DoubleType) + assert(inferSchema.inferField(DecimalType(3, -10), value) == DoubleType) // Seq(s"${Long.MaxValue}1", "2015-12-01 00:00:00") should be StringType - assert(inferSchema.inferField(NullType, s"${Long.MaxValue}1", options) == DecimalType(20, 0)) - assert(inferSchema.inferField(DecimalType(20, 0), "2015-12-01 00:00:00", options) + assert(inferSchema.inferField(NullType, s"${Long.MaxValue}1") == DecimalType(20, 0)) + assert(inferSchema.inferField(DecimalType(20, 0), "2015-12-01 00:00:00") == StringType) } @@ -161,8 +161,8 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { "positiveInf" -> "inf"), false, "GMT") val inferSchema = new CSVInferSchema(options) - assert(inferSchema.inferField(NullType, "nan", options) == DoubleType) - assert(inferSchema.inferField(NullType, "inf", options) == DoubleType) - assert(inferSchema.inferField(NullType, "-inf", options) == DoubleType) + assert(inferSchema.inferField(NullType, "nan") == DoubleType) + assert(inferSchema.inferField(NullType, "inf") == DoubleType) + assert(inferSchema.inferField(NullType, "-inf") == DoubleType) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index b2c7ce47aeb2..b46dfb94c133 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -135,7 +135,7 @@ object TextInputCSVDataSource extends CSVDataSource { val parser = new CsvParser(parsedOptions.asParserSettings) linesWithoutHeader.map(parser.parseLine) } - new CSVInferSchema(parsedOptions).infer(tokenRDD, header, parsedOptions) + new CSVInferSchema(parsedOptions).infer(tokenRDD, header) case _ => // If the first line could not be read, just return the empty schema. StructType(Nil) @@ -208,7 +208,7 @@ object MultiLineCSVDataSource extends CSVDataSource { encoding = parsedOptions.charset) } val sampled = CSVUtils.sample(tokenRDD, parsedOptions) - new CSVInferSchema(parsedOptions).infer(sampled, header, parsedOptions) + new CSVInferSchema(parsedOptions).infer(sampled, header) case None => // If the first row could not be read, just return the empty schema. StructType(Nil) From 4d6c86b4d82743b6ca9f7389579646324bca2727 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 29 Nov 2018 13:19:18 +0100 Subject: [PATCH 20/29] Use constants --- .../spark/sql/catalyst/util/DateTimeFormatter.scala | 12 +++++++----- .../spark/sql/catalyst/util/DateTimeUtils.scala | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala index 25cb7bef143c..d708eec050ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -58,18 +58,20 @@ class Iso8601DateTimeFormatter( } } - def conv(instant: Instant, secMul: Long, nanoDiv: Long): Long = { + private def instantToMicros(instant: Instant, secMul: Long, nanoDiv: Long): Long = { val sec = Math.multiplyExact(instant.getEpochSecond, secMul) val result = Math.addExact(sec, instant.getNano / nanoDiv) result } - def parse(s: String): Long = conv(toInstant(s), 1000000, 1000) + def parse(s: String): Long = { + instantToMicros(toInstant(s), DateTimeUtils.MICROS_PER_SECOND, DateTimeUtils.NANOS_PER_MICROS) + } def format(us: Long): String = { - val secs = Math.floorDiv(us, 1000000) - val mos = Math.floorMod(us, 1000000) - val instant = Instant.ofEpochSecond(secs, mos * 1000) + val secs = Math.floorDiv(us, DateTimeUtils.MICROS_PER_SECOND) + val mos = Math.floorMod(us, DateTimeUtils.MICROS_PER_SECOND) + val instant = Instant.ofEpochSecond(secs, mos * DateTimeUtils.NANOS_PER_MICROS) formatter.withZone(timeZone.toZoneId).format(instant) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 5ae75dc93930..c6dfdbf2505b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -50,7 +50,7 @@ object DateTimeUtils { final val MILLIS_PER_SECOND = 1000L final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY - + final val NANOS_PER_MICROS = 1000L final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L // number of days in 400 years From f3f46c75a225021f7b7c743dd977d8d195a508fc Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 30 Nov 2018 10:46:00 +0100 Subject: [PATCH 21/29] Merging followup --- .../org/apache/spark/sql/internal/SQLConf.scala | 13 ++++++------- .../sql/catalyst/csv/UnivocityParserSuite.scala | 2 +- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 37a8c0ffb944..b2b277364f4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1619,13 +1619,12 @@ object SQLConf { .booleanConf .createWithDefault(true) - val LEGACY_TIME_PARSER_ENABLED = - buildConf("spark.sql.legacy.timeParser.enabled") - .doc("When set to true, java.text.SimpleDateFormat is using for formatting and parsing dates " + - " and timestamps in a locale-sensitive manner. When set to false, classes from " + - "java.time.* packages are using for the same purpose.") - .booleanConf - .createWithDefault(false) + val LEGACY_TIME_PARSER_ENABLED = buildConf("spark.sql.legacy.timeParser.enabled") + .doc("When set to true, java.text.SimpleDateFormat is using for formatting and parsing " + + " dates/timestamps in a locale-sensitive manner. When set to false, classes from " + + "java.time.* packages are using for the same purpose.") + .booleanConf + .createWithDefault(false) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index e3f9ff21a55b..2d0b0d3033a9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -222,7 +222,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { val options = new CSVOptions(Map("locale" -> langTag), false, "GMT") val parser = new UnivocityParser(new StructType().add("d", decimalType), options) - assert(parser.makeConverter("_1", decimalType, options = options).apply(input) === expected) + assert(parser.makeConverter("_1", decimalType).apply(input) === expected) } Seq("en-US", "ko-KR", "ru-RU", "de-DE").foreach(checkDecimalParsing) From 3f3ca707b19f7ee038ce72fe043b3481bb02a7ce Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 1 Dec 2018 20:51:14 +0100 Subject: [PATCH 22/29] Updating the migration guide --- docs/sql-migration-guide-upgrade.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index e48125a0972b..d14068bc6c7d 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -31,6 +31,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, the `SET` command works without any warnings even if the specified key is for `SparkConf` entries and it has no effect because the command does not update `SparkConf`, but the behavior might confuse users. Since 3.0, the command fails if a `SparkConf` key is used. You can disable such a check by setting `spark.sql.legacy.execution.setCommandRejectsSparkConfs` to `false`. + - Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`. + ## Upgrading From Spark SQL 2.3 to 2.4 - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below. From 1dd9ed106e25a9381dd482c53c5969ef0733d9bc Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 1 Dec 2018 20:53:38 +0100 Subject: [PATCH 23/29] Inlining method's arguments --- .../spark/sql/catalyst/util/DateTimeFormatter.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala index d708eec050ea..a2055845a586 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -58,15 +58,13 @@ class Iso8601DateTimeFormatter( } } - private def instantToMicros(instant: Instant, secMul: Long, nanoDiv: Long): Long = { - val sec = Math.multiplyExact(instant.getEpochSecond, secMul) - val result = Math.addExact(sec, instant.getNano / nanoDiv) + private def instantToMicros(instant: Instant): Long = { + val sec = Math.multiplyExact(instant.getEpochSecond, DateTimeUtils.MICROS_PER_SECOND) + val result = Math.addExact(sec, instant.getNano / DateTimeUtils.NANOS_PER_MICROS) result } - def parse(s: String): Long = { - instantToMicros(toInstant(s), DateTimeUtils.MICROS_PER_SECOND, DateTimeUtils.NANOS_PER_MICROS) - } + def parse(s: String): Long = instantToMicros(toInstant(s)) def format(us: Long): String = { val secs = Math.floorDiv(us, DateTimeUtils.MICROS_PER_SECOND) From 83bf58bc59c3c6bebfe4acfd6afec4e35ea67c38 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 1 Dec 2018 20:55:31 +0100 Subject: [PATCH 24/29] Additional fallback --- .../spark/sql/catalyst/util/DateTimeFormatter.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala index a2055845a586..edd5f9ec12fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -154,8 +154,14 @@ class LegacyFallbackDateFormatter( timeZone: TimeZone, locale: Locale) extends LegacyDateFormatter(pattern, timeZone, locale) { override def parse(s: String): Int = { - Try(super.parse(s)).getOrElse { - DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(s).getTime) + Try(super.parse(s)).orElse { + // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility. + Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(s).getTime)) + }.getOrElse { + // In Spark 1.5.0, we store the data as number of days since epoch in string. + // So, we just convert it to Int. + s.toInt } } } From 00509d3a94e0679505cd9fde78e38a3a15d11bde Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 1 Dec 2018 21:03:17 +0100 Subject: [PATCH 25/29] Removing unrelated changes --- .../apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index bd7271c353f2..c2b525ad1a9f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -68,8 +68,9 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { } test("Timestamp field types are inferred correctly via custom data format") { - var options = new CSVOptions(Map("timestampFormat" -> "yyyy-MM"), false, "GMT") + var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), false, "GMT") var inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(TimestampType, "2015-08") == TimestampType) options = new CSVOptions(Map("timestampFormat" -> "yyyy"), false, "GMT") @@ -130,6 +131,7 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { test("Merging Nulltypes should yield Nulltype.") { val options = new CSVOptions(Map.empty[String, String], false, "GMT") val inferSchema = new CSVInferSchema(options) + val mergedNullTypes = inferSchema.mergeRowTypes(Array(NullType), Array(NullType)) assert(mergedNullTypes.deep == Array(NullType).deep) } From e9d6bb03bd27448784761a244fac09bdd809724c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 2 Dec 2018 13:43:24 +0100 Subject: [PATCH 26/29] Using floorDiv to take days from seconds --- .../apache/spark/sql/catalyst/util/DateTimeFormatter.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala index edd5f9ec12fa..ad1f4131de2f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala @@ -123,7 +123,9 @@ class Iso8601DateFormatter( override def parse(s: String): Int = { val seconds = dateTimeFormatter.toInstant(s).getEpochSecond - (seconds / DateTimeUtils.SECONDS_PER_DAY).toInt + val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY) + + days.toInt } override def format(days: Int): String = { From 1ad118414406d8e563c84a89e3e305b3fda4654e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 2 Dec 2018 11:24:32 +0100 Subject: [PATCH 27/29] A test for roundtrip timestamp parsing --- .../sql/util/DateTimeFormatterSuite.scala | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala new file mode 100644 index 000000000000..23a5830ff4d2 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import java.util.Locale + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter, DateTimeTestUtils} + +class DateTimeFormatterSuite extends SparkFunSuite { + + test("roundtrip parsing timestamps using timezones") { + DateTimeTestUtils.outstandingTimezones.foreach { timeZone => + val timestamp = "2018-12-02T11:22:33.123456" + val formatter = DateTimeFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US) + val micros = formatter.parse(timestamp) + val formatted = formatter.format(micros) + assert(timestamp === formatted) + } + } +} From f8097b49acf75661de79a06ec0a55f8a66516c3a Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 2 Dec 2018 14:49:16 +0100 Subject: [PATCH 28/29] Tests for DateTimeFormatter --- .../sql/catalyst/util/DateTimeTestUtils.scala | 5 +- .../sql/util/DateTimeFormatterSuite.scala | 83 +++++++++++++++++-- 2 files changed, 78 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala index dfa0fe93a2f9..66d8d28988f8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala @@ -26,7 +26,7 @@ object DateTimeTestUtils { val ALL_TIMEZONES: Seq[TimeZone] = TimeZone.getAvailableIDs.toSeq.map(TimeZone.getTimeZone) - val outstandingTimezones: Seq[TimeZone] = Seq( + val outstandingTimezonesIds: Seq[String] = Seq( "UTC", "PST", "CET", @@ -34,7 +34,8 @@ object DateTimeTestUtils { "America/Los_Angeles", "Antarctica/Vostok", "Asia/Hong_Kong", - "Europe/Amsterdam").map(TimeZone.getTimeZone) + "Europe/Amsterdam") + val outstandingTimezones: Seq[TimeZone] = outstandingTimezonesIds.map(TimeZone.getTimeZone) def withDefaultTimeZone[T](newDefaultTimeZone: TimeZone)(block: => T): T = { val originalDefaultTimeZone = TimeZone.getDefault diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala index 23a5830ff4d2..02d4ee049060 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala @@ -17,20 +17,87 @@ package org.apache.spark.sql.util -import java.util.Locale +import java.util.{Locale, TimeZone} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter, DateTimeTestUtils} class DateTimeFormatterSuite extends SparkFunSuite { + test("parsing dates using time zones") { + val localDate = "2018-12-02" + val expectedDays = Map( + "UTC" -> 17867, + "PST" -> 17867, + "CET" -> 17866, + "Africa/Dakar" -> 17867, + "America/Los_Angeles" -> 17867, + "Antarctica/Vostok" -> 17866, + "Asia/Hong_Kong" -> 17866, + "Europe/Amsterdam" -> 17866) + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + val formatter = DateFormatter("yyyy-MM-dd", TimeZone.getTimeZone(timeZone), Locale.US) + val daysSinceEpoch = formatter.parse(localDate) + assert(daysSinceEpoch === expectedDays(timeZone)) + } + } + + test("parsing timestamps using time zones") { + val localDate = "2018-12-02T10:11:12.001234" + val expectedMicros = Map( + "UTC" -> 1543745472001234L, + "PST" -> 1543774272001234L, + "CET" -> 1543741872001234L, + "Africa/Dakar" -> 1543745472001234L, + "America/Los_Angeles" -> 1543774272001234L, + "Antarctica/Vostok" -> 1543723872001234L, + "Asia/Hong_Kong" -> 1543716672001234L, + "Europe/Amsterdam" -> 1543741872001234L) + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + val formatter = DateTimeFormatter( + "yyyy-MM-dd'T'HH:mm:ss.SSSSSS", + TimeZone.getTimeZone(timeZone), + Locale.US) + val microsSinceEpoch = formatter.parse(localDate) + assert(microsSinceEpoch === expectedMicros(timeZone)) + } + } + + test("format dates using time zones") { + val daysSinceEpoch = 17867 + val expectedDate = Map( + "UTC" -> "2018-12-02", + "PST" -> "2018-12-01", + "CET" -> "2018-12-02", + "Africa/Dakar" -> "2018-12-02", + "America/Los_Angeles" -> "2018-12-01", + "Antarctica/Vostok" -> "2018-12-02", + "Asia/Hong_Kong" -> "2018-12-02", + "Europe/Amsterdam" -> "2018-12-02") + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + val formatter = DateFormatter("yyyy-MM-dd", TimeZone.getTimeZone(timeZone), Locale.US) + val date = formatter.format(daysSinceEpoch) + assert(date === expectedDate(timeZone)) + } + } - test("roundtrip parsing timestamps using timezones") { - DateTimeTestUtils.outstandingTimezones.foreach { timeZone => - val timestamp = "2018-12-02T11:22:33.123456" - val formatter = DateTimeFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US) - val micros = formatter.parse(timestamp) - val formatted = formatter.format(micros) - assert(timestamp === formatted) + test("format timestamps using time zones") { + val microsSinceEpoch = 1543745472001234L + val expectedTimestamp = Map( + "UTC" -> "2018-12-02T10:11:12.001234", + "PST" -> "2018-12-02T02:11:12.001234", + "CET" -> "2018-12-02T11:11:12.001234", + "Africa/Dakar" -> "2018-12-02T10:11:12.001234", + "America/Los_Angeles" -> "2018-12-02T02:11:12.001234", + "Antarctica/Vostok" -> "2018-12-02T16:11:12.001234", + "Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234", + "Europe/Amsterdam" -> "2018-12-02T11:11:12.001234") + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + val formatter = DateTimeFormatter( + "yyyy-MM-dd'T'HH:mm:ss.SSSSSS", + TimeZone.getTimeZone(timeZone), + Locale.US) + val timestamp = formatter.format(microsSinceEpoch) + assert(timestamp === expectedTimestamp(timeZone)) } } } From 384879531a67508e51097a4e16d550d0c50d8939 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 3 Dec 2018 11:15:52 +0100 Subject: [PATCH 29/29] Fix typo --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b2b277364f4e..9a1cadceec05 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1620,9 +1620,9 @@ object SQLConf { .createWithDefault(true) val LEGACY_TIME_PARSER_ENABLED = buildConf("spark.sql.legacy.timeParser.enabled") - .doc("When set to true, java.text.SimpleDateFormat is using for formatting and parsing " + + .doc("When set to true, java.text.SimpleDateFormat is used for formatting and parsing " + " dates/timestamps in a locale-sensitive manner. When set to false, classes from " + - "java.time.* packages are using for the same purpose.") + "java.time.* packages are used for the same purpose.") .booleanConf .createWithDefault(false) }