Skip to content

Commit

Permalink
Fixing the SerializationDateTimeConversionMode `AlwaysEpochMillisecds…
Browse files Browse the repository at this point in the history
…WithSystemDefaultTimezone` (#30266)

* Fixing the SerializationDateTimeConversionMode `AlwaysEpochMillisecondsWithSystemDefaultTimezone`

The `CosmosRowConverter` had a bug assuming that ZoneOffsets between two time zones would be constant - in fact in the 19xx years political rules defining summer/winter time have changed multiple times. To get the correct ZoneOffset the right "local" Instant (vs. Instant.now) needs to be used.

* Adding changelog

* Adding UDF to vonvert back ChangeFeedOffset to Map[Int, Long] to make it possible to save in hdfs as Spark2 continuations

* Preparing azure-cosmos-spark 4.12.2 release

* Updating test for time conversion
  • Loading branch information
FabianMeiswinkel committed Aug 5, 2022
1 parent dac0c7a commit caddd5a
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 41 deletions.
4 changes: 2 additions & 2 deletions eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ com.azure:azure-cosmos;4.33.1;4.34.0-beta.1
com.azure:azure-cosmos-benchmark;4.0.1-beta.1;4.0.1-beta.1
com.azure:azure-cosmos-dotnet-benchmark;4.0.1-beta.1;4.0.1-beta.1
com.azure.cosmos.spark:azure-cosmos-spark_3_2-12;1.0.0-beta.1;1.0.0-beta.1
com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12;4.12.1;4.13.0-beta.1
com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12;4.12.1;4.13.0-beta.1
com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12;4.12.1;4.12.2
com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12;4.12.1;4.12.2
com.azure:azure-cosmos-encryption;1.4.1;1.5.0-beta.1
com.azure:azure-data-appconfiguration;1.3.5;1.4.0-beta.1
com.azure:azure-data-appconfiguration-perf;1.0.0-beta.1;1.0.0-beta.1
Expand Down
7 changes: 2 additions & 5 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
## Release History

### 4.13.0-beta.1 (Unreleased)

#### Features Added

#### Breaking Changes
### 4.12.2 (2022-08-04)

#### Bugs Fixed
* Fixed the SerializationDateTimeConversionMode `AlwaysEpochMillisecdsWithSystemDefaultTimezone` where ZoneOffset calculation could be wrong especially for dates in the 19xx years. - See [PR 30266](https://github.com/Azure/azure-sdk-for-java/pull/30266)

#### Other Changes
* Added support to allow config different account for throughput control - See [PR 30127](https://github.com/Azure/azure-sdk-for-java/pull/30127)
Expand Down
8 changes: 6 additions & 2 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ https://github.com/Azure/azure-sdk-for-java/issues/new
#### azure-cosmos-spark_3-1_2-12
| Connector | Supported Spark Versions | Minimum Java Version | Supported Scala Versions | Supported Databricks Runtimes |
|--------------| ------------------------ | -------------------- | ----------------------- | ----------------------------- |
| 4.12.2 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
| 4.12.1 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
| 4.12.0 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
| 4.11.2 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
| 4.11.1 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
Expand Down Expand Up @@ -60,6 +62,8 @@ https://github.com/Azure/azure-sdk-for-java/issues/new
#### azure-cosmos-spark_3-2_2-12
| Connector | Supported Spark Versions | Minimum Java Version | Supported Scala Versions | Supported Databricks Runtimes |
|-----------| ------------------------ | -------------------- | ----------------------- | ----------------------------- |
| 4.12.2 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
| 4.12.1 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
| 4.12.0 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
| 4.11.2 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
| 4.11.1 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
Expand All @@ -76,11 +80,11 @@ https://github.com/Azure/azure-sdk-for-java/issues/new
### Download

You can use the maven coordinate of the jar to auto install the Spark Connector to your Databricks Runtime 8 from Maven:
`com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12:4.12.0`
`com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12:4.12.2`

You can also integrate against Cosmos DB Spark Connector in your SBT project:
```scala
libraryDependencies += "com.azure.cosmos.spark" % "azure-cosmos-spark_3-1_2-12" % "4.12.0"
libraryDependencies += "com.azure.cosmos.spark" % "azure-cosmos-spark_3-1_2-12" % "4.12.2"
```

Cosmos DB Spark Connector is available on [Maven Central Repo](https://search.maven.org/search?q=g:com.azure.cosmos.spark).
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos-spark_3-1_2-12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</parent>
<groupId>com.azure.cosmos.spark</groupId>
<artifactId>azure-cosmos-spark_3-1_2-12</artifactId>
<version>4.13.0-beta.1</version> <!-- {x-version-update;com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12;current} -->
<version>4.12.2</version> <!-- {x-version-update;com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12;current} -->
<packaging>jar</packaging>
<url>https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/cosmos/azure-cosmos-spark_3-1_2-12</url>
<name>OLTP Spark 3.1 Connector for Azure Cosmos DB SQL API</name>
Expand Down
7 changes: 2 additions & 5 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
## Release History

### 4.13.0-beta.1 (Unreleased)

#### Features Added

#### Breaking Changes
### 4.12.2 (2022-08-04)

#### Bugs Fixed
* Fixed the SerializationDateTimeConversionMode `AlwaysEpochMillisecdsWithSystemDefaultTimezone` where ZoneOffset calculation could be wrong especially for dates in the 19xx years. - See [PR 30266](https://github.com/Azure/azure-sdk-for-java/pull/30266)

#### Other Changes
* Added support to allow config different account for throughput control - See [PR 30127](https://github.com/Azure/azure-sdk-for-java/pull/30127)
Expand Down
8 changes: 6 additions & 2 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ https://github.com/Azure/azure-sdk-for-java/issues/new
#### azure-cosmos-spark_3-2_2-12
| Connector | Supported Spark Versions | Minimum Java Version | Supported Scala Versions | Supported Databricks Runtimes |
|-----------| ------------------------ | -------------------- | ----------------------- | ----------------------------- |
| 4.12.2 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
| 4.12.1 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
| 4.12.0 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
| 4.11.2 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
| 4.11.1 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
Expand All @@ -44,6 +46,8 @@ https://github.com/Azure/azure-sdk-for-java/issues/new
#### azure-cosmos-spark_3-1_2-12
| Connector | Supported Spark Versions | Minimum Java Version | Supported Scala Versions | Supported Databricks Runtimes |
|--------------| ------------------------ | -------------------- | ----------------------- | ----------------------------- |
| 4.12.2 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
| 4.12.1 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
| 4.12.0 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
| 4.11.2 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
| 4.11.1 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
Expand Down Expand Up @@ -75,11 +79,11 @@ https://github.com/Azure/azure-sdk-for-java/issues/new
### Download

You can use the maven coordinate of the jar to auto install the Spark Connector to your Databricks Runtime 8 from Maven:
`com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.12.0`
`com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.12.2`

You can also integrate against Cosmos DB Spark Connector in your SBT project:
```scala
libraryDependencies += "com.azure.cosmos.spark" % "azure-cosmos-spark_3-2_2-12" % "4.12.0"
libraryDependencies += "com.azure.cosmos.spark" % "azure-cosmos-spark_3-2_2-12" % "4.12.2"
```

Cosmos DB Spark Connector is available on [Maven Central Repo](https://search.maven.org/search?q=g:com.azure.cosmos.spark).
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos-spark_3-2_2-12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</parent>
<groupId>com.azure.cosmos.spark</groupId>
<artifactId>azure-cosmos-spark_3-2_2-12</artifactId>
<version>4.13.0-beta.1</version> <!-- {x-version-update;com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12;current} -->
<version>4.12.2</version> <!-- {x-version-update;com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12;current} -->
<packaging>jar</packaging>
<url>https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/cosmos/azure-cosmos-spark_3-2_2-12</url>
<name>OLTP Spark 3.2 Connector for Azure Cosmos DB SQL API</name>
Expand Down
4 changes: 2 additions & 2 deletions sdk/cosmos/azure-cosmos-spark_3_2-12/docs/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ You can use any other Spark 3.1.1 spark offering as well, also you should be abl
SLF4J is only needed if you plan to use logging, please also download an SLF4J binding which will link the SLF4J API with the logging implementation of your choice. See the [SLF4J user manual](https://www.slf4j.org/manual.html) for more information.

For Spark 3.1:
- Install Cosmos DB Spark Connector, in your spark Cluster [com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12:4.12.0](https://search.maven.org/artifact/com.azure.cosmos.spark/azure-cosmos-spark_3-1_2-12/4.11.2/jar)
- Install Cosmos DB Spark Connector, in your spark Cluster [com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12:4.12.2](https://search.maven.org/artifact/com.azure.cosmos.spark/azure-cosmos-spark_3-1_2-12/4.12.2/jar)

For Spark 3.2:
- Install Cosmos DB Spark Connector, in your spark Cluster [com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.12.0](https://search.maven.org/artifact/com.azure.cosmos.spark/azure-cosmos-spark_3-2_2-12/4.11.2/jar)
- Install Cosmos DB Spark Connector, in your spark Cluster [com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.12.2](https://search.maven.org/artifact/com.azure.cosmos.spark/azure-cosmos-spark_3-2_2-12/4.12.2/jar)


The getting started guide is based on PySpark however you can use the equivalent scala version as well, and you can run the following code snippet in an Azure Databricks PySpark notebook.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[cosmos] case class ChangeFeedOffset
override def json(): String = jsonPersisted
}

private[spark] object ChangeFeedOffset {
private[cosmos] object ChangeFeedOffset {
private val IdPropertyName: String = "id"
private val StatePropertyName: String = "state"
private val InputPartitionsPropertyName: String = "partitions"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,14 @@ private[cosmos] class CosmosRowConverter(
.atStartOfDay()
.toInstant(ZoneOffset.UTC).toEpochMilli)
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone =>
convertToJsonNodeConditionally(LocalDate
val localDate = LocalDate
.ofEpochDay(rowData.asInstanceOf[Long])
.atStartOfDay()
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli)
val localTimestampInstant = Timestamp.valueOf(localDate).toInstant

convertToJsonNodeConditionally(
localDate
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(localTimestampInstant)).toEpochMilli)
}
case DateType if rowData.isInstanceOf[java.lang.Integer] =>
serializationConfig.serializationDateTimeConversionMode match {
Expand All @@ -332,10 +336,13 @@ private[cosmos] class CosmosRowConverter(
.atStartOfDay()
.toInstant(ZoneOffset.UTC).toEpochMilli)
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone =>
convertToJsonNodeConditionally(LocalDate
val localDate = LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue())
.atStartOfDay()
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli)
val localTimestampInstant = Timestamp.valueOf(localDate).toInstant
convertToJsonNodeConditionally(
localDate
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(localTimestampInstant)).toEpochMilli)
}
case DateType => convertToJsonNodeConditionally(rowData.asInstanceOf[Date].getTime)
case TimestampType if rowData.isInstanceOf[java.lang.Long] =>
Expand All @@ -358,7 +365,7 @@ private[cosmos] class CosmosRowConverter(
case SerializationDateTimeConversionModes.Default =>
convertToJsonNodeConditionally(rowData.asInstanceOf[java.lang.Integer])
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone |
SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone =>
SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone =>
val microsSinceEpoch = rowData.asInstanceOf[java.lang.Integer].longValue()
convertToJsonNodeConditionally(
Instant.ofEpochSecond(
Expand Down Expand Up @@ -444,13 +451,14 @@ private[cosmos] class CosmosRowConverter(
.toInstant(ZoneOffset.UTC).toEpochMilli,
classOf[JsonNode])
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone =>
val localDate = LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Long])
.atStartOfDay()
val localTimestampInstant = Timestamp.valueOf(localDate).toInstant
objectMapper.convertValue(
LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Long])
.atStartOfDay()
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli,
localDate
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(localTimestampInstant)).toEpochMilli,
classOf[JsonNode])

}

case DateType if rowData.isInstanceOf[java.lang.Integer] =>
Expand All @@ -465,11 +473,13 @@ private[cosmos] class CosmosRowConverter(
.toInstant(ZoneOffset.UTC).toEpochMilli,
classOf[JsonNode])
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone =>
val localDate = LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue())
.atStartOfDay()
val localTimestampInstant = Timestamp.valueOf(localDate).toInstant
objectMapper.convertValue(
LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue())
.atStartOfDay()
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli,
localDate
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(localTimestampInstant)).toEpochMilli,
classOf[JsonNode])
}
case DateType => objectMapper.convertValue(rowData.asInstanceOf[Date].getTime, classOf[JsonNode])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.spark.udf

import com.azure.cosmos.{CosmosAsyncClient, SparkBridgeInternal}
import com.azure.cosmos.implementation.SparkBridgeImplementationInternal
import com.azure.cosmos.implementation.SparkBridgeImplementationInternal.rangeToNormalizedRange
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState
import com.azure.cosmos.implementation.query.CompositeContinuationToken
import com.azure.cosmos.spark.{ChangeFeedOffset, CosmosClientCache, CosmosClientCacheItem, CosmosClientConfiguration, CosmosConfig, CosmosContainerConfig, Loan}
import org.apache.spark.sql.api.java.UDF2

import scala.collection.mutable

@SerialVersionUID(1L)
class CreateSpark2ContinuationsFromChangeFeedOffset extends UDF2[Map[String, String], String, Map[Int, Long]] {
override def call
(
userProvidedConfig: Map[String, String],
changeFeedOffset: String
): Map[Int, Long] = {

val effectiveUserConfig = CosmosConfig.getEffectiveConfig(None, None, userProvidedConfig)
val cosmosClientConfig = CosmosClientConfiguration(
effectiveUserConfig,
useEventualConsistency = false)

val cosmosContainerConfig: CosmosContainerConfig =
CosmosContainerConfig.parseCosmosContainerConfig(effectiveUserConfig, None, None)

Loan(
List[Option[CosmosClientCacheItem]](
Some(CosmosClientCache(
cosmosClientConfig,
None,
s"UDF CreateSpark2ContinuationsFromChangeFeedOffset"
))
))
.to(cosmosClientCacheItems => {
createSpark2ContinuationsFromChangeFeedOffset(
cosmosClientCacheItems.head.get.client,
cosmosContainerConfig.database,
cosmosContainerConfig.container,
changeFeedOffset
)
})
}

private[this] def createSpark2ContinuationsFromChangeFeedOffset
(
client: CosmosAsyncClient,
databaseName: String,
containerName: String,
offsetJson: String
): Map[Int, Long] = {

val effectiveOffsetJson = if (offsetJson.indexOf("\n") == 2 && offsetJson.size > 2) {
offsetJson.substring(3)
} else {
offsetJson
}
val offset: ChangeFeedOffset = ChangeFeedOffset.fromJson(effectiveOffsetJson)

val container = client
.getDatabase(databaseName)
.getContainer(containerName)

val expectedContainerResourceId = container.read().block().getProperties.getResourceId

val pkRanges = SparkBridgeInternal
.getPartitionKeyRanges(container)

val lsnsByPkRangeId = mutable.Map[Int, Long]()

pkRanges
.foreach(pkRange => {
val normalizedRange = rangeToNormalizedRange(pkRange.toRange)

val effectiveChangeFeedState = ChangeFeedState
.fromString(
SparkBridgeImplementationInternal
.extractChangeFeedStateForRange(offset.changeFeedState, normalizedRange)
)

val containerResourceId = effectiveChangeFeedState.getContainerRid

if (!expectedContainerResourceId.equalsIgnoreCase(containerResourceId)) {
throw new IllegalArgumentException(
s"The provided change feed offset is for a different container (either completely different container " +
s"or container with same name but after being deleted and recreated). Name:$containerName " +
s"Expected ResourceId: $expectedContainerResourceId, " +
s"Actual ResourceId: $containerResourceId"
)
}

var minLsn: Option[CompositeContinuationToken] = None

effectiveChangeFeedState
.extractContinuationTokens()
.forEach(token => {

if (minLsn.isEmpty) {
minLsn = Some(token)
} else if (SparkBridgeImplementationInternal.toLsn(token.getToken) <
SparkBridgeImplementationInternal.toLsn(minLsn.get.getToken)) {
minLsn = Some(token)
}
})

if (minLsn.isDefined) {
lsnsByPkRangeId.put(
pkRange.getId.toInt,
SparkBridgeImplementationInternal.toLsn(minLsn.get.getToken))
}
})

lsnsByPkRangeId.toMap
}
}

0 comments on commit caddd5a

Please sign in to comment.