Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafakV2SourceConnector #39410

Merged
merged 49 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
f0c05ac
kafkaV2SourceConnector
Feb 7, 2024
7b5fc7f
Merge branch 'main' into KafkaV2SourceConnector
Feb 8, 2024
59f2cd4
changes
Feb 9, 2024
5ba9587
Merge branch 'main' into KafkaV2SourceConnector
Feb 9, 2024
e1a7378
add validator and refactor
Feb 9, 2024
bb3cec8
add split merge handling in source task
Feb 10, 2024
031ff0c
add tests
Feb 12, 2024
f78d910
cspell change
Feb 12, 2024
375365f
Merge branch 'main' into KafkaV2SourceConnector
Feb 12, 2024
7b42cca
fix
Feb 13, 2024
0d39f9d
add tests and resolve few comments
Feb 13, 2024
67b0671
add tests
Feb 13, 2024
47bcc62
fix compilation
Feb 13, 2024
b272054
fix tests
Feb 14, 2024
d54ba23
refactor
Feb 14, 2024
f574cdc
fix
Feb 14, 2024
be4c14a
add config reference config, update changelog and fix spotbugs
Feb 14, 2024
034473a
fix
Feb 14, 2024
4090abf
fix spell check errors
Feb 14, 2024
ceda53d
change
Feb 15, 2024
704ec9d
update test.yml
Feb 15, 2024
89d6208
Merge branch 'main' into UsingTestContainersForE2ETests
Feb 20, 2024
0e61917
add testContainer tests
Feb 22, 2024
4ab1e43
Merge branch 'main' into KafkaV2SourceConnector
Feb 22, 2024
37b797d
resolve comments
Feb 22, 2024
530d418
Merge branch 'KafkaV2SourceConnector' into UsingTestContainersForE2ET…
Feb 22, 2024
a53c01f
fix
Feb 22, 2024
1fbb8a2
Merge branch 'KafkaV2SourceConnector' into UsingTestContainersForE2ET…
Feb 22, 2024
02b3b83
fix compile
Feb 22, 2024
1cbb560
refactor
Feb 22, 2024
d71cb71
merge from main and resolve conflicts
Feb 22, 2024
908b5f2
update version
Feb 22, 2024
240d365
change
Mar 1, 2024
1b092ae
Merge branch 'main' into UsingTestContainersForE2ETests
Mar 1, 2024
283dabf
update external dependencies
Mar 1, 2024
ce5d82e
update external dependencies
Mar 1, 2024
417a6f3
KafkaV2SourceConnector
Mar 26, 2024
9dfe2a0
merge from main
Mar 26, 2024
dc1c3a7
remove azure cosmos implementation package dependency
Mar 26, 2024
fb5296d
Merge remote-tracking branch 'origin/UsingTestContainersForE2ETests' …
Mar 26, 2024
e5ae8a6
fix tests
Mar 26, 2024
75e5d8f
Merge branch 'main' into KafkaV2SourceConnector-2
Mar 26, 2024
d9d9a25
change to use ImplementationBridgeHelpers pattern
Mar 27, 2024
389e20b
fix tests
Mar 27, 2024
2ddff13
suppress style check for shaded classes from apache common and guava
Mar 27, 2024
9dcd348
remove guava/apachecommon package
Mar 27, 2024
10f8bca
update tests.yml file
Mar 28, 2024
3b97b2f
refactor
Mar 28, 2024
6a25454
fix build issue
Mar 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion .vscode/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
"sdk/cosmos/azure-cosmos-encryption/**",
"sdk/cosmos/azure-cosmos-spark_3_2-12/**",
"sdk/spring/azure-spring-data-cosmos/**",
"sdk/cosmos/azure-cosmos-kafka-connect/**",
"sdk/deviceupdate/azure-iot-deviceupdate/**",
"sdk/e2e/src/**",
"sdk/eventgrid/azure-messaging-eventgrid-cloudnative-cloudevents/**",
Expand Down Expand Up @@ -723,7 +724,7 @@
"words": [
"Pfast",
"Pdirect",
"Pmulti",
"Pmulti",
"Psplit",
"Pquery",
"Pcfp",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ the main ServiceBusClientBuilder. -->
files="com.azure.cosmos.ClientSideRequestStatistics"/> <!-- Need OperatingSystemMXBean from sun to obtain cpu info -->
<suppress checks="EnforceFinalFields" files="com.azure.spring.cloud.config.AppConfigurationPropertySourceLocator"/>
<suppress checks="ConstantName" files="com.azure.spring.cloud.config.AppConfigurationPropertySourceLocator"/>
<suppress checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck"
files="[/\\]azure-cosmos-kafka-connect[/\\]"/>
<suppress checks="com.azure.tools.checkstyle.checks.ExternalDependencyExposedCheck" files="com.azure.cosmos.kafka.connect.CosmosDBSourceConnector"/>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disabling this one is bad (for the entire file) - would it be possible to use this pattern? Directly from the code - scoped to just the method violating the rule?

If not, this is fine...

// CHECKSTYLE: stop MultipleVariableDeclarations
// CHECKSTYLE: stop JavadocVariable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will check

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will track in the next PR


<!-- Checkstyle suppressions for resource manager package -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="com.azure.resourcemanager.*"/>
Expand Down
4 changes: 4 additions & 0 deletions eng/versioning/external_dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,10 @@ cosmos_org.scalastyle:scalastyle-maven-plugin;1.0.0
# Cosmos Kafka connector runtime dependencies
cosmos_org.apache.kafka:connect-api;3.6.0
# Cosmos Kafka connector tests only
cosmos_org.apache.kafka:connect-runtime;3.6.0
cosmos_org.testcontainers:testcontainers;1.19.5
cosmos_org.testcontainers:kafka;1.19.5
cosmos_org.sourcelab:kafka-connect-client;4.0.4
# Maven Tools for Cosmos Kafka connector only
cosmos_io.confluent:kafka-connect-maven-plugin;0.12.0

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 1.0.0-beta.1 (Unreleased)

#### Features Added
* Added Source connector. See [PR 39410](https://github.com/Azure/azure-sdk-for-java/pull/39410)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
## Configuration Reference:

## Generic Configuration
| Config Property Name | Default | Description |
|:---------------------------------------------|:--------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `kafka.connect.cosmos.accountEndpoint` | None | Cosmos DB Account Endpoint Uri |
| `kafka.connect.cosmos.accountEndpoint` | None | Cosmos DB Account Key |
| `kafka.connect.cosmos.useGatewayMode` | `false` | Flag to indicate whether to use gateway mode. By default it is false. |
| `kafka.connect.cosmos.preferredRegionsList` | `[]` | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated kafka cluster with your Cosmos DB account and pass the kafka cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet&preserve-view=true). |
| `kafka.connect.cosmos.applicationName` | `""` | Application name. Will be added as the userAgent suffix. |

## Source Connector Configuration
| Config Property Name | Default | Description |
|:----------------------------------------------------------|:-------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `kafka.connect.cosmos.source.database.name` | None | Cosmos DB database name. |
| `kafka.connect.cosmos.source.containers.includeAll` | `false` | Flag to indicate whether reading from all containers. |
| `kafka.connect.cosmos.source.containers.includedList` | `[]` | Containers included. This config will be ignored if kafka.connect.cosmos.source.includeAllContainers is true. |
| `kafka.connect.cosmos.source.containers.topicMap` | `[]` | A comma delimited list of Kafka topics mapped to Cosmos containers. For example: topic1#con1,topic2#con2. By default, container name is used as the name of the kafka topic to publish data to, can use this property to override the default config |
| `kafka.connect.cosmos.source.changeFeed.startFrom` | `Beginning` | ChangeFeed Start from settings (Now, Beginning or a certain point in time (UTC) for example 2020-02-10T14:15:03) - the default value is 'Beginning'. |
| `kafka.connect.cosmos.source.changeFeed.mode` | `LatestVersion` | ChangeFeed mode (LatestVersion or AllVersionsAndDeletes). |
| `kafka.connect.cosmos.source.changeFeed.maxItemCountHint` | `1000` | The maximum number of documents returned in a single change feed request. But the number of items received might be higher than the specified value if multiple items are changed by the same transaction. |
| `kafka.connect.cosmos.source.metadata.poll.delay.ms` | `300000` | Indicates how often to check the metadata changes (including container split/merge, adding/removing/recreated containers). When changes are detected, it will reconfigure the tasks. Default is 5 minutes. |
| `kafka.connect.cosmos.source.metadata.storage.topic` | `_cosmos.metadata.topic` | The name of the topic where the metadata are stored. The metadata topic will be created if it does not already exist, else it will use the pre-created topic. |
| `kafka.connect.cosmos.source.messageKey.enabled` | `true` | Whether to set the kafka record message key. |
| `kafka.connect.cosmos.source.messageKey.field` | `id` | The field to use as the message key. |
42 changes: 39 additions & 3 deletions sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ Licensed under the MIT License.
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jacoco.min.linecoverage>0.01</jacoco.min.linecoverage>
<jacoco.min.branchcoverage>0.02</jacoco.min.branchcoverage>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<shadingPrefix>azure_cosmos_kafka_connect</shadingPrefix>

<!-- CosmosSkip - This is not a module we want/expect external customers to consume. Skip breaking API checks. -->
Expand All @@ -48,7 +46,12 @@ Licensed under the MIT License.
<javaModulesSurefireArgLine>
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.models=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.source=com.fasterxml.jackson.databind,ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.routing=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.apachecommons.lang=ALL-UNNAMED
--add-exports com.azure.cosmos/com.azure.cosmos.implementation.changefeed.common=com.azure.cosmos.kafka.connect
--add-exports com.azure.cosmos/com.azure.cosmos.implementation.feedranges=com.azure.cosmos.kafka.connect
--add-exports com.azure.cosmos/com.azure.cosmos.implementation.query=com.azure.cosmos.kafka.connect
</javaModulesSurefireArgLine>
</properties>

Expand Down Expand Up @@ -94,6 +97,19 @@ Licensed under the MIT License.
<version>1.10.0</version> <!-- {x-version-update;org.apache.commons:commons-text;external_dependency} -->
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<version>3.6.0</version> <!-- {x-version-update;cosmos_org.apache.kafka:connect-runtime;external_dependency} -->
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>jackson-jaxrs-json-provider</artifactId>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down Expand Up @@ -160,6 +176,24 @@ Licensed under the MIT License.
<version>1.14.12</version> <!-- {x-version-update;testdep_net.bytebuddy:byte-buddy-agent;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.19.5</version> <!-- {x-version-update;cosmos_org.testcontainers:testcontainers;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.19.5</version> <!-- {x-version-update;cosmos_org.testcontainers:kafka;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.sourcelab</groupId>
<artifactId>kafka-connect-client</artifactId>
<version>4.0.4</version> <!-- {x-version-update;cosmos_org.sourcelab:kafka-connect-client;external_dependency} -->
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -204,6 +238,7 @@ Licensed under the MIT License.
<include>com.azure:*</include>
<include>org.apache.kafka:connect-api:[3.6.0]</include> <!-- {x-include-update;cosmos_org.apache.kafka:connect-api;external_dependency} -->
<include>io.confluent:kafka-connect-maven-plugin:[0.12.0]</include> <!-- {x-include-update;cosmos_io.confluent:kafka-connect-maven-plugin;external_dependency} -->
<include>org.sourcelab:kafka-connect-client:[4.0.4]</include> <!-- {x-include-update;cosmos_org.sourcelab:kafka-connect-client;external_dependency} -->
</includes>
</bannedDependencies>
</rules>
Expand All @@ -221,6 +256,7 @@ Licensed under the MIT License.
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-${project.version}-jar-with-dependencies</finalName>
<filters>
<filter>
<artifact>*:*:*:*</artifact>
Expand Down
8 changes: 8 additions & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/src/docker/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
connectors/
log.txt

# Exclude all temporary files in resources
!resources/*example
resources/sink.properties
resources/source.properties
resources/standalone.properties
7 changes: 7 additions & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/src/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Build the Cosmos DB Connectors on top of the Kafka Connect image
FROM confluentinc/cp-kafka-connect:7.5.0

# Install datagen connector
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest

COPY connectors/ /etc/kafka-connect/jars