Skip to content

Commit

Permalink
Full Fidelity Change Feed Processor with public surface changes (#30399)
Browse files Browse the repository at this point in the history
* changes for FFCF pull model

* Update FullFidelityChangeFeedTest.java

* spark changes

* Update CosmosRowConverter.scala

* Update SparkE2EChangeFeedITest.scala

* Initial changes for change feed processor in full fidelity mode

* Setting change feed policy on container for full fidelity in testing

* Added changes for type casting change feed processor response to ChangeFeedProcessorResponse object

* Renamed ChangeFeedProcessorResponse to ChangeFeedProcessorItem

* Added change feed processor new lease changes

* Added java docs for ChangeFeedOperationType

* Added timeToLiveExpired for change feed meta data

* Added logic to create leases based on EPK feed range

* Added more logic for feed range based lease

* Added the lease token feed range epk implementation code to CFP

* Added more TODOs

* Code review comments, added lease structure changes and many more

* Fixed spotbugs related to package rename and change

* Added todo for changelog once we finalize the public surface API

* Removed TODOs, finalized API naming

* Renamed to handleAllChanges()

* Updated Beta API annotation to 4.35.0

* Added comments to Change Feed mode

* Updated the changelog

* Updated the changelog

* Added toJsonNode() API to convert ChangeFeedProcessorItem to JsonNode

* Updated beta version to 4.36.0 since 4.35.0 is already  released

* Added 4.35.0 beta version field back to fix rev api check

* Renamed incremental and full fidelity mode to new names on public surface area

* Fixed @deprecated annotation

* Fixed test cases

* Code review comments

* Simplifying full fidelity change feed tests

* Simplified more tests with less throughput for container creation

* Disabled emulator test for checking previous present on replace

* Reverted ChangeFeedMode to implementation class. Reverted the name changes of enums

* Fixed unit tests for incremental and full_fidelity mode

* Disabled full fidelity new tests for testing

* Enabled the tests and reduced throughput on ThroughputControlTests

* Fixed compilation error

* API review comments

* API renames, commented out tests for testing

* Removed sample code

* Switching classes to test the emulator CI

* Disabled one test to debug the CI

* Disabled full fidelity change feed processor tests for emulator CI

* Fixed flaky tests

* Enabled basic tests for CFP in FFCF mode

Co-authored-by: simorenoh <simonmorenohe@gmail.com>
  • Loading branch information
kushagraThapar and simorenoh committed Sep 29, 2022
1 parent 17ecaee commit 2262467
Show file tree
Hide file tree
Showing 40 changed files with 2,045 additions and 643 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class BasicOperationTest {
private CosmosContainer createdTestContainer;
private CosmosContainer createdResultsContainer;

@BeforeClass(groups = {"emulator"}, timeOut = TIMEOUT)
@BeforeClass(groups = {"simple"}, timeOut = TIMEOUT)
public void before_BasicOperationTest() {
assertThat(this.client).isNull();
CosmosClientBuilder clientBuilder = new CosmosClientBuilder()
Expand All @@ -60,7 +60,7 @@ public void before_BasicOperationTest() {
this.createdResultsContainer = this.createdDatabase.getContainer("results" + suffix);
}

@AfterClass(groups = {"emulator"}, timeOut = TIMEOUT, alwaysRun = true)
@AfterClass(groups = {"simple"}, timeOut = TIMEOUT, alwaysRun = true)
public void afterClass() {
assertThat(this.client).isNotNull();
assertThat(this.createdDatabase).isNotNull();
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.14.0-beta.1 (Unreleased)

#### Features Added
* Added new config options for Change Feed Modes, Incremental as `LatestVersion` and Full Fidelity as `AllVersionsAndDeletes` changes - See [PR 30399](https://github.com/Azure/azure-sdk-for-java/pull/30399)
* Added option to emit client-side metrics via micrometer.io MeterRegistry. - See [PR 30065](https://github.com/Azure/azure-sdk-for-java/pull/30065)

#### Breaking Changes
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.14.0-beta.1 (Unreleased)

#### Features Added
* Added new config options for Change Feed Modes, Incremental as `LatestVersion` and Full Fidelity as `AllVersionsAndDeletes` changes - See [PR 30399](https://github.com/Azure/azure-sdk-for-java/pull/30399)
* Added option to emit client-side metrics via micrometer.io MeterRegistry. - See [PR 30065](https://github.com/Azure/azure-sdk-for-java/pull/30065)

#### Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ Used to influence the json serialization/deserialization behavior
| `spark.cosmos.serialization.dateTimeConversionMode` | `Default` | The date/time conversion mode (`Default`, `AlwaysEpochMilliseconds`, `AlwaysEpochMillisecondsWithSystemDefaultTimezone`). With `Default` the standard Spark 3.* behavior is used (`java.sql.Date`/`java.time.LocalDate` are converted to EpochDay, `java.sql.Timestamp`/`java.time.Instant` are converted to MicrosecondsFromEpoch). With `AlwaysEpochMilliseconds` the same behavior the Cosmos DB connector for Spark 2.4 used is applied - `java.sql.Date`, `java.time.LocalDate`, `java.sql.Timestamp` and `java.time.Instant` are converted to MillisecondsFromEpoch. The behavior for `AlwaysEpochMillisecondsWithSystemDefaultTimezone` is identical with `AlwaysEpochMilliseconds` except that it will assume System default time zone / Spark session time zone (specified via `spark.sql.session.timezone`) instead of UTC when the date/time to be parsed has no explicit time zone.|

#### Change feed (only for Spark-Streaming using `cosmos.oltp.changeFeed` data source, which is read-only) configuration
| Config Property Name | Default | Description |
| :--- | :---- | :--- |
| `spark.cosmos.changeFeed.startFrom` | `Beginning` | ChangeFeed Start from settings (`Now`, `Beginning` or a certain point in time (UTC) for example `2020-02-10T14:15:03`) - the default value is `Beginning`. If the write config contains a `checkpointLocation` and any checkpoints exist, the stream is always continued independent of the `spark.cosmos.changeFeed.startFrom` settings - you need to change `checkpointLocation` or delete checkpoints to restart the stream if that is the intention. |
| `spark.cosmos.changeFeed.mode` | `Incremental` | ChangeFeed mode (`Incremental` or `FullFidelity`) - NOTE: `FullFidelity` is in experimental state right now. It requires that the subscription/account has been enabled for the private preview and there are known breaking changes that will happen for `FullFidelity` (schema of the returned documents). It is recommended to only use `FullFidelity` for non-production scenarios at this point. |
| `spark.cosmos.changeFeed.itemCountPerTriggerHint` | None | Approximate maximum number of items read from change feed for each micro-batch/trigger |
| `spark.cosmos.changeFeed.batchCheckpointLocation` | None | Can be used to generate checkpoints when using change feed queries in batch mode - and proceeding on the next iteration where the previous left off. |
| Config Property Name | Default | Description |
| :--- |:----------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `spark.cosmos.changeFeed.startFrom` | `Beginning` | ChangeFeed Start from settings (`Now`, `Beginning` or a certain point in time (UTC) for example `2020-02-10T14:15:03`) - the default value is `Beginning`. If the write config contains a `checkpointLocation` and any checkpoints exist, the stream is always continued independent of the `spark.cosmos.changeFeed.startFrom` settings - you need to change `checkpointLocation` or delete checkpoints to restart the stream if that is the intention. |
| `spark.cosmos.changeFeed.mode` | `Incremental/LatestVersion` | ChangeFeed mode (`Incremental/LatestVersion` or `FullFidelity/AllVersionsAndDeletes`) - NOTE: `FullFidelity/AllVersionsAndDeletes` is in experimental state right now. It requires that the subscription/account has been enabled for the private preview and there are known breaking changes that will happen for `FullFidelity/AllVersionsAndDeletes` (schema of the returned documents). It is recommended to only use `FullFidelity/AllVersionsAndDeletes` for non-production scenarios at this point. |
| `spark.cosmos.changeFeed.itemCountPerTriggerHint` | None | Approximate maximum number of items read from change feed for each micro-batch/trigger |
| `spark.cosmos.changeFeed.batchCheckpointLocation` | None | Can be used to generate checkpoints when using change feed queries in batch mode - and proceeding on the next iteration where the previous left off. |

#### Json conversion configuration
| Config Property Name | Default | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ private case class ChangeFeedPartitionReader

var factoryMethod: java.util.function.Function[JsonNode, _] = (_: JsonNode) => {}
cosmosChangeFeedConfig.changeFeedMode match {
case ChangeFeedModes.Incremental =>
case ChangeFeedModes.Incremental | ChangeFeedModes.LatestVersion =>
factoryMethod = (jsonNode: JsonNode) => changeFeedItemFactoryMethod(jsonNode)
case ChangeFeedModes.FullFidelity =>
case ChangeFeedModes.FullFidelity | ChangeFeedModes.AllVersionsAndDeletes =>
factoryMethod = (jsonNode: JsonNode) => changeFeedItemFactoryMethodV1(jsonNode)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ private class ChangeFeedTable(val session: SparkSession,
userConfig: Map[String, String]): StructType = {

val defaultSchema: StructType = changeFeedConfig.changeFeedMode match {
case ChangeFeedModes.Incremental =>
case ChangeFeedModes.Incremental | ChangeFeedModes.LatestVersion =>
ChangeFeedTable.defaultIncrementalChangeFeedSchemaForInferenceDisabled
case ChangeFeedModes.FullFidelity =>
case ChangeFeedModes.FullFidelity | ChangeFeedModes.AllVersionsAndDeletes =>
ChangeFeedTable.defaultFullFidelityChangeFeedSchemaForInferenceDisabled
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,8 @@ private object ChangeFeedModes extends Enumeration {

val Incremental: ChangeFeedModes.Value = Value("Incremental")
val FullFidelity: ChangeFeedModes.Value = Value("FullFidelity")
val LatestVersion: ChangeFeedModes.Value = Value("LatestVersion")
val AllVersionsAndDeletes: ChangeFeedModes.Value = Value("AllVersionsAndDeletes")
}

private object ChangeFeedStartFromModes extends Enumeration {
Expand Down Expand Up @@ -1108,8 +1110,8 @@ private case class CosmosChangeFeedConfig
}

this.changeFeedMode match {
case ChangeFeedModes.Incremental => options
case ChangeFeedModes.FullFidelity => options.fullFidelity()
case ChangeFeedModes.Incremental | ChangeFeedModes.LatestVersion => options
case ChangeFeedModes.FullFidelity | ChangeFeedModes.AllVersionsAndDeletes => options.allVersionsAndDeletes()
}
}

Expand Down Expand Up @@ -1148,7 +1150,7 @@ private object CosmosChangeFeedConfig {
mandatory = false,
defaultValue = Some(ChangeFeedModes.Incremental),
parseFromStringFunction = changeFeedModeString => CosmosConfigEntry.parseEnumeration(changeFeedModeString, ChangeFeedModes),
helpMessage = "ChangeFeed mode (Incremental or FullFidelity)")
helpMessage = "ChangeFeed mode (Incremental/LatestVersion or FullFidelity/AllVersionsAndDeletes)")

private val maxItemCountPerTriggerHint = CosmosConfigEntry[Long](
key = CosmosConfigNames.ChangeFeedItemCountPerTriggerHint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,22 @@ class CosmosConfigSpec extends UnitSpec {
config.maxItemCountPerTrigger.get shouldEqual 54
}

it should "parse change feed config with PIT start mode" in {
it should "parse change feed config for all versions and deletes with incorrect casing" in {
val changeFeedConfig = Map(
"spark.cosmos.changeFeed.mode" -> "AllVersionsANDDELETES",
"spark.cosmos.changeFeed.STARTfrom" -> "NOW",
"spark.cosmos.changeFeed.itemCountPerTriggerHint" -> "54"
)

val config = CosmosChangeFeedConfig.parseCosmosChangeFeedConfig(changeFeedConfig)

config.changeFeedMode shouldEqual ChangeFeedModes.AllVersionsAndDeletes
config.startFrom shouldEqual ChangeFeedStartFromModes.Now
config.startFromPointInTime shouldEqual None
config.maxItemCountPerTrigger.get shouldEqual 54
}

it should "parse change feed config (incremental) with PIT start mode" in {
val changeFeedConfig = Map(
"spark.cosmos.changeFeed.mode" -> "incremental",
"spark.cosmos.changeFeed.STARTfrom" -> "2019-12-31T10:45:10Z",
Expand All @@ -556,6 +571,24 @@ class CosmosConfigSpec extends UnitSpec {
config.maxItemCountPerTrigger.get shouldEqual 54
}

it should "parse change feed config (latestversion) with PIT start mode" in {
val changeFeedConfig = Map(
"spark.cosmos.changeFeed.mode" -> "latestversion",
"spark.cosmos.changeFeed.STARTfrom" -> "2019-12-31T10:45:10Z",
"spark.cosmos.changeFeed.itemCountPerTriggerHint" -> "54"
)

val config = CosmosChangeFeedConfig.parseCosmosChangeFeedConfig(changeFeedConfig)

config.changeFeedMode shouldEqual ChangeFeedModes.LatestVersion
config.startFrom shouldEqual ChangeFeedStartFromModes.PointInTime
Instant.from(config.startFromPointInTime.get) shouldEqual
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXXX")
.parse("2019-12-31T10:45:10Z")
.toInstant
config.maxItemCountPerTrigger.get shouldEqual 54
}

it should "complain when parsing invalid change feed mode" in {
val changeFeedConfig = Map(
"spark.cosmos.changeFeed.mode" -> "Whatever",
Expand All @@ -569,7 +602,7 @@ class CosmosConfigSpec extends UnitSpec {
} catch {
case e: Exception => e.getMessage shouldEqual
"invalid configuration for spark.cosmos.changeFeed.mode:Whatever. Config description: " +
"ChangeFeed mode (Incremental or FullFidelity)"
"ChangeFeed mode (Incremental/LatestVersion or FullFidelity/AllVersionsAndDeletes)"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class PartitionMetadataSpec extends UnitSpec {
key shouldEqual s"$databaseName/$collectionName/${normalizedRange.min}-${normalizedRange.max}"
}

it should "create instance with valid parameters via apply" in {
it should "create instance with valid parameters via apply in incremental mode" in {

val clientConfig = CosmosClientConfiguration(
UUID.randomUUID().toString,
Expand Down Expand Up @@ -92,7 +92,79 @@ class PartitionMetadataSpec extends UnitSpec {
docCount,
docSizeInKB,
firstLsn,
createChangeFeedState(latestLsn))
createChangeFeedState(latestLsn, "INCREMENTAL"))

viaCtor.cosmosClientConfig should be theSameInstanceAs viaApply.cosmosClientConfig
viaCtor.cosmosClientConfig should be theSameInstanceAs clientConfig
viaCtor.cosmosContainerConfig should be theSameInstanceAs viaApply.cosmosContainerConfig
viaCtor.cosmosContainerConfig should be theSameInstanceAs containerConfig
viaCtor.feedRange shouldEqual viaApply.feedRange
viaCtor.feedRange shouldEqual normalizedRange
viaCtor.documentCount shouldEqual viaApply.documentCount
viaCtor.documentCount shouldEqual docCount
viaCtor.totalDocumentSizeInKB shouldEqual viaApply.totalDocumentSizeInKB
viaCtor.totalDocumentSizeInKB shouldEqual docSizeInKB
viaCtor.latestLsn shouldEqual viaApply.latestLsn
viaCtor.latestLsn shouldEqual latestLsn
viaCtor.firstLsn shouldEqual viaApply.firstLsn
viaCtor.firstLsn.get shouldEqual latestLsn - 10
viaCtor.lastUpdated.get should be >= nowEpochMs
viaCtor.lastUpdated.get shouldEqual viaCtor.lastRetrieved.get
viaApply.lastUpdated.get should be >= nowEpochMs
viaApply.lastUpdated.get shouldEqual viaApply.lastRetrieved.get
}

it should "create instance with valid parameters via apply in full fidelity mode" in {

val clientConfig = CosmosClientConfiguration(
UUID.randomUUID().toString,
UUID.randomUUID().toString,
None,
UUID.randomUUID().toString,
useGatewayMode = false,
useEventualConsistency = true,
enableClientTelemetry = false,
disableTcpConnectionEndpointRediscovery = false,
clientTelemetryEndpoint = None,
preferredRegionsList = Option.empty)

val containerConfig = CosmosContainerConfig(UUID.randomUUID().toString, UUID.randomUUID().toString)
val latestLsn = rnd.nextInt(10000000) + 1
val firstLsn = Some(latestLsn - 10L)

val normalizedRange = NormalizedRange(UUID.randomUUID().toString, UUID.randomUUID().toString)
val docCount = rnd.nextInt()
val docSizeInKB = rnd.nextInt()

val nowEpochMs = Instant.now.toEpochMilli
val createdAt = new AtomicLong(nowEpochMs)
val lastRetrievedAt = new AtomicLong(nowEpochMs)

val viaCtor = PartitionMetadata(
Map[String, String](),
clientConfig,
None,
containerConfig,
normalizedRange,
docCount,
docSizeInKB,
firstLsn,
latestLsn,
0,
None,
createdAt,
lastRetrievedAt)

val viaApply = PartitionMetadata(
Map[String, String](),
clientConfig,
None,
containerConfig,
normalizedRange,
docCount,
docSizeInKB,
firstLsn,
createChangeFeedState(latestLsn, "FULL_FIDELITY"))

viaCtor.cosmosClientConfig should be theSameInstanceAs viaApply.cosmosClientConfig
viaCtor.cosmosClientConfig should be theSameInstanceAs clientConfig
Expand Down Expand Up @@ -581,16 +653,17 @@ class PartitionMetadataSpec extends UnitSpec {
//scalastyle:on null
//scalastyle:on multiple.string.literals

private[this] def createChangeFeedState(latestLsn: Long) = {
private[this] def createChangeFeedState(latestLsn: Long, mode: String) = {
val collectionRid = UUID.randomUUID().toString

val json = String.format(
"{\"V\":1," +
"\"Rid\":\"%s\"," +
"\"Mode\":\"INCREMENTAL\"," +
"\"Mode\":\"%s\"," +
"\"StartFrom\":{\"Type\":\"BEGINNING\"}," +
"\"Continuation\":%s}",
collectionRid,
mode,
String.format(
"{\"V\":1," +
"\"Rid\":\"%s\"," +
Expand Down
Loading

0 comments on commit 2262467

Please sign in to comment.