Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulsar IO - KafkaSource - Implement KeyValue support for KafkaBytesSource #10002

Merged
merged 15 commits into from
Mar 25, 2021

Conversation

eolivelli
Copy link
Contributor

@eolivelli eolivelli commented Mar 22, 2021

Motivation

Add support for different key and value Deserializers for the Kafka Source.

With this change the Kafka Connector supports non-String keys and it also apply the correct Schema to the Pulsar Topic.

For primitive datatypes we are not decoding the Kafka key pair into Java Objects, we are simply passing a reference to the internal ByteBuffer (that is a wrapper for a byte[]).

The Schema type is decided using the keyDeserializationClass and valueDeserializationClass parameters that you pass to the Kafka Source configuration.

This is the mapping;

  • ByteArrayDeserializer,ByteBufferDeserializer,BytesDeserializer: Schema.BYTEBUFFER
  • StringDeserializer.class: Schema.STRING
  • DoubleDeserializer: Schema.DOUBLE
  • FloatDeserializer: Schema.FLOAT
  • IntegerDeserializer: Schema.INT32
  • LongDeserializer: Schema.INT64
  • ShortDeserializer: Schema.INT16;
  • KafkaAvroDeserializer: Schema.AVRO (schema is downloaded from the SchemaRegistry)

When the key deserializer is StringDeserializer we use the decoded key as Pulsar key.
When the key is not StringDeserializer then we use the Pulsar KeyValue data type, with SEPARATED key encoding .

This way on the topic we have a Schema for the key and a Schema for the value.
The key is encoded into the Pulsar key (SEPARATED) and so it is used for routing and for compaction.

Limits of this patch:

  • we are not supporting byte[] keys (this is a limitation of Pulsar IO Record, that is to be addressed separately)
  • we are only supporting SEPARATE KeyValue encoding (adding support for INLINE is simple, but can be done in a separate change if some user requests such support)
  • there is no support for JSON payloads (this is to be implemented in a follow up work)

Modifications

  • refactor AbstractKafkaSource in order to

Verifying this change

The patch introduce unit tests that cover the new code.

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? it should be documented with dedicated docs

@eolivelli eolivelli requested a review from sijie March 22, 2021 10:24
@eolivelli eolivelli changed the title [kafka] Implement KeyValue support for KakfaBytesSource [kafka] Implement KeyValue support for KafkaBytesSource Mar 22, 2021
@Jennifer88huang-zz Jennifer88huang-zz added this to the 2.8.0 milestone Mar 23, 2021
@eolivelli
Copy link
Contributor Author

Can someone help me reviewing this patch ?

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, just left a minor comment.

props.put(key, ByteBufferDeserializer.class.getCanonicalName());

Schema<?> result;
if (ByteArrayDeserializer.class.getName().equals(kafkaDeserializerClass)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use switch case to instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately these are not constants and I cannot use the switch.

I like that we have this strong compile binding. If I write string constants I am losing the reference to the class.

This is not code in hotpath, so using the switch is only a syntax issue.

So you feel strong that we should use the switch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you feel strong that we should use the switch?

No, just from the syntax perspective, it works for me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Thanks for your review.
Can you please help merging this patch?
CI is green

@codelipenghui
Copy link
Contributor

@congbobo184 @gaoran10 Could you please also help review this PR?

Copy link
Contributor

@congbobo184 congbobo184 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Great work.

@eolivelli eolivelli merged commit e4b4627 into apache:master Mar 25, 2021
@eolivelli
Copy link
Contributor Author

Thank you @codelipenghui and @congbobo184 I have merged this patch.

I will follow up with other enhancements to the KafkaSource

@eolivelli eolivelli changed the title [kafka] Implement KeyValue support for KafkaBytesSource Pulsar IO - KafkaSource - Implement KeyValue support for KafkaBytesSource Mar 30, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants