Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18937][SQL] Timezone support in CSV/JSON parsing #16750

Closed
wants to merge 11 commits into from

Conversation

ueshin
Copy link
Member

@ueshin ueshin commented Jan 31, 2017

What changes were proposed in this pull request?

This is a follow-up pr of #16308.

This pr enables timezone support in CSV/JSON parsing.

We should introduce timeZone option for CSV/JSON datasources (the default value of the option is session local timezone).

The datasources should use the timeZone option to format/parse to write/read timestamp values.
Notice that while reading, if the timestampFormat has the timezone info, the timezone will not be used because we should respect the timezone in the values.

For example, if you have timestamp "2016-01-01 00:00:00" in GMT, the values written with the default timezone option, which is "GMT" because session local timezone is "GMT" here, are:

scala> spark.conf.set("spark.sql.session.timeZone", "GMT")

scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> df.show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+


scala> df.write.json("/path/to/gmtjson")
$ cat /path/to/gmtjson/part-*
{"ts":"2016-01-01T00:00:00.000Z"}

whereas setting the option to "PST", they are:

scala> df.write.option("timeZone", "PST").json("/path/to/pstjson")
$ cat /path/to/pstjson/part-*
{"ts":"2015-12-31T16:00:00.000-08:00"}

We can properly read these files even if the timezone option is wrong because the timestamp values have timezone info:

scala> val schema = new StructType().add("ts", TimestampType)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(ts,TimestampType,true))

scala> spark.read.schema(schema).json("/path/to/gmtjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+

scala> spark.read.schema(schema).option("timeZone", "PST").json("/path/to/gmtjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+

And even if timezoneFormat doesn't contain timezone info, we can properly read the values with setting correct timezone option:

scala> df.write.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson")
$ cat /path/to/jstjson/part-*
{"ts":"2016-01-01T09:00:00"}
// wrong result
scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").json("/path/to/jstjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 09:00:00|
+-------------------+

// correct result
scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+

This pr also makes JsonToStruct and StructToJson TimeZoneAwareExpression to be able to evaluate values with timezone option.

How was this patch tested?

Existing tests and added some tests.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ueshin, I left some opinions and questions.

@@ -297,7 +300,7 @@ def text(self, paths):
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
negativeInf=None, dateFormat=None, timestampFormat=None, timeZone=None, maxColumns=None,
Copy link
Member

@HyukjinKwon HyukjinKwon Jan 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to my knowledge, this should be added at the end to prevent breaking the existing codes that use those options by positional arguments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, I'll move them to the end.

@@ -329,7 +332,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @since 1.4.0
*/
def json(jsonRDD: RDD[String]): DataFrame = {
val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap)
val optionsWithTimeZone = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just pass the timezone into JSONOptions as a default or resemble columnNameOfCorruptRecord inJSONOptions below?

It seems the same logics here duplicated several times and logics to set default values in tests are introduced there which might be not necessary or be able to be removed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the same comment also applies to CSVOptions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeZone option is used in the JSONOptions/CSVOptions, so we can't handle it the same as columnNameOfCorruptRecord.
I'll modify to pass the default timezone id to JSONOptions and CSVOptions.

@@ -161,12 +163,3 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive
settings
}
}

object CSVOptions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind if I ask the reason to remove this which apparently causing fixing many tests in CSV?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CSVOptions (and also JSONOptions) will always have to take timeZone option.
I don't want callers to forget to specify it by these convenient methods.
Or should I add the default timezone id to these methods?

@SparkQA
Copy link

SparkQA commented Jan 31, 2017

Test build #72190 has finished for PR 16750 at commit 551cff9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 1, 2017

Test build #72236 has finished for PR 16750 at commit d5ab37c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -31,10 +31,11 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
*/
private[sql] class JSONOptions(
@transient private val parameters: CaseInsensitiveMap)
@transient private val parameters: CaseInsensitiveMap, defaultTimeZoneId: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the timeZoneId just an option in parameters with key timeZoneId?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put the timeZone option every time creating JSONOptions (or CSVOptions), but there were the same contains-key check logic many times as @HyukjinKwon mentioned.
So I modified to pass the default timezone id to JSONOptions and CSVOptions.

Copy link
Member

@HyukjinKwon HyukjinKwon Feb 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, it needed to introduce such logics below before creating JSONOptions/CSVOptions.

