Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding option to fall back to Spark 2.4 Date/Time conversion assuming System default time zone instead of UTC when date has no explicit time zone #30001

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#### Bugs Fixed

#### Other Changes
* Added a new `spark.cosmos.serialization.dateTimeConversionMode` mode called `AlwaysEpochMillisecondsWithSystemDefaultTimezone` that will assume SystemDefault time zone instead of UTC when a Date/time to be parsed has no explicit time zone. - See [PR 30001](https://github.com/Azure/azure-sdk-for-java/pull/30001)

### 4.12.0 (2022-07-14)

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#### Bugs Fixed

#### Other Changes
* Added a new `spark.cosmos.serialization.dateTimeConversionMode` mode called `AlwaysEpochMillisecondsWithSystemDefaultTimezone` that will assume SystemDefault time zone instead of UTC when a Date/time to be parsed has no explicit time zone. - See [PR 30001](https://github.com/Azure/azure-sdk-for-java/pull/30001)

### 4.12.0 (2022-07-14)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Used to influence the json serialization/deserialization behavior
| Config Property Name | Default | Description |
| :--- | :---- | :--- |
| `spark.cosmos.serialization.inclusionMode` | `Always` | Determines whether null/default values will be serialized to json or whether properties with null/default value will be skipped. The behavior follows the same ideas as [Jackson's JsonInclude.Include](https://github.com/FasterXML/jackson-annotations/blob/d0820002721c76adad2cc87fcd88bf60f56b64de/src/main/java/com/fasterxml/jackson/annotation/JsonInclude.java#L98-L227). `Always` means json properties are created even for null and default values. `NonNull` means no json properties will be created for explicit null values. `NonEmpty` means json properties will not be created for empty string values or empty arrays/mpas. `NonDefault` means json properties will be skipped not just for null/empty but also when the value is identical to the default value `0` for numeric properties for example. |
| `spark.cosmos.serialization.dateTimeConversionMode` | `Default` | The date/time conversion mode (`Default`, `AlwaysEpochMilliseconds`). With `Default` the standard Spark 3.* behavior is used (`java.sql.Date`/`java.time.LocalDate` are converted to EpochDay, `java.sql.Timestamp`/`java.time.Instant` are converted to MicrosecondsFromEpoch). With `AlwaysEpochMilliseconds` the same behavior the Cosmos DB connector for Spark 2.4 used is applied - `java.sql.Date`, `java.time.LocalDate`, `java.sql.Timestamp` and `java.time.Instant` are converted to MillisecondsFromEpoch.|
| `spark.cosmos.serialization.dateTimeConversionMode` | `Default` | The date/time conversion mode (`Default`, `AlwaysEpochMilliseconds`, `AlwaysEpochMillisecondsWithSystemDefaultTimezone`). With `Default` the standard Spark 3.* behavior is used (`java.sql.Date`/`java.time.LocalDate` are converted to EpochDay, `java.sql.Timestamp`/`java.time.Instant` are converted to MicrosecondsFromEpoch). With `AlwaysEpochMilliseconds` the same behavior the Cosmos DB connector for Spark 2.4 used is applied - `java.sql.Date`, `java.time.LocalDate`, `java.sql.Timestamp` and `java.time.Instant` are converted to MillisecondsFromEpoch. The behavior for `AlwaysEpochMillisecondsWithSystemDefaultTimezone` is identical with `AlwaysEpochMilliseconds` except that it will assume System default time zone / Spark session time zone (specified via `spark.sql.session.timezone`) instead of UTC when the date/time to be parsed has no explicit time zone.|

#### Change feed (only for Spark-Streaming using `cosmos.oltp.changeFeed` data source, which is read-only) configuration
| Config Property Name | Default | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,10 @@ private object SerializationDateTimeConversionModes extends Enumeration {
type SerializationDateTimeConversionMode = Value

val Default: SerializationDateTimeConversionModes.Value = Value("Default")
val AlwaysEpochMilliseconds: SerializationDateTimeConversionModes.Value = Value("AlwaysEpochMilliseconds")
val AlwaysEpochMillisecondsWithUtcTimezone:
SerializationDateTimeConversionModes.Value = Value("AlwaysEpochMilliseconds")
val AlwaysEpochMillisecondsWithSystemDefaultTimezone:
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
SerializationDateTimeConversionModes.Value = Value("AlwaysEpochMillisecondsWithSystemDefaultTimezone")
}

private case class CosmosSerializationConfig
Expand All @@ -836,12 +839,16 @@ private object CosmosSerializationConfig {
mandatory = false,
defaultValue = Some(SerializationDateTimeConversionModes.Default),
parseFromStringFunction = value => CosmosConfigEntry.parseEnumeration(value, SerializationDateTimeConversionModes),
helpMessage = "The date/time conversion mode (`Default`, `AlwaysEpochMilliseconds`). " +
helpMessage = "The date/time conversion mode (`Default`, `AlwaysEpochMilliseconds`, " +
"`AlwaysEpochMillisecondsWithSystemDefaultTimezone`). " +
"With `Default` the standard Spark 3.* behavior is used (`java.sql.Date`/`java.time.LocalDate` are converted " +
"to EpochDay, `java.sql.Timestamp`/`java.time.Instant` are converted to MicrosecondsFromEpoch). With " +
"`AlwaysEpochMilliseconds` the same behavior the Cosmos DB connector for Spark 2.4 used is applied - " +
"`java.sql.Date`, `java.time.LocalDate`, `java.sql.Timestamp` and `java.time.Instant` are converted " +
"to MillisecondsFromEpoch.")
"to MillisecondsFromEpoch. The behavior for `AlwaysEpochMillisecondsWithSystemDefaultTimezone` is identical " +
"with `AlwaysEpochMilliseconds` except that it will assume System default time zone / Spark session time zone " +
"(specified via `spark.sql.session.time zone`) instead of UTC when the date/time to be parsed has no explicit " +
"time zone.")

def parseSerializationConfig(cfg: Map[String, String]): CosmosSerializationConfig = {
val inclusionModeOpt = CosmosConfigEntry.parse(cfg, inclusionMode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,28 +298,39 @@ private[cosmos] class CosmosRowConverter(
serializationConfig.serializationDateTimeConversionMode match {
case SerializationDateTimeConversionModes.Default =>
convertToJsonNodeConditionally(rowData.asInstanceOf[Long])
case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds =>
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone =>
convertToJsonNodeConditionally(LocalDate
.ofEpochDay(rowData.asInstanceOf[Long])
.atStartOfDay()
.toInstant(ZoneOffset.UTC).toEpochMilli)
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone =>
convertToJsonNodeConditionally(LocalDate
.ofEpochDay(rowData.asInstanceOf[Long])
.atStartOfDay()
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli)
}
case DateType if rowData.isInstanceOf[java.lang.Integer] =>
serializationConfig.serializationDateTimeConversionMode match {
case SerializationDateTimeConversionModes.Default =>
convertToJsonNodeConditionally(rowData.asInstanceOf[java.lang.Integer])
case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds =>
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone =>
convertToJsonNodeConditionally(LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue())
.atStartOfDay()
.toInstant(ZoneOffset.UTC).toEpochMilli)
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone =>
convertToJsonNodeConditionally(LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue())
.atStartOfDay()
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli)
}
case DateType => convertToJsonNodeConditionally(rowData.asInstanceOf[Date].getTime)
case TimestampType if rowData.isInstanceOf[java.lang.Long] =>
serializationConfig.serializationDateTimeConversionMode match {
case SerializationDateTimeConversionModes.Default =>
convertToJsonNodeConditionally(rowData.asInstanceOf[java.lang.Long])
case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds =>
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone |
SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone =>
val microsSinceEpoch = rowData.asInstanceOf[java.lang.Long]
convertToJsonNodeConditionally(
Instant.ofEpochSecond(
Expand All @@ -333,7 +344,8 @@ private[cosmos] class CosmosRowConverter(
serializationConfig.serializationDateTimeConversionMode match {
case SerializationDateTimeConversionModes.Default =>
convertToJsonNodeConditionally(rowData.asInstanceOf[java.lang.Integer])
case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds =>
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone |
SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone =>
val microsSinceEpoch = rowData.asInstanceOf[java.lang.Integer].longValue()
convertToJsonNodeConditionally(
Instant.ofEpochSecond(
Expand Down Expand Up @@ -411,33 +423,49 @@ private[cosmos] class CosmosRowConverter(
serializationConfig.serializationDateTimeConversionMode match {
case SerializationDateTimeConversionModes.Default =>
objectMapper.convertValue(rowData.asInstanceOf[java.lang.Long], classOf[JsonNode])
case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds =>
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone =>
objectMapper.convertValue(
LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Long])
.atStartOfDay()
.toInstant(ZoneOffset.UTC).toEpochMilli,
classOf[JsonNode])
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone =>
objectMapper.convertValue(
LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Long])
.atStartOfDay()
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli,
classOf[JsonNode])

}

case DateType if rowData.isInstanceOf[java.lang.Integer] =>
serializationConfig.serializationDateTimeConversionMode match {
case SerializationDateTimeConversionModes.Default =>
objectMapper.convertValue(rowData.asInstanceOf[java.lang.Integer], classOf[JsonNode])
case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds =>
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone =>
objectMapper.convertValue(
LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue())
.atStartOfDay()
.toInstant(ZoneOffset.UTC).toEpochMilli,
classOf[JsonNode])
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone =>
objectMapper.convertValue(
LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue())
.atStartOfDay()
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli,
classOf[JsonNode])
}
case DateType => objectMapper.convertValue(rowData.asInstanceOf[Date].getTime, classOf[JsonNode])
case TimestampType if rowData.isInstanceOf[java.lang.Long] =>
serializationConfig.serializationDateTimeConversionMode match {
case SerializationDateTimeConversionModes.Default =>
objectMapper.convertValue(rowData.asInstanceOf[java.lang.Long], classOf[JsonNode])
case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds =>
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone |
SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone =>
val microsSinceEpoch = rowData.asInstanceOf[java.lang.Long]
objectMapper.convertValue(
Instant.ofEpochSecond(
Expand All @@ -452,7 +480,8 @@ private[cosmos] class CosmosRowConverter(
serializationConfig.serializationDateTimeConversionMode match {
case SerializationDateTimeConversionModes.Default =>
objectMapper.convertValue(rowData.asInstanceOf[java.lang.Integer], classOf[JsonNode])
case SerializationDateTimeConversionModes.AlwaysEpochMilliseconds =>
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone |
SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone =>
val microsSinceEpoch = rowData.asInstanceOf[java.lang.Integer].longValue()
objectMapper.convertValue(
Instant.ofEpochSecond(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import java.sql.{Date, Timestamp}
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.time.{Instant, LocalDate, LocalDateTime, OffsetDateTime, ZoneOffset}
import java.util.UUID
import java.util.{TimeZone, UUID}
import scala.util.Random

// scalastyle:off underscore.import
Expand All @@ -40,15 +40,31 @@ class CosmosRowConverterSpec extends UnitSpec with BasicLoggingTrait {
CosmosRowConverter.get(
new CosmosSerializationConfig(
SerializationInclusionModes.Always,
SerializationDateTimeConversionModes.AlwaysEpochMilliseconds
SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone
)
)

private[this] val alwaysEpochMsRowConverterWithSystemDefaultTimezone =
CosmosRowConverter.get(
new CosmosSerializationConfig(
SerializationInclusionModes.Always,
SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone
)
)

private[this] val alwaysEpochMsRowConverterNonNull =
CosmosRowConverter.get(
new CosmosSerializationConfig(
SerializationInclusionModes.NonNull,
SerializationDateTimeConversionModes.AlwaysEpochMilliseconds
SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone
)
)

private[this] val alwaysEpochMsRowConverterNonNullWithSystemDefaultTimezone =
CosmosRowConverter.get(
new CosmosSerializationConfig(
SerializationInclusionModes.NonNull,
SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone
)
)

Expand Down Expand Up @@ -505,9 +521,36 @@ class CosmosRowConverterSpec extends UnitSpec with BasicLoggingTrait {
objectNode.get(colName1).asLong() shouldEqual testDate.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli
objectNode.get(colName2).asLong() shouldEqual testTimestamp.toEpochMilli

val originalDefaultTimezone = java.time.ZoneId.systemDefault
try {
TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
java.time.ZoneId.systemDefault().getId shouldEqual "America/Los_Angeles"
objectNode = alwaysEpochMsRowConverterWithSystemDefaultTimezone.fromRowToObjectNode(row)
objectNode.get(colName1).asLong() shouldEqual testDate
.atStartOfDay()
.toInstant(TimeZone.getTimeZone("America/Los_Angeles").toZoneId.getRules.getOffset(Instant.now))
.toEpochMilli
objectNode.get(colName2).asLong() shouldEqual testTimestamp.toEpochMilli
} finally {
TimeZone.setDefault(TimeZone.getTimeZone(originalDefaultTimezone.getId))
}

objectNode = alwaysEpochMsRowConverterNonNull.fromRowToObjectNode(row)
objectNode.get(colName1).asLong() shouldEqual testDate.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli
objectNode.get(colName2).asLong() shouldEqual testTimestamp.toEpochMilli

try {
TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
java.time.ZoneId.systemDefault().getId shouldEqual "America/Los_Angeles"
objectNode = alwaysEpochMsRowConverterNonNullWithSystemDefaultTimezone.fromRowToObjectNode(row)
objectNode.get(colName1).asLong() shouldEqual testDate
.atStartOfDay()
.toInstant(TimeZone.getTimeZone("America/Los_Angeles").toZoneId.getRules.getOffset(Instant.now))
.toEpochMilli
objectNode.get(colName2).asLong() shouldEqual testTimestamp.toEpochMilli
} finally {
TimeZone.setDefault(TimeZone.getTimeZone(originalDefaultTimezone.getId))
}
}

"numeric types in spark row" should "translate to ObjectNode" in {
Expand Down