Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full Fidelity Change Feed Processor with public surface changes #30399

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
b07a55e
changes for FFCF pull model
simorenoh Jul 27, 2022
b530f2b
Update FullFidelityChangeFeedTest.java
simorenoh Jul 27, 2022
32becf2
spark changes
simorenoh Jul 27, 2022
f404260
Update CosmosRowConverter.scala
simorenoh Jul 27, 2022
5a606c8
Update SparkE2EChangeFeedITest.scala
simorenoh Jul 27, 2022
a3e77d9
Initial changes for change feed processor in full fidelity mode
kushagraThapar Jul 28, 2022
b3f5762
Merge branch 'pull-model-ffcf' into full_fidelity_cfp
kushagraThapar Jul 28, 2022
5fcf7c9
Setting change feed policy on container for full fidelity in testing
kushagraThapar Jul 28, 2022
5645ab3
Merge branch 'pull-model-ffcf' into full_fidelity_cfp
kushagraThapar Jul 28, 2022
f42d4a6
Added changes for type casting change feed processor response to Chan…
kushagraThapar Jul 28, 2022
452d43e
Renamed ChangeFeedProcessorResponse to ChangeFeedProcessorItem
kushagraThapar Jul 29, 2022
8c7cd80
Added change feed processor new lease changes
kushagraThapar Jul 29, 2022
f7628bc
Merged latest main and resolved conflicts
kushagraThapar Jul 29, 2022
c075a79
Merge branch 'full_fidelity_cfp' into full_fidelity_cfp_with_lease_ch…
kushagraThapar Jul 29, 2022
40664a6
Added java docs for ChangeFeedOperationType
kushagraThapar Jul 29, 2022
9ab4667
Added timeToLiveExpired for change feed meta data
kushagraThapar Jul 29, 2022
9ddcd3e
Merge branch 'full_fidelity_cfp' into full_fidelity_cfp_with_lease_ch…
kushagraThapar Jul 29, 2022
542b324
Added logic to create leases based on EPK feed range
kushagraThapar Jul 29, 2022
abbd786
Added more logic for feed range based lease
kushagraThapar Jul 29, 2022
8e54f71
Added the lease token feed range epk implementation code to CFP
kushagraThapar Aug 2, 2022
f6a39e7
Added more TODOs
kushagraThapar Aug 3, 2022
24b62b6
Code review comments, added lease structure changes and many more
kushagraThapar Aug 5, 2022
9908df4
Fixed spotbugs related to package rename and change
kushagraThapar Aug 5, 2022
7f5e3b5
Added todo for changelog once we finalize the public surface API
kushagraThapar Aug 5, 2022
6cc8da4
Removed TODOs, finalized API naming
kushagraThapar Aug 9, 2022
702d13d
Renamed to handleAllChanges()
kushagraThapar Aug 9, 2022
19a7482
Merged latest main and resolved conflicts
kushagraThapar Aug 9, 2022
585c3e3
Updated Beta API annotation to 4.35.0
kushagraThapar Aug 9, 2022
1156724
Added comments to Change Feed mode
kushagraThapar Aug 9, 2022
e70a5cc
Updated the changelog
kushagraThapar Aug 9, 2022
976321c
Merged latest main and resolved conflicts
kushagraThapar Aug 11, 2022
bdfecfe
Updated the changelog
kushagraThapar Aug 11, 2022
1aee86b
Added toJsonNode() API to convert ChangeFeedProcessorItem to JsonNode
kushagraThapar Aug 23, 2022
62233c2
Merge branch 'main' into full_fidelity_cfp_with_public_surface_changes
kushagraThapar Aug 23, 2022
cdb7ca4
Updated beta version to 4.36.0 since 4.35.0 is already released
kushagraThapar Aug 23, 2022
93ee557
Added 4.35.0 beta version field back to fix rev api check
kushagraThapar Aug 23, 2022
e50f7ea
Merge branch 'main' into full_fidelity_cfp_with_public_surface_changes
kushagraThapar Sep 20, 2022
0dd3d7a
Renamed incremental and full fidelity mode to new names on public sur…
kushagraThapar Sep 20, 2022
4e7c7ce
Fixed @deprecated annotation
kushagraThapar Sep 21, 2022
c20d646
Fixed test cases
kushagraThapar Sep 21, 2022
f13bb05
Code review comments
kushagraThapar Sep 22, 2022
f3f9caa
Simplifying full fidelity change feed tests
kushagraThapar Sep 22, 2022
4527599
Simplified more tests with less throughput for container creation
kushagraThapar Sep 22, 2022
53d9ff6
Disabled emulator test for checking previous present on replace
kushagraThapar Sep 22, 2022
de1af5b
Reverted ChangeFeedMode to implementation class. Reverted the name ch…
kushagraThapar Sep 23, 2022
f314c57
Fixed unit tests for incremental and full_fidelity mode
kushagraThapar Sep 23, 2022
f5a1062
Disabled full fidelity new tests for testing
kushagraThapar Sep 24, 2022
272776c
Enabled the tests and reduced throughput on ThroughputControlTests
kushagraThapar Sep 24, 2022
3ac81c2
Merged latest main and resolved conflicts
kushagraThapar Sep 26, 2022
6fd2f8a
Fixed compilation error
kushagraThapar Sep 26, 2022
cf12c16
API review comments
kushagraThapar Sep 26, 2022
d0e4ddf
API renames, commented out tests for testing
kushagraThapar Sep 27, 2022
68dc9f7
Removed sample code
kushagraThapar Sep 27, 2022
ec91b91
Switching classes to test the emulator CI
kushagraThapar Sep 27, 2022
0cde31a
Disabled one test to debug the CI
kushagraThapar Sep 28, 2022
ef649e2
Merged latest main and resolved conflicts
kushagraThapar Sep 28, 2022
9aded75
Disabled full fidelity change feed processor tests for emulator CI
kushagraThapar Sep 28, 2022
32ad4d9
Fixed flaky tests
kushagraThapar Sep 28, 2022
653e49d
Enabled basic tests for CFP in FFCF mode
kushagraThapar Sep 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class BasicOperationTest {
private CosmosContainer createdTestContainer;
private CosmosContainer createdResultsContainer;

@BeforeClass(groups = {"emulator"}, timeOut = TIMEOUT)
@BeforeClass(groups = {"simple"}, timeOut = TIMEOUT)
public void before_BasicOperationTest() {
assertThat(this.client).isNull();
CosmosClientBuilder clientBuilder = new CosmosClientBuilder()
Expand All @@ -60,7 +60,7 @@ public void before_BasicOperationTest() {
this.createdResultsContainer = this.createdDatabase.getContainer("results" + suffix);
}

@AfterClass(groups = {"emulator"}, timeOut = TIMEOUT, alwaysRun = true)
@AfterClass(groups = {"simple"}, timeOut = TIMEOUT, alwaysRun = true)
public void afterClass() {
assertThat(this.client).isNotNull();
assertThat(this.createdDatabase).isNotNull();
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.14.0-beta.1 (Unreleased)

#### Features Added
* Added new config options for Change Feed Modes, Incremental as `LatestVersion` and Full Fidelity as `AllVersionsAndDeletes` changes - See [PR 30399](https://github.com/Azure/azure-sdk-for-java/pull/30399)
* Added option to emit client-side metrics via micrometer.io MeterRegistry. - See [PR 30065](https://github.com/Azure/azure-sdk-for-java/pull/30065)

#### Breaking Changes
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.14.0-beta.1 (Unreleased)

#### Features Added
* Added new config options for Change Feed Modes, Incremental as `LatestVersion` and Full Fidelity as `AllVersionsAndDeletes` changes - See [PR 30399](https://github.com/Azure/azure-sdk-for-java/pull/30399)
* Added option to emit client-side metrics via micrometer.io MeterRegistry. - See [PR 30065](https://github.com/Azure/azure-sdk-for-java/pull/30065)

#### Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ Used to influence the json serialization/deserialization behavior
| `spark.cosmos.serialization.dateTimeConversionMode` | `Default` | The date/time conversion mode (`Default`, `AlwaysEpochMilliseconds`, `AlwaysEpochMillisecondsWithSystemDefaultTimezone`). With `Default` the standard Spark 3.* behavior is used (`java.sql.Date`/`java.time.LocalDate` are converted to EpochDay, `java.sql.Timestamp`/`java.time.Instant` are converted to MicrosecondsFromEpoch). With `AlwaysEpochMilliseconds` the same behavior the Cosmos DB connector for Spark 2.4 used is applied - `java.sql.Date`, `java.time.LocalDate`, `java.sql.Timestamp` and `java.time.Instant` are converted to MillisecondsFromEpoch. The behavior for `AlwaysEpochMillisecondsWithSystemDefaultTimezone` is identical with `AlwaysEpochMilliseconds` except that it will assume System default time zone / Spark session time zone (specified via `spark.sql.session.timezone`) instead of UTC when the date/time to be parsed has no explicit time zone.|

#### Change feed (only for Spark-Streaming using `cosmos.oltp.changeFeed` data source, which is read-only) configuration
| Config Property Name | Default | Description |
| :--- | :---- | :--- |
| `spark.cosmos.changeFeed.startFrom` | `Beginning` | ChangeFeed Start from settings (`Now`, `Beginning` or a certain point in time (UTC) for example `2020-02-10T14:15:03`) - the default value is `Beginning`. If the write config contains a `checkpointLocation` and any checkpoints exist, the stream is always continued independent of the `spark.cosmos.changeFeed.startFrom` settings - you need to change `checkpointLocation` or delete checkpoints to restart the stream if that is the intention. |
| `spark.cosmos.changeFeed.mode` | `Incremental` | ChangeFeed mode (`Incremental` or `FullFidelity`) - NOTE: `FullFidelity` is in experimental state right now. It requires that the subscription/account has been enabled for the private preview and there are known breaking changes that will happen for `FullFidelity` (schema of the returned documents). It is recommended to only use `FullFidelity` for non-production scenarios at this point. |
| `spark.cosmos.changeFeed.itemCountPerTriggerHint` | None | Approximate maximum number of items read from change feed for each micro-batch/trigger |
| `spark.cosmos.changeFeed.batchCheckpointLocation` | None | Can be used to generate checkpoints when using change feed queries in batch mode - and proceeding on the next iteration where the previous left off. |
| Config Property Name | Default | Description |
| :--- |:----------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
kushagraThapar marked this conversation as resolved.
Show resolved Hide resolved
| `spark.cosmos.changeFeed.startFrom` | `Beginning` | ChangeFeed Start from settings (`Now`, `Beginning` or a certain point in time (UTC) for example `2020-02-10T14:15:03`) - the default value is `Beginning`. If the write config contains a `checkpointLocation` and any checkpoints exist, the stream is always continued independent of the `spark.cosmos.changeFeed.startFrom` settings - you need to change `checkpointLocation` or delete checkpoints to restart the stream if that is the intention. |
| `spark.cosmos.changeFeed.mode` | `Incremental/LatestVersion` | ChangeFeed mode (`Incremental/LatestVersion` or `FullFidelity/AllVersionsAndDeletes`) - NOTE: `FullFidelity/AllVersionsAndDeletes` is in experimental state right now. It requires that the subscription/account has been enabled for the private preview and there are known breaking changes that will happen for `FullFidelity/AllVersionsAndDeletes` (schema of the returned documents). It is recommended to only use `FullFidelity/AllVersionsAndDeletes` for non-production scenarios at this point. |
| `spark.cosmos.changeFeed.itemCountPerTriggerHint` | None | Approximate maximum number of items read from change feed for each micro-batch/trigger |
| `spark.cosmos.changeFeed.batchCheckpointLocation` | None | Can be used to generate checkpoints when using change feed queries in batch mode - and proceeding on the next iteration where the previous left off. |

#### Json conversion configuration
| Config Property Name | Default | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ private case class ChangeFeedPartitionReader

var factoryMethod: java.util.function.Function[JsonNode, _] = (_: JsonNode) => {}
cosmosChangeFeedConfig.changeFeedMode match {
case ChangeFeedModes.Incremental =>
case ChangeFeedModes.Incremental | ChangeFeedModes.LatestVersion =>
factoryMethod = (jsonNode: JsonNode) => changeFeedItemFactoryMethod(jsonNode)
case ChangeFeedModes.FullFidelity =>
case ChangeFeedModes.FullFidelity | ChangeFeedModes.AllVersionsAndDeletes =>
factoryMethod = (jsonNode: JsonNode) => changeFeedItemFactoryMethodV1(jsonNode)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ private class ChangeFeedTable(val session: SparkSession,
userConfig: Map[String, String]): StructType = {

val defaultSchema: StructType = changeFeedConfig.changeFeedMode match {
case ChangeFeedModes.Incremental =>
case ChangeFeedModes.Incremental | ChangeFeedModes.LatestVersion =>
ChangeFeedTable.defaultIncrementalChangeFeedSchemaForInferenceDisabled
case ChangeFeedModes.FullFidelity =>
case ChangeFeedModes.FullFidelity | ChangeFeedModes.AllVersionsAndDeletes =>
ChangeFeedTable.defaultFullFidelityChangeFeedSchemaForInferenceDisabled
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,8 @@ private object ChangeFeedModes extends Enumeration {

val Incremental: ChangeFeedModes.Value = Value("Incremental")
val FullFidelity: ChangeFeedModes.Value = Value("FullFidelity")
val LatestVersion: ChangeFeedModes.Value = Value("LatestVersion")
val AllVersionsAndDeletes: ChangeFeedModes.Value = Value("AllVersionsAndDeletes")
}

private object ChangeFeedStartFromModes extends Enumeration {
Expand Down Expand Up @@ -1108,8 +1110,8 @@ private case class CosmosChangeFeedConfig
}

this.changeFeedMode match {
case ChangeFeedModes.Incremental => options
case ChangeFeedModes.FullFidelity => options.fullFidelity()
case ChangeFeedModes.Incremental | ChangeFeedModes.LatestVersion => options
case ChangeFeedModes.FullFidelity | ChangeFeedModes.AllVersionsAndDeletes => options.allVersionsAndDeletes()
}
}

Expand Down Expand Up @@ -1148,7 +1150,7 @@ private object CosmosChangeFeedConfig {
mandatory = false,
defaultValue = Some(ChangeFeedModes.Incremental),
parseFromStringFunction = changeFeedModeString => CosmosConfigEntry.parseEnumeration(changeFeedModeString, ChangeFeedModes),
helpMessage = "ChangeFeed mode (Incremental or FullFidelity)")
helpMessage = "ChangeFeed mode (Incremental/LatestVersion or FullFidelity/AllVersionsAndDeletes)")

private val maxItemCountPerTriggerHint = CosmosConfigEntry[Long](
key = CosmosConfigNames.ChangeFeedItemCountPerTriggerHint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,22 @@ class CosmosConfigSpec extends UnitSpec {
config.maxItemCountPerTrigger.get shouldEqual 54
}

it should "parse change feed config with PIT start mode" in {
it should "parse change feed config for all versions and deletes with incorrect casing" in {
val changeFeedConfig = Map(
"spark.cosmos.changeFeed.mode" -> "AllVersionsANDDELETES",
"spark.cosmos.changeFeed.STARTfrom" -> "NOW",
"spark.cosmos.changeFeed.itemCountPerTriggerHint" -> "54"
)

val config = CosmosChangeFeedConfig.parseCosmosChangeFeedConfig(changeFeedConfig)

config.changeFeedMode shouldEqual ChangeFeedModes.AllVersionsAndDeletes
config.startFrom shouldEqual ChangeFeedStartFromModes.Now
config.startFromPointInTime shouldEqual None
config.maxItemCountPerTrigger.get shouldEqual 54
}

it should "parse change feed config (incremental) with PIT start mode" in {
val changeFeedConfig = Map(
"spark.cosmos.changeFeed.mode" -> "incremental",
"spark.cosmos.changeFeed.STARTfrom" -> "2019-12-31T10:45:10Z",
Expand All @@ -556,6 +571,24 @@ class CosmosConfigSpec extends UnitSpec {
config.maxItemCountPerTrigger.get shouldEqual 54
}

it should "parse change feed config (latestversion) with PIT start mode" in {
val changeFeedConfig = Map(
"spark.cosmos.changeFeed.mode" -> "latestversion",
"spark.cosmos.changeFeed.STARTfrom" -> "2019-12-31T10:45:10Z",
"spark.cosmos.changeFeed.itemCountPerTriggerHint" -> "54"
)

val config = CosmosChangeFeedConfig.parseCosmosChangeFeedConfig(changeFeedConfig)

config.changeFeedMode shouldEqual ChangeFeedModes.LatestVersion
config.startFrom shouldEqual ChangeFeedStartFromModes.PointInTime
Instant.from(config.startFromPointInTime.get) shouldEqual
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXXX")
.parse("2019-12-31T10:45:10Z")
.toInstant
config.maxItemCountPerTrigger.get shouldEqual 54
}

it should "complain when parsing invalid change feed mode" in {
val changeFeedConfig = Map(
"spark.cosmos.changeFeed.mode" -> "Whatever",
Expand All @@ -569,7 +602,7 @@ class CosmosConfigSpec extends UnitSpec {
} catch {
case e: Exception => e.getMessage shouldEqual
"invalid configuration for spark.cosmos.changeFeed.mode:Whatever. Config description: " +
"ChangeFeed mode (Incremental or FullFidelity)"
"ChangeFeed mode (Incremental/LatestVersion or FullFidelity/AllVersionsAndDeletes)"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class PartitionMetadataSpec extends UnitSpec {
key shouldEqual s"$databaseName/$collectionName/${normalizedRange.min}-${normalizedRange.max}"
}

it should "create instance with valid parameters via apply" in {
it should "create instance with valid parameters via apply in incremental mode" in {

val clientConfig = CosmosClientConfiguration(
UUID.randomUUID().toString,
Expand Down Expand Up @@ -92,7 +92,79 @@ class PartitionMetadataSpec extends UnitSpec {
docCount,
docSizeInKB,
firstLsn,
createChangeFeedState(latestLsn))
createChangeFeedState(latestLsn, "INCREMENTAL"))

viaCtor.cosmosClientConfig should be theSameInstanceAs viaApply.cosmosClientConfig
viaCtor.cosmosClientConfig should be theSameInstanceAs clientConfig
viaCtor.cosmosContainerConfig should be theSameInstanceAs viaApply.cosmosContainerConfig
viaCtor.cosmosContainerConfig should be theSameInstanceAs containerConfig
viaCtor.feedRange shouldEqual viaApply.feedRange
viaCtor.feedRange shouldEqual normalizedRange
viaCtor.documentCount shouldEqual viaApply.documentCount
viaCtor.documentCount shouldEqual docCount
viaCtor.totalDocumentSizeInKB shouldEqual viaApply.totalDocumentSizeInKB
viaCtor.totalDocumentSizeInKB shouldEqual docSizeInKB
viaCtor.latestLsn shouldEqual viaApply.latestLsn
viaCtor.latestLsn shouldEqual latestLsn
viaCtor.firstLsn shouldEqual viaApply.firstLsn
viaCtor.firstLsn.get shouldEqual latestLsn - 10
viaCtor.lastUpdated.get should be >= nowEpochMs
viaCtor.lastUpdated.get shouldEqual viaCtor.lastRetrieved.get
viaApply.lastUpdated.get should be >= nowEpochMs
viaApply.lastUpdated.get shouldEqual viaApply.lastRetrieved.get
}

it should "create instance with valid parameters via apply in full fidelity mode" in {

val clientConfig = CosmosClientConfiguration(
UUID.randomUUID().toString,
UUID.randomUUID().toString,
None,
UUID.randomUUID().toString,
useGatewayMode = false,
useEventualConsistency = true,
enableClientTelemetry = false,
disableTcpConnectionEndpointRediscovery = false,
clientTelemetryEndpoint = None,
preferredRegionsList = Option.empty)

val containerConfig = CosmosContainerConfig(UUID.randomUUID().toString, UUID.randomUUID().toString)
val latestLsn = rnd.nextInt(10000000) + 1
val firstLsn = Some(latestLsn - 10L)

val normalizedRange = NormalizedRange(UUID.randomUUID().toString, UUID.randomUUID().toString)
val docCount = rnd.nextInt()
val docSizeInKB = rnd.nextInt()

val nowEpochMs = Instant.now.toEpochMilli
val createdAt = new AtomicLong(nowEpochMs)
val lastRetrievedAt = new AtomicLong(nowEpochMs)

val viaCtor = PartitionMetadata(
Map[String, String](),
clientConfig,
None,
containerConfig,
normalizedRange,
docCount,
docSizeInKB,
firstLsn,
latestLsn,
0,
None,
createdAt,
lastRetrievedAt)

val viaApply = PartitionMetadata(
Map[String, String](),
clientConfig,
None,
containerConfig,
normalizedRange,
docCount,
docSizeInKB,
firstLsn,
createChangeFeedState(latestLsn, "FULL_FIDELITY"))

viaCtor.cosmosClientConfig should be theSameInstanceAs viaApply.cosmosClientConfig
viaCtor.cosmosClientConfig should be theSameInstanceAs clientConfig
Expand Down Expand Up @@ -581,16 +653,17 @@ class PartitionMetadataSpec extends UnitSpec {
//scalastyle:on null
//scalastyle:on multiple.string.literals

private[this] def createChangeFeedState(latestLsn: Long) = {
private[this] def createChangeFeedState(latestLsn: Long, mode: String) = {
val collectionRid = UUID.randomUUID().toString

val json = String.format(
"{\"V\":1," +
"\"Rid\":\"%s\"," +
"\"Mode\":\"INCREMENTAL\"," +
"\"Mode\":\"%s\"," +
"\"StartFrom\":{\"Type\":\"BEGINNING\"}," +
"\"Continuation\":%s}",
collectionRid,
mode,
String.format(
"{\"V\":1," +
"\"Rid\":\"%s\"," +
Expand Down
Loading