Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PulsarKafkaProducer is not thread safe #4707

Closed
ssunorz opened this issue Jul 11, 2019 · 1 comment · Fixed by #4745
Closed

PulsarKafkaProducer is not thread safe #4707

ssunorz opened this issue Jul 11, 2019 · 1 comment · Fixed by #4745
Labels
type/bug The PR fixed a bug or issue reported a bug type/feature The PR added a new feature or issue requested a new feature

Comments

@ssunorz
Copy link
Contributor

ssunorz commented Jul 11, 2019

Is your feature request related to a problem? Please describe.
I replaced kafka client with pulsar-client-kafka, referring to this document.
http://pulsar.apache.org/docs/en/adaptors-kafka/

Then, sending messages to multiple topics at the same time caused an exception(NullPointerException).
This exception does not occur in version 2.2.1, but does occur in version 2.3.2 and later.

I think that the change made in this PR is the cause.
Each time you send a message to a new topic, the cluster object is regenerated.
https://github.com/apache/pulsar/blob/v2.3.2/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java#L229

The javadoc states that KafkaProducer is thread safe.
https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
I want to use pulsar-client-kafka without modifying the code that used Kafka client.

To Reproduce

# build.gradle
dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile (group: 'org.apache.pulsar', name: 'pulsar-client-kafka-original', version: '2.3.2')
    compile (group: 'org.apache.pulsar', name: 'pulsar-client-auth-athenz', version: '2.3.2') 
}
    public static void main(String[] args) throws Exception {

        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "pulsar+ssl://server:6651");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, "5");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        properties.setProperty(PulsarClientKafkaConfig.USE_TLS, "true");
        properties.setProperty(PulsarClientKafkaConfig.TLS_TRUST_CERTS_FILE_PATH, filePath("trust", ".crt", CERT));
        properties.setProperty(PulsarClientKafkaConfig.AUTHENTICATION_CLASS, AuthenticationAthenz.class.getName());
        properties.setProperty(PulsarClientKafkaConfig.AUTHENTICATION_PARAMS_STRING, authParams());

        properties.setProperty("security-protocol", "PLAINTEXT");

        properties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
        properties.setProperty(ProducerConfig.METADATA_MAX_AGE_CONFIG, "15000");
        producer = new PulsarKafkaProducer<>(properties, new StringSerializer(), new StringSerializer());

        Thread thread1 = new Thread(() -> {
            String topic1 = "persistent://topic1";
            ProducerRecord<String, String> record1 = new ProducerRecord<>(topic1, "Hello");
            producer.send(record1, (recordMetadata, e) -> {
                System.out.println(recordMetadata);
            });
        });

        Thread thread2 = new Thread(() -> {
            String topic2 = "persistent://topic2";
            ProducerRecord<String, String> record2 = new ProducerRecord<>(topic2, "Hello");
            producer.send(record2, (recordMetadata, e) -> {
                System.out.println(recordMetadata);
            });
        });

        thread1.start();
        thread2.start();
    }
 Exception in thread "Thread-2" java.lang.NullPointerException
	at org.apache.kafka.clients.producer.internals.DefaultPartitioner.partition(DefaultPartitioner.java:56)
	at org.apache.kafka.clients.producer.PulsarKafkaProducer.buildMessage(PulsarKafkaProducer.java:270)
	at org.apache.kafka.clients.producer.PulsarKafkaProducer.send(PulsarKafkaProducer.java:172)
	at Main.lambda$main$3(Main.java:156)
	at java.lang.Thread.run(Thread.java:748)
@ssunorz ssunorz added the type/feature The PR added a new feature or issue requested a new feature label Jul 11, 2019
@nkurihar nkurihar added component/kafka type/bug The PR fixed a bug or issue reported a bug labels Jul 11, 2019
@fxbing
Copy link
Contributor

fxbing commented Jul 17, 2019

The reason is the update of cluster is not atomic.
I have fix it and add a test. #4745

easyfan pushed a commit to easyfan/pulsar that referenced this issue Jul 26, 2019
jiazhai pushed a commit that referenced this issue Aug 28, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants