Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full Fidelity Change Feed Processor with public surface changes #30399

Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
b07a55e
changes for FFCF pull model
simorenoh Jul 27, 2022
b530f2b
Update FullFidelityChangeFeedTest.java
simorenoh Jul 27, 2022
32becf2
spark changes
simorenoh Jul 27, 2022
f404260
Update CosmosRowConverter.scala
simorenoh Jul 27, 2022
5a606c8
Update SparkE2EChangeFeedITest.scala
simorenoh Jul 27, 2022
a3e77d9
Initial changes for change feed processor in full fidelity mode
kushagraThapar Jul 28, 2022
b3f5762
Merge branch 'pull-model-ffcf' into full_fidelity_cfp
kushagraThapar Jul 28, 2022
5fcf7c9
Setting change feed policy on container for full fidelity in testing
kushagraThapar Jul 28, 2022
5645ab3
Merge branch 'pull-model-ffcf' into full_fidelity_cfp
kushagraThapar Jul 28, 2022
f42d4a6
Added changes for type casting change feed processor response to Chan…
kushagraThapar Jul 28, 2022
452d43e
Renamed ChangeFeedProcessorResponse to ChangeFeedProcessorItem
kushagraThapar Jul 29, 2022
8c7cd80
Added change feed processor new lease changes
kushagraThapar Jul 29, 2022
f7628bc
Merged latest main and resolved conflicts
kushagraThapar Jul 29, 2022
c075a79
Merge branch 'full_fidelity_cfp' into full_fidelity_cfp_with_lease_ch…
kushagraThapar Jul 29, 2022
40664a6
Added java docs for ChangeFeedOperationType
kushagraThapar Jul 29, 2022
9ab4667
Added timeToLiveExpired for change feed meta data
kushagraThapar Jul 29, 2022
9ddcd3e
Merge branch 'full_fidelity_cfp' into full_fidelity_cfp_with_lease_ch…
kushagraThapar Jul 29, 2022
542b324
Added logic to create leases based on EPK feed range
kushagraThapar Jul 29, 2022
abbd786
Added more logic for feed range based lease
kushagraThapar Jul 29, 2022
8e54f71
Added the lease token feed range epk implementation code to CFP
kushagraThapar Aug 2, 2022
f6a39e7
Added more TODOs
kushagraThapar Aug 3, 2022
24b62b6
Code review comments, added lease structure changes and many more
kushagraThapar Aug 5, 2022
9908df4
Fixed spotbugs related to package rename and change
kushagraThapar Aug 5, 2022
7f5e3b5
Added todo for changelog once we finalize the public surface API
kushagraThapar Aug 5, 2022
6cc8da4
Removed TODOs, finalized API naming
kushagraThapar Aug 9, 2022
702d13d
Renamed to handleAllChanges()
kushagraThapar Aug 9, 2022
19a7482
Merged latest main and resolved conflicts
kushagraThapar Aug 9, 2022
585c3e3
Updated Beta API annotation to 4.35.0
kushagraThapar Aug 9, 2022
1156724
Added comments to Change Feed mode
kushagraThapar Aug 9, 2022
e70a5cc
Updated the changelog
kushagraThapar Aug 9, 2022
976321c
Merged latest main and resolved conflicts
kushagraThapar Aug 11, 2022
bdfecfe
Updated the changelog
kushagraThapar Aug 11, 2022
1aee86b
Added toJsonNode() API to convert ChangeFeedProcessorItem to JsonNode
kushagraThapar Aug 23, 2022
62233c2
Merge branch 'main' into full_fidelity_cfp_with_public_surface_changes
kushagraThapar Aug 23, 2022
cdb7ca4
Updated beta version to 4.36.0 since 4.35.0 is already released
kushagraThapar Aug 23, 2022
93ee557
Added 4.35.0 beta version field back to fix rev api check
kushagraThapar Aug 23, 2022
e50f7ea
Merge branch 'main' into full_fidelity_cfp_with_public_surface_changes
kushagraThapar Sep 20, 2022
0dd3d7a
Renamed incremental and full fidelity mode to new names on public sur…
kushagraThapar Sep 20, 2022
4e7c7ce
Fixed @deprecated annotation
kushagraThapar Sep 21, 2022
c20d646
Fixed test cases
kushagraThapar Sep 21, 2022
f13bb05
Code review comments
kushagraThapar Sep 22, 2022
f3f9caa
Simplifying full fidelity change feed tests
kushagraThapar Sep 22, 2022
4527599
Simplified more tests with less throughput for container creation
kushagraThapar Sep 22, 2022
53d9ff6
Disabled emulator test for checking previous present on replace
kushagraThapar Sep 22, 2022
de1af5b
Reverted ChangeFeedMode to implementation class. Reverted the name ch…
kushagraThapar Sep 23, 2022
f314c57
Fixed unit tests for incremental and full_fidelity mode
kushagraThapar Sep 23, 2022
f5a1062
Disabled full fidelity new tests for testing
kushagraThapar Sep 24, 2022
272776c
Enabled the tests and reduced throughput on ThroughputControlTests
kushagraThapar Sep 24, 2022
3ac81c2
Merged latest main and resolved conflicts
kushagraThapar Sep 26, 2022
6fd2f8a
Fixed compilation error
kushagraThapar Sep 26, 2022
cf12c16
API review comments
kushagraThapar Sep 26, 2022
d0e4ddf
API renames, commented out tests for testing
kushagraThapar Sep 27, 2022
68dc9f7
Removed sample code
kushagraThapar Sep 27, 2022
ec91b91
Switching classes to test the emulator CI
kushagraThapar Sep 27, 2022
0cde31a
Disabled one test to debug the CI
kushagraThapar Sep 28, 2022
ef649e2
Merged latest main and resolved conflicts
kushagraThapar Sep 28, 2022
9aded75
Disabled full fidelity change feed processor tests for emulator CI
kushagraThapar Sep 28, 2022
32ad4d9
Fixed flaky tests
kushagraThapar Sep 28, 2022
653e49d
Enabled basic tests for CFP in FFCF mode
kushagraThapar Sep 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ Used to influence the json serialization/deserialization behavior
| `spark.cosmos.serialization.dateTimeConversionMode` | `Default` | The date/time conversion mode (`Default`, `AlwaysEpochMilliseconds`, `AlwaysEpochMillisecondsWithSystemDefaultTimezone`). With `Default` the standard Spark 3.* behavior is used (`java.sql.Date`/`java.time.LocalDate` are converted to EpochDay, `java.sql.Timestamp`/`java.time.Instant` are converted to MicrosecondsFromEpoch). With `AlwaysEpochMilliseconds` the same behavior the Cosmos DB connector for Spark 2.4 used is applied - `java.sql.Date`, `java.time.LocalDate`, `java.sql.Timestamp` and `java.time.Instant` are converted to MillisecondsFromEpoch. The behavior for `AlwaysEpochMillisecondsWithSystemDefaultTimezone` is identical with `AlwaysEpochMilliseconds` except that it will assume System default time zone / Spark session time zone (specified via `spark.sql.session.timezone`) instead of UTC when the date/time to be parsed has no explicit time zone.|

#### Change feed (only for Spark-Streaming using `cosmos.oltp.changeFeed` data source, which is read-only) configuration
| Config Property Name | Default | Description |
| :--- | :---- | :--- |
| `spark.cosmos.changeFeed.startFrom` | `Beginning` | ChangeFeed Start from settings (`Now`, `Beginning` or a certain point in time (UTC) for example `2020-02-10T14:15:03`) - the default value is `Beginning`. If the write config contains a `checkpointLocation` and any checkpoints exist, the stream is always continued independent of the `spark.cosmos.changeFeed.startFrom` settings - you need to change `checkpointLocation` or delete checkpoints to restart the stream if that is the intention. |
| `spark.cosmos.changeFeed.mode` | `Incremental` | ChangeFeed mode (`Incremental` or `FullFidelity`) - NOTE: `FullFidelity` is in experimental state right now. It requires that the subscription/account has been enabled for the private preview and there are known breaking changes that will happen for `FullFidelity` (schema of the returned documents). It is recommended to only use `FullFidelity` for non-production scenarios at this point. |
| `spark.cosmos.changeFeed.itemCountPerTriggerHint` | None | Approximate maximum number of items read from change feed for each micro-batch/trigger |
| `spark.cosmos.changeFeed.batchCheckpointLocation` | None | Can be used to generate checkpoints when using change feed queries in batch mode - and proceeding on the next iteration where the previous left off. |
| Config Property Name | Default | Description |
| :--- |:----------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
kushagraThapar marked this conversation as resolved.
Show resolved Hide resolved
| `spark.cosmos.changeFeed.startFrom` | `Beginning` | ChangeFeed Start from settings (`Now`, `Beginning` or a certain point in time (UTC) for example `2020-02-10T14:15:03`) - the default value is `Beginning`. If the write config contains a `checkpointLocation` and any checkpoints exist, the stream is always continued independent of the `spark.cosmos.changeFeed.startFrom` settings - you need to change `checkpointLocation` or delete checkpoints to restart the stream if that is the intention. |
| `spark.cosmos.changeFeed.mode` | `Incremental/LatestVersion` | ChangeFeed mode (`Incremental/LatestVersion` or `FullFidelity/AllVersionsAndDeletes`) - NOTE: `FullFidelity/AllVersionsAndDeletes` is in experimental state right now. It requires that the subscription/account has been enabled for the private preview and there are known breaking changes that will happen for `FullFidelity/AllVersionsAndDeletes` (schema of the returned documents). It is recommended to only use `FullFidelity/AllVersionsAndDeletes` for non-production scenarios at this point. |
| `spark.cosmos.changeFeed.itemCountPerTriggerHint` | None | Approximate maximum number of items read from change feed for each micro-batch/trigger |
| `spark.cosmos.changeFeed.batchCheckpointLocation` | None | Can be used to generate checkpoints when using change feed queries in batch mode - and proceeding on the next iteration where the previous left off. |

