From c6ed592cba5854e19670336c66e4090b4ddef2ed Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Mon, 18 Jul 2022 17:02:07 +0000 Subject: [PATCH 1/2] Adding option to fall back to Spark 2.4 Date/Time conversion assuming System default time zone instead of UTC when date has no explicit time zone --- .../docs/configuration-reference.md | 2 +- .../com/azure/cosmos/spark/CosmosConfig.scala | 13 +++-- .../cosmos/spark/CosmosRowConverter.scala | 45 ++++++++++++++--- .../cosmos/spark/CosmosRowConverterSpec.scala | 49 +++++++++++++++++-- 4 files changed, 94 insertions(+), 15 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md b/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md index 11bcd6347b99..14c9e162a3a6 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md @@ -60,7 +60,7 @@ Used to influence the json serialization/deserialization behavior | Config Property Name | Default | Description | | :--- | :---- | :--- | | `spark.cosmos.serialization.inclusionMode` | `Always` | Determines whether null/default values will be serialized to json or whether properties with null/default value will be skipped. The behavior follows the same ideas as [Jackson's JsonInclude.Include](https://github.com/FasterXML/jackson-annotations/blob/d0820002721c76adad2cc87fcd88bf60f56b64de/src/main/java/com/fasterxml/jackson/annotation/JsonInclude.java#L98-L227). `Always` means json properties are created even for null and default values. `NonNull` means no json properties will be created for explicit null values. `NonEmpty` means json properties will not be created for empty string values or empty arrays/mpas. `NonDefault` means json properties will be skipped not just for null/empty but also when the value is identical to the default value `0` for numeric properties for example. | -| `spark.cosmos.serialization.dateTimeConversionMode` | `Default` | The date/time conversion mode (`Default`, `AlwaysEpochMilliseconds`). With `Default` the standard Spark 3.* behavior is used (`java.sql.Date`/`java.time.LocalDate` are converted to EpochDay, `java.sql.Timestamp`/`java.time.Instant` are converted to MicrosecondsFromEpoch). With `AlwaysEpochMilliseconds` the same behavior the Cosmos DB connector for Spark 2.4 used is applied - `java.sql.Date`, `java.time.LocalDate`, `java.sql.Timestamp` and `java.time.Instant` are converted to MillisecondsFromEpoch.| +| `spark.cosmos.serialization.dateTimeConversionMode` | `Default` | The date/time conversion mode (`Default`, `AlwaysEpochMilliseconds`, `AlwaysEpochMillisecondsWithSystemDefaultTimezone`). With `Default` the standard Spark 3.* behavior is used (`java.sql.Date`/`java.time.LocalDate` are converted to EpochDay, `java.sql.Timestamp`/`java.time.Instant` are converted to MicrosecondsFromEpoch). With `AlwaysEpochMilliseconds` the same behavior the Cosmos DB connector for Spark 2.4 used is applied - `java.sql.Date`, `java.time.LocalDate`, `java.sql.Timestamp` and `java.time.Instant` are converted to MillisecondsFromEpoch. The behavior for `AlwaysEpochMillisecondsWithSystemDefaultTimezone` is identical with `AlwaysEpochMilliseconds` except that it will assume System default time zone / Spark session time zone (specified via `spark.sql.session.timezone`) instead of UTC when the date/time to be parsed has no explicit time zone.| #### Change feed (only for Spark-Streaming using `cosmos.oltp.changeFeed` data source, which is read-only) configuration | Config Property Name | Default | Description | diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala index ef4ebacd866b..db74ee39c82f 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala @@ -812,7 +812,10 @@ private object SerializationDateTimeConversionModes extends Enumeration { type SerializationDateTimeConversionMode = Value val Default: SerializationDateTimeConversionModes.Value = Value("Default") - val AlwaysEpochMilliseconds: SerializationDateTimeConversionModes.Value = Value("AlwaysEpochMilliseconds") + val AlwaysEpochMillisecondsWithUtcTimezone: + SerializationDateTimeConversionModes.Value = Value("AlwaysEpochMilliseconds") + val AlwaysEpochMillisecondsWithSystemDefaultTimezone: + SerializationDateTimeConversionModes.Value = Value("AlwaysEpochMillisecondsWithSystemDefaultTimezone") } private case class CosmosSerializationConfig @@ -836,12 +839,16 @@ private object CosmosSerializationConfig { mandatory = false, defaultValue = Some(SerializationDateTimeConversionModes.Default), parseFromStringFunction = value => CosmosConfigEntry.parseEnumeration(value, SerializationDateTimeConversionModes), - helpMessage = "The date/time conversion mode (`Default`, `AlwaysEpochMilliseconds`). " + + helpMessage = "The date/time conversion mode (`Default`, `AlwaysEpochMilliseconds`, " + + "`AlwaysEpochMillisecondsWithSystemDefaultTimezone`). " + "With `Default` the standard Spark 3.* behavior is used (`java.sql.Date`/`java.time.LocalDate` are converted " + "to EpochDay, `java.sql.Timestamp`/`java.time.Instant` are converted to MicrosecondsFromEpoch). With " + "`AlwaysEpochMilliseconds` the same behavior the Cosmos DB connector for Spark 2.4 used is applied - " + "`java.sql.Date`, `java.time.LocalDate`, `java.sql.Timestamp` and `java.time.Instant` are converted " + - "to MillisecondsFromEpoch.") + "to MillisecondsFromEpoch. The behavior for `AlwaysEpochMillisecondsWithSystemDefaultTimezone` is identical " + + "with `AlwaysEpochMilliseconds` except that it will assume System default time zone / Spark session time zone " + + "(specified via `spark.sql.session.time zone`) instead of UTC when the date/time to be parsed has no explicit " + + "time zone.") def parseSerializationConfig(cfg: Map[String, String]): CosmosSerializationConfig = { val inclusionModeOpt = CosmosConfigEntry.parse(cfg, inclusionMode) diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosRowConverter.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosRowConverter.scala index 6a87a8c5d22e..43e0273217a7 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosRowConverter.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosRowConverter.scala @@ -298,28 +298,39 @@ private[cosmos] class CosmosRowConverter( serializationConfig.serializationDateTimeConversionMode match { case SerializationDateTimeConversionModes.Default => convertToJsonNodeConditionally(rowData.asInstanceOf[Long]) - case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds => + case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone => convertToJsonNodeConditionally(LocalDate .ofEpochDay(rowData.asInstanceOf[Long]) .atStartOfDay() .toInstant(ZoneOffset.UTC).toEpochMilli) + case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone => + convertToJsonNodeConditionally(LocalDate + .ofEpochDay(rowData.asInstanceOf[Long]) + .atStartOfDay() + .toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli) } case DateType if rowData.isInstanceOf[java.lang.Integer] => serializationConfig.serializationDateTimeConversionMode match { case SerializationDateTimeConversionModes.Default => convertToJsonNodeConditionally(rowData.asInstanceOf[java.lang.Integer]) - case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds => + case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone => convertToJsonNodeConditionally(LocalDate .ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue()) .atStartOfDay() .toInstant(ZoneOffset.UTC).toEpochMilli) + case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone => + convertToJsonNodeConditionally(LocalDate + .ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue()) + .atStartOfDay() + .toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli) } case DateType => convertToJsonNodeConditionally(rowData.asInstanceOf[Date].getTime) case TimestampType if rowData.isInstanceOf[java.lang.Long] => serializationConfig.serializationDateTimeConversionMode match { case SerializationDateTimeConversionModes.Default => convertToJsonNodeConditionally(rowData.asInstanceOf[java.lang.Long]) - case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds => + case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone | + SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone => val microsSinceEpoch = rowData.asInstanceOf[java.lang.Long] convertToJsonNodeConditionally( Instant.ofEpochSecond( @@ -333,7 +344,8 @@ private[cosmos] class CosmosRowConverter( serializationConfig.serializationDateTimeConversionMode match { case SerializationDateTimeConversionModes.Default => convertToJsonNodeConditionally(rowData.asInstanceOf[java.lang.Integer]) - case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds => + case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone | + SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone => val microsSinceEpoch = rowData.asInstanceOf[java.lang.Integer].longValue() convertToJsonNodeConditionally( Instant.ofEpochSecond( @@ -411,33 +423,49 @@ private[cosmos] class CosmosRowConverter( serializationConfig.serializationDateTimeConversionMode match { case SerializationDateTimeConversionModes.Default => objectMapper.convertValue(rowData.asInstanceOf[java.lang.Long], classOf[JsonNode]) - case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds => + case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone => objectMapper.convertValue( LocalDate .ofEpochDay(rowData.asInstanceOf[java.lang.Long]) .atStartOfDay() .toInstant(ZoneOffset.UTC).toEpochMilli, classOf[JsonNode]) + case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone => + objectMapper.convertValue( + LocalDate + .ofEpochDay(rowData.asInstanceOf[java.lang.Long]) + .atStartOfDay() + .toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli, + classOf[JsonNode]) + } case DateType if rowData.isInstanceOf[java.lang.Integer] => serializationConfig.serializationDateTimeConversionMode match { case SerializationDateTimeConversionModes.Default => objectMapper.convertValue(rowData.asInstanceOf[java.lang.Integer], classOf[JsonNode]) - case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds => + case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone => objectMapper.convertValue( LocalDate .ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue()) .atStartOfDay() .toInstant(ZoneOffset.UTC).toEpochMilli, classOf[JsonNode]) + case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone => + objectMapper.convertValue( + LocalDate + .ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue()) + .atStartOfDay() + .toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli, + classOf[JsonNode]) } case DateType => objectMapper.convertValue(rowData.asInstanceOf[Date].getTime, classOf[JsonNode]) case TimestampType if rowData.isInstanceOf[java.lang.Long] => serializationConfig.serializationDateTimeConversionMode match { case SerializationDateTimeConversionModes.Default => objectMapper.convertValue(rowData.asInstanceOf[java.lang.Long], classOf[JsonNode]) - case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds => + case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone | + SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone => val microsSinceEpoch = rowData.asInstanceOf[java.lang.Long] objectMapper.convertValue( Instant.ofEpochSecond( @@ -452,7 +480,8 @@ private[cosmos] class CosmosRowConverter( serializationConfig.serializationDateTimeConversionMode match { case SerializationDateTimeConversionModes.Default => objectMapper.convertValue(rowData.asInstanceOf[java.lang.Integer], classOf[JsonNode]) - case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds => + case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone | + SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone => val microsSinceEpoch = rowData.asInstanceOf[java.lang.Integer].longValue() objectMapper.convertValue( Instant.ofEpochSecond( diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosRowConverterSpec.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosRowConverterSpec.scala index f44d1c2d7f54..b72bbddd07cc 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosRowConverterSpec.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosRowConverterSpec.scala @@ -16,7 +16,7 @@ import java.sql.{Date, Timestamp} import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit import java.time.{Instant, LocalDate, LocalDateTime, OffsetDateTime, ZoneOffset} -import java.util.UUID +import java.util.{TimeZone, UUID} import scala.util.Random // scalastyle:off underscore.import @@ -40,7 +40,15 @@ class CosmosRowConverterSpec extends UnitSpec with BasicLoggingTrait { CosmosRowConverter.get( new CosmosSerializationConfig( SerializationInclusionModes.Always, - SerializationDateTimeConversionModes.AlwaysEpochMilliseconds + SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone + ) + ) + + private[this] val alwaysEpochMsRowConverterWithSystemDefaultTimezone = + CosmosRowConverter.get( + new CosmosSerializationConfig( + SerializationInclusionModes.Always, + SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone ) ) @@ -48,7 +56,15 @@ class CosmosRowConverterSpec extends UnitSpec with BasicLoggingTrait { CosmosRowConverter.get( new CosmosSerializationConfig( SerializationInclusionModes.NonNull, - SerializationDateTimeConversionModes.AlwaysEpochMilliseconds + SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone + ) + ) + + private[this] val alwaysEpochMsRowConverterNonNullWithSystemDefaultTimezone = + CosmosRowConverter.get( + new CosmosSerializationConfig( + SerializationInclusionModes.NonNull, + SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone ) ) @@ -505,9 +521,36 @@ class CosmosRowConverterSpec extends UnitSpec with BasicLoggingTrait { objectNode.get(colName1).asLong() shouldEqual testDate.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli objectNode.get(colName2).asLong() shouldEqual testTimestamp.toEpochMilli + val originalDefaultTimezone = java.time.ZoneId.systemDefault + try { + TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) + java.time.ZoneId.systemDefault().getId shouldEqual "America/Los_Angeles" + objectNode = alwaysEpochMsRowConverterWithSystemDefaultTimezone.fromRowToObjectNode(row) + objectNode.get(colName1).asLong() shouldEqual testDate + .atStartOfDay() + .toInstant(TimeZone.getTimeZone("America/Los_Angeles").toZoneId.getRules.getOffset(Instant.now)) + .toEpochMilli + objectNode.get(colName2).asLong() shouldEqual testTimestamp.toEpochMilli + } finally { + TimeZone.setDefault(TimeZone.getTimeZone(originalDefaultTimezone.getId)) + } + objectNode = alwaysEpochMsRowConverterNonNull.fromRowToObjectNode(row) objectNode.get(colName1).asLong() shouldEqual testDate.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli objectNode.get(colName2).asLong() shouldEqual testTimestamp.toEpochMilli + + try { + TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) + java.time.ZoneId.systemDefault().getId shouldEqual "America/Los_Angeles" + objectNode = alwaysEpochMsRowConverterNonNullWithSystemDefaultTimezone.fromRowToObjectNode(row) + objectNode.get(colName1).asLong() shouldEqual testDate + .atStartOfDay() + .toInstant(TimeZone.getTimeZone("America/Los_Angeles").toZoneId.getRules.getOffset(Instant.now)) + .toEpochMilli + objectNode.get(colName2).asLong() shouldEqual testTimestamp.toEpochMilli + } finally { + TimeZone.setDefault(TimeZone.getTimeZone(originalDefaultTimezone.getId)) + } } "numeric types in spark row" should "translate to ObjectNode" in { From 2e7823ac7a7b57e2786ec4c15597ac29926bf529 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Mon, 18 Jul 2022 17:13:58 +0000 Subject: [PATCH 2/2] Updating changelog --- sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md | 1 + sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md | 1 + 2 files changed, 2 insertions(+) diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md index ddd269f40b7b..c4b3a382a6c5 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md @@ -9,6 +9,7 @@ #### Bugs Fixed #### Other Changes +* Added a new `spark.cosmos.serialization.dateTimeConversionMode` mode called `AlwaysEpochMillisecondsWithSystemDefaultTimezone` that will assume SystemDefault time zone instead of UTC when a Date/time to be parsed has no explicit time zone. - See [PR 30001](https://github.com/Azure/azure-sdk-for-java/pull/30001) ### 4.12.0 (2022-07-14) diff --git a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md index c1b5afff8988..ec0b10634cee 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md @@ -9,6 +9,7 @@ #### Bugs Fixed #### Other Changes +* Added a new `spark.cosmos.serialization.dateTimeConversionMode` mode called `AlwaysEpochMillisecondsWithSystemDefaultTimezone` that will assume SystemDefault time zone instead of UTC when a Date/time to be parsed has no explicit time zone. - See [PR 30001](https://github.com/Azure/azure-sdk-for-java/pull/30001) ### 4.12.0 (2022-07-14)