-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Topic creation not working properly #22527
Comments
@ragaur-tibco by default Pulsar deletes inactive topics after some timeout. Probably it's the reason. https://pulsar.apache.org/docs/next/admin-api-topics/#create-1 You can try to override this setting on the namespace level, re-create the topics, and check if the issue still exists.
|
Probably my answer doesn't describe the root cause of the issue, but this is one of the first "WTF?" all users encounter while learning Pulsar that could be easily avoided by changing defaults. |
Thanks you @visortelle let me try this will update the result here There is one more question I would like to ask: If we send messages from the producer with new topics, then the new topics should be created; however, we receive a list of topics in namespace that is zero |
The metadata operations in Pulsar aren't strongly consistent in all cases. For example, there isn't a guarantee of "read your writes" consistency. If you create a topic and immediately query it, it's possible that it's not visible in the returned list of topics. There are multiple reported issues in this area and this comment is a good summary: #12555 (comment) . Some problems have been addressed since that comment with #18518, but there's a possibility that the "read your writes" guarantee isn't yet achieved in all cases. |
Please share textual information in a code block instead of a screenshot, it's easier to read it.
these are simply a result of an invalid command (should be |
@ragaur-tibco By default, the messages will get automatically deleted unless you first create consumers with subscriptions. You can set retention for the namespace level if you'd like to keep the messages produced to a topic without subscriptions. |
Hi @lhotari @visortelle and created multiple topics in the namespace list are showing all the created topics but in logs while running the consumer first and provided the regex pattern in topics
getting below lines from client SDK logs where it says number of topics = 0
|
Interesting. I just tried and the regex subscription worked Codeval consumer = pulsarClient.newConsumer()
.topicsPattern("new-tenant/new-namespace/.*")
.subscriptionName("new-subscription")
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.messageListener(new MessageListener[Array[Byte]] {
override def received(consumer: Consumer[Array[Byte]], msg: Message[Array[Byte]]): Unit = {
println(s"Received: ${msg.getValue.mkString(",")}. From topic: ${msg.getTopicName}")
consumer.acknowledge(msg)
}
})
.subscribe()
val topics = Vector(
"persistent://new-tenant/new-namespace/topic-a",
"persistent://new-tenant/new-namespace/topic-b",
"persistent://new-tenant/new-namespace/topic-c",
)
topics.foreach(topic =>
val producer = pulsarClient.newProducer.topic(topic).create()
val from = 4096L
val to = from + 10
for (i <- from until to)
producer.send(scala.math.BigInt(i).toByteArray)
producer.flush()
producer.close()
) Logs
|
New observations:
Topic list:
Probably related: |
Hi @visortelle |
@ragaur-tibco I see.
.patternAutoDiscoveryPeriod(1) Ah, ok. It is implemented with polling. Edit: it use notifications too according to this article https://streamnative.io/blog/improving-regular-expression-based-subscriptions-pulsar-consumers |
I tried below configuration |
You probably mean |
Sorry @visortelle |
@ragaur-tibco I'm also unable to receive messages from non-persistent topics for some reason. Code// Cleanup
pulsarAdmin.topics.getList("new-tenant/new-namespace").asScala
.foreach(pulsarAdmin.topics.delete(_))
val consumer = pulsarClient.newConsumer()
.topicsPattern("new-tenant/new-namespace/.*".r.pattern)
.subscriptionName("new-subscription")
.patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS)
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.messageListener(new MessageListener[Array[Byte]] {
override def received(consumer: org.apache.pulsar.client.api.Consumer[Array[Byte]], msg: org.apache.pulsar.client.api.Message[Array[Byte]]): Unit = {
println(s"Received: ${msg.getValue.mkString(",")}. From topic: ${msg.getTopicName}")
consumer.acknowledge(msg)
}
})
.subscribe()
val topics = Vector(
s"persistent://new-tenant/new-namespace/topic-a",
s"persistent://new-tenant/new-namespace/topic-b",
s"non-persistent://new-tenant/new-namespace/topic-c",
s"non-persistent://new-tenant/new-namespace/topic-d",
)
topics.foreach(topic =>
val producer = pulsarClient.newProducer.topic(topic).create()
val from = 4096L
val to = from + 10
for (i <- from until to)
producer.send(scala.math.BigInt(i).toByteArray)
producer.flush()
producer.close()
)
println(s"Sleep started at ${java.time.LocalTime.now()}")
Thread.sleep(10 * 1000)
println(s"Sleep finished at ${java.time.LocalTime.now()}")
// All the topics should be discovered at this moment
topics.foreach(topic =>
val producer = pulsarClient.newProducer.topic(topic).create()
val from = 4096L
val to = from + 10
for (i <- from until to)
producer.send(scala.math.BigInt(i).toByteArray)
producer.flush()
producer.close()
) Logs
|
@visortelle I tried multiple times but I was not able to receive any messages from non-persistent topics even if I select |
It may be hard to catch, but it sometimes works. This time I ran 20+ times to observe a subscription and messages from the non-persistent Added logs for subscriptions per topic. CodepulsarAdmin.topics.getList("new-tenant/new-namespace").asScala
.foreach(pulsarAdmin.topics.delete(_))
val consumer = pulsarClient.newConsumer()
.topicsPattern("new-tenant/new-namespace/.*".r.pattern)
.subscriptionName("new-subscription")
.patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS)
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.messageListener(new MessageListener[Array[Byte]] {
override def received(consumer: org.apache.pulsar.client.api.Consumer[Array[Byte]], msg: org.apache.pulsar.client.api.Message[Array[Byte]]): Unit = {
println(s"Received: ${msg.getValue.mkString(",")}. From topic: ${msg.getTopicName}")
consumer.acknowledge(msg)
}
})
.subscribe()
val topics = Vector(
s"persistent://new-tenant/new-namespace/topic-a",
s"persistent://new-tenant/new-namespace/topic-b",
s"non-persistent://new-tenant/new-namespace/topic-c",
s"non-persistent://new-tenant/new-namespace/topic-d",
)
topics.foreach(topic =>
val producer = pulsarClient.newProducer.topic(topic).create()
val from = 4096L
val to = from + 10
for (i <- from until to)
producer.send(scala.math.BigInt(i).toByteArray)
producer.flush()
producer.close()
)
println(s"Sleep started at ${java.time.LocalTime.now()}")
Thread.sleep(10 * 1000)
println(s"Sleep finished at ${java.time.LocalTime.now()}")
// All the topics should be discovered at this moment
def logSubscriptions =
topics.foreach(topic =>
val subscriptions = pulsarAdmin.topics.getSubscriptions(topic).asScala
println(s"Subscriptions for topic $topic: $subscriptions")
)
logSubscriptions
topics.foreach(topic =>
val producer = pulsarClient.newProducer.topic(topic).create()
val from = 4096L
val to = from + 10
for (i <- from until to)
producer.send(scala.math.BigInt(i).toByteArray)
producer.flush()
producer.close()
)
logSubscriptions
println(s"Sleep started at ${java.time.LocalTime.now()}")
Thread.sleep(10 * 5)
println(s"Sleep finished at ${java.time.LocalTime.now()}")
logSubscriptions Logs
|
means it is not working properly with non-persistent topics @visortelle |
@ragaur-tibco it seems we both were wrong here and it works as expected. I rewrote it in Scala ZIO which is simpler to understand for me. It seems like the consumer simply doesn't have enough time to update the list of topics. By the way, the undocumented casting to Let me know if something isn't clear. Corrected code: topics = Vector(
"persistent://new-tenant/new-namespace/topic-a",
"persistent://new-tenant/new-namespace/topic-b",
"non-persistent://new-tenant/new-namespace/topic-c",
"non-persistent://new-tenant/new-namespace/topic-d"
)
numMessagesPerTopic = 10
// Thread-safe counter
numMessagesReceivedRef <- Ref.make(0)
_ <- ZIO.attempt {
// Cleanup
pulsarAdmin.topics.getList("new-tenant/new-namespace").asScala
.foreach(pulsarAdmin.topics.delete(_, true))
pulsarAdmin.topics.getPartitionedTopicList("new-tenant/new-namespace").asScala
.foreach(pulsarAdmin.topics.deletePartitionedTopic(_, true))
}
// Create a consumer
consumer <- ZIO.attempt {
pulsarClient.newConsumer()
.topicsPattern("new-tenant/new-namespace/.*".r.pattern)
.subscriptionName("new-subscription")
.patternAutoDiscoveryPeriod(100, TimeUnit.MILLISECONDS)
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.subscribe()
}
// Consume messages in background
consumeInBackgroundFib <- (for {
isMessageReceived <- ZIO.attempt {
Option(consumer.receive(1, TimeUnit.SECONDS)) match
case None => false
case Some(msg) =>
println(s"Received: ${msg.getValue.mkString(",")}. From topic: ${msg.getTopicName}")
consumer.acknowledge(msg.getMessageId)
true
}
_ <- numMessagesReceivedRef.update(_ + 1).when(isMessageReceived)
} yield ())
.forever // like `while true`
.fork // Run in background
// Create a producer for each topic
producers <- ZIO.attempt {
topics.map(topic => pulsarClient.newProducer.topic(topic).create())
}
// Wait for the pattern consumer to create the right number of consumers under the hood
_ <- ZIO.attempt {
// Cast consumer to PatternMultiTopicsConsumerImpl
// that has extra pattern-related methods
val numConsumers = consumer
.asInstanceOf[PatternMultiTopicsConsumerImpl[Array[Byte]]]
.getConsumers
.size
if numConsumers != topics.size
then throw new Exception(s"Expected $topics.size consumers, but got $numConsumers")
}
.retry(Schedule.exponential(10.millis))
.timeoutFail(new Exception("Consumers weren't created in time"))(10.seconds)
// Produce messages
_ <- ZIO.attempt {
for (i <- 0 until numMessagesPerTopic)
producers.foreach(producer => producer.sendAsync(Array(i.toByte)))
}
// Wait for all messages are be received
_ <- (for {
numMessagesReceived <- numMessagesReceivedRef.get
_ <- ZIO.attempt {
if numMessagesReceived != topics.size * numMessagesPerTopic
then throw new Exception(s"Expected ${topics.size * numMessagesPerTopic} messages, but got $numMessagesReceived")
}
} yield ())
.retry(Schedule.exponential(10.millis))
.timeoutFail(new Exception("Messages weren't received in time"))(10.seconds)
numMessagesReceived <- numMessagesReceivedRef.get
_ <- ZIO.logInfo(s"Messages received: $numMessagesReceived")
_ <- consumeInBackgroundFib.join Logs:
|
The auto-topic creation question was resolved. There is another issue for the regex consumer + non-persistent topics question: #22529 Closing. |
Search before asking
Read release policy
Version
3.2.2
Minimal reproduce step
Below are the steps to reproduce:
bin/pulsar-admin tenants create my-tenant
bin/pulsar-admin topics list my-tenant/new-name
What did you expect to see?
All the topics which we have created should be present in the namespace
What did you see instead?
Sometimes after running the command to list the topics only one topic is showing or sometime two topics and sometimes 0 topics
And while running the pulsar client to send the messages from the producer and consume in consumer side below logs from pulsar client SDK
NOTE: we were trying to use the multi-topic subscription and using the ALLTopics and sending the messages to all persistent and non-persistent topics
Anything else?
No response
Are you willing to submit a PR?
The text was updated successfully, but these errors were encountered: