Description
Search before asking
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
Version
Minimal reproduce step
Package the protocols inside apachepulsar/pulsar:3.3.5 image using below Dockerfile:
FROM datastax/lunastreaming-all:3.1_4.10 AS source
FROM apachepulsar/pulsar:3.3.5
COPY --from=source /pulsar/protocols/ /pulsar/protocols/
COPY --from=source /pulsar/proxyextensions/ /pulsar/proxyextensions/
Build image: docker buildx build --platform linux/amd64 -t custom-pulsar .
Deploy the custom-pulsar image with below additional configs to pulsar components to enable the kafka protocol:
broker:
- kafkaAdvertisedListeners=PLAINTEXT://advertisedAddress:9092
- kafkaListeners=PLAINTEXT://0.0.0.0:9092
- kafkaNamespace=kafka
- kafkaTransactionCoordinatorEnabled=true
- kopSchemaRegistryEnable=true
- kopSchemaRegistryPort=8081
- messagingProtocols=kafka
- protocolHandlerDirectory=./protocols
proxy:
- kafkaAdvertisedListeners=PLAINTEXT://pulsar-proxy:9092
- kafkaListeners=PLAINTEXT://0.0.0.0:9092
- kafkaNamespace=kafka
- kafkaTransactionCoordinatorEnabled=true
- kopSchemaRegistryEnable=true
- kopSchemaRegistryProxyEnableTls=false
- kopSchemaRegistryPort=8081
- numHttpServerThreads=10
Now, deploy omb-kafka driver to run the kafka protocol testing by using : openmessaging/benchmark repo.
Exec into the omb-driver:
kubectl --namespace=mypulsar exec omb-driver --container=omb-driver -n mypulsar -- bash
Add test duration & copy the file:
grep -v testDurationMinutes workloads/1-topic-16-partition-100b.yaml > workloads/workload.yaml
&&
echo -n "testDurationMinutes:" >> workloads/workload.yaml
&&
echo " 15" >> workloads/workload.yaml
&&
sed "s/localhost:9092/pulsar-proxy:9092/g" /benchmark/driver-kafka/kafka-all.yaml | sed "s/localhost:8080/pulsar-proxy:8080/g" | sed "s/localhost:6650/pulsar-proxy:6650/g" > /benchmark/driver-kafka/kafka-all.yaml-pulsar-proxy.yaml
Now run the benchmark test:
/benchmark/bin/benchmark -d driver-kafka/kafka-all.yaml-pulsar-proxy.yaml -w $WORKERS workloads/workload.yaml
What did you expect to see?
The omb-driver should do its work w/o any error.
What did you see instead?
The client will get stuck at this point:
08:16:07.296 [main] INFO - Workers list - producers: [http://omb-worker-1.omb-worker:8080, http://omb-worker-0.omb-worker:8080]
08:16:07.297 [main] INFO - Workers list - consumers: [http://omb-worker-3.omb-worker:8080, http://omb-worker-2.omb-worker:8080]
08:16:07.302 [main] INFO - --------------- WORKLOAD : 1 topic / 16 partition / 100b --- DRIVER : Kafka---------------
08:16:10.924 [main] INFO - Created 1 topics in 280.102194 ms
08:16:10.926 [main] INFO - Topic: test-topic-sHBv5HQ-0000
08:16:15.926 [main] INFO - Paused 5000.170277 ms
08:16:15.934 [main] INFO - Subscription: sub-000-fi6b95E for test-topic-sHBv5HQ-0000
08:16:16.063 [main] INFO - Created 1 consumers in 127.913773 ms
08:16:21.063 [main] INFO - Paused 5000.150031 ms
08:16:21.064 [main] INFO - 2 Producer Workers
08:16:21.145 [main] INFO - Created 1 producers in 81.018096 ms
08:16:21.146 [main] INFO - Waiting for consumers to be ready (1 messages to be received)
You will see below error on the broker:
2025-03-05T15:34:09,907+0000 [kafka-tx-recovery-OrderedExecutor-3-0] ERROR io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - System error while handling append for persistent://public/kafka/test-topic-wac3_eM-0000-partition-9
java.util.concurrent.CompletionException: java.lang.IllegalStateException: Field 'marker_type' is not set
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347) ~[?:?]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:708) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162) ~[?:?]
at io.streamnative.pulsar.handlers.kop.PendingTopicFutures.lambda$addListener$1(PendingTopicFutures.java:71) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990) ~[?:?]
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:974) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
at io.streamnative.pulsar.handlers.kop.storage.PartitionLog.lambda$initialise$0(PartitionLog.java:189) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:787) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773) ~[?:?]
at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[org.apache.bookkeeper-bookkeeper-common-4.16.6.jar:4.16.6]
at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[org.apache.bookkeeper-bookkeeper-common-4.16.6.jar:4.16.6]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.118.Final.jar:4.1.118.Final]
at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: java.lang.IllegalStateException: Field 'marker_type' is not set
at org.apache.pulsar.common.api.proto.MessageMetadata.getMarkerType(MessageMetadata.java:685) ~[com.datastax.oss-pulsar-common-3.1.4.13.jar:3.1.4.13]
at org.apache.pulsar.broker.service.persistent.MessageDeduplication.setContextPropsIfRepl(MessageDeduplication.java:375) ~[com.datastax.oss-pulsar-broker-3.1.4.13.jar:3.1.4.13]
at org.apache.pulsar.broker.service.persistent.MessageDeduplication.isDuplicate(MessageDeduplication.java:339) ~[com.datastax.oss-pulsar-broker-3.1.4.13.jar:3.1.4.13]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.publishMessage(PersistentTopic.java:609) ~[com.datastax.oss-pulsar-broker-3.1.4.13.jar:3.1.4.13]
at io.streamnative.pulsar.handlers.kop.storage.PartitionLog.publishMessage(PartitionLog.java:999) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
at io.streamnative.pulsar.handlers.kop.storage.PartitionLog.publishMessages(PartitionLog.java:921) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
at io.streamnative.pulsar.handlers.kop.storage.PartitionLog.lambda$appendRecords$9(PartitionLog.java:530) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
at io.streamnative.pulsar.handlers.kop.PendingTopicFutures.lambda$addListener$0(PendingTopicFutures.java:66) ~[1KTcsm5OKTif7rIR_6Wt8w/:?]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646) ~[?:?]
... 10 more
Anything else?
The issue arises after inclusion of this PR #23697. Without this PR, the functionality is working as expected.
Are you willing to submit a PR?
- I'm willing to submit a PR!