From caddd5a947ec054c972e194bade4e50f48bf3497 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Fri, 5 Aug 2022 19:02:06 +0200 Subject: [PATCH] Fixing the SerializationDateTimeConversionMode `AlwaysEpochMillisecdsWithSystemDefaultTimezone` (#30266) * Fixing the SerializationDateTimeConversionMode `AlwaysEpochMillisecondsWithSystemDefaultTimezone` The `CosmosRowConverter` had a bug assuming that ZoneOffsets between two time zones would be constant - in fact in the 19xx years political rules defining summer/winter time have changed multiple times. To get the correct ZoneOffset the right "local" Instant (vs. Instant.now) needs to be used. * Adding changelog * Adding UDF to vonvert back ChangeFeedOffset to Map[Int, Long] to make it possible to save in hdfs as Spark2 continuations * Preparing azure-cosmos-spark 4.12.2 release * Updating test for time conversion --- eng/versioning/version_client.txt | 4 +- .../azure-cosmos-spark_3-1_2-12/CHANGELOG.md | 7 +- .../azure-cosmos-spark_3-1_2-12/README.md | 8 +- .../azure-cosmos-spark_3-1_2-12/pom.xml | 2 +- .../azure-cosmos-spark_3-2_2-12/CHANGELOG.md | 7 +- .../azure-cosmos-spark_3-2_2-12/README.md | 8 +- .../azure-cosmos-spark_3-2_2-12/pom.xml | 2 +- .../docs/quick-start.md | 4 +- .../azure/cosmos/spark/ChangeFeedOffset.scala | 2 +- .../cosmos/spark/CosmosRowConverter.scala | 38 ++++-- ...rk2ContinuationsFromChangeFeedOffset.scala | 120 ++++++++++++++++++ .../cosmos/spark/CosmosRowConverterSpec.scala | 46 ++++++- .../spark/SparkE2EChangeFeedITest.scala | 17 ++- 13 files changed, 224 insertions(+), 41 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/CreateSpark2ContinuationsFromChangeFeedOffset.scala diff --git a/eng/versioning/version_client.txt b/eng/versioning/version_client.txt index 2ef40e166785b..0cc4d3a0c9df9 100644 --- a/eng/versioning/version_client.txt +++ b/eng/versioning/version_client.txt @@ -88,8 +88,8 @@ com.azure:azure-cosmos;4.33.1;4.34.0-beta.1 com.azure:azure-cosmos-benchmark;4.0.1-beta.1;4.0.1-beta.1 com.azure:azure-cosmos-dotnet-benchmark;4.0.1-beta.1;4.0.1-beta.1 com.azure.cosmos.spark:azure-cosmos-spark_3_2-12;1.0.0-beta.1;1.0.0-beta.1 -com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12;4.12.1;4.13.0-beta.1 -com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12;4.12.1;4.13.0-beta.1 +com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12;4.12.1;4.12.2 +com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12;4.12.1;4.12.2 com.azure:azure-cosmos-encryption;1.4.1;1.5.0-beta.1 com.azure:azure-data-appconfiguration;1.3.5;1.4.0-beta.1 com.azure:azure-data-appconfiguration-perf;1.0.0-beta.1;1.0.0-beta.1 diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md index 917079e0abd54..f07baf5a76762 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md @@ -1,12 +1,9 @@ ## Release History -### 4.13.0-beta.1 (Unreleased) - -#### Features Added - -#### Breaking Changes +### 4.12.2 (2022-08-04) #### Bugs Fixed +* Fixed the SerializationDateTimeConversionMode `AlwaysEpochMillisecdsWithSystemDefaultTimezone` where ZoneOffset calculation could be wrong especially for dates in the 19xx years. - See [PR 30266](https://github.com/Azure/azure-sdk-for-java/pull/30266) #### Other Changes * Added support to allow config different account for throughput control - See [PR 30127](https://github.com/Azure/azure-sdk-for-java/pull/30127) diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/README.md b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/README.md index 3c0fcc03dbc03..8f525929d8887 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/README.md +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/README.md @@ -29,6 +29,8 @@ https://github.com/Azure/azure-sdk-for-java/issues/new #### azure-cosmos-spark_3-1_2-12 | Connector | Supported Spark Versions | Minimum Java Version | Supported Scala Versions | Supported Databricks Runtimes | |--------------| ------------------------ | -------------------- | ----------------------- | ----------------------------- | +| 4.12.2 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* | +| 4.12.1 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* | | 4.12.0 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* | | 4.11.2 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* | | 4.11.1 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* | @@ -60,6 +62,8 @@ https://github.com/Azure/azure-sdk-for-java/issues/new #### azure-cosmos-spark_3-2_2-12 | Connector | Supported Spark Versions | Minimum Java Version | Supported Scala Versions | Supported Databricks Runtimes | |-----------| ------------------------ | -------------------- | ----------------------- | ----------------------------- | +| 4.12.2 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* | +| 4.12.1 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* | | 4.12.0 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* | | 4.11.2 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* | | 4.11.1 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* | @@ -76,11 +80,11 @@ https://github.com/Azure/azure-sdk-for-java/issues/new ### Download You can use the maven coordinate of the jar to auto install the Spark Connector to your Databricks Runtime 8 from Maven: -`com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12:4.12.0` +`com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12:4.12.2` You can also integrate against Cosmos DB Spark Connector in your SBT project: ```scala -libraryDependencies += "com.azure.cosmos.spark" % "azure-cosmos-spark_3-1_2-12" % "4.12.0" +libraryDependencies += "com.azure.cosmos.spark" % "azure-cosmos-spark_3-1_2-12" % "4.12.2" ``` Cosmos DB Spark Connector is available on [Maven Central Repo](https://search.maven.org/search?q=g:com.azure.cosmos.spark). diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/pom.xml b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/pom.xml index 643c2437e0b96..afdcd9d1205f3 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/pom.xml +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/pom.xml @@ -11,7 +11,7 @@ com.azure.cosmos.spark azure-cosmos-spark_3-1_2-12 - 4.13.0-beta.1 + 4.12.2 jar https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/cosmos/azure-cosmos-spark_3-1_2-12 OLTP Spark 3.1 Connector for Azure Cosmos DB SQL API diff --git a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md index cc707d13646b8..3c105ef59638a 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md @@ -1,12 +1,9 @@ ## Release History -### 4.13.0-beta.1 (Unreleased) - -#### Features Added - -#### Breaking Changes +### 4.12.2 (2022-08-04) #### Bugs Fixed +* Fixed the SerializationDateTimeConversionMode `AlwaysEpochMillisecdsWithSystemDefaultTimezone` where ZoneOffset calculation could be wrong especially for dates in the 19xx years. - See [PR 30266](https://github.com/Azure/azure-sdk-for-java/pull/30266) #### Other Changes * Added support to allow config different account for throughput control - See [PR 30127](https://github.com/Azure/azure-sdk-for-java/pull/30127) diff --git a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/README.md b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/README.md index b53abeafc52a5..6bca8965d3748 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/README.md +++ b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/README.md @@ -28,6 +28,8 @@ https://github.com/Azure/azure-sdk-for-java/issues/new #### azure-cosmos-spark_3-2_2-12 | Connector | Supported Spark Versions | Minimum Java Version | Supported Scala Versions | Supported Databricks Runtimes | |-----------| ------------------------ | -------------------- | ----------------------- | ----------------------------- | +| 4.12.2 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* | +| 4.12.1 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* | | 4.12.0 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* | | 4.11.2 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* | | 4.11.1 | 3.2.0 - 3.2.1 | 8 | 2.12 | 10.\* | @@ -44,6 +46,8 @@ https://github.com/Azure/azure-sdk-for-java/issues/new #### azure-cosmos-spark_3-1_2-12 | Connector | Supported Spark Versions | Minimum Java Version | Supported Scala Versions | Supported Databricks Runtimes | |--------------| ------------------------ | -------------------- | ----------------------- | ----------------------------- | +| 4.12.2 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* | +| 4.12.1 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* | | 4.12.0 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* | | 4.11.2 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* | | 4.11.1 | 3.1.1 - 3.1.2 | 8 | 2.12 | 8.\*, 9.\* | @@ -75,11 +79,11 @@ https://github.com/Azure/azure-sdk-for-java/issues/new ### Download You can use the maven coordinate of the jar to auto install the Spark Connector to your Databricks Runtime 8 from Maven: -`com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.12.0` +`com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.12.2` You can also integrate against Cosmos DB Spark Connector in your SBT project: ```scala -libraryDependencies += "com.azure.cosmos.spark" % "azure-cosmos-spark_3-2_2-12" % "4.12.0" +libraryDependencies += "com.azure.cosmos.spark" % "azure-cosmos-spark_3-2_2-12" % "4.12.2" ``` Cosmos DB Spark Connector is available on [Maven Central Repo](https://search.maven.org/search?q=g:com.azure.cosmos.spark). diff --git a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/pom.xml b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/pom.xml index 987e316056a8a..20d38e03be627 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/pom.xml +++ b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/pom.xml @@ -11,7 +11,7 @@ com.azure.cosmos.spark azure-cosmos-spark_3-2_2-12 - 4.13.0-beta.1 + 4.12.2 jar https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/cosmos/azure-cosmos-spark_3-2_2-12 OLTP Spark 3.2 Connector for Azure Cosmos DB SQL API diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/quick-start.md b/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/quick-start.md index 255fd1c520fe4..a3c2935433e77 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/quick-start.md +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/quick-start.md @@ -23,10 +23,10 @@ You can use any other Spark 3.1.1 spark offering as well, also you should be abl SLF4J is only needed if you plan to use logging, please also download an SLF4J binding which will link the SLF4J API with the logging implementation of your choice. See the [SLF4J user manual](https://www.slf4j.org/manual.html) for more information. For Spark 3.1: -- Install Cosmos DB Spark Connector, in your spark Cluster [com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12:4.12.0](https://search.maven.org/artifact/com.azure.cosmos.spark/azure-cosmos-spark_3-1_2-12/4.11.2/jar) +- Install Cosmos DB Spark Connector, in your spark Cluster [com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12:4.12.2](https://search.maven.org/artifact/com.azure.cosmos.spark/azure-cosmos-spark_3-1_2-12/4.12.2/jar) For Spark 3.2: -- Install Cosmos DB Spark Connector, in your spark Cluster [com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.12.0](https://search.maven.org/artifact/com.azure.cosmos.spark/azure-cosmos-spark_3-2_2-12/4.11.2/jar) +- Install Cosmos DB Spark Connector, in your spark Cluster [com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.12.2](https://search.maven.org/artifact/com.azure.cosmos.spark/azure-cosmos-spark_3-2_2-12/4.12.2/jar) The getting started guide is based on PySpark however you can use the equivalent scala version as well, and you can run the following code snippet in an Azure Databricks PySpark notebook. diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedOffset.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedOffset.scala index 08ccc699e65f3..82e3b6254c55b 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedOffset.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedOffset.scala @@ -34,7 +34,7 @@ private[cosmos] case class ChangeFeedOffset override def json(): String = jsonPersisted } -private[spark] object ChangeFeedOffset { +private[cosmos] object ChangeFeedOffset { private val IdPropertyName: String = "id" private val StatePropertyName: String = "state" private val InputPartitionsPropertyName: String = "partitions" diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosRowConverter.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosRowConverter.scala index 9d5b39c986ff7..32c1ade502a44 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosRowConverter.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosRowConverter.scala @@ -317,10 +317,14 @@ private[cosmos] class CosmosRowConverter( .atStartOfDay() .toInstant(ZoneOffset.UTC).toEpochMilli) case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone => - convertToJsonNodeConditionally(LocalDate + val localDate = LocalDate .ofEpochDay(rowData.asInstanceOf[Long]) .atStartOfDay() - .toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli) + val localTimestampInstant = Timestamp.valueOf(localDate).toInstant + + convertToJsonNodeConditionally( + localDate + .toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(localTimestampInstant)).toEpochMilli) } case DateType if rowData.isInstanceOf[java.lang.Integer] => serializationConfig.serializationDateTimeConversionMode match { @@ -332,10 +336,13 @@ private[cosmos] class CosmosRowConverter( .atStartOfDay() .toInstant(ZoneOffset.UTC).toEpochMilli) case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone => - convertToJsonNodeConditionally(LocalDate + val localDate = LocalDate .ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue()) .atStartOfDay() - .toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli) + val localTimestampInstant = Timestamp.valueOf(localDate).toInstant + convertToJsonNodeConditionally( + localDate + .toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(localTimestampInstant)).toEpochMilli) } case DateType => convertToJsonNodeConditionally(rowData.asInstanceOf[Date].getTime) case TimestampType if rowData.isInstanceOf[java.lang.Long] => @@ -358,7 +365,7 @@ private[cosmos] class CosmosRowConverter( case SerializationDateTimeConversionModes.Default => convertToJsonNodeConditionally(rowData.asInstanceOf[java.lang.Integer]) case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone | - SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithUtcTimezone => + SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone => val microsSinceEpoch = rowData.asInstanceOf[java.lang.Integer].longValue() convertToJsonNodeConditionally( Instant.ofEpochSecond( @@ -444,13 +451,14 @@ private[cosmos] class CosmosRowConverter( .toInstant(ZoneOffset.UTC).toEpochMilli, classOf[JsonNode]) case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone => + val localDate = LocalDate + .ofEpochDay(rowData.asInstanceOf[java.lang.Long]) + .atStartOfDay() + val localTimestampInstant = Timestamp.valueOf(localDate).toInstant objectMapper.convertValue( - LocalDate - .ofEpochDay(rowData.asInstanceOf[java.lang.Long]) - .atStartOfDay() - .toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli, + localDate + .toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(localTimestampInstant)).toEpochMilli, classOf[JsonNode]) - } case DateType if rowData.isInstanceOf[java.lang.Integer] => @@ -465,11 +473,13 @@ private[cosmos] class CosmosRowConverter( .toInstant(ZoneOffset.UTC).toEpochMilli, classOf[JsonNode]) case SerializationDateTimeConversionModes.AlwaysEpochMillisecondsWithSystemDefaultTimezone => + val localDate = LocalDate + .ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue()) + .atStartOfDay() + val localTimestampInstant = Timestamp.valueOf(localDate).toInstant objectMapper.convertValue( - LocalDate - .ofEpochDay(rowData.asInstanceOf[java.lang.Integer].longValue()) - .atStartOfDay() - .toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(Instant.now)).toEpochMilli, + localDate + .toInstant(java.time.ZoneId.systemDefault.getRules().getOffset(localTimestampInstant)).toEpochMilli, classOf[JsonNode]) } case DateType => objectMapper.convertValue(rowData.asInstanceOf[Date].getTime, classOf[JsonNode]) diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/CreateSpark2ContinuationsFromChangeFeedOffset.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/CreateSpark2ContinuationsFromChangeFeedOffset.scala new file mode 100644 index 0000000000000..60847556558fd --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/CreateSpark2ContinuationsFromChangeFeedOffset.scala @@ -0,0 +1,120 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.spark.udf + +import com.azure.cosmos.{CosmosAsyncClient, SparkBridgeInternal} +import com.azure.cosmos.implementation.SparkBridgeImplementationInternal +import com.azure.cosmos.implementation.SparkBridgeImplementationInternal.rangeToNormalizedRange +import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState +import com.azure.cosmos.implementation.query.CompositeContinuationToken +import com.azure.cosmos.spark.{ChangeFeedOffset, CosmosClientCache, CosmosClientCacheItem, CosmosClientConfiguration, CosmosConfig, CosmosContainerConfig, Loan} +import org.apache.spark.sql.api.java.UDF2 + +import scala.collection.mutable + +@SerialVersionUID(1L) +class CreateSpark2ContinuationsFromChangeFeedOffset extends UDF2[Map[String, String], String, Map[Int, Long]] { + override def call + ( + userProvidedConfig: Map[String, String], + changeFeedOffset: String + ): Map[Int, Long] = { + + val effectiveUserConfig = CosmosConfig.getEffectiveConfig(None, None, userProvidedConfig) + val cosmosClientConfig = CosmosClientConfiguration( + effectiveUserConfig, + useEventualConsistency = false) + + val cosmosContainerConfig: CosmosContainerConfig = + CosmosContainerConfig.parseCosmosContainerConfig(effectiveUserConfig, None, None) + + Loan( + List[Option[CosmosClientCacheItem]]( + Some(CosmosClientCache( + cosmosClientConfig, + None, + s"UDF CreateSpark2ContinuationsFromChangeFeedOffset" + )) + )) + .to(cosmosClientCacheItems => { + createSpark2ContinuationsFromChangeFeedOffset( + cosmosClientCacheItems.head.get.client, + cosmosContainerConfig.database, + cosmosContainerConfig.container, + changeFeedOffset + ) + }) + } + + private[this] def createSpark2ContinuationsFromChangeFeedOffset + ( + client: CosmosAsyncClient, + databaseName: String, + containerName: String, + offsetJson: String + ): Map[Int, Long] = { + + val effectiveOffsetJson = if (offsetJson.indexOf("\n") == 2 && offsetJson.size > 2) { + offsetJson.substring(3) + } else { + offsetJson + } + val offset: ChangeFeedOffset = ChangeFeedOffset.fromJson(effectiveOffsetJson) + + val container = client + .getDatabase(databaseName) + .getContainer(containerName) + + val expectedContainerResourceId = container.read().block().getProperties.getResourceId + + val pkRanges = SparkBridgeInternal + .getPartitionKeyRanges(container) + + val lsnsByPkRangeId = mutable.Map[Int, Long]() + + pkRanges + .foreach(pkRange => { + val normalizedRange = rangeToNormalizedRange(pkRange.toRange) + + val effectiveChangeFeedState = ChangeFeedState + .fromString( + SparkBridgeImplementationInternal + .extractChangeFeedStateForRange(offset.changeFeedState, normalizedRange) + ) + + val containerResourceId = effectiveChangeFeedState.getContainerRid + + if (!expectedContainerResourceId.equalsIgnoreCase(containerResourceId)) { + throw new IllegalArgumentException( + s"The provided change feed offset is for a different container (either completely different container " + + s"or container with same name but after being deleted and recreated). Name:$containerName " + + s"Expected ResourceId: $expectedContainerResourceId, " + + s"Actual ResourceId: $containerResourceId" + ) + } + + var minLsn: Option[CompositeContinuationToken] = None + + effectiveChangeFeedState + .extractContinuationTokens() + .forEach(token => { + + if (minLsn.isEmpty) { + minLsn = Some(token) + } else if (SparkBridgeImplementationInternal.toLsn(token.getToken) < + SparkBridgeImplementationInternal.toLsn(minLsn.get.getToken)) { + minLsn = Some(token) + } + }) + + if (minLsn.isDefined) { + lsnsByPkRangeId.put( + pkRange.getId.toInt, + SparkBridgeImplementationInternal.toLsn(minLsn.get.getToken)) + } + }) + + lsnsByPkRangeId.toMap + } +} + diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosRowConverterSpec.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosRowConverterSpec.scala index 6cb65e61053dd..95931ba4da6a4 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosRowConverterSpec.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/CosmosRowConverterSpec.scala @@ -488,6 +488,7 @@ class CosmosRowConverterSpec extends UnitSpec with BasicLoggingTrait { val colName1 = "testCol1" val colName2 = "testCol2" + val colName3 = "testCol3" val testDate = LocalDate.of(1945, 12, 12) val testTimestamp = new java.sql.Timestamp( @@ -500,6 +501,9 @@ class CosmosRowConverterSpec extends UnitSpec with BasicLoggingTrait { val colVal1= convertToCatalyst(colVal1Raw).asInstanceOf[Int] colVal1 shouldEqual -8786 colVal1 shouldEqual testDate.toEpochDay + val testDateLocalInstant = Timestamp.valueOf(new java.sql.Timestamp( + 45, 11, 12, 0, 0, 0, 0) + .toLocalDateTime).toInstant // Catalyst optimizer will convert java.sql.Timestamp into epoch Microseconds val colVal2Raw = Timestamp.from(testTimestamp) @@ -508,18 +512,36 @@ class CosmosRowConverterSpec extends UnitSpec with BasicLoggingTrait { colVal2 shouldEqual -727530468000000L colVal2 shouldEqual ChronoUnit.MICROS.between(Instant.EPOCH, testTimestamp) + val testTimestampCol3 = new java.sql.Timestamp( + 100, 11, 12, 12, 12, 12, 0) + .toLocalDateTime.toInstant(ZoneOffset.UTC) + + val testDateCol3 = LocalDate.of(2000, 12, 12) + val colVal3Raw = new Date(100, 11, 12) + convertToCatalyst(colVal3Raw).isInstanceOf[Int] shouldEqual true + val colVal3= convertToCatalyst(colVal3Raw).asInstanceOf[Int] + colVal3 shouldEqual 11303 + colVal3 shouldEqual testDateCol3.toEpochDay + val testDateCol3Timestamp = Timestamp.valueOf(new java.sql.Timestamp( + 100, 11, 12, 0, 0, 0, 0) + .toLocalDateTime) + val testDateCol3LocalInstant = testDateCol3Timestamp.toInstant + val row = new GenericRowWithSchema( - Array(colVal1, colVal2), + Array(colVal1, colVal2, Seq(colVal3)), StructType(Seq(StructField(colName1, DateType), - StructField(colName2, TimestampType)))) + StructField(colName2, TimestampType), + StructField(colName3, ArrayType(DateType))))) var objectNode = defaultRowConverter.fromRowToObjectNode(row) objectNode.get(colName1).asLong() shouldEqual colVal1 objectNode.get(colName2).asLong() shouldEqual colVal2 + objectNode.get(colName3).asInstanceOf[ArrayNode].get(0).asLong() shouldEqual colVal3 objectNode = alwaysEpochMsRowConverter.fromRowToObjectNode(row) objectNode.get(colName1).asLong() shouldEqual testDate.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli objectNode.get(colName2).asLong() shouldEqual testTimestamp.toEpochMilli + objectNode.get(colName3).asInstanceOf[ArrayNode].get(0).asLong() shouldEqual testDateCol3.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli val originalDefaultTimezone = java.time.ZoneId.systemDefault try { @@ -528,9 +550,15 @@ class CosmosRowConverterSpec extends UnitSpec with BasicLoggingTrait { objectNode = alwaysEpochMsRowConverterWithSystemDefaultTimezone.fromRowToObjectNode(row) objectNode.get(colName1).asLong() shouldEqual testDate .atStartOfDay() - .toInstant(TimeZone.getTimeZone("America/Los_Angeles").toZoneId.getRules.getOffset(Instant.now)) + .toInstant(TimeZone.getTimeZone("America/Los_Angeles").toZoneId.getRules.getOffset(testDateLocalInstant)) .toEpochMilli + objectNode.get(colName1).asLong() shouldEqual -759081600000L objectNode.get(colName2).asLong() shouldEqual testTimestamp.toEpochMilli + objectNode.get(colName3).asInstanceOf[ArrayNode].get(0).asLong() shouldEqual testDateCol3 + .atStartOfDay() + .toInstant(TimeZone.getTimeZone("America/Los_Angeles").toZoneId.getRules.getOffset(testDateCol3LocalInstant)) + .toEpochMilli + objectNode.get(colName3).asInstanceOf[ArrayNode].get(0).asLong() shouldEqual 976608000000L } finally { TimeZone.setDefault(TimeZone.getTimeZone(originalDefaultTimezone.getId)) } @@ -538,6 +566,8 @@ class CosmosRowConverterSpec extends UnitSpec with BasicLoggingTrait { objectNode = alwaysEpochMsRowConverterNonNull.fromRowToObjectNode(row) objectNode.get(colName1).asLong() shouldEqual testDate.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli objectNode.get(colName2).asLong() shouldEqual testTimestamp.toEpochMilli + objectNode.get(colName3).asInstanceOf[ArrayNode].get(0).asLong() shouldEqual + testDateCol3.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli try { TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) @@ -545,9 +575,17 @@ class CosmosRowConverterSpec extends UnitSpec with BasicLoggingTrait { objectNode = alwaysEpochMsRowConverterNonNullWithSystemDefaultTimezone.fromRowToObjectNode(row) objectNode.get(colName1).asLong() shouldEqual testDate .atStartOfDay() - .toInstant(TimeZone.getTimeZone("America/Los_Angeles").toZoneId.getRules.getOffset(Instant.now)) + .toInstant(TimeZone.getTimeZone("America/Los_Angeles").toZoneId.getRules.getOffset(testDateLocalInstant)) .toEpochMilli + + objectNode.get(colName1).asLong() shouldEqual -759081600000L objectNode.get(colName2).asLong() shouldEqual testTimestamp.toEpochMilli + + objectNode.get(colName3).asInstanceOf[ArrayNode].get(0).asLong() shouldEqual testDateCol3 + .atStartOfDay() + .toInstant(TimeZone.getTimeZone("America/Los_Angeles").toZoneId.getRules.getOffset(testDateCol3LocalInstant)) + .toEpochMilli + objectNode.get(colName3).asInstanceOf[ArrayNode].get(0).asLong() shouldEqual 976608000000L } finally { TimeZone.setDefault(TimeZone.getTimeZone(originalDefaultTimezone.getId)) } diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala index ecdabfac3811f..f2a8959de8004 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala @@ -4,11 +4,10 @@ package com.azure.cosmos.spark import com.azure.cosmos.SparkBridgeInternal import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState - import com.azure.cosmos.implementation.{TestConfigurations, Utils} import com.azure.cosmos.models.PartitionKey import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait -import com.azure.cosmos.spark.udf.{CreateChangeFeedOffsetFromSpark2, GetFeedRangeForPartitionKeyValue} +import com.azure.cosmos.spark.udf.{CreateChangeFeedOffsetFromSpark2, CreateSpark2ContinuationsFromChangeFeedOffset, GetFeedRangeForPartitionKeyValue} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.functions import org.apache.spark.sql.types._ @@ -513,6 +512,20 @@ class SparkE2EChangeFeedITest databaseResourceIdAndTokenMap._2 ) + val tokenMapAfterBackAndForthConversion: Map[Int, Long] = new CreateSpark2ContinuationsFromChangeFeedOffset() + .call( + cfg, + migratedOffset + ) + + tokenMapAfterBackAndForthConversion.size shouldBe databaseResourceIdAndTokenMap._2.size + databaseResourceIdAndTokenMap + ._2 + .foreach(pkRangeLsnPair => { + tokenMapAfterBackAndForthConversion.get(pkRangeLsnPair._1).isDefined shouldEqual true + tokenMapAfterBackAndForthConversion.get(pkRangeLsnPair._1).get shouldEqual pkRangeLsnPair._2 + }) + if (hdfs.exists(new Path(startOffsetFileLocation))) { hdfs.copyToLocalFile(true, new Path(startOffsetFileLocation), new Path(startOffsetFileLocation + ".bak")) }