Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing the SerializationDateTimeConversionMode AlwaysEpochMillisecdsWithSystemDefaultTimezone #30266

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ com.azure:azure-cosmos;4.33.1;4.34.0-beta.1
com.azure:azure-cosmos-benchmark;4.0.1-beta.1;4.0.1-beta.1
com.azure:azure-cosmos-dotnet-benchmark;4.0.1-beta.1;4.0.1-beta.1
com.azure.cosmos.spark:azure-cosmos-spark_3_2-12;1.0.0-beta.1;1.0.0-beta.1
com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12;4.12.1;4.13.0-beta.1
com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12;4.12.1;4.13.0-beta.1
com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12;4.12.1;4.12.2
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12;4.12.1;4.12.2
com.azure:azure-cosmos-encryption;1.4.1;1.5.0-beta.1
com.azure:azure-data-appconfiguration;1.3.5;1.4.0-beta.1
com.azure:azure-data-appconfiguration-perf;1.0.0-beta.1;1.0.0-beta.1
Expand Down
7 changes: 2 additions & 5 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
## Release History

### 4.13.0-beta.1 (Unreleased)

#### Features Added

#### Breaking Changes
### 4.12.2 (2022-08-04)

#### Bugs Fixed
* Fixed the SerializationDateTimeConversionMode `AlwaysEpochMillisecdsWithSystemDefaultTimezone` where ZoneOffset calculation could be wrong especially for dates in the 19xx years. - See [PR 30266](https://github.com/Azure/azure-sdk-for-java/pull/30266)

#### Other Changes
* Added support to allow config different account for throughput control - See [PR 30127](https://github.com/Azure/azure-sdk-for-java/pull/30127)
Expand Down
8 changes: 6 additions & 2 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ https://github.com/Azure/azure-sdk-for-java/issues/new
#### azure-cosmos-spark_3-1_2-12
| Connector | Supported Spark Versions | Minimum Java Version | Supported Scala Versions | Supported Databricks Runtimes |
|--------------| ------------------------ | -------------------- | ----------------------- | ----------------------------- |
| 4.12.2 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
| 4.12.1 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
| 4.12.0 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
| 4.11.2 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
| 4.11.1 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
Expand Down Expand Up @@ -60,6 +62,8 @@ https://github.com/Azure/azure-sdk-for-java/issues/new
#### azure-cosmos-spark_3-2_2-12
| Connector | Supported Spark Versions | Minimum Java Version | Supported Scala Versions | Supported Databricks Runtimes |
|-----------| ------------------------ | -------------------- | ----------------------- | ----------------------------- |
| 4.12.2 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
| 4.12.1 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
| 4.12.0 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
| 4.11.2 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
| 4.11.1 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
Expand All @@ -76,11 +80,11 @@ https://github.com/Azure/azure-sdk-for-java/issues/new
### Download

You can use the maven coordinate of the jar to auto install the Spark Connector to your Databricks Runtime 8 from Maven:
`com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12:4.12.0`
`com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12:4.12.2`

You can also integrate against Cosmos DB Spark Connector in your SBT project:
```scala
libraryDependencies += "com.azure.cosmos.spark" % "azure-cosmos-spark_3-1_2-12" % "4.12.0"
libraryDependencies += "com.azure.cosmos.spark" % "azure-cosmos-spark_3-1_2-12" % "4.12.2"
```

Cosmos DB Spark Connector is available on [Maven Central Repo](https://search.maven.org/search?q=g:com.azure.cosmos.spark).
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos-spark_3-1_2-12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</parent>
<groupId>com.azure.cosmos.spark</groupId>
<artifactId>azure-cosmos-spark_3-1_2-12</artifactId>
<version>4.13.0-beta.1</version> <!-- {x-version-update;com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12;current} -->
<version>4.12.2</version> <!-- {x-version-update;com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12;current} -->
<packaging>jar</packaging>
<url>https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/cosmos/azure-cosmos-spark_3-1_2-12</url>
<name>OLTP Spark 3.1 Connector for Azure Cosmos DB SQL API</name>
Expand Down
7 changes: 2 additions & 5 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
## Release History

### 4.13.0-beta.1 (Unreleased)

#### Features Added

#### Breaking Changes
### 4.12.2 (2022-08-04)

#### Bugs Fixed
* Fixed the SerializationDateTimeConversionMode `AlwaysEpochMillisecdsWithSystemDefaultTimezone` where ZoneOffset calculation could be wrong especially for dates in the 19xx years. - See [PR 30266](https://github.com/Azure/azure-sdk-for-java/pull/30266)

#### Other Changes
* Added support to allow config different account for throughput control - See [PR 30127](https://github.com/Azure/azure-sdk-for-java/pull/30127)
Expand Down
8 changes: 6 additions & 2 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ https://github.com/Azure/azure-sdk-for-java/issues/new
#### azure-cosmos-spark_3-2_2-12
| Connector | Supported Spark Versions | Minimum Java Version | Supported Scala Versions | Supported Databricks Runtimes |
|-----------| ------------------------ | -------------------- | ----------------------- | ----------------------------- |
| 4.12.2 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
| 4.12.1 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
| 4.12.0 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
| 4.11.2 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
| 4.11.1 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* |
Expand All @@ -44,6 +46,8 @@ https://github.com/Azure/azure-sdk-for-java/issues/new
#### azure-cosmos-spark_3-1_2-12
| Connector | Supported Spark Versions | Minimum Java Version | Supported Scala Versions | Supported Databricks Runtimes |
|--------------| ------------------------ | -------------------- | ----------------------- | ----------------------------- |
| 4.12.2 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
| 4.12.1 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
| 4.12.0 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
| 4.11.2 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
| 4.11.1 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* |
Expand Down Expand Up @@ -75,11 +79,11 @@ https://github.com/Azure/azure-sdk-for-java/issues/new
### Download

You can use the maven coordinate of the jar to auto install the Spark Connector to your Databricks Runtime 8 from Maven:
`com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.12.0`
`com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.12.2`

You can also integrate against Cosmos DB Spark Connector in your SBT project:
```scala
libraryDependencies += "com.azure.cosmos.spark" % "azure-cosmos-spark_3-2_2-12" % "4.12.0"
libraryDependencies += "com.azure.cosmos.spark" % "azure-cosmos-spark_3-2_2-12" % "4.12.2"
```

Cosmos DB Spark Connector is available on [Maven Central Repo](https://search.maven.org/search?q=g:com.azure.cosmos.spark).
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos-spark_3-2_2-12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</parent>
<groupId>com.azure.cosmos.spark</groupId>
<artifactId>azure-cosmos-spark_3-2_2-12</artifactId>
<version>4.13.0-beta.1</version> <!-- {x-version-update;com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12;current} -->
<version>4.12.2</version> <!-- {x-version-update;com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12;current} -->
<packaging>jar</packaging>
<url>https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/cosmos/azure-cosmos-spark_3-2_2-12</url>
<name>OLTP Spark 3.2 Connector for Azure Cosmos DB SQL API</name>
Expand Down
4 changes: 2 additions & 2 deletions sdk/cosmos/azure-cosmos-spark_3_2-12/docs/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ You can use any other Spark 3.1.1 spark offering as well, also you should be abl
SLF4J is only needed if you plan to use logging, please also download an SLF4J binding which will link the SLF4J API with the logging implementation of your choice. See the [SLF4J user manual](https://www.slf4j.org/manual.html) for more information.

For Spark 3.1:
- Install Cosmos DB Spark Connector, in your spark Cluster [com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12:4.12.0](https://search.maven.org/artifact/com.azure.cosmos.spark/azure-cosmos-spark_3-1_2-12/4.11.2/jar)
- Install Cosmos DB Spark Connector, in your spark Cluster [com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12:4.12.2](https://search.maven.org/artifact/com.azure.cosmos.spark/azure-cosmos-spark_3-1_2-12/4.12.2/jar)

For Spark 3.2:
- Install Cosmos DB Spark Connector, in your spark Cluster [com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.12.0](https://search.maven.org/artifact/com.azure.cosmos.spark/azure-cosmos-spark_3-2_2-12/4.11.2/jar)
- Install Cosmos DB Spark Connector, in your spark Cluster [com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.12.2](https://search.maven.org/artifact/com.azure.cosmos.spark/azure-cosmos-spark_3-2_2-12/4.12.2/jar)


The getting started guide is based on PySpark however you can use the equivalent scala version as well, and you can run the following code snippet in an Azure Databricks PySpark notebook.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[cosmos] case class ChangeFeedOffset
override def json(): String = jsonPersisted
}

private[spark] object ChangeFeedOffset {
private[cosmos] object ChangeFeedOffset {
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
private val IdPropertyName: String = "id"
private val StatePropertyName: String = "state"
private val InputPartitionsPropertyName: String = "partitions"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,14 @@ private[cosmos] class CosmosRowConverter(
.atStartOfDay()
.toInstant(ZoneOffset.UTC).toEpochMilli)
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone =>
convertToJsonNodeConditionally(LocalDate
val localDate = LocalDate
.ofEpochDay(rowData.asInstanceOf[Long])
.atStartOfDay()
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli)
val localTimestampInstant = Timestamp.valueOf(localDate).toInstant

convertToJsonNodeConditionally(
localDate
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(localTimestampInstant)).toEpochMilli)
}
case DateType if rowData.isInstanceOf[java.lang.Integer] =>
serializationConfig.serializationDateTimeConversionMode match {
Expand All @@ -332,10 +336,13 @@ private[cosmos] class CosmosRowConverter(
.atStartOfDay()
.toInstant(ZoneOffset.UTC).toEpochMilli)
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone =>
convertToJsonNodeConditionally(LocalDate
val localDate = LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue())
.atStartOfDay()
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli)
val localTimestampInstant = Timestamp.valueOf(localDate).toInstant
convertToJsonNodeConditionally(
localDate
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(localTimestampInstant)).toEpochMilli)
}
case DateType => convertToJsonNodeConditionally(rowData.asInstanceOf[Date].getTime)
case TimestampType if rowData.isInstanceOf[java.lang.Long] =>
Expand All @@ -358,7 +365,7 @@ private[cosmos] class CosmosRowConverter(
case SerializationDateTimeConversionModes.Default =>
convertToJsonNodeConditionally(rowData.asInstanceOf[java.lang.Integer])
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone |
SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone =>
SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone =>
val microsSinceEpoch = rowData.asInstanceOf[java.lang.Integer].longValue()
convertToJsonNodeConditionally(
Instant.ofEpochSecond(
Expand Down Expand Up @@ -444,13 +451,14 @@ private[cosmos] class CosmosRowConverter(
.toInstant(ZoneOffset.UTC).toEpochMilli,
classOf[JsonNode])
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone =>
val localDate = LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Long])
.atStartOfDay()
val localTimestampInstant = Timestamp.valueOf(localDate).toInstant
objectMapper.convertValue(
LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Long])
.atStartOfDay()
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli,
localDate
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(localTimestampInstant)).toEpochMilli,
classOf[JsonNode])

}

case DateType if rowData.isInstanceOf[java.lang.Integer] =>
Expand All @@ -465,11 +473,13 @@ private[cosmos] class CosmosRowConverter(
.toInstant(ZoneOffset.UTC).toEpochMilli,
classOf[JsonNode])
case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone =>
val localDate = LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue())
.atStartOfDay()
val localTimestampInstant = Timestamp.valueOf(localDate).toInstant
objectMapper.convertValue(
LocalDate
.ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue())
.atStartOfDay()
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli,
localDate
.toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(localTimestampInstant)).toEpochMilli,
classOf[JsonNode])
}
case DateType => objectMapper.convertValue(rowData.asInstanceOf[Date].getTime, classOf[JsonNode])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.spark.udf

import com.azure.cosmos.{CosmosAsyncClient, SparkBridgeInternal}
import com.azure.cosmos.implementation.SparkBridgeImplementationInternal
import com.azure.cosmos.implementation.SparkBridgeImplementationInternal.rangeToNormalizedRange
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState
import com.azure.cosmos.implementation.query.CompositeContinuationToken
import com.azure.cosmos.spark.{ChangeFeedOffset, CosmosClientCache, CosmosClientCacheItem, CosmosClientConfiguration, CosmosConfig, CosmosContainerConfig, Loan}
import org.apache.spark.sql.api.java.UDF2

import scala.collection.mutable

@SerialVersionUID(1L)
class CreateSpark2ContinuationsFromChangeFeedOffset extends UDF2[Map[String, String], String, Map[Int, Long]] {
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
override def call
(
userProvidedConfig: Map[String, String],
changeFeedOffset: String
): Map[Int, Long] = {

val effectiveUserConfig = CosmosConfig.getEffectiveConfig(None, None, userProvidedConfig)
val cosmosClientConfig = CosmosClientConfiguration(
effectiveUserConfig,
useEventualConsistency = false)
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved

val cosmosContainerConfig: CosmosContainerConfig =
CosmosContainerConfig.parseCosmosContainerConfig(effectiveUserConfig, None, None)

Loan(
List[Option[CosmosClientCacheItem]](
Some(CosmosClientCache(
cosmosClientConfig,
None,
s"UDF CreateSpark2ContinuationsFromChangeFeedOffset"
))
))
.to(cosmosClientCacheItems => {
createSpark2ContinuationsFromChangeFeedOffset(
cosmosClientCacheItems.head.get.client,
cosmosContainerConfig.database,
cosmosContainerConfig.container,
changeFeedOffset
)
})
}

private[this] def createSpark2ContinuationsFromChangeFeedOffset
(
client: CosmosAsyncClient,
databaseName: String,
containerName: String,
offsetJson: String
): Map[Int, Long] = {

val effectiveOffsetJson = if (offsetJson.indexOf("\n") == 2 && offsetJson.size > 2) {
offsetJson.substring(3)
} else {
offsetJson
}
val offset: ChangeFeedOffset = ChangeFeedOffset.fromJson(effectiveOffsetJson)

val container = client
.getDatabase(databaseName)
.getContainer(containerName)

val expectedContainerResourceId = container.read().block().getProperties.getResourceId

val pkRanges = SparkBridgeInternal
.getPartitionKeyRanges(container)

val lsnsByPkRangeId = mutable.Map[Int, Long]()

pkRanges
.foreach(pkRange => {
val normalizedRange = rangeToNormalizedRange(pkRange.toRange)

val effectiveChangeFeedState = ChangeFeedState
.fromString(
SparkBridgeImplementationInternal
.extractChangeFeedStateForRange(offset.changeFeedState, normalizedRange)
)

val containerResourceId = effectiveChangeFeedState.getContainerRid

if (!expectedContainerResourceId.equalsIgnoreCase(containerResourceId)) {
throw new IllegalArgumentException(
s"The provided change feed offset is for a different container (either completely different container " +
s"or container with same name but after being deleted and recreated). Name:$containerName " +
s"Expected ResourceId: $expectedContainerResourceId, " +
s"Actual ResourceId: $containerResourceId"
)
}

var minLsn: Option[CompositeContinuationToken] = None

effectiveChangeFeedState
.extractContinuationTokens()
.forEach(token => {

if (minLsn.isEmpty) {
minLsn = Some(token)
} else if (SparkBridgeImplementationInternal.toLsn(token.getToken) <
SparkBridgeImplementationInternal.toLsn(minLsn.get.getToken)) {
minLsn = Some(token)
}
})

if (minLsn.isDefined) {
lsnsByPkRangeId.put(
pkRange.getId.toInt,
SparkBridgeImplementationInternal.toLsn(minLsn.get.getToken))
}
})

lsnsByPkRangeId.toMap
}
}