From c4900de5461e5ce6e3fccb89e6b36790e08fc439 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 20 Jul 2016 15:42:51 +0900 Subject: [PATCH 1/6] Adds the support for dateFormat option for CSV and JSON for reading and writing --- python/pyspark/sql/readwriter.py | 32 +++- python/pyspark/sql/streaming.py | 17 +- .../sql/catalyst/util/DateTimeUtils.scala | 7 + .../apache/spark/sql/DataFrameReader.scala | 9 +- .../apache/spark/sql/DataFrameWriter.scala | 8 + .../datasources/csv/CSVRelation.scala | 53 +++++- .../datasources/json/JSONOptions.scala | 8 + .../datasources/json/JacksonGenerator.scala | 23 ++- .../datasources/json/JacksonParser.scala | 27 +++ .../datasources/json/JsonFileFormat.scala | 5 +- .../sql/streaming/DataStreamReader.scala | 11 +- .../execution/datasources/csv/CSVSuite.scala | 154 +++++++++++++++++- .../datasources/json/JsonSuite.scala | 57 +++++++ .../datasources/json/TestJsonData.scala | 6 + .../sources/JsonHadoopFsRelationSuite.scala | 4 + 15 files changed, 390 insertions(+), 31 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 64de33e8ec0a8..fcb2a5263d477 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -156,7 +156,7 @@ def load(self, path=None, format=None, schema=None, **options): def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, - mode=None, columnNameOfCorruptRecord=None): + mode=None, columnNameOfCorruptRecord=None, dateFormat=None): """ Loads a JSON file (one object per line) or an RDD of Strings storing JSON objects (one object per record) and returns the result as a :class`DataFrame`. @@ -198,6 +198,12 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, ``spark.sql.columnNameOfCorruptRecord``. If None is set, it uses the value specified in ``spark.sql.columnNameOfCorruptRecord``. + :param dateFormat: sets the string that indicates a date format. Custom date formats + follow the formats at ``java.text.SimpleDateFormat``. This + applies to both date type and timestamp type. By default, it is None + which means trying to read times and date by + ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()`` or + ISO 8601 format. >>> df1 = spark.read.json('python/test_support/sql/people.json') >>> df1.dtypes @@ -213,7 +219,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames, allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, - mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord) + mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat) if isinstance(path, basestring): path = [path] if type(path) == list: @@ -328,8 +334,9 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to both date type and timestamp type. By default, it is None - which means trying to parse times and date by - ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``. + which means trying to read times and date by + ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()`` or + ISO 8601 format. :param maxColumns: defines a hard limit of how many columns a record can have. If None is set, it uses the default value, ``20480``. :param maxCharsPerColumn: defines the maximum number of characters allowed for any given @@ -571,7 +578,7 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options) self._jwrite.saveAsTable(name) @since(1.4) - def json(self, path, mode=None, compression=None): + def json(self, path, mode=None, compression=None, dateFormat=None): """Saves the content of the :class:`DataFrame` in JSON format at the specified path. :param path: the path in any Hadoop supported file system @@ -584,11 +591,15 @@ def json(self, path, mode=None, compression=None): :param compression: compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and deflate). + :param dateFormat: sets the string that indicates a date format. Custom date formats + follow the formats at ``java.text.SimpleDateFormat``. This + applies to both date type and timestamp type. By default, it is None + which means trying to write times and date by ISO 8601 format. >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) - self._set_opts(compression=compression) + self._set_opts(compression=compression, dateFormat=dateFormat) self._jwrite.json(path) @since(1.4) @@ -634,7 +645,7 @@ def text(self, path, compression=None): @since(2.0) def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None, - header=None, nullValue=None, escapeQuotes=None, quoteAll=None): + header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None): """Saves the content of the :class:`DataFrame` in CSV format at the specified path. :param path: the path in any Hadoop supported file system @@ -666,12 +677,17 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No the default value, ``false``. :param nullValue: sets the string representation of a null value. If None is set, it uses the default value, empty string. + :param dateFormat: sets the string that indicates a date format. Custom date formats + follow the formats at ``java.text.SimpleDateFormat``. This + applies to both date type and timestamp type. By default, it is None + which means trying to write times and date by ISO 8601 format. >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header, - nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll) + nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll, + dateFormat= dateFormat) self._jwrite.csv(path) @since(1.5) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index a364555003027..e65a49af3117d 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -338,7 +338,7 @@ def load(self, path=None, format=None, schema=None, **options): def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, - mode=None, columnNameOfCorruptRecord=None): + mode=None, columnNameOfCorruptRecord=None, dateFormat=None): """ Loads a JSON file stream (one object per line) and returns a :class`DataFrame`. @@ -381,6 +381,12 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, ``spark.sql.columnNameOfCorruptRecord``. If None is set, it uses the value specified in ``spark.sql.columnNameOfCorruptRecord``. + :param dateFormat: sets the string that indicates a date format. Custom date formats + follow the formats at ``java.text.SimpleDateFormat``. This + applies to both date type and timestamp type. By default, it is None + which means trying to read times and date by + ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()`` or + ISO 8601 format. >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema) >>> json_sdf.isStreaming @@ -393,7 +399,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames, allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, - mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord) + mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat) if isinstance(path, basestring): return self._df(self._jreader.json(path)) else: @@ -451,7 +457,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, - maxMalformedLogPerPartition=None, mode=None): + maxMalformedLogPerPartition=None, mode=None, dateFormat=None): """Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -495,8 +501,9 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to both date type and timestamp type. By default, it is None - which means trying to parse times and date by - ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``. + which means trying to read times and date by + ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()`` or + ISO 8601 format. :param maxColumns: defines a hard limit of how many columns a record can have. If None is set, it uses the default value, ``20480``. :param maxCharsPerColumn: defines the maximum number of characters allowed for any given diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 0b643a5b84268..cd9767b6133f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -24,6 +24,8 @@ import javax.xml.bind.DatatypeConverter import scala.annotation.tailrec +import org.apache.commons.lang3.time.FastDateFormat + import org.apache.spark.unsafe.types.UTF8String /** @@ -62,6 +64,11 @@ object DateTimeUtils { @transient lazy val defaultTimeZone = TimeZone.getDefault + // These are ISO 8601 compliant `FastDateFormat`s. These are thread-safe and compatible + // with `SimpleDateFormat`. + lazy val iso8601TimeFormat = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ") + lazy val iso8601DateFormat = FastDateFormat.getInstance("yyyy-MM-dd") + // Reuse the Calendar object in each thread as it is expensive to create in each method call. private val threadLocalGmtCalendar = new ThreadLocal[Calendar] { override protected def initialValue: Calendar = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index e23dacc7a1c05..245941cfc82f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -280,7 +280,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `columnNameOfCorruptRecord` (default is the value specified in * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.
  • + *
  • `dateFormat` (default `null`): sets the string that indicates a date format. Custom date + * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type + * and timestamp type. By default, it is `null` which means trying to read times and date by + * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()` or ISO 8601 format.
  • * + * @since 2.0.0 */ @scala.annotation.varargs @@ -376,8 +381,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * value. *
  • `dateFormat` (default `null`): sets the string that indicates a date format. Custom date * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type - * and timestamp type. By default, it is `null` which means trying to parse times and date by - * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()`.
  • + * and timestamp type. By default, it is `null` which means trying to read times and date by + * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()` or ISO 8601 format. *
  • `maxColumns` (default `20480`): defines a hard limit of how many columns * a record can have.
  • *
  • `maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 44a9f312bd76c..b41ee420e059b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -483,6 +483,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { *
  • `compression` (default `null`): compression codec to use when saving to file. This can be * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`, * `snappy` and `deflate`).
  • + *
  • `dateFormat` (default `null`): sets the string that indicates a date format. Custom date + * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type + * and timestamp type. By default, it is `null` which means trying to write times and date by + * ISO 8601 format.
  • * * @since 1.4.0 */ @@ -575,6 +579,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { *
  • `compression` (default `null`): compression codec to use when saving to file. This can be * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`, * `snappy` and `deflate`).
  • + *
  • `dateFormat` (default `null`): sets the string that indicates a date format. Custom date + * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type + * and timestamp type. By default, it is `null` which means trying to write times and date by + * ISO 8601 format.
  • * * @since 2.0.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index 6b2f9fc61e677..47867ea1781db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile} import org.apache.spark.sql.types._ @@ -188,6 +189,14 @@ private[csv] class CsvOutputWriter( // create the Generator without separator inserted between 2 records private[this] val text = new Text() + // A `ValueConverter` is responsible for converting a value of an `InternalRow` to `String`. + // When the value is null, this converter should not be called. + private type ValueConverter = (InternalRow, Int) => String + + // `ValueConverter`s for all values in the fields of the schema + private val valueConverters: Array[ValueConverter] = + dataSchema.map(_.dataType).map(makeConverter).toArray + private val recordWriter: RecordWriter[NullWritable, Text] = { new TextOutputFormat[NullWritable, Text]() { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { @@ -204,18 +213,50 @@ private[csv] class CsvOutputWriter( private var records: Long = 0L private val csvWriter = new LineCsvWriter(params, dataSchema.fieldNames.toSeq) - private def rowToString(row: Seq[Any]): Seq[String] = row.map { field => - if (field != null) { - field.toString - } else { - params.nullValue + private def rowToString(row: InternalRow): Seq[String] = { + var i = 0 + val values = new Array[String](row.numFields) + while (i < row.numFields) { + if (!row.isNullAt(i)) { + values(i) = valueConverters(i).apply(row, i) + } else { + values(i) = params.nullValue + } + i += 1 } + values + } + + private def makeConverter(dataType: DataType): ValueConverter = dataType match { + case DateType if params.dateFormat != null => + (row: InternalRow, ordinal: Int) => + params.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) + + case DateType => + (row: InternalRow, ordinal: Int) => + val date = DateTimeUtils.toJavaDate(row.getInt(ordinal)) + DateTimeUtils.iso8601DateFormat.format(date) + + case TimestampType if params.dateFormat != null => + (row: InternalRow, ordinal: Int) => + params.dateFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + + case TimestampType => + (row: InternalRow, ordinal: Int) => + val timestamp = DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)) + DateTimeUtils.iso8601TimeFormat.format(timestamp) + + case udt: UserDefinedType[_] => makeConverter(udt.sqlType) + + case dt: DataType => + (row: InternalRow, ordinal: Int) => + row.get(ordinal, dt).toString } override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") override protected[sql] def writeInternal(row: InternalRow): Unit = { - csvWriter.writeRow(rowToString(row.toSeq(dataSchema)), records == 0L && params.headerFlag) + csvWriter.writeRow(rowToString(row), records == 0L && params.headerFlag) records += 1 if (records % FLUSH_BATCH_SIZE == 0) { flush() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala index 66f1126fb9ae6..0e0fd511c8177 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.json +import java.text.SimpleDateFormat + import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import org.apache.spark.internal.Logging @@ -53,6 +55,12 @@ private[sql] class JSONOptions( private val parseMode = parameters.getOrElse("mode", "PERMISSIVE") val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord") + // Share date format object as it is expensive to parse date pattern. + val dateFormat: SimpleDateFormat = { + val dateFormat = parameters.get("dateFormat") + dateFormat.map(new SimpleDateFormat(_)).orNull + } + // Parse mode flags if (!ParseModes.isValidMode(parseMode)) { logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala index 23f4a55491d28..a11682c2857a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala @@ -26,7 +26,10 @@ import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} import org.apache.spark.sql.types._ -private[sql] class JacksonGenerator(schema: StructType, writer: Writer) { +private[sql] class JacksonGenerator( + schema: StructType, + writer: Writer, + options: JSONOptions = new JSONOptions(Map.empty[String, String])) { // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`. @@ -74,13 +77,27 @@ private[sql] class JacksonGenerator(schema: StructType, writer: Writer) { (row: SpecializedGetters, ordinal: Int) => gen.writeString(row.getUTF8String(ordinal).toString) + case TimestampType if options.dateFormat != null => + (row: SpecializedGetters, ordinal: Int) => + val timestampString = + options.dateFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + gen.writeString(timestampString) + case TimestampType => (row: SpecializedGetters, ordinal: Int) => - gen.writeString(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)).toString) + val timestamp = DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)) + gen.writeString(DateTimeUtils.iso8601TimeFormat.format(timestamp)) + + case DateType if options.dateFormat != null => + (row: SpecializedGetters, ordinal: Int) => + val dateString = + options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) + gen.writeString(dateString) case DateType => (row: SpecializedGetters, ordinal: Int) => - gen.writeString(DateTimeUtils.toJavaDate(row.getInt(ordinal)).toString) + val date = DateTimeUtils.toJavaDate(row.getInt(ordinal)) + gen.writeString(DateTimeUtils.iso8601DateFormat.format(date)) case BinaryType => (row: SpecializedGetters, ordinal: Int) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 4ae9376b5a504..1586309d329aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -18,8 +18,10 @@ package org.apache.spark.sql.execution.datasources.json import java.io.ByteArrayOutputStream +import java.text.ParseException import scala.collection.mutable.ArrayBuffer +import scala.util.Try import com.fasterxml.jackson.core._ @@ -199,6 +201,17 @@ class JacksonParser( UTF8String.fromBytes(writer.toByteArray) } + case TimestampType if options.dateFormat != null => + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_STRING => + // This one will lose microseconds parts. + // See https://issues.apache.org/jira/browse/SPARK-10681. + options.dateFormat.parse(parser.getText).getTime * 1000L + + case VALUE_NUMBER_INT => + parser.getLongValue * 1000000L + } + case TimestampType => (parser: JsonParser) => parseJsonToken(parser, dataType) { case VALUE_STRING => @@ -210,6 +223,20 @@ class JacksonParser( parser.getLongValue * 1000000L } + case DateType if options.dateFormat != null => + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_STRING => + val stringValue = parser.getText + // This one will lose microseconds parts. + // See https://issues.apache.org/jira/browse/SPARK-10681. + Try(DateTimeUtils.millisToDays(options.dateFormat.parse(parser.getText).getTime)) + .getOrElse { + // In Spark 1.5.0, we store the data as number of days since epoch in string. + // So, we just convert it to Int. + stringValue.toInt + } + } + case DateType => (parser: JsonParser) => parseJsonToken(parser, dataType) { case VALUE_STRING => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 27910e2cddad8..4ded0354017bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -84,7 +84,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { bucketId: Option[Int], dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new JsonOutputWriter(path, bucketId, dataSchema, context) + new JsonOutputWriter(path, parsedOptions, bucketId, dataSchema, context) } } } @@ -150,6 +150,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { private[json] class JsonOutputWriter( path: String, + options: JSONOptions, bucketId: Option[Int], dataSchema: StructType, context: TaskAttemptContext) @@ -157,7 +158,7 @@ private[json] class JsonOutputWriter( private[this] val writer = new CharArrayWriter() // create the Generator without separator inserted between 2 records - private[this] val gen = new JacksonGenerator(dataSchema, writer) + private[this] val gen = new JacksonGenerator(dataSchema, writer, options) private[this] val result = new Text() private val recordWriter: RecordWriter[NullWritable, Text] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 2e606b21bdf30..533b69315e7a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -186,6 +186,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
  • `columnNameOfCorruptRecord` (default is the value specified in * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.
  • + *
  • `dateFormat` (default `null`): sets the string that indicates a date format. Custom date + * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type + * and timestamp type. By default, it is `null` which means trying to read times and date by + * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()` or ISO 8601 format.
  • * * @since 2.0.0 */ @@ -230,8 +234,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * value. *
  • `dateFormat` (default `null`): sets the string that indicates a date format. Custom date * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type - * and timestamp type. By default, it is `null` which means trying to parse times and date by - * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()`.
  • + * and timestamp type. By default, it is `null` which means trying to read times and date by + * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()` or ISO 8601 format. *
  • `maxColumns` (default `20480`): defines a hard limit of how many columns * a record can have.
  • *
  • `maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed @@ -258,7 +262,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
  • `maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be * considered in every trigger.
  • *
  • `mergeSchema` (default is the value specified in `spark.sql.parquet.mergeSchema`): sets - * whether we should merge schemas collected from all Parquet part-files. This will override + * whether we should merge schemas collected from all + * Parquet part-files. This will override * `spark.sql.parquet.mergeSchema`.
  • * * @since 2.0.0 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 8cd76ddf20f04..f8c6cce10520f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.SparkException import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.sql.types._ @@ -477,7 +478,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { val options = Map( "header" -> "true", "inferSchema" -> "true", - "dateFormat" -> "dd/MM/yyyy hh:mm") + "dateFormat" -> "dd/MM/yyyy HH:mm") val results = spark.read .format("csv") .options(options) @@ -485,7 +486,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .select("date") .collect() - val dateFormat = new SimpleDateFormat("dd/MM/yyyy hh:mm") + val dateFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm") val expected = Seq(Seq(new Timestamp(dateFormat.parse("26/08/2015 18:00").getTime)), Seq(new Timestamp(dateFormat.parse("27/10/2014 18:30").getTime)), @@ -691,4 +692,153 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { verifyCars(cars, withHeader = true, checkValues = false) } + + test("Write timestamps correctly in ISO8601 format by default") { + withTempDir { dir => + val iso8601timestampsPath = s"${dir.getCanonicalPath}/iso8601timestamps.csv" + val timestamps = spark.read + .format("csv") + .option("inferSchema", "true") + .option("header", "true") + .option("dateFormat", "dd/MM/yyyy HH:mm") + .load(testFile(datesFile)) + timestamps.write + .format("csv") + .option("header", "true") + .save(iso8601timestampsPath) + + // This will load back the timestamps as string. + val iso8601Timestamps = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "false") + .load(iso8601timestampsPath) + + val expectedTimestamps = timestamps.collect().map { r => + // This should be ISO8601 formatted string. + Row(DateTimeUtils.iso8601TimeFormat.format(r.toSeq.head)) + } + + checkAnswer(iso8601Timestamps, expectedTimestamps) + } + } + + test("Write dates correctly in ISO8601 format by default") { + withTempDir { dir => + val customSchema = new StructType(Array(StructField("date", DateType, true))) + val iso8601datesPath = s"${dir.getCanonicalPath}/iso8601dates.csv" + val dates = spark.read + .format("csv") + .schema(customSchema) + .option("header", "true") + .option("inferSchema", "false") + .option("dateFormat", "dd/MM/yyyy HH:mm") + .load(testFile(datesFile)) + dates.write + .format("csv") + .option("header", "true") + .save(iso8601datesPath) + + // This will load back the dates as string. + val iso8601dates = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "false") + .load(iso8601datesPath) + + val expectedDates = dates.collect().map { r => + // This should be ISO8601 formatted string. + Row(DateTimeUtils.iso8601DateFormat.format(r.toSeq.head)) + } + + checkAnswer(iso8601dates, expectedDates) + } + } + + test("Roundtrip in reading and writing timestamps") { + withTempDir { dir => + val iso8601timestampsPath = s"${dir.getCanonicalPath}/iso8601timestamps.csv" + val timestamps = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "true") + .load(testFile(datesFile)) + + timestamps.write + .format("csv") + .option("header", "true") + .save(iso8601timestampsPath) + + val iso8601timestamps = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "true") + .load(iso8601timestampsPath) + + checkAnswer(iso8601timestamps, timestamps) + } + } + + test("Write dates correctly with dateFormat option") { + val customSchema = new StructType(Array(StructField("date", DateType, true))) + withTempDir { dir => + // With dateFormat option. + val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.csv" + val datesWithFormat = spark.read + .format("csv") + .schema(customSchema) + .option("header", "true") + .option("dateFormat", "dd/MM/yyyy HH:mm") + .load(testFile(datesFile)) + datesWithFormat.write + .format("csv") + .option("header", "true") + .option("dateFormat", "yyyy/MM/dd") + .save(datesWithFormatPath) + + // This will load back the dates as string. + val stringDatesWithFormat = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "false") + .load(datesWithFormatPath) + val expectedStringDatesWithFormat = Seq( + Row("2015/08/26"), + Row("2014/10/27"), + Row("2016/01/28")) + + checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat) + } + } + + test("Write timestamps correctly with dateFormat option") { + withTempDir { dir => + // With dateFormat option. + val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.csv" + val datesWithFormat = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "true") + .option("dateFormat", "dd/MM/yyyy HH:mm") + .load(testFile(datesFile)) + datesWithFormat.write + .format("csv") + .option("header", "true") + .option("dateFormat", "yyyy/MM/dd HH:mm") + .save(datesWithFormatPath) + + // This will load back the dates as string. + val stringDatesWithFormat = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "false") + .load(datesWithFormatPath) + val expectedStringDatesWithFormat = Seq( + Row("2015/08/26 18:00"), + Row("2014/10/27 18:30"), + Row("2016/01/28 20:00")) + + checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 342fd3e82ee06..b22444bad688f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1664,4 +1664,61 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema.size === 2) df.collect() } + + test("Write dates correctly with dateFormat option") { + val customSchema = new StructType(Array(StructField("date", DateType, true))) + withTempDir { dir => + // With dateFormat option. + val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.json" + val datesWithFormat = spark.read + .schema(customSchema) + .option("dateFormat", "dd/MM/yyyy HH:mm") + .json(datesRecord) + + datesWithFormat.write + .format("json") + .option("dateFormat", "yyyy/MM/dd") + .save(datesWithFormatPath) + + // This will load back the dates as string. + val stringSchema = StructType(StructField("date", StringType, true) :: Nil) + val stringDatesWithFormat = spark.read + .schema(stringSchema) + .json(datesWithFormatPath) + val expectedStringDatesWithFormat = Seq( + Row("2015/08/26"), + Row("2014/10/27"), + Row("2016/01/28")) + + checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat) + } + } + + test("Write timestamps correctly with dateFormat option") { + val customSchema = new StructType(Array(StructField("date", TimestampType, true))) + withTempDir { dir => + // With dateFormat option. + val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json" + val timestampsWithFormat = spark.read + .schema(customSchema) + .option("dateFormat", "dd/MM/yyyy HH:mm") + .json(datesRecord) + timestampsWithFormat.write + .format("json") + .option("dateFormat", "yyyy/MM/dd HH:mm") + .save(timestampsWithFormatPath) + + // This will load back the timestamps as string. + val stringSchema = StructType(StructField("date", StringType, true) :: Nil) + val stringTimestampsWithFormat = spark.read + .schema(stringSchema) + .json(timestampsWithFormatPath) + val expectedStringDatesWithFormat = Seq( + Row("2015/08/26 18:00"), + Row("2014/10/27 18:30"), + Row("2016/01/28 20:00")) + + checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index f4a3336643869..7db792650f6cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -222,6 +222,12 @@ private[json] trait TestJsonData { spark.sparkContext.parallelize( s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil) + def datesRecord: RDD[String] = + spark.sparkContext.parallelize( + """{"date": "26/08/2015 18:00"}""" :: + """{"date": "27/10/2014 18:30"}""" :: + """{"date": "28/01/2016 20:00"}""" :: Nil) + lazy val singleRow: RDD[String] = spark.sparkContext.parallelize("""{"a":123}""" :: Nil) def empty: RDD[String] = spark.sparkContext.parallelize(Seq[String]()) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala index d79edee5b1a4c..52486b122a93f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala @@ -32,6 +32,10 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { override protected def supportsDataType(dataType: DataType): Boolean = dataType match { case _: NullType => false case _: BinaryType => false + // `TimestampType` is disabled because `DatatypeConverter.parseDateTime()` + // in `DateTimeUtils` parses the formatted string wrongly when the date is + // too early. (e.g. "1600-07-13T08:36:32.847"). + case _: TimestampType => false case _: CalendarIntervalType => false case _ => true } From 4171a3c53fc78e46646f49f29c77decb32968bf9 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 17 Aug 2016 20:31:30 +0900 Subject: [PATCH 2/6] Fix python style --- python/pyspark/sql/readwriter.py | 2 +- python/pyspark/sql/streaming.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index fcb2a5263d477..dab485c40fdb7 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -687,7 +687,7 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No self.mode(mode) self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header, nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll, - dateFormat= dateFormat) + dateFormat=dateFormat) self._jwrite.csv(path) @since(1.5) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index e65a49af3117d..a2e52d65ab407 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -457,7 +457,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, - maxMalformedLogPerPartition=None, mode=None, dateFormat=None): + maxMalformedLogPerPartition=None, mode=None): """Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if From e8c63ad071ff5900ec2bbc23cbc4ea3138a98d91 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 18 Aug 2016 09:25:37 +0900 Subject: [PATCH 3/6] Address comments --- .../sql/catalyst/util/DateTimeUtils.scala | 4 +- .../datasources/csv/CSVOptions.scala | 11 ++- .../datasources/csv/CSVRelation.scala | 30 ++++--- .../datasources/json/JSONOptions.scala | 11 +-- .../datasources/json/JacksonParser.scala | 85 ++++++++++--------- 5 files changed, 70 insertions(+), 71 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index cd9767b6133f2..5a303b4d36953 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -66,8 +66,8 @@ object DateTimeUtils { // These are ISO 8601 compliant `FastDateFormat`s. These are thread-safe and compatible // with `SimpleDateFormat`. - lazy val iso8601TimeFormat = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ") - lazy val iso8601DateFormat = FastDateFormat.getInstance("yyyy-MM-dd") + val iso8601TimeFormat = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ") + val iso8601DateFormat = FastDateFormat.getInstance("yyyy-MM-dd") // Reuse the Calendar object in each thread as it is expensive to create in each method call. private val threadLocalGmtCalendar = new ThreadLocal[Calendar] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 10fe541a2c575..9c499ab60625a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.execution.datasources.csv import java.nio.charset.StandardCharsets -import java.text.SimpleDateFormat + +import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes} @@ -101,11 +102,9 @@ private[csv] class CSVOptions(@transient private val parameters: Map[String, Str name.map(CompressionCodecs.getCodecClassName) } - // Share date format object as it is expensive to parse date pattern. - val dateFormat: SimpleDateFormat = { - val dateFormat = parameters.get("dateFormat") - dateFormat.map(new SimpleDateFormat(_)).orNull - } + // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. + val dateFormat: FastDateFormat = + parameters.get("dateFormat").map(FastDateFormat.getInstance).orNull val maxColumns = getInt("maxColumns", 20480) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index 47867ea1781db..35bff23546fdb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -228,23 +228,25 @@ private[csv] class CsvOutputWriter( } private def makeConverter(dataType: DataType): ValueConverter = dataType match { - case DateType if params.dateFormat != null => - (row: InternalRow, ordinal: Int) => - params.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) - case DateType => - (row: InternalRow, ordinal: Int) => - val date = DateTimeUtils.toJavaDate(row.getInt(ordinal)) - DateTimeUtils.iso8601DateFormat.format(date) - - case TimestampType if params.dateFormat != null => - (row: InternalRow, ordinal: Int) => - params.dateFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + if (params.dateFormat != null) { + (row: InternalRow, ordinal: Int) => + params.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) + } else { + (row: InternalRow, ordinal: Int) => + val date = DateTimeUtils.toJavaDate(row.getInt(ordinal)) + DateTimeUtils.iso8601DateFormat.format(date) + } case TimestampType => - (row: InternalRow, ordinal: Int) => - val timestamp = DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)) - DateTimeUtils.iso8601TimeFormat.format(timestamp) + if (params.dateFormat != null) { + (row: InternalRow, ordinal: Int) => + params.dateFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + } else { + (row: InternalRow, ordinal: Int) => + val timestamp = DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)) + DateTimeUtils.iso8601TimeFormat.format(timestamp) + } case udt: UserDefinedType[_] => makeConverter(udt.sqlType) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala index 0e0fd511c8177..870fe5de8b283 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala @@ -17,9 +17,8 @@ package org.apache.spark.sql.execution.datasources.json -import java.text.SimpleDateFormat - import com.fasterxml.jackson.core.{JsonFactory, JsonParser} +import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes} @@ -55,11 +54,9 @@ private[sql] class JSONOptions( private val parseMode = parameters.getOrElse("mode", "PERMISSIVE") val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord") - // Share date format object as it is expensive to parse date pattern. - val dateFormat: SimpleDateFormat = { - val dateFormat = parameters.get("dateFormat") - dateFormat.map(new SimpleDateFormat(_)).orNull - } + // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. + val dateFormat: FastDateFormat = + parameters.get("dateFormat").map(FastDateFormat.getInstance).orNull // Parse mode flags if (!ParseModes.isValidMode(parseMode)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 1586309d329aa..e5cb354f2fadc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.datasources.json import java.io.ByteArrayOutputStream -import java.text.ParseException import scala.collection.mutable.ArrayBuffer import scala.util.Try @@ -201,54 +200,56 @@ class JacksonParser( UTF8String.fromBytes(writer.toByteArray) } - case TimestampType if options.dateFormat != null => - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_STRING => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - options.dateFormat.parse(parser.getText).getTime * 1000L - - case VALUE_NUMBER_INT => - parser.getLongValue * 1000000L - } - case TimestampType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_STRING => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - DateTimeUtils.stringToTime(parser.getText).getTime * 1000L - - case VALUE_NUMBER_INT => - parser.getLongValue * 1000000L + if (options.dateFormat != null) { + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_STRING => + // This one will lose microseconds parts. + // See https://issues.apache.org/jira/browse/SPARK-10681. + options.dateFormat.parse(parser.getText).getTime * 1000L + + case VALUE_NUMBER_INT => + parser.getLongValue * 1000000L + } + } else { + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_STRING => + // This one will lose microseconds parts. + // See https://issues.apache.org/jira/browse/SPARK-10681. + DateTimeUtils.stringToTime(parser.getText).getTime * 1000L + + case VALUE_NUMBER_INT => + parser.getLongValue * 1000000L + } } - case DateType if options.dateFormat != null => - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_STRING => - val stringValue = parser.getText - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - Try(DateTimeUtils.millisToDays(options.dateFormat.parse(parser.getText).getTime)) - .getOrElse { + case DateType => + if (options.dateFormat != null) { + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_STRING => + val stringValue = parser.getText + // This one will lose microseconds parts. + // See https://issues.apache.org/jira/browse/SPARK-10681. + Try(DateTimeUtils.millisToDays(options.dateFormat.parse(parser.getText).getTime)) + .getOrElse { // In Spark 1.5.0, we store the data as number of days since epoch in string. // So, we just convert it to Int. stringValue.toInt } - } - - case DateType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_STRING => - val stringValue = parser.getText - if (stringValue.contains("-")) { - // The format of this string will probably be "yyyy-mm-dd". - DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime) - } else { - // In Spark 1.5.0, we store the data as number of days since epoch in string. - // So, we just convert it to Int. - stringValue.toInt - } + } + } else { + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_STRING => + val stringValue = parser.getText + if (stringValue.contains("-")) { + // The format of this string will probably be "yyyy-mm-dd". + DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime) + } else { + // In Spark 1.5.0, we store the data as number of days since epoch in string. + // So, we just convert it to Int. + stringValue.toInt + } + } } case BinaryType => From 2bd9cbf601ea2ec9f95cd82e888dcafa54421715 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 18 Aug 2016 09:32:31 +0900 Subject: [PATCH 4/6] Fix nits --- .../org/apache/spark/sql/DataFrameReader.scala | 2 +- .../execution/datasources/csv/CSVSuite.scala | 18 +++++++++--------- .../execution/datasources/json/JsonSuite.scala | 4 ++-- .../datasources/json/TestJsonData.scala | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 245941cfc82f0..d027001c0d46f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -285,7 +285,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * and timestamp type. By default, it is `null` which means trying to read times and date by * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()` or ISO 8601 format. * - + * * @since 2.0.0 */ @scala.annotation.varargs diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f8c6cce10520f..2b7ac13a5215f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -814,31 +814,31 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { test("Write timestamps correctly with dateFormat option") { withTempDir { dir => // With dateFormat option. - val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.csv" - val datesWithFormat = spark.read + val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv" + val timestampsWithFormat = spark.read .format("csv") .option("header", "true") .option("inferSchema", "true") .option("dateFormat", "dd/MM/yyyy HH:mm") .load(testFile(datesFile)) - datesWithFormat.write + timestampsWithFormat.write .format("csv") .option("header", "true") .option("dateFormat", "yyyy/MM/dd HH:mm") - .save(datesWithFormatPath) + .save(timestampsWithFormatPath) - // This will load back the dates as string. - val stringDatesWithFormat = spark.read + // This will load back the timestamps as string. + val stringTimestampsWithFormat = spark.read .format("csv") .option("header", "true") .option("inferSchema", "false") - .load(datesWithFormatPath) - val expectedStringDatesWithFormat = Seq( + .load(timestampsWithFormatPath) + val expectedStringTimestampsWithFormat = Seq( Row("2015/08/26 18:00"), Row("2014/10/27 18:30"), Row("2016/01/28 20:00")) - checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat) + checkAnswer(stringTimestampsWithFormat, expectedStringTimestampsWithFormat) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index b22444bad688f..fc877cea8e892 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1673,7 +1673,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val datesWithFormat = spark.read .schema(customSchema) .option("dateFormat", "dd/MM/yyyy HH:mm") - .json(datesRecord) + .json(datesRecords) datesWithFormat.write .format("json") @@ -1702,7 +1702,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val timestampsWithFormat = spark.read .schema(customSchema) .option("dateFormat", "dd/MM/yyyy HH:mm") - .json(datesRecord) + .json(datesRecords) timestampsWithFormat.write .format("json") .option("dateFormat", "yyyy/MM/dd HH:mm") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index 7db792650f6cb..a400940db924a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -222,7 +222,7 @@ private[json] trait TestJsonData { spark.sparkContext.parallelize( s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil) - def datesRecord: RDD[String] = + def datesRecords: RDD[String] = spark.sparkContext.parallelize( """{"date": "26/08/2015 18:00"}""" :: """{"date": "27/10/2014 18:30"}""" :: From 7e6167403a743cc0facb9ad7dd43ca5d912c5f25 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 19 Aug 2016 19:48:57 +0900 Subject: [PATCH 5/6] Separate timestampFormat option and do not use null as default option. --- python/pyspark/sql/readwriter.py | 58 +++++++++++------ python/pyspark/sql/streaming.py | 33 ++++++---- .../sql/catalyst/util/DateTimeUtils.scala | 7 -- .../apache/spark/sql/DataFrameReader.scala | 19 ++++-- .../apache/spark/sql/DataFrameWriter.scala | 20 +++--- .../datasources/csv/CSVInferSchema.scala | 42 ++++++------ .../datasources/csv/CSVOptions.scala | 6 +- .../datasources/csv/CSVRelation.scala | 20 ++---- .../datasources/json/JSONOptions.scala | 6 +- .../datasources/json/JacksonGenerator.scala | 16 +---- .../datasources/json/JacksonParser.scala | 65 +++++++------------ .../sql/streaming/DataStreamReader.scala | 20 +++--- .../execution/datasources/csv/CSVSuite.scala | 15 +++-- .../datasources/json/JsonSuite.scala | 14 ++-- 14 files changed, 171 insertions(+), 170 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index dab485c40fdb7..3da6f497e9f12 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -156,7 +156,7 @@ def load(self, path=None, format=None, schema=None, **options): def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, - mode=None, columnNameOfCorruptRecord=None, dateFormat=None): + mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None): """ Loads a JSON file (one object per line) or an RDD of Strings storing JSON objects (one object per record) and returns the result as a :class`DataFrame`. @@ -200,10 +200,12 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, ``spark.sql.columnNameOfCorruptRecord``. :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This - applies to both date type and timestamp type. By default, it is None - which means trying to read times and date by - ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()`` or - ISO 8601 format. + applies to date type. If None is set, it uses the + default value value, ``yyyy-MM-dd``. + :param timestampFormat: sets the string that indicates a timestamp format. Custom date + formats follow the formats at ``java.text.SimpleDateFormat``. + This applies to timestamp type. If None is set, it uses the + default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. >>> df1 = spark.read.json('python/test_support/sql/people.json') >>> df1.dtypes @@ -219,7 +221,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames, allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, - mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat) + mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, + timestampFormat=timestampFormat) if isinstance(path, basestring): path = [path] if type(path) == list: @@ -291,8 +294,8 @@ def text(self, paths): def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, - negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, - maxMalformedLogPerPartition=None, mode=None): + negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None): """Loads a CSV file and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -333,10 +336,12 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non is set, it uses the default value, ``Inf``. :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This - applies to both date type and timestamp type. By default, it is None - which means trying to read times and date by - ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()`` or - ISO 8601 format. + applies to date type. If None is set, it uses the + default value value, ``yyyy-MM-dd``. + :param timestampFormat: sets the string that indicates a timestamp format. Custom date + formats follow the formats at ``java.text.SimpleDateFormat``. + This applies to timestamp type. If None is set, it uses the + default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. :param maxColumns: defines a hard limit of how many columns a record can have. If None is set, it uses the default value, ``20480``. :param maxCharsPerColumn: defines the maximum number of characters allowed for any given @@ -363,7 +368,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, - dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, + dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, + maxCharsPerColumn=maxCharsPerColumn, maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode) if isinstance(path, basestring): path = [path] @@ -578,7 +584,7 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options) self._jwrite.saveAsTable(name) @since(1.4) - def json(self, path, mode=None, compression=None, dateFormat=None): + def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None): """Saves the content of the :class:`DataFrame` in JSON format at the specified path. :param path: the path in any Hadoop supported file system @@ -593,13 +599,18 @@ def json(self, path, mode=None, compression=None, dateFormat=None): snappy and deflate). :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This - applies to both date type and timestamp type. By default, it is None - which means trying to write times and date by ISO 8601 format. + applies to date type. If None is set, it uses the + default value value, ``yyyy-MM-dd``. + :param timestampFormat: sets the string that indicates a timestamp format. Custom date + formats follow the formats at ``java.text.SimpleDateFormat``. + This applies to timestamp type. If None is set, it uses the + default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) - self._set_opts(compression=compression, dateFormat=dateFormat) + self._set_opts( + compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat) self._jwrite.json(path) @since(1.4) @@ -645,7 +656,8 @@ def text(self, path, compression=None): @since(2.0) def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None, - header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None): + header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, + timestampFormat=None): """Saves the content of the :class:`DataFrame` in CSV format at the specified path. :param path: the path in any Hadoop supported file system @@ -679,15 +691,19 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No the default value, empty string. :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This - applies to both date type and timestamp type. By default, it is None - which means trying to write times and date by ISO 8601 format. + applies to date type. If None is set, it uses the + default value value, ``yyyy-MM-dd``. + :param timestampFormat: sets the string that indicates a timestamp format. Custom date + formats follow the formats at ``java.text.SimpleDateFormat``. + This applies to timestamp type. If None is set, it uses the + default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header, nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll, - dateFormat=dateFormat) + dateFormat=dateFormat, timestampFormat=timestampFormat) self._jwrite.csv(path) @since(1.5) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index a2e52d65ab407..3761d2b1994f0 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -338,7 +338,8 @@ def load(self, path=None, format=None, schema=None, **options): def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, - mode=None, columnNameOfCorruptRecord=None, dateFormat=None): + mode=None, columnNameOfCorruptRecord=None, dateFormat=None, + timestampFormat=None): """ Loads a JSON file stream (one object per line) and returns a :class`DataFrame`. @@ -383,10 +384,12 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, ``spark.sql.columnNameOfCorruptRecord``. :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This - applies to both date type and timestamp type. By default, it is None - which means trying to read times and date by - ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()`` or - ISO 8601 format. + applies to date type. If None is set, it uses the + default value value, ``yyyy-MM-dd``. + :param timestampFormat: sets the string that indicates a timestamp format. Custom date + formats follow the formats at ``java.text.SimpleDateFormat``. + This applies to timestamp type. If None is set, it uses the + default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema) >>> json_sdf.isStreaming @@ -399,7 +402,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames, allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, - mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat) + mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, + timestampFormat=timestampFormat) if isinstance(path, basestring): return self._df(self._jreader.json(path)) else: @@ -456,8 +460,8 @@ def text(self, path): def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, - negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, - maxMalformedLogPerPartition=None, mode=None): + negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None): """Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -500,10 +504,12 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non is set, it uses the default value, ``Inf``. :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This - applies to both date type and timestamp type. By default, it is None - which means trying to read times and date by - ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()`` or - ISO 8601 format. + applies to date type. If None is set, it uses the + default value value, ``yyyy-MM-dd``. + :param timestampFormat: sets the string that indicates a timestamp format. Custom date + formats follow the formats at ``java.text.SimpleDateFormat``. + This applies to timestamp type. If None is set, it uses the + default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. :param maxColumns: defines a hard limit of how many columns a record can have. If None is set, it uses the default value, ``20480``. :param maxCharsPerColumn: defines the maximum number of characters allowed for any given @@ -528,7 +534,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, - dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, + dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, + maxCharsPerColumn=maxCharsPerColumn, maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode) if isinstance(path, basestring): return self._df(self._jreader.csv(path)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 5a303b4d36953..0b643a5b84268 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -24,8 +24,6 @@ import javax.xml.bind.DatatypeConverter import scala.annotation.tailrec -import org.apache.commons.lang3.time.FastDateFormat - import org.apache.spark.unsafe.types.UTF8String /** @@ -64,11 +62,6 @@ object DateTimeUtils { @transient lazy val defaultTimeZone = TimeZone.getDefault - // These are ISO 8601 compliant `FastDateFormat`s. These are thread-safe and compatible - // with `SimpleDateFormat`. - val iso8601TimeFormat = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ") - val iso8601DateFormat = FastDateFormat.getInstance("yyyy-MM-dd") - // Reuse the Calendar object in each thread as it is expensive to create in each method call. private val threadLocalGmtCalendar = new ThreadLocal[Calendar] { override protected def initialValue: Calendar = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index d027001c0d46f..c060091c7fc38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -280,10 +280,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `columnNameOfCorruptRecord` (default is the value specified in * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.
  • - *
  • `dateFormat` (default `null`): sets the string that indicates a date format. Custom date - * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type - * and timestamp type. By default, it is `null` which means trying to read times and date by - * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()` or ISO 8601 format.
  • + *
  • `dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format. + * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to + * date type.
  • + *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that + * indicates a timestamp format. Custom date formats follow the formats at + * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • * * * @since 2.0.0 @@ -379,9 +381,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * value. *
  • `negativeInf` (default `-Inf`): sets the string representation of a negative infinity * value.
  • - *
  • `dateFormat` (default `null`): sets the string that indicates a date format. Custom date - * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type - * and timestamp type. By default, it is `null` which means trying to read times and date by + *
  • `dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format. + * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to + * date type.
  • + *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that + * indicates a timestamp format. Custom date formats follow the formats at + * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()` or ISO 8601 format. *
  • `maxColumns` (default `20480`): defines a hard limit of how many columns * a record can have.
  • diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index b41ee420e059b..a9049a60f25e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -483,10 +483,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { *
  • `compression` (default `null`): compression codec to use when saving to file. This can be * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`, * `snappy` and `deflate`).
  • - *
  • `dateFormat` (default `null`): sets the string that indicates a date format. Custom date - * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type - * and timestamp type. By default, it is `null` which means trying to write times and date by - * ISO 8601 format.
  • + *
  • `dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format. + * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to + * date type.
  • + *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that + * indicates a timestamp format. Custom date formats follow the formats at + * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • * * @since 1.4.0 */ @@ -579,10 +581,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { *
  • `compression` (default `null`): compression codec to use when saving to file. This can be * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`, * `snappy` and `deflate`).
  • - *
  • `dateFormat` (default `null`): sets the string that indicates a date format. Custom date - * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type - * and timestamp type. By default, it is `null` which means trying to write times and date by - * ISO 8601 format.
  • + *
  • `dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format. + * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to + * date type.
  • + *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that + * indicates a timestamp format. Custom date formats follow the formats at + * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • * * @since 2.0.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index de3d889621b7d..f1b4c11878a94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -139,20 +139,14 @@ private[csv] object CSVInferSchema { } private def tryParseTimestamp(field: String, options: CSVOptions): DataType = { - if (options.dateFormat != null) { - // This case infers a custom `dataFormat` is set. - if ((allCatch opt options.dateFormat.parse(field)).isDefined) { - TimestampType - } else { - tryParseBoolean(field, options) - } - } else { + // This case infers a custom `dataFormat` is set. + if ((allCatch opt options.timestampFormat.parse(field)).isDefined) { + TimestampType + } else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) { // We keep this for backwords competibility. - if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) { - TimestampType - } else { - tryParseBoolean(field, options) - } + TimestampType + } else { + tryParseBoolean(field, options) } } @@ -277,18 +271,24 @@ private[csv] object CSVTypeCast { val value = new BigDecimal(datum.replaceAll(",", "")) Decimal(value, dt.precision, dt.scale) } - case _: TimestampType if options.dateFormat != null => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - options.dateFormat.parse(datum).getTime * 1000L case _: TimestampType => // This one will lose microseconds parts. // See https://issues.apache.org/jira/browse/SPARK-10681. - DateTimeUtils.stringToTime(datum).getTime * 1000L - case _: DateType if options.dateFormat != null => - DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime) + Try(options.timestampFormat.parse(datum).getTime * 1000L) + .getOrElse { + // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility. + DateTimeUtils.stringToTime(datum).getTime * 1000L + } case _: DateType => - DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime) + // This one will lose microseconds parts. + // See https://issues.apache.org/jira/browse/SPARK-10681.x + Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime)) + .getOrElse { + // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility. + DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime) + } case _: StringType => UTF8String.fromString(datum) case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 9c499ab60625a..364d7c831eb44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -104,7 +104,11 @@ private[csv] class CSVOptions(@transient private val parameters: Map[String, Str // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. val dateFormat: FastDateFormat = - parameters.get("dateFormat").map(FastDateFormat.getInstance).orNull + FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd")) + + val timestampFormat: FastDateFormat = + FastDateFormat.getInstance( + parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ")) val maxColumns = getInt("maxColumns", 20480) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index 35bff23546fdb..840e243229362 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -229,24 +229,12 @@ private[csv] class CsvOutputWriter( private def makeConverter(dataType: DataType): ValueConverter = dataType match { case DateType => - if (params.dateFormat != null) { - (row: InternalRow, ordinal: Int) => - params.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) - } else { - (row: InternalRow, ordinal: Int) => - val date = DateTimeUtils.toJavaDate(row.getInt(ordinal)) - DateTimeUtils.iso8601DateFormat.format(date) - } + (row: InternalRow, ordinal: Int) => + params.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) case TimestampType => - if (params.dateFormat != null) { - (row: InternalRow, ordinal: Int) => - params.dateFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) - } else { - (row: InternalRow, ordinal: Int) => - val timestamp = DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)) - DateTimeUtils.iso8601TimeFormat.format(timestamp) - } + (row: InternalRow, ordinal: Int) => + params.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) case udt: UserDefinedType[_] => makeConverter(udt.sqlType) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala index 870fe5de8b283..02d211d04265e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala @@ -56,7 +56,11 @@ private[sql] class JSONOptions( // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. val dateFormat: FastDateFormat = - parameters.get("dateFormat").map(FastDateFormat.getInstance).orNull + FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd")) + + val timestampFormat: FastDateFormat = + FastDateFormat.getInstance( + parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ")) // Parse mode flags if (!ParseModes.isValidMode(parseMode)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala index a11682c2857a6..270e7fbd3c137 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala @@ -77,28 +77,18 @@ private[sql] class JacksonGenerator( (row: SpecializedGetters, ordinal: Int) => gen.writeString(row.getUTF8String(ordinal).toString) - case TimestampType if options.dateFormat != null => + case TimestampType => (row: SpecializedGetters, ordinal: Int) => val timestampString = - options.dateFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) gen.writeString(timestampString) - case TimestampType => - (row: SpecializedGetters, ordinal: Int) => - val timestamp = DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)) - gen.writeString(DateTimeUtils.iso8601TimeFormat.format(timestamp)) - - case DateType if options.dateFormat != null => + case DateType => (row: SpecializedGetters, ordinal: Int) => val dateString = options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) gen.writeString(dateString) - case DateType => - (row: SpecializedGetters, ordinal: Int) => - val date = DateTimeUtils.toJavaDate(row.getInt(ordinal)) - gen.writeString(DateTimeUtils.iso8601DateFormat.format(date)) - case BinaryType => (row: SpecializedGetters, ordinal: Int) => gen.writeBinary(row.getBinary(ordinal)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index e5cb354f2fadc..359a3e2aa8ad2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -201,55 +201,38 @@ class JacksonParser( } case TimestampType => - if (options.dateFormat != null) { - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_STRING => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - options.dateFormat.parse(parser.getText).getTime * 1000L - - case VALUE_NUMBER_INT => - parser.getLongValue * 1000000L - } - } else { - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_STRING => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - DateTimeUtils.stringToTime(parser.getText).getTime * 1000L - - case VALUE_NUMBER_INT => - parser.getLongValue * 1000000L - } + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_STRING => + // This one will lose microseconds parts. + // See https://issues.apache.org/jira/browse/SPARK-10681. + Try(options.timestampFormat.parse(parser.getText).getTime * 1000L) + .getOrElse { + // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility. + DateTimeUtils.stringToTime(parser.getText).getTime * 1000L + } + + case VALUE_NUMBER_INT => + parser.getLongValue * 1000000L } case DateType => - if (options.dateFormat != null) { - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_STRING => - val stringValue = parser.getText - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - Try(DateTimeUtils.millisToDays(options.dateFormat.parse(parser.getText).getTime)) + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_STRING => + val stringValue = parser.getText + // This one will lose microseconds parts. + // See https://issues.apache.org/jira/browse/SPARK-10681.x + Try(DateTimeUtils.millisToDays(options.dateFormat.parse(parser.getText).getTime)) + .getOrElse { + // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility. + Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)) .getOrElse { // In Spark 1.5.0, we store the data as number of days since epoch in string. // So, we just convert it to Int. stringValue.toInt } - } - } else { - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_STRING => - val stringValue = parser.getText - if (stringValue.contains("-")) { - // The format of this string will probably be "yyyy-mm-dd". - DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime) - } else { - // In Spark 1.5.0, we store the data as number of days since epoch in string. - // So, we just convert it to Int. - stringValue.toInt - } - } + } } case BinaryType => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 533b69315e7a1..3ad1125229c97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -186,10 +186,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
  • `columnNameOfCorruptRecord` (default is the value specified in * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.
  • - *
  • `dateFormat` (default `null`): sets the string that indicates a date format. Custom date - * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type - * and timestamp type. By default, it is `null` which means trying to read times and date by - * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()` or ISO 8601 format.
  • + *
  • `dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format. + * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to + * date type.
  • + *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that + * indicates a timestamp format. Custom date formats follow the formats at + * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • * * @since 2.0.0 */ @@ -232,10 +234,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * value. *
  • `negativeInf` (default `-Inf`): sets the string representation of a negative infinity * value.
  • - *
  • `dateFormat` (default `null`): sets the string that indicates a date format. Custom date - * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type - * and timestamp type. By default, it is `null` which means trying to read times and date by - * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()` or ISO 8601 format.
  • + *
  • `dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format. + * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to + * date type.
  • + *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that + * indicates a timestamp format. Custom date formats follow the formats at + * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • *
  • `maxColumns` (default `20480`): defines a hard limit of how many columns * a record can have.
  • *
  • `maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 2b7ac13a5215f..2befad6d72eca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -22,6 +22,7 @@ import java.nio.charset.UnsupportedCharsetException import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat +import org.apache.commons.lang3.time.FastDateFormat import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec @@ -478,7 +479,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { val options = Map( "header" -> "true", "inferSchema" -> "true", - "dateFormat" -> "dd/MM/yyyy HH:mm") + "timestampFormat" -> "dd/MM/yyyy HH:mm") val results = spark.read .format("csv") .options(options) @@ -700,7 +701,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .format("csv") .option("inferSchema", "true") .option("header", "true") - .option("dateFormat", "dd/MM/yyyy HH:mm") + .option("timestampFormat", "dd/MM/yyyy HH:mm") .load(testFile(datesFile)) timestamps.write .format("csv") @@ -714,9 +715,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .option("inferSchema", "false") .load(iso8601timestampsPath) + val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ") val expectedTimestamps = timestamps.collect().map { r => // This should be ISO8601 formatted string. - Row(DateTimeUtils.iso8601TimeFormat.format(r.toSeq.head)) + Row(iso8501.format(r.toSeq.head)) } checkAnswer(iso8601Timestamps, expectedTimestamps) @@ -746,9 +748,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .option("inferSchema", "false") .load(iso8601datesPath) + val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd") val expectedDates = dates.collect().map { r => // This should be ISO8601 formatted string. - Row(DateTimeUtils.iso8601DateFormat.format(r.toSeq.head)) + Row(iso8501.format(r.toSeq.head)) } checkAnswer(iso8601dates, expectedDates) @@ -819,12 +822,12 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .format("csv") .option("header", "true") .option("inferSchema", "true") - .option("dateFormat", "dd/MM/yyyy HH:mm") + .option("timestampFormat", "dd/MM/yyyy HH:mm") .load(testFile(datesFile)) timestampsWithFormat.write .format("csv") .option("header", "true") - .option("dateFormat", "yyyy/MM/dd HH:mm") + .option("timestampFormat", "yyyy/MM/dd HH:mm") .save(timestampsWithFormatPath) // This will load back the timestamps as string. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index fc877cea8e892..63a9061210ca6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -101,15 +101,15 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType)) val ISO8601Time1 = "1970-01-01T01:00:01.0Z" + val ISO8601Time2 = "1970-01-01T02:00:01-01:00" checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)), enforceCorrectType(ISO8601Time1, TimestampType)) - checkTypePromotion(DateTimeUtils.millisToDays(3601000), - enforceCorrectType(ISO8601Time1, DateType)) - val ISO8601Time2 = "1970-01-01T02:00:01-01:00" checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)), enforceCorrectType(ISO8601Time2, TimestampType)) - checkTypePromotion(DateTimeUtils.millisToDays(10801000), - enforceCorrectType(ISO8601Time2, DateType)) + + val ISO8601Date = "1970-01-01" + checkTypePromotion(DateTimeUtils.millisToDays(32400000), + enforceCorrectType(ISO8601Date, DateType)) } test("Get compatible type") { @@ -1701,11 +1701,11 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json" val timestampsWithFormat = spark.read .schema(customSchema) - .option("dateFormat", "dd/MM/yyyy HH:mm") + .option("timestampFormat", "dd/MM/yyyy HH:mm") .json(datesRecords) timestampsWithFormat.write .format("json") - .option("dateFormat", "yyyy/MM/dd HH:mm") + .option("timestampFormat", "yyyy/MM/dd HH:mm") .save(timestampsWithFormatPath) // This will load back the timestamps as string. From 405e723215213586eecb676268e00aa2263b6c71 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 19 Aug 2016 21:59:51 +0900 Subject: [PATCH 6/6] Use timestampFormat for timestamps in CSV tests --- .../datasources/csv/CSVInferSchemaSuite.scala | 4 ++-- .../datasources/csv/CSVTypeCastSuite.scala | 17 +++++++++++------ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala index dbe3af49c90c3..5e00f669b8593 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala @@ -60,9 +60,9 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("Timestamp field types are inferred correctly via custom data format") { - var options = new CSVOptions(Map("dateFormat" -> "yyyy-mm")) + var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm")) assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) - options = new CSVOptions(Map("dateFormat" -> "yyyy")) + options = new CSVOptions(Map("timestampFormat" -> "yyyy")) assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala index 26b33b24efc3d..3ce643e667ce4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala @@ -96,13 +96,18 @@ class CSVTypeCastSuite extends SparkFunSuite { assert(CSVTypeCast.castTo("1.00", DoubleType) == 1.0) assert(CSVTypeCast.castTo("true", BooleanType) == true) - val options = CSVOptions("dateFormat", "dd/MM/yyyy hh:mm") + val timestampsOptions = CSVOptions("timestampFormat", "dd/MM/yyyy hh:mm") val customTimestamp = "31/01/2015 00:00" - val expectedTime = options.dateFormat.parse("31/01/2015 00:00").getTime - assert(CSVTypeCast.castTo(customTimestamp, TimestampType, nullable = true, options) == - expectedTime * 1000L) - assert(CSVTypeCast.castTo(customTimestamp, DateType, nullable = true, options) == - DateTimeUtils.millisToDays(expectedTime)) + val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime + val castedTimestamp = + CSVTypeCast.castTo(customTimestamp, TimestampType, nullable = true, timestampsOptions) + assert(castedTimestamp == expectedTime * 1000L) + + val customDate = "31/01/2015" + val dateOptions = CSVOptions("dateFormat", "dd/MM/yyyy") + val expectedDate = dateOptions.dateFormat.parse(customDate).getTime + val castedDate = CSVTypeCast.castTo(customTimestamp, DateType, nullable = true, dateOptions) + assert(castedDate == DateTimeUtils.millisToDays(expectedDate)) val timestamp = "2015-01-01 00:00:00" assert(CSVTypeCast.castTo(timestamp, TimestampType) ==