Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Kafka Protocol is not working on Pulsar latest releases #24085

Open
2 of 3 tasks
mukesh154 opened this issue Mar 17, 2025 · 4 comments
Open
2 of 3 tasks

[Bug] Kafka Protocol is not working on Pulsar latest releases #24085

mukesh154 opened this issue Mar 17, 2025 · 4 comments
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@mukesh154
Copy link
Contributor

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

Minimal reproduce step

Package the protocols inside apachepulsar/pulsar:3.3.5 image using below Dockerfile:

FROM datastax/lunastreaming-all:3.1_4.10 AS source
FROM apachepulsar/pulsar:3.3.5
COPY --from=source /pulsar/protocols/ /pulsar/protocols/
COPY --from=source /pulsar/proxyextensions/ /pulsar/proxyextensions/

Build image: docker buildx build --platform linux/amd64 -t custom-pulsar .

Deploy the custom-pulsar image with below additional configs to pulsar components to enable the kafka protocol:

broker:

      - kafkaAdvertisedListeners=PLAINTEXT://advertisedAddress:9092
      - kafkaListeners=PLAINTEXT://0.0.0.0:9092
      - kafkaNamespace=kafka
      - kafkaTransactionCoordinatorEnabled=true
      - kopSchemaRegistryEnable=true
      - kopSchemaRegistryPort=8081
      - messagingProtocols=kafka
      - protocolHandlerDirectory=./protocols

proxy:

      - kafkaAdvertisedListeners=PLAINTEXT://pulsar-proxy:9092
      - kafkaListeners=PLAINTEXT://0.0.0.0:9092
      - kafkaNamespace=kafka
      - kafkaTransactionCoordinatorEnabled=true
      - kopSchemaRegistryEnable=true
      - kopSchemaRegistryProxyEnableTls=false
      - kopSchemaRegistryPort=8081
      - numHttpServerThreads=10

Now, deploy omb-kafka driver to run the kafka protocol testing by using : openmessaging/benchmark repo.
Exec into the omb-driver:
kubectl --namespace=mypulsar exec omb-driver --container=omb-driver -n mypulsar -- bash
Add test duration & copy the file:

grep -v testDurationMinutes workloads/1-topic-16-partition-100b.yaml > workloads/workload.yaml 
&& 
echo -n "testDurationMinutes:" >> workloads/workload.yaml 
&& 
echo " 15" >> workloads/workload.yaml 
&& 
sed "s/localhost:9092/pulsar-proxy:9092/g" /benchmark/driver-kafka/kafka-all.yaml | sed "s/localhost:8080/pulsar-proxy:8080/g"  | sed "s/localhost:6650/pulsar-proxy:6650/g"  > /benchmark/driver-kafka/kafka-all.yaml-pulsar-proxy.yaml 

Now run the benchmark test:
/benchmark/bin/benchmark -d driver-kafka/kafka-all.yaml-pulsar-proxy.yaml -w $WORKERS workloads/workload.yaml

What did you expect to see?

The omb-driver should do its work w/o any error.

What did you see instead?

The client will get stuck at this point:

08:16:07.296 [main] INFO - Workers list - producers: [http://omb-worker-1.omb-worker:8080, http://omb-worker-0.omb-worker:8080]
08:16:07.297 [main] INFO - Workers list - consumers: [http://omb-worker-3.omb-worker:8080, http://omb-worker-2.omb-worker:8080]
08:16:07.302 [main] INFO - --------------- WORKLOAD : 1 topic / 16 partition / 100b --- DRIVER : Kafka---------------
08:16:10.924 [main] INFO - Created 1 topics in 280.102194 ms
08:16:10.926 [main] INFO - Topic: test-topic-sHBv5HQ-0000
08:16:15.926 [main] INFO - Paused 5000.170277 ms
08:16:15.934 [main] INFO - Subscription: sub-000-fi6b95E for test-topic-sHBv5HQ-0000
08:16:16.063 [main] INFO - Created 1 consumers in 127.913773 ms
08:16:21.063 [main] INFO - Paused 5000.150031 ms
08:16:21.064 [main] INFO - 2 Producer Workers
08:16:21.145 [main] INFO - Created 1 producers in 81.018096 ms
08:16:21.146 [main] INFO - Waiting for consumers to be ready (1 messages to be received)

You will see below error on the broker:

2025-03-05T15:34:09,907+0000 [kafka-tx-recovery-OrderedExecutor-3-0] ERROR io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - System error while handling append for persistent://public/kafka/test-topic-wac3_eM-0000-partition-9
java.util.concurrent.CompletionException: java.lang.IllegalStateException: Field 'marker_type' is not set
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:708) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162) ~[?:?]
    at io.streamnative.pulsar.handlers.kop.PendingTopicFutures.lambda$addListener$1(PendingTopicFutures.java:71) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
    at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:974) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
    at io.streamnative.pulsar.handlers.kop.storage.PartitionLog.lambda$initialise$0(PartitionLog.java:189) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
    at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:787) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773) ~[?:?]
    at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[org.apache.bookkeeper-bookkeeper-common-4.16.6.jar:4.16.6]
    at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[org.apache.bookkeeper-bookkeeper-common-4.16.6.jar:4.16.6]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.118.Final.jar:4.1.118.Final]
    at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: java.lang.IllegalStateException: Field 'marker_type' is not set
    at org.apache.pulsar.common.api.proto.MessageMetadata.getMarkerType(MessageMetadata.java:685) ~[com.datastax.oss-pulsar-common-3.1.4.13.jar:3.1.4.13]
    at org.apache.pulsar.broker.service.persistent.MessageDeduplication.setContextPropsIfRepl(MessageDeduplication.java:375) ~[com.datastax.oss-pulsar-broker-3.1.4.13.jar:3.1.4.13]
    at org.apache.pulsar.broker.service.persistent.MessageDeduplication.isDuplicate(MessageDeduplication.java:339) ~[com.datastax.oss-pulsar-broker-3.1.4.13.jar:3.1.4.13]
    at org.apache.pulsar.broker.service.persistent.PersistentTopic.publishMessage(PersistentTopic.java:609) ~[com.datastax.oss-pulsar-broker-3.1.4.13.jar:3.1.4.13]
    at io.streamnative.pulsar.handlers.kop.storage.PartitionLog.publishMessage(PartitionLog.java:999) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
    at io.streamnative.pulsar.handlers.kop.storage.PartitionLog.publishMessages(PartitionLog.java:921) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
    at io.streamnative.pulsar.handlers.kop.storage.PartitionLog.lambda$appendRecords$9(PartitionLog.java:530) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
    at io.streamnative.pulsar.handlers.kop.PendingTopicFutures.lambda$addListener$0(PendingTopicFutures.java:66) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646) ~[?:?]
    ... 10 more

Anything else?

The issue arises after inclusion of this PR #23697. Without this PR, the functionality is working as expected.

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@lhotari
Copy link
Member

lhotari commented Mar 17, 2025

@mukesh154 Thanks for reporting. Do you have a chance to test with changes from #24087 ?

@mukesh154
Copy link
Contributor Author

@lhotari Yes, I tested with the above fix. While the IllegalStateException is no longer present, the consumer is still not initializing properly and is not receiving messages. The following log is continuously printed on the benchmark client:

11:42:39.527 [main] INFO - (0 of 1 messages received)
11:42:39.927 [main] INFO - (0 of 1 messages received)
11:42:40.227 [main] INFO - (0 of 1 messages received)

@lhotari
Copy link
Member

lhotari commented Mar 17, 2025

@lhotari Yes, I tested with the above fix. While the IllegalStateException is no longer present, the consumer is still not initializing properly and is not receiving messages. The following log is continuously printed on the benchmark client:

11:42:39.527 [main] INFO - (0 of 1 messages received)
11:42:39.927 [main] INFO - (0 of 1 messages received)
11:42:40.227 [main] INFO - (0 of 1 messages received)

@mukesh154 In 3.3.5 and 4.0.3 there's a potential regression which is mentioned in the release notes: https://pulsar.apache.org/release-notes/versioned/pulsar-3.3.5/#known-issues . It's worth testing with 3.3.4 instead of 3.3.5 due to that known issue.

@BewareMyPower
Copy link
Contributor

Kafka protocol support is not a standard part of the Pulsar project. Since you're using datastax/lunastreaming-all, you should contact the support from DataStax. The code base should be https://github.com/datastax/starlight-for-kafka.

The 1st regression should be introduced by #23697 and fixed by #24087.

The 2nd regression introduced by #23931 could be fixed by a workaround like changing https://github.com/datastax/starlight-for-kafka/blob/1bf51f259eb5ac4afca2d17eee1a9fd94c53d2c4/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java#L1329 to

        long adjustedMaxBytes = Integer.MAX_VALUE;

But it seems that NPE would still happen before #24089 is merged. I don't get which cases could it happen because the tests of our internal Kafka protocol implementation didn't fail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

3 participants