Skip to content

[Bug] Kafka Protocol is not working on Pulsar latest releases #24085

Open
@mukesh154

Description

@mukesh154

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

Minimal reproduce step

Package the protocols inside apachepulsar/pulsar:3.3.5 image using below Dockerfile:

FROM datastax/lunastreaming-all:3.1_4.10 AS source
FROM apachepulsar/pulsar:3.3.5
COPY --from=source /pulsar/protocols/ /pulsar/protocols/
COPY --from=source /pulsar/proxyextensions/ /pulsar/proxyextensions/

Build image: docker buildx build --platform linux/amd64 -t custom-pulsar .

Deploy the custom-pulsar image with below additional configs to pulsar components to enable the kafka protocol:

broker:

      - kafkaAdvertisedListeners=PLAINTEXT://advertisedAddress:9092
      - kafkaListeners=PLAINTEXT://0.0.0.0:9092
      - kafkaNamespace=kafka
      - kafkaTransactionCoordinatorEnabled=true
      - kopSchemaRegistryEnable=true
      - kopSchemaRegistryPort=8081
      - messagingProtocols=kafka
      - protocolHandlerDirectory=./protocols

proxy:

      - kafkaAdvertisedListeners=PLAINTEXT://pulsar-proxy:9092
      - kafkaListeners=PLAINTEXT://0.0.0.0:9092
      - kafkaNamespace=kafka
      - kafkaTransactionCoordinatorEnabled=true
      - kopSchemaRegistryEnable=true
      - kopSchemaRegistryProxyEnableTls=false
      - kopSchemaRegistryPort=8081
      - numHttpServerThreads=10

Now, deploy omb-kafka driver to run the kafka protocol testing by using : openmessaging/benchmark repo.
Exec into the omb-driver:
kubectl --namespace=mypulsar exec omb-driver --container=omb-driver -n mypulsar -- bash
Add test duration & copy the file:

grep -v testDurationMinutes workloads/1-topic-16-partition-100b.yaml > workloads/workload.yaml 
&& 
echo -n "testDurationMinutes:" >> workloads/workload.yaml 
&& 
echo " 15" >> workloads/workload.yaml 
&& 
sed "s/localhost:9092/pulsar-proxy:9092/g" /benchmark/driver-kafka/kafka-all.yaml | sed "s/localhost:8080/pulsar-proxy:8080/g"  | sed "s/localhost:6650/pulsar-proxy:6650/g"  > /benchmark/driver-kafka/kafka-all.yaml-pulsar-proxy.yaml 

Now run the benchmark test:
/benchmark/bin/benchmark -d driver-kafka/kafka-all.yaml-pulsar-proxy.yaml -w $WORKERS workloads/workload.yaml

What did you expect to see?

The omb-driver should do its work w/o any error.

What did you see instead?

The client will get stuck at this point:

08:16:07.296 [main] INFO - Workers list - producers: [http://omb-worker-1.omb-worker:8080, http://omb-worker-0.omb-worker:8080]
08:16:07.297 [main] INFO - Workers list - consumers: [http://omb-worker-3.omb-worker:8080, http://omb-worker-2.omb-worker:8080]
08:16:07.302 [main] INFO - --------------- WORKLOAD : 1 topic / 16 partition / 100b --- DRIVER : Kafka---------------
08:16:10.924 [main] INFO - Created 1 topics in 280.102194 ms
08:16:10.926 [main] INFO - Topic: test-topic-sHBv5HQ-0000
08:16:15.926 [main] INFO - Paused 5000.170277 ms
08:16:15.934 [main] INFO - Subscription: sub-000-fi6b95E for test-topic-sHBv5HQ-0000
08:16:16.063 [main] INFO - Created 1 consumers in 127.913773 ms
08:16:21.063 [main] INFO - Paused 5000.150031 ms
08:16:21.064 [main] INFO - 2 Producer Workers
08:16:21.145 [main] INFO - Created 1 producers in 81.018096 ms
08:16:21.146 [main] INFO - Waiting for consumers to be ready (1 messages to be received)

You will see below error on the broker:

2025-03-05T15:34:09,907+0000 [kafka-tx-recovery-OrderedExecutor-3-0] ERROR io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - System error while handling append for persistent://public/kafka/test-topic-wac3_eM-0000-partition-9
java.util.concurrent.CompletionException: java.lang.IllegalStateException: Field 'marker_type' is not set
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:708) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162) ~[?:?]
    at io.streamnative.pulsar.handlers.kop.PendingTopicFutures.lambda$addListener$1(PendingTopicFutures.java:71) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
    at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:974) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
    at io.streamnative.pulsar.handlers.kop.storage.PartitionLog.lambda$initialise$0(PartitionLog.java:189) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
    at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:787) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773) ~[?:?]
    at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[org.apache.bookkeeper-bookkeeper-common-4.16.6.jar:4.16.6]
    at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[org.apache.bookkeeper-bookkeeper-common-4.16.6.jar:4.16.6]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.118.Final.jar:4.1.118.Final]
    at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: java.lang.IllegalStateException: Field 'marker_type' is not set
    at org.apache.pulsar.common.api.proto.MessageMetadata.getMarkerType(MessageMetadata.java:685) ~[com.datastax.oss-pulsar-common-3.1.4.13.jar:3.1.4.13]
    at org.apache.pulsar.broker.service.persistent.MessageDeduplication.setContextPropsIfRepl(MessageDeduplication.java:375) ~[com.datastax.oss-pulsar-broker-3.1.4.13.jar:3.1.4.13]
    at org.apache.pulsar.broker.service.persistent.MessageDeduplication.isDuplicate(MessageDeduplication.java:339) ~[com.datastax.oss-pulsar-broker-3.1.4.13.jar:3.1.4.13]
    at org.apache.pulsar.broker.service.persistent.PersistentTopic.publishMessage(PersistentTopic.java:609) ~[com.datastax.oss-pulsar-broker-3.1.4.13.jar:3.1.4.13]
    at io.streamnative.pulsar.handlers.kop.storage.PartitionLog.publishMessage(PartitionLog.java:999) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
    at io.streamnative.pulsar.handlers.kop.storage.PartitionLog.publishMessages(PartitionLog.java:921) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
    at io.streamnative.pulsar.handlers.kop.storage.PartitionLog.lambda$appendRecords$9(PartitionLog.java:530) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
    at io.streamnative.pulsar.handlers.kop.PendingTopicFutures.lambda$addListener$0(PendingTopicFutures.java:66) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646) ~[?:?]
    ... 10 more

Anything else?

The issue arises after inclusion of this PR #23697. Without this PR, the functionality is working as expected.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions