-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pulsar IO - KafkaSource - allow to manage Avro Encoded messages #9448
Pulsar IO - KafkaSource - allow to manage Avro Encoded messages #9448
Conversation
…ableBatching is enabled if you do not set an initial schema to the Producer the schema must be prepared at the first message with a Schema. There is a bug and compression is not applied in this case, and the consumer receives an uncompressed message, failing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eolivelli I think this approach makes unnecessary changes to the existing Pulsar schema library for achieving a very narrow scope. Pulsar and Kafka have very similar data models. Most of the time you should just transfer the bytes. The only thing you need to manage is to convert "Kafka SerDe" to "Pulsar Schema". You don't really need to write a special record to carry the schema information.
I wrote a schema-aware Kafka source at https://github.com/streamnative/pulsar-io-kafka, which we haven't contributed back yet.
-
It only has one source connector that deals with
byte[]
. It works for all schemas. You don't need different source connectors. -
All what you need to do is to covert KafkaSerDe to Pulsar schema. See: https://github.com/streamnative/pulsar-io-kafka/blob/master/src/main/java/io/streamnative/connectors/kafka/KafkaSource.java#L338
-
because Kafka encoded schema id as the first 4 bytes in Avro messages, hence you need to write a Schema wrapper. https://github.com/streamnative/pulsar-io-kafka/blob/master/src/main/java/io/streamnative/connectors/kafka/schema/KafkaAvroSchema.java
-
Kafka also handles JSON slight differently. Hence we need to process that differently as well. https://github.com/streamnative/pulsar-io-kafka/blob/master/src/main/java/io/streamnative/connectors/kafka/schema/KafkaJsonSchema.java
-
In a lot of cases, you just use Kafka's existing tools to covert Kafka schema into a standard AVRO schema to be stored as a Pulsar schema. That is the benefit of using an open-standard serialization framework rather than introducing your own type system. Reuse existing tools instead of reinventing a wheel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie
I think you are right in saying that we have to use open standards and existing tools, and to not reinvent the wheel.
Indeed this implementation of the KafkaSource is using exacly KafkaAvroDeserializer
that is the tool that is available in the Kafka ecosystem to deal with Avro.
I prefer this solution because it is using only standard tools in the Kafka ecosystem.
I believe that in the long term this very simple code will be easily maintained here in the Pulsar codebase
I would like not to enter the details of how Kafka and Confluent serialize data (extract the schema id, extract the raw payload, connect to the schema registry....)
It is better to use the official library and use standard APIs.
in the future it will be easy to upgrade to new versions of the Kafka/Confluent client and support future changes/evolutions and enterprise features.
I believe this is a good enhancement to the Pulsar Sink framework:
we allowing Sources to push GenericRecords, and Pulsar will package the structure using the Schema provided by the Source using the Pulsar Schema API (Avro or whatever we will support in the future...).
If you prefer I can split the patch into two parts:
- allow PulsarSource to deal with GenericRecord
- enhance the standard KafkaSource
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
Outdated
Show resolved
Hide resolved
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
Outdated
Show resolved
Hide resolved
Unfortunately, I disagree with you on this. This approach creates a very bad example for other people to follow. First of all, it basically creates a "connector" class per schema type. This is a very bad practice. I would discourage a connector implementation going down this route. It is impossible to maintain. Secondly, it solves a very narrow scoped problem by introducing a specific type of source connector in which the key is a string and the value is AVRO. All the key schema information is dropped. Key schema information is important to a lot of streaming use cases. You can't solve the problem with this approach. Following your approach, you will end up creating N*N connector classes (where N is the number of schema types). I would encourage people not introducing a code change that is specialized for their own needs. The connector implementation should be beneficial to broader users.
Unfortunately, the Kafka AVRO is a confluent open-source thing not a Kafka community thing. We are always in the game of touching serialization details when converting a message from X format to Y format. You either do the conversation at a high level using abstraction or at a low level by realizing the serialization details. The approach you proposed also has a bad performance because it will churn a lot of object allocations. Realizing the serialization details can save a lot of memory copy and serialization/deserialization.
This is a good initiative. But it should be isolated from this KafkaSource change. |
@sijie there no problem in disagreeing, I am going to split the patch into two parts, this way we can make one step at a time.
We already have In my plans I would like to work more on this KafkaSource and on the KafkaSink and try to make the structure better. There is an open work that will allow to put more sinks on the same nar and provide a better user experience. In the meanwhile users of the Kafka source can go with "--classname" (or they can select it from a Web UI for interactive Pulsar Management Consoles)
we can work on this issue as well (and that's on my backlog), I didn't want to introduce too many features. I have users that are used to advanced data mapping mechanisms both for the key and for the value, so mapping the key is very important to me. The approach you proposed also has a bad performance because it will churn a lot
I know about this fact, and I know how the StreamNative connector works. Using the Java Model with GenericRecord adds that additional cost, but the benefit are:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work @eolivelli . I put a few comments about some minor things.
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
Outdated
Show resolved
Hide resolved
pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
Outdated
Show resolved
Hide resolved
pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
Outdated
Show resolved
Hide resolved
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
Outdated
Show resolved
Hide resolved
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/PulsarSchemaCache.java
Outdated
Show resolved
Hide resolved
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/PulsarSchemaCache.java
Outdated
Show resolved
Hide resolved
tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/AvroKafkaSourceTest.java
Outdated
Show resolved
Hide resolved
f06c417
to
8e456e3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie I have added Guava
cache, removed AUTO_PRODUCE_BYTES
, removed extractKey
method and switched to ByteBuffer
and addressed all of your comments.
Please take a look again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eolivelli overall looks good.
I see you removed the key handling from this PR. I think in general, we should structure the class hierarchy like the following. As we are introducing a new bytes connector, we need to make the key
as pure bytes because that's the default behavior from Kafka.
KafkaAbstractSource -> KafkaAbstractStringKeySource -> KafkaStringSource
KafkaAbstractSource -> KafkaAbstractBytesKeySource -> KafkaBytesSource
pulsar-io/kafka/pom.xml
Outdated
<version>${kafka.confluent.avroserializer.version}</version> | ||
</dependency> | ||
|
||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need this dependency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. I needed it initially.
now it is useless.
dropped
.properties(Collections.emptyMap()) | ||
.schema(definition.getBytes(StandardCharsets.UTF_8) | ||
).build(); | ||
return new Schema<ByteBuffer>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need to override the decode method and throw an UnsupportedException. Otherwise, if the decode method is used, it will cause StackOverflowException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@codelipenghui @sijie CI passed. probably we are good to go now :) |
/pulsarbot rerun-failure-checks |
@freeznet @codelipenghui Can you review this PR? |
@codelipenghui can you please help merging this patch? |
@thank you very much @codelipenghui and @sijie |
…he#9448) ### Motivation Currently KafkaSource allows only to deal with strings and byte arrays, it does not support records with Schema. In Kafka we have the ability to encode messages using Avro and there is a Schema Registry (by Confluent®) ### Modifications Summary of changes: - allow current KafkaSource (`KafkaBytesSource`) to deal with `io.confluent.kafka.serializers.KafkaAvroDeserializer ` and copy the raw bytes to the Pulsar topic, setting appropriately the Schema - this source support Schema Evolution end-to-end (i.e. add fields to the original schema in the Kafka world, and see the new fields in the Pulsar topic, without any reconfiguration or restart) - add Confluent® Schema Registry Client to the Kafka Connector NAR, the license is compatible with Apache 2 license and we can redistribute it - the configuration of the Schema Registry Client is done done in the consumerProperties property of the source (usually you add schema.registry.url) - add integration tests with Kafka and Schema Registry ### Verifying this change The patch introduces new integration tests. The integration tests launch a Kafka Container and also a Confluent Schema Registry Container
Motivation
Currently KafkaSource allows only to deal with strings and byte arrays, it does not support records with Schema.
In Kafka we have the ability to encode messages using Avro and there is a Schema Registry (by Confluent®)
Modifications
Summary of changes:
KafkaBytesSource
) to deal withio.confluent.kafka.serializers.KafkaAvroDeserializer
and copy the raw bytes to the Pulsar topic, setting appropriately the SchemaVerifying this change
The patch introduces new integration tests.
The integration tests launch a Kafka Container and also a Confluent Schema Registry Container
Documentation
I will be happy to provide documentation once this patch is committed.