val options = extraOptions.toMap
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
if (caseInsensitiveOptions.contains("timeZone")) {
  caseInsensitiveOptions
} else {
  new CaseInsensitiveMap(
  options + ("timeZone" -> sparkSession.sessionState.conf.sessionLocalTimeZone))
}

val parsedOptions: JSONOptions = new JSONOptions(optionsWithTimeZone)

So, I suggested this way as It seems also because the default value of timeZone can be varied. It seems ParquetOptions.compressionCodecClassName also takes another argument for the same reason.

Another way I suggested is, to make this Option[TimeZone] to decouple the variant of the default value (like JSONOptions.columnNameOfCorruptRecord) but it seems timestampFormat in both options are dependent on timeZone. In that case, we should make it Option too which seems introducing some more complexity. So, it seems above way is better.

I am fine if we find a better cleaner way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To cut this short, I think we can resemble JSONOptions.columnNameOfCorruptRecord or ParquetOptions.compressionCodecClassName to deal with the variant of default value.

It seems now it resembles the latter.

@@ -298,6 +299,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeZoneId?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to use timeZone for the option key as the same as spark.sql.session.timeZone for config key for the session local timezone.
What do you think?

c.set(Calendar.MILLISECOND, 123)
checkEvaluation(
JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId),
InternalRow.fromSeq(c.getTimeInMillis * 1000L :: Nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: InternalRow(c.getTimeInMillis * 1000L)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll use it.

)
checkEvaluation(
JsonToStruct(schema, Map.empty, Literal(jsonData1), Option("PST")),
InternalRow.fromSeq(c.getTimeInMillis * 1000L :: Nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the result doesn't change? Sorry it's always hard for me to reason about time-related tests...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, I should have added a comment.
I'll add soon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, it's because the input json string includes timezone string "Z", which means GMT.

@SparkQA
Copy link

SparkQA commented Feb 9, 2017

Test build #72626 has finished for PR 16750 at commit ffc4912.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -58,13 +59,15 @@ private[sql] class JSONOptions(
private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord")

val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId))

// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
val dateFormat: FastDateFormat =
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we don't need timezone here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a combination of the dateFormat and DateTimeUtils.millisToDays() (see JacksonParser.scala#L251 or UnivocityParser.scala#L137).

If both timezones of the dateFormat and DateTimeUtils.millisToDays() are the same, the days will be calculated correctly.
Here the dateFormat will have the default timezone to parse and DateTimeUtils.millisToDays() will also use the default timezone to calculate days here.

val stringTimestampsWithFormat = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "false")
Copy link
Contributor

@cloud-fan cloud-fan Feb 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you turn off the schema inference and don't give a schema, what will be the schema then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema will be StringType for all columns. (CSVInferSchema.scala#L68)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be good if we add some comments to say it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually we should make it more explicitly, by specifying a schema, like https://github.com/apache/spark/pull/16750/files#diff-fde14032b0e6ef8086461edf79a27c5dR1771

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I'll specify the schema in the next pr.

@SparkQA
Copy link

SparkQA commented Feb 13, 2017

Test build #72803 has started for PR 16750 at commit a455f4f.

@ueshin
Copy link
Member Author

ueshin commented Feb 13, 2017

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Feb 13, 2017

Test build #72811 has finished for PR 16750 at commit a455f4f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 15, 2017

Test build #72930 has finished for PR 16750 at commit ae6397d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class TransportChannelHandler extends ChannelInboundHandlerAdapter
  • class LinearSVCWrapperWriter(instance: LinearSVCWrapper) extends MLWriter
  • class LinearSVCWrapperReader extends MLReader[LinearSVCWrapper]
  • class SubstituteBroadcastHints(conf: CatalystConf) extends Rule[LogicalPlan]
  • case class Hint(name: String, parameters: Seq[String], child: LogicalPlan) extends UnaryNode
  • class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging

DataSource(
sparkSession = null,
className = name,
options = Map("timeZone" -> DateTimeUtils.defaultTimeZone().getID)).providingClass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change? I think we will have a default session timezone?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, we can't use the default session timezone because sparkSession is null here..

@cloud-fan
Copy link
Contributor

thanks, merging to master!

please address some remaining comments in your next PR

@asfgit asfgit closed this in 865b2fd Feb 15, 2017
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 16, 2017
## What changes were proposed in this pull request?

This is a follow-up pr of apache#16308.

This pr enables timezone support in CSV/JSON parsing.

We should introduce `timeZone` option for CSV/JSON datasources (the default value of the option is session local timezone).

The datasources should use the `timeZone` option to format/parse to write/read timestamp values.
Notice that while reading, if the timestampFormat has the timezone info, the timezone will not be used because we should respect the timezone in the values.

For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values written with the default timezone option, which is `"GMT"` because session local timezone is `"GMT"` here, are:

```scala
scala> spark.conf.set("spark.sql.session.timeZone", "GMT")

scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> df.show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+

scala> df.write.json("/path/to/gmtjson")
```

```sh
$ cat /path/to/gmtjson/part-*
{"ts":"2016-01-01T00:00:00.000Z"}
```

whereas setting the option to `"PST"`, they are:

```scala
scala> df.write.option("timeZone", "PST").json("/path/to/pstjson")
```

```sh
$ cat /path/to/pstjson/part-*
{"ts":"2015-12-31T16:00:00.000-08:00"}
```

We can properly read these files even if the timezone option is wrong because the timestamp values have timezone info:

```scala
scala> val schema = new StructType().add("ts", TimestampType)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(ts,TimestampType,true))

scala> spark.read.schema(schema).json("/path/to/gmtjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+

scala> spark.read.schema(schema).option("timeZone", "PST").json("/path/to/gmtjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+
```

And even if `timezoneFormat` doesn't contain timezone info, we can properly read the values with setting correct timezone option:

```scala
scala> df.write.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson")
```

```sh
$ cat /path/to/jstjson/part-*
{"ts":"2016-01-01T09:00:00"}
```

```scala
// wrong result
scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").json("/path/to/jstjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 09:00:00|
+-------------------+

// correct result
scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+
```

This pr also makes `JsonToStruct` and `StructToJson` `TimeZoneAwareExpression` to be able to evaluate values with timezone option.

## How was this patch tested?

Existing tests and added some tests.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes apache#16750 from ueshin/issues/SPARK-18937.
ghost pushed a commit to dbtsai/spark that referenced this pull request Mar 4, 2017
## What changes were proposed in this pull request?

This is a follow-up pr of apache#16308 and apache#16750.

This pr enables timezone support in partition values.

We should use `timeZone` option introduced at apache#16750 to parse/format partition values of the `TimestampType`.

For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT` which will be used for partition values, the values written by the default timezone option, which is `"GMT"` because the session local timezone is `"GMT"` here, are:

```scala
scala> spark.conf.set("spark.sql.session.timeZone", "GMT")

scala> val df = Seq((1, new java.sql.Timestamp(1451606400000L))).toDF("i", "ts")
df: org.apache.spark.sql.DataFrame = [i: int, ts: timestamp]

scala> df.show()
+---+-------------------+
|  i|                 ts|
+---+-------------------+
|  1|2016-01-01 00:00:00|
+---+-------------------+

scala> df.write.partitionBy("ts").save("/path/to/gmtpartition")
```

```sh
$ ls /path/to/gmtpartition/
_SUCCESS			ts=2016-01-01 00%3A00%3A00
```

whereas setting the option to `"PST"`, they are:

```scala
scala> df.write.option("timeZone", "PST").partitionBy("ts").save("/path/to/pstpartition")
```

```sh
$ ls /path/to/pstpartition/
_SUCCESS			ts=2015-12-31 16%3A00%3A00
```

We can properly read the partition values if the session local timezone and the timezone of the partition values are the same:

```scala
scala> spark.read.load("/path/to/gmtpartition").show()
+---+-------------------+
|  i|                 ts|
+---+-------------------+
|  1|2016-01-01 00:00:00|
+---+-------------------+
```

And even if the timezones are different, we can properly read the values with setting corrent timezone option:

```scala
// wrong result
scala> spark.read.load("/path/to/pstpartition").show()
+---+-------------------+
|  i|                 ts|
+---+-------------------+
|  1|2015-12-31 16:00:00|
+---+-------------------+

// correct result
scala> spark.read.option("timeZone", "PST").load("/path/to/pstpartition").show()
+---+-------------------+
|  i|                 ts|
+---+-------------------+
|  1|2016-01-01 00:00:00|
+---+-------------------+
```

## How was this patch tested?

Existing tests and added some tests.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes apache#17053 from ueshin/issues/SPARK-18939.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants