-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ability to use Kafka's sinks as pulsar sinks #9927
Conversation
this replaces #9825 - reworked according to @sijie 's feedback |
cool! that will be really useful. just throwing some ideas... |
@yuvalgut thank you! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome work.
I am not much convinced about the final packaging, because if you want to run an existing Kafka Sink you have to build this module and add that Sink as dependency (this is the same work we are doing with Debezium source?).
I wonder if we can (as a follow up work) add the a ability to pass an additional file (the jar for the Sink implementation) as configuration and dynamically load it.
Probably the new package management facility (still under development in current master branch) will help?
...kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
Outdated
Show resolved
Hide resolved
...ect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
Outdated
Show resolved
Hide resolved
Final packaging is tricky. |
...kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
Outdated
Show resolved
Hide resolved
final int partition = 0; | ||
final Object key; | ||
final Object value; | ||
if (unwrapKeyValueIfAvailable && sourceRecord.getValue() instanceof KeyValue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might also need to another branch to check if Record
is a KVRecord
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
Outdated
Show resolved
Hide resolved
...kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
Outdated
Show resolved
Hide resolved
...kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public void offset(TopicPartition topicPartition, long l) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment above.
...ect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
Show resolved
Hide resolved
...kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
Outdated
Show resolved
Hide resolved
sourceRecord.fail(); | ||
return; | ||
} | ||
pendingFlush.whenComplete((ignore, ex) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also don't think you need to register the callback here. Instead, I think you need to track the pulsar source records in the context in order to make sure the "offsets" are correctly recorded. once you call kafka sink to flush, you need to call sink context to flush. If you successfully flush, you can call sourceRecord to ack, otherwise you fail the source records.
...kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
Show resolved
Hide resolved
5046934
to
0f64b14
Compare
updated to reflect CR comments (except some that affect pulsar-io/context and can be done separately/later) |
### Motivation Looking at #9927 (comment) there is no way to reliably get partition number. There is Optional<String> getPartitionId() which returns: - "{topic}-{partition}" in some Record implementation - String(partition) in other implementation (parsing back integer is a waste) - empty Optional (ok) in others ### Modifications Added `default Optional<Integer> getPartitionIndex()` to the `Record` interface. Return partition number where appropriate.
b5ca187
to
0b44bbd
Compare
4beb265
to
7f08a23
Compare
rebased on latest master |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did my final pass of review.
Overall is good, I left some final comments
I hope we can merge this patch soon, this way we can continue to build up more features over this bridge to Kafka Connect.
...kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
Show resolved
Hide resolved
...kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
Show resolved
Hide resolved
...kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
Outdated
Show resolved
Hide resolved
...t-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
Outdated
Show resolved
Hide resolved
...t-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
Outdated
Show resolved
Hide resolved
...ect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
Outdated
Show resolved
Hide resolved
...a-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/GenericObjectWrapper.java
Outdated
Show resolved
Hide resolved
...ect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
Outdated
Show resolved
Hide resolved
...ect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
Show resolved
Hide resolved
...ect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
Show resolved
Hide resolved
7f08a23
to
60aa808
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie can you please review ? this patch has been hanging for many days. @merlimat @rdhabalia @codelipenghui can you help with merging this patch ? |
94d18f6
to
f7232b4
Compare
…10446) Fixes #10445 ### Motivation SinkContext should expose Subscription type to the Sink More context: #9927 (comment) Needed for #9927 ### Modifications Added `getSubscriptionType()` to the `SinkContext` interface and `ContextImpl`
f7232b4
to
c080b7f
Compare
…pache#10446) Fixes apache#10445 ### Motivation SinkContext should expose Subscription type to the Sink More context: apache#9927 (comment) Needed for apache#9927 ### Modifications Added `getSubscriptionType()` to the `SinkContext` interface and `ContextImpl`
c080b7f
to
c1e7858
Compare
@sijie please take another look |
} | ||
|
||
private void flushIfNeeded(boolean force) { | ||
if (force || currentBatchSize.get() >= maxBatchSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dlg99 This will cause submitting multiple flush tasks, no? Because currentBatchSize
will always be larger than maxBatchSize
before the flush
action is taken.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This potentially will enqueue a lot of tasks in the flush queue which will burn CPU.
required = true, | ||
defaultValue = "", | ||
help = "Pulsar service URL to use for the offset store.") | ||
private String pulsarServiceUrl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field will potentially be removed after PulsarClient is exposed to the context. So you might want to document it.
* Add ability to use Kafka's sinks as Pulsar sinks * Get topic/partition from the record * Making use of GenericObject * Added SubscriptionType check and tests * support for topic/partition pause/resume/seek * added comment; only one flush in progress is allowed
* Add ability to use Kafka's sinks as Pulsar sinks * Get topic/partition from the record * Making use of GenericObject * Added SubscriptionType check and tests * support for topic/partition pause/resume/seek * added comment; only one flush in progress is allowed
Motivation
Provide a way to use Kafka-Connect Sink as a Pulsar Sink, in cases like:
etc.
Modifications
Added KafkaConnectSink. Kafka Schema is autodetected from key/value itself for primitive values with option to unwrap KeyValue. GenericRecord support is TBD, depends on the changes Enrico is working on.
Added profile to pulsar-io/kafka-connect-adaptor to build nar with kafka-connect connector included
Verifying this change
This change added tests and can be verified as follows:
Added unit tests.
Tested locally as
Ran pulsar standalone
bin/pulsar standalone
add dependency
into
pulsar-io/kafka-connect-adaptor-nar/pom.xml
Built nar as `mvn -f pulsar-io/kafka-connect-adaptor-nar/pom.xml clean package -DskipTests to include kafka's connect-file sink into the nar.
Ran test nar as
with
set topic schema as
message produced as
and got
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
TBD following review