From d2ed68673082995bee65a3c58be8b71642a60d57 Mon Sep 17 00:00:00 2001 From: "sergei.rubtcov" Date: Wed, 3 Jan 2018 12:50:26 +0200 Subject: [PATCH 1/2] [SPARK-19228][SQL] Introduce tryParseDate method to process csv date, add a type-widening rule in findTightestCommonType between DateType and TimestampType, add java.time.format.DateTimeFormatter to more accurately infer the type of time, add an end-to-end test case and unit test --- .../datasources/csv/CSVInferSchema.scala | 20 ++++++++-- .../datasources/csv/CSVOptions.scala | 11 ++++++ .../test-data/dates-and-timestamps.csv | 4 ++ .../datasources/csv/CSVInferSchemaSuite.scala | 10 ++++- .../execution/datasources/csv/CSVSuite.scala | 39 +++++++++++++++++++ 5 files changed, 80 insertions(+), 4 deletions(-) create mode 100644 sql/core/src/test/resources/test-data/dates-and-timestamps.csv diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index a585cbed2551b..b3ef9d6d5b84c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -90,7 +90,10 @@ private[csv] object CSVInferSchema { // DecimalTypes have different precisions and scales, so we try to find the common type. findTightestCommonType(typeSoFar, tryParseDecimal(field, options)).getOrElse(StringType) case DoubleType => tryParseDouble(field, options) - case TimestampType => tryParseTimestamp(field, options) + case DateType => tryParseDate(field, options) + case TimestampType => + findTightestCommonType(typeSoFar, tryParseTimestamp(field, options)).getOrElse( + tryParseBoolean(field, options)) case BooleanType => tryParseBoolean(field, options) case StringType => StringType case other: DataType => @@ -140,14 +143,23 @@ private[csv] object CSVInferSchema { private def tryParseDouble(field: String, options: CSVOptions): DataType = { if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field, options)) { DoubleType + } else { + tryParseDate(field, options) + } + } + + private def tryParseDate(field: String, options: CSVOptions): DataType = { + // This case infers a custom `dateFormat` is set. + if ((allCatch opt options.dateFormatter.parse(field)).isDefined) { + DateType } else { tryParseTimestamp(field, options) } } private def tryParseTimestamp(field: String, options: CSVOptions): DataType = { - // This case infers a custom `dataFormat` is set. - if ((allCatch opt options.timestampFormat.parse(field)).isDefined) { + // This case infers a custom `timestampFormat` is set. + if ((allCatch opt options.timestampFormatter.parse(field)).isDefined) { TimestampType } else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) { // We keep this for backwards compatibility. @@ -216,6 +228,8 @@ private[csv] object CSVInferSchema { } else { Some(DecimalType(range + scale, scale)) } + // By design 'TimestampType' (8 bytes) is larger than 'DateType' (4 bytes). + case (t1: DateType, t2: TimestampType) => Some(TimestampType) case _ => None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index c16790630ce17..df25be85b4922 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.csv import java.nio.charset.StandardCharsets +import java.time.format.{DateTimeFormatter, ResolverStyle} import java.util.{Locale, TimeZone} import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling} @@ -150,6 +151,16 @@ class CSVOptions( val isCommentSet = this.comment != '\u0000' + def dateFormatter: DateTimeFormatter = { + DateTimeFormatter.ofPattern(dateFormat.getPattern) + .withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART) + } + + def timestampFormatter: DateTimeFormatter = { + DateTimeFormatter.ofPattern(timestampFormat.getPattern) + .withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART) + } + def asWriterSettings: CsvWriterSettings = { val writerSettings = new CsvWriterSettings() val format = writerSettings.getFormat diff --git a/sql/core/src/test/resources/test-data/dates-and-timestamps.csv b/sql/core/src/test/resources/test-data/dates-and-timestamps.csv new file mode 100644 index 0000000000000..0a9a4c2f8566c --- /dev/null +++ b/sql/core/src/test/resources/test-data/dates-and-timestamps.csv @@ -0,0 +1,4 @@ +timestamp,date +26/08/2015 22:31:46.913,27/09/2015 +27/10/2014 22:33:31.601,26/12/2016 +28/01/2016 22:33:52.888,28/01/2017 \ No newline at end of file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala index 661742087112f..d1a8822ca025e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala @@ -59,13 +59,21 @@ class CSVInferSchemaSuite extends SparkFunSuite { assert(CSVInferSchema.inferField(IntegerType, textValueOne, options) == expectedTypeOne) } - test("Timestamp field types are inferred correctly via custom data format") { + test("Timestamp field types are inferred correctly via custom date format") { var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), "GMT") assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) options = new CSVOptions(Map("timestampFormat" -> "yyyy"), "GMT") assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType) } + test("Date field types are inferred correctly via custom date and timestamp format") { + val options = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy", + "timestampFormat" -> "dd/MM/yyyy HH:mm:ss.SSS"), "GMT") + assert(CSVInferSchema.inferField(TimestampType, + "28/01/2017 22:31:46.913", options) == TimestampType) + assert(CSVInferSchema.inferField(DateType, "16/12/2012", options) == DateType) + } + test("Timestamp field types are inferred correctly from other types") { val options = new CSVOptions(Map.empty[String, String], "GMT") assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 4398e547d9217..b9e3c292f7926 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -54,6 +54,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { private val simpleSparseFile = "test-data/simple_sparse.csv" private val numbersFile = "test-data/numbers.csv" private val datesFile = "test-data/dates.csv" + private val datesAndTimestampsFile = "test-data/dates-and-timestamps.csv" private val unescapedQuotesFile = "test-data/unescaped-quotes.csv" private val valueMalformedFile = "test-data/value-malformed.csv" @@ -566,6 +567,44 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { assert(results.toSeq.map(_.toSeq) === expected) } + test("inferring timestamp types and date types via custom formats") { + val options = Map( + "header" -> "true", + "inferSchema" -> "true", + "timestampFormat" -> "dd/MM/yyyy HH:mm:ss.SSS", + "dateFormat" -> "dd/MM/yyyy") + val results = spark.read + .format("csv") + .options(options) + .load(testFile(datesAndTimestampsFile)) + assert(results.schema{0}.dataType===TimestampType) + assert(results.schema{1}.dataType===DateType) + val timestamps = spark.read + .format("csv") + .options(options) + .load(testFile(datesAndTimestampsFile)) + .select("timestamp") + .collect() + val timestampFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SSS", Locale.US) + val timestampExpected = + Seq(Seq(new Timestamp(timestampFormat.parse("26/08/2015 22:31:46.913").getTime)), + Seq(new Timestamp(timestampFormat.parse("27/10/2014 22:33:31.601").getTime)), + Seq(new Timestamp(timestampFormat.parse("28/01/2016 22:33:52.888").getTime))) + assert(timestamps.toSeq.map(_.toSeq) === timestampExpected) + val dates = spark.read + .format("csv") + .options(options) + .load(testFile(datesAndTimestampsFile)) + .select("date") + .collect() + val dateFormat = new SimpleDateFormat("dd/MM/yyyy", Locale.US) + val dateExpected = + Seq(Seq(new Date(dateFormat.parse("27/09/2015").getTime)), + Seq(new Date(dateFormat.parse("26/12/2016").getTime)), + Seq(new Date(dateFormat.parse("28/01/2017").getTime))) + assert(dates.toSeq.map(_.toSeq) === dateExpected) + } + test("load date types via custom date format") { val customSchema = new StructType(Array(StructField("date", DateType, true))) val options = Map( From 84b236a742e7f5a62ee2e6ce6d230c3e6628294b Mon Sep 17 00:00:00 2001 From: "sergei.rubtcov" Date: Thu, 19 Apr 2018 14:46:44 +0300 Subject: [PATCH 2/2] [SPARK-19228][SQL] refactor tryParseDate method after code review, DateTimeFormatter made lazy val --- .../spark/sql/execution/datasources/csv/CSVInferSchema.scala | 4 +--- .../spark/sql/execution/datasources/csv/CSVOptions.scala | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index b3ef9d6d5b84c..b36ab3238703f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -91,9 +91,7 @@ private[csv] object CSVInferSchema { findTightestCommonType(typeSoFar, tryParseDecimal(field, options)).getOrElse(StringType) case DoubleType => tryParseDouble(field, options) case DateType => tryParseDate(field, options) - case TimestampType => - findTightestCommonType(typeSoFar, tryParseTimestamp(field, options)).getOrElse( - tryParseBoolean(field, options)) + case TimestampType => tryParseTimestamp(field, options) case BooleanType => tryParseBoolean(field, options) case StringType => StringType case other: DataType => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index df25be85b4922..83103a0b8a834 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -151,12 +151,12 @@ class CSVOptions( val isCommentSet = this.comment != '\u0000' - def dateFormatter: DateTimeFormatter = { + lazy val dateFormatter: DateTimeFormatter = { DateTimeFormatter.ofPattern(dateFormat.getPattern) .withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART) } - def timestampFormatter: DateTimeFormatter = { + lazy val timestampFormatter: DateTimeFormatter = { DateTimeFormatter.ofPattern(timestampFormat.getPattern) .withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART) }