#### Json conversion configuration
| Config Property Name | Default | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ package com.azure.cosmos.implementation

import com.azure.cosmos.{CosmosAsyncClient, CosmosClientBuilder, DirectConnectionConfig, SparkBridgeInternal}
import com.azure.cosmos.implementation.ImplementationBridgeHelpers.CosmosClientBuilderHelper
import com.azure.cosmos.implementation.changefeed.common.{ChangeFeedMode, ChangeFeedStartFromInternal, ChangeFeedState, ChangeFeedStateV1}
import com.azure.cosmos.implementation.changefeed.common.{ChangeFeedStartFromInternal, ChangeFeedState, ChangeFeedStateV1}
import com.azure.cosmos.implementation.query.CompositeContinuationToken
import com.azure.cosmos.implementation.routing.Range
import com.azure.cosmos.models.{FeedRange, PartitionKey, SparkModelBridgeInternal}
import com.azure.cosmos.models.{ChangeFeedMode, FeedRange, PartitionKey, SparkModelBridgeInternal}
import com.azure.cosmos.spark.{ChangeFeedOffset, NormalizedRange}
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait

Expand Down Expand Up @@ -283,7 +283,7 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra
val changeFeedState: ChangeFeedState = new ChangeFeedStateV1(
containerResourceId,
FeedRangeEpkImpl.forFullRange,
ChangeFeedMode.INCREMENTAL,
ChangeFeedMode.LATEST_VERSION,
ChangeFeedStartFromInternal.createFromLegacyContinuation(),
feedRangeContinuation
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,12 @@ private case class ChangeFeedPartitionReader
cosmosChangeFeedConfig.changeFeedMode match {
case ChangeFeedModes.Incremental =>
kushagraThapar marked this conversation as resolved.
Show resolved Hide resolved
factoryMethod = (jsonNode: JsonNode) => changeFeedItemFactoryMethod(jsonNode)
case ChangeFeedModes.LatestVersion =>
factoryMethod = (jsonNode: JsonNode) => changeFeedItemFactoryMethod(jsonNode)
case ChangeFeedModes.FullFidelity =>
factoryMethod = (jsonNode: JsonNode) => changeFeedItemFactoryMethodV1(jsonNode)
case ChangeFeedModes.AllVersionsAndDeletes =>
factoryMethod = (jsonNode: JsonNode) => changeFeedItemFactoryMethodV1(jsonNode)
}

ImplementationBridgeHelpers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,12 @@ private class ChangeFeedTable(val session: SparkSession,
val defaultSchema: StructType = changeFeedConfig.changeFeedMode match {
case ChangeFeedModes.Incremental =>
ChangeFeedTable.defaultIncrementalChangeFeedSchemaForInferenceDisabled
case ChangeFeedModes.LatestVersion =>
ChangeFeedTable.defaultIncrementalChangeFeedSchemaForInferenceDisabled
case ChangeFeedModes.FullFidelity =>
ChangeFeedTable.defaultFullFidelityChangeFeedSchemaForInferenceDisabled
case ChangeFeedModes.AllVersionsAndDeletes =>
ChangeFeedTable.defaultFullFidelityChangeFeedSchemaForInferenceDisabled
}

CosmosTableSchemaInferrer.inferSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,8 @@ private object ChangeFeedModes extends Enumeration {

val Incremental: ChangeFeedModes.Value = Value("Incremental")
val FullFidelity: ChangeFeedModes.Value = Value("FullFidelity")
val LatestVersion: ChangeFeedModes.Value = Value("LatestVersion")
val AllVersionsAndDeletes: ChangeFeedModes.Value = Value("AllVersionsAndDeletes")
}

private object ChangeFeedStartFromModes extends Enumeration {
Expand Down Expand Up @@ -1103,7 +1105,9 @@ private case class CosmosChangeFeedConfig

this.changeFeedMode match {
case ChangeFeedModes.Incremental => options
case ChangeFeedModes.FullFidelity => options.fullFidelity()
case ChangeFeedModes.LatestVersion => options
case ChangeFeedModes.FullFidelity => options.allVersionsAndDeletes()
case ChangeFeedModes.AllVersionsAndDeletes => options.allVersionsAndDeletes()
}
}

Expand All @@ -1116,7 +1120,7 @@ private case class CosmosChangeFeedConfig
}

private object CosmosChangeFeedConfig {
private val DefaultChangeFeedMode: ChangeFeedMode = ChangeFeedModes.Incremental
private val DefaultChangeFeedMode: ChangeFeedMode = ChangeFeedModes.LatestVersion
private val DefaultStartFromMode: ChangeFeedStartFromMode = ChangeFeedStartFromModes.Beginning

private val startFrom = CosmosConfigEntry[ChangeFeedStartFromMode](
Expand All @@ -1140,9 +1144,9 @@ private object CosmosChangeFeedConfig {
private val changeFeedMode = CosmosConfigEntry[ChangeFeedMode](
key = CosmosConfigNames.ChangeFeedMode,
mandatory = false,
defaultValue = Some(ChangeFeedModes.Incremental),
defaultValue = Some(ChangeFeedModes.LatestVersion),
parseFromStringFunction = changeFeedModeString => CosmosConfigEntry.parseEnumeration(changeFeedModeString, ChangeFeedModes),
helpMessage = "ChangeFeed mode (Incremental or FullFidelity)")
helpMessage = "ChangeFeed mode (Incremental/LatestVersion or FullFidelity/AllVersionsAndDeletes)")

private val maxItemCountPerTriggerHint = CosmosConfigEntry[Long](
key = CosmosConfigNames.ChangeFeedItemCountPerTriggerHint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ class CosmosConfigSpec extends UnitSpec {

val config = CosmosChangeFeedConfig.parseCosmosChangeFeedConfig(changeFeedConfig)

config.changeFeedMode shouldEqual ChangeFeedModes.Incremental
config.changeFeedMode shouldEqual ChangeFeedModes.LatestVersion
config.startFrom shouldEqual ChangeFeedStartFromModes.Beginning
config.startFromPointInTime shouldEqual None
config.maxItemCountPerTrigger shouldEqual None
Expand All @@ -538,7 +538,22 @@ class CosmosConfigSpec extends UnitSpec {
config.maxItemCountPerTrigger.get shouldEqual 54
}

it should "parse change feed config with PIT start mode" in {
it should "parse change feed config for all versions and deletes with incorrect casing" in {
val changeFeedConfig = Map(
"spark.cosmos.changeFeed.mode" -> "AllVersionsANDDELETES",
"spark.cosmos.changeFeed.STARTfrom" -> "NOW",
"spark.cosmos.changeFeed.itemCountPerTriggerHint" -> "54"
)

val config = CosmosChangeFeedConfig.parseCosmosChangeFeedConfig(changeFeedConfig)

config.changeFeedMode shouldEqual ChangeFeedModes.AllVersionsAndDeletes
config.startFrom shouldEqual ChangeFeedStartFromModes.Now
config.startFromPointInTime shouldEqual None
config.maxItemCountPerTrigger.get shouldEqual 54
}

it should "parse change feed config (incremental) with PIT start mode" in {
val changeFeedConfig = Map(
"spark.cosmos.changeFeed.mode" -> "incremental",
"spark.cosmos.changeFeed.STARTfrom" -> "2019-12-31T10:45:10Z",
Expand All @@ -556,6 +571,24 @@ class CosmosConfigSpec extends UnitSpec {
config.maxItemCountPerTrigger.get shouldEqual 54
}

it should "parse change feed config (latestversion) with PIT start mode" in {
val changeFeedConfig = Map(
"spark.cosmos.changeFeed.mode" -> "latestversion",
"spark.cosmos.changeFeed.STARTfrom" -> "2019-12-31T10:45:10Z",
"spark.cosmos.changeFeed.itemCountPerTriggerHint" -> "54"
)

val config = CosmosChangeFeedConfig.parseCosmosChangeFeedConfig(changeFeedConfig)

config.changeFeedMode shouldEqual ChangeFeedModes.LatestVersion
config.startFrom shouldEqual ChangeFeedStartFromModes.PointInTime
Instant.from(config.startFromPointInTime.get) shouldEqual
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXXX")
.parse("2019-12-31T10:45:10Z")
.toInstant
config.maxItemCountPerTrigger.get shouldEqual 54
}

it should "complain when parsing invalid change feed mode" in {
val changeFeedConfig = Map(
"spark.cosmos.changeFeed.mode" -> "Whatever",
Expand All @@ -569,7 +602,7 @@ class CosmosConfigSpec extends UnitSpec {
} catch {
case e: Exception => e.getMessage shouldEqual
"invalid configuration for spark.cosmos.changeFeed.mode:Whatever. Config description: " +
"ChangeFeed mode (Incremental or FullFidelity)"
"ChangeFeed mode (Incremental/LatestVersion or FullFidelity/AllVersionsAndDeletes)"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ class PartitionMetadataSpec extends UnitSpec {
val json = String.format(
"{\"V\":1," +
"\"Rid\":\"%s\"," +
"\"Mode\":\"INCREMENTAL\"," +
"\"Mode\":\"LATEST_VERSION\"," +
"\"StartFrom\":{\"Type\":\"BEGINNING\"}," +
"\"Continuation\":%s}",
collectionRid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,52 @@ class SparkE2EChangeFeedITest
ChangeFeedTable.defaultIncrementalChangeFeedSchemaForInferenceDisabled) shouldEqual true
}

"spark change feed query (LatestVersion)" can "use default schema" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY

val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer)
for (state <- Array(true, false)) {
val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
objectNode.put("name", "Shrodigner's cat")
objectNode.put("type", "cat")
objectNode.put("age", 20)
objectNode.put("isAlive", state)
objectNode.put("id", UUID.randomUUID().toString)
container.createItem(objectNode).block()
}
val cfg = Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.maxItemCount" -> "2",
"spark.cosmos.read.inferSchema.enabled" -> "false"
)

val df = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
val rowsArray = df.collect()
rowsArray should have size 2
df.schema.equals(
ChangeFeedTable.defaultIncrementalChangeFeedSchemaForInferenceDisabled) shouldEqual true

val cfgExplicit = Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.inferSchema.enabled" -> "false",
"spark.cosmos.read.maxItemCount" -> "1",
"spark.cosmos.changeFeed.mode" -> "LatestVersion"
)

val dfExplicit = spark.read.format("cosmos.oltp.changeFeed").options(cfgExplicit).load()
val rowsArrayExplicit = dfExplicit.collect()
rowsArrayExplicit should have size 2
dfExplicit.schema.equals(
ChangeFeedTable.defaultIncrementalChangeFeedSchemaForInferenceDisabled) shouldEqual true
}

"spark change feed query (incremental)" can "use user provided schema" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
Expand Down Expand Up @@ -151,6 +197,38 @@ class SparkE2EChangeFeedITest
ChangeFeedTable.defaultFullFidelityChangeFeedSchemaForInferenceDisabled) shouldEqual true
}

"spark change feed query (all versions and deletes)" can "use default schema" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY

val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer)
for (state <- Array(true, false)) {
val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
objectNode.put("name", "Shrodigner's cat")
objectNode.put("type", "cat")
objectNode.put("age", 20)
objectNode.put("isAlive", state)
objectNode.put("id", UUID.randomUUID().toString)
container.createItem(objectNode).block()
}
val cfg = Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.inferSchema.enabled" -> "false",
"spark.cosmos.changeFeed.mode" -> "AllVersionsAndDeletes",
"spark.cosmos.read.maxItemCount" -> "1",
"spark.cosmos.changeFeed.startFrom" -> "NOW"
)

val df = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
val rowsArray = df.collect()
rowsArray should have size 0
df.schema.equals(
ChangeFeedTable.defaultFullFidelityChangeFeedSchemaForInferenceDisabled) shouldEqual true
}

"spark change feed micro batch (incremental)" can "use default schema" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ trait CosmosContainerWithRetention extends CosmosContainer {
val properties: CosmosContainerProperties =
new CosmosContainerProperties(cosmosContainer, partitionKeyPath)
properties.setChangeFeedPolicy(
ChangeFeedPolicy.createFullFidelityPolicy(Duration.ofMinutes(10)))
ChangeFeedPolicy.createAllVersionsAndDeletesPolicy(Duration.ofMinutes(10)))

val throughputProperties = ThroughputProperties.createManualThroughput(Defaults.DefaultContainerThroughput)

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.37.0-beta.1 (Unreleased)

#### Features Added
* Added new preview APIs to `ChangeFeedProcessor` for handling full fidelity / all changes - See [PR 30399](https://github.com/Azure/azure-sdk-for-java/pull/30399)

#### Breaking Changes

Expand Down
Loading