Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to configure all options when using Kafka wrapper #1090

Closed
merlimat opened this issue Jan 22, 2018 · 5 comments · Fixed by #3911
Closed

Allow to configure all options when using Kafka wrapper #1090

merlimat opened this issue Jan 22, 2018 · 5 comments · Fixed by #3911
Labels
area/client help wanted type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Milestone

Comments

@merlimat
Copy link
Contributor

Expected behavior

Currently, we can only configure authentication on the Pulsar client instance when using the Kafka wrapper.

We can also configure few options for which there is a Kafka correspective, using the Kafka own properties, so that existing applications will behave in the same way.

Apart from that, we should be able to configure all the other Pulsar client option, maybe through a set of Java properties that are being passed to the Kafka client anyway.

@merlimat merlimat added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages help wanted labels Jan 22, 2018
@merlimat merlimat added this to the 1.22.0-incubating milestone Jan 22, 2018
@Horaddrim
Copy link

I'll try to do it :D Any firstcommer help?

@sijie
Copy link
Member

sijie commented Jun 12, 2018

@Horaddrim you are welcome to send a pull request. if you encounter any problems, feel free to ask questions at here, or mailing list dev@pulsar.incubator.apache.org or join the slack channel for https://pulsar.incubator.apache.org/contact/

@merlimat
Copy link
Contributor Author

@Horaddrim We added a number of config for Kafka wrapper in 2.0 release.

The list was documented at http://pulsar.apache.org/docs/latest/adaptors/KafkaWrapper/#CustomPulsarconfigurations-eshp5

The config classes can be found at https://github.com/apache/incubator-pulsar/tree/master/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat

I'm sure there will be some settings that were added since and are missing from there.

@Horaddrim
Copy link

Really thank you @merlimat and @sijie ! I've already checked these materials, so ASAP I'll start working in this 😄

@ConcurrencyPractitioner
Copy link
Contributor

Ok, I found out that a list of Pulsar consumer configs could be found in the class ConsumerConfigurationData. I noticed that there is a map which allows for other properties to be added in the class itself. So I want to know if there is possibly a list of configs which Pulsar's ConsumerImpl uses which I could draw from as a resource.

sijie pushed a commit that referenced this issue Mar 9, 2019
…#1090) (#3753)

Add support for Kafka's ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG in PulsarKafkaProducer by utilizing pulsar's ClientBuilder.keepAliveInterval
sijie pushed a commit that referenced this issue Mar 11, 2019
**Motivation** 

Support Kafka's ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG  #1090 

Previously `ProducerBuilder.sendTimeout` was set by parsing Kafka's `ProducerConfig.MAX_BLOCK_MS_CONFIG`.
According to Kafka's [document](https://kafka.apache.org/20/documentation.html) it's for   

> Controlling how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block either because the buffer is full or metadata unavailable.

But `ProducerBuilder.sendTimeout`  is for  

> If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.

And Kafka's `ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG`, according to the document is for:
> Controlling the maximum amount of time the client will wait for the response of a request.

Which I think would be better fit purpose of Pulsar's `ProducerBuilder.sendTimeout`.
sijie pushed a commit that referenced this issue Mar 20, 2019
…roducerConfig.INTERCEPTOR_CLASSES_CONFIG. #1090 (#3843)

**Motivatio**
Add a wrapper around Kafka's `org.apache.kafka.clients.producer.ProducerInterceptor` to support Kafka's ProducerConfig.INTERCEPTOR_CLASSES_CONFIG. #1090

The wrapper will try to delegate all call to underlying instance of Kafka's `org.apache.kafka.clients.producer.ProducerInterceptor`  it holds.

When `PulsarKafkaProducer` convert a Kafka's `ProducerRecord` to Pulsar's `Message`, the schema(fixed to type of Schema<byte[]>), key, value, eventTimestamp and partitionID is set.
When doing the delegation, we'll do 
Pulsar`Message` -> Kafka's `ProducerRecord` -> invoke underlying Kafka's `org.apache.kafka.clients.producer.ProducerInterceptor#onSend`  -> Pulsar`Message`
It'll try to preserve all the information. Verified through unit test.
For `org.apache.kafka.clients.producer.ProducerInterceptor#onSendAcknowledgement` it'll call `org.apache.kafka.clients.producer.ProducerInterceptor#onAcknowledgement` only partitionID, eventTimestamp, key byte lenth, value byte length will be pass in.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client help wanted type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants