-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KafkaV2SinkConnector #39434
KafkaV2SinkConnector #39434
Conversation
/azp run java - cosmos - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @xinlian12
All my suggestions can be implemented in a follow up PR if it makes it easier.
@@ -459,7 +492,7 @@ Licensed under the MIT License. | |||
</profile> | |||
<profile> | |||
<!-- integration tests, requires Cosmos DB Emulator Endpoint --> | |||
<id>kafka-integration</id> | |||
<id>kafka</id> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should name them as integration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will discuss offline for this
...e-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java
Outdated
Show resolved
Hide resolved
/** | ||
* A Sink connector that publishes topic messages to CosmosDB. | ||
*/ | ||
public class CosmosSinkConnector extends SinkConnector { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our CosmosSourceConnector
implements AutoCloseable
, how about sink connector? Do we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sink connector does not hold any resources like socket etc, so no real need to implement AutoCloseable
...a-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConfig.java
Show resolved
Hide resolved
new KafkaCosmosPointWriter(this.sinkTaskConfig.getWriteConfig(), context.errantRecordReporter()); | ||
} | ||
|
||
// TODO[public preview]: in V1, it will create the database if does not exists, but why? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are few comments I left with // TODO[], will revisit all together once. But I want to remove this as I do not think it make sense to create the database here
...connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java
Outdated
Show resolved
Hide resolved
...connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java
Outdated
Show resolved
Hide resolved
...connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java
Outdated
Show resolved
Hide resolved
...t/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosWriteException.java
Outdated
Show resolved
Hide resolved
/azp run java - cosmos - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
6decd72
to
8f819f9
Compare
/azp run java - cosmos - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
Adding back Kafka sink connector change : #38973
Extra changes: