-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Describe the bug
While consumer subscribing on topic, it failed with TimeOut error.

The log above is using python client 2.7.4 to connect Pulsar.
At the mean time, the broker shows the error log java.lang.IllegalStateException: Recursion depth became negative: -1

To Reproduce
No clear way to reproduce yet.
Expected behavior
The consumer should subscribe successfully.
Desktop (please complete the following information):
- OS: Linux
- Pulsar broker version: 2.8.3
- Pulsar client version: 2.7.4
Additional context
I read the python and C++ code about the behavior of pulsar client, the client has already connected to the broker, but do not get the result of newSubscribe cmd, which results in the TimeOut error.
It seems that broker get the java.lang.IllegalStateException: Recursion depth became negative: -1 error while the logger print some log.
Let's see the broker server log screenshot, it is caused by org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$16(ServerCnx.java:961), just is the code printing the log.
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Lines 954 to 968 in ee87c7d
| isAuthorizedFuture.thenApply(isAuthorized -> { | |
| if (isAuthorized) { | |
| if (log.isDebugEnabled()) { | |
| log.debug("[{}] Client is authorized to subscribe with role {}", | |
| remoteAddress, getPrincipal()); | |
| } | |
| log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName); | |
| try { | |
| Metadata.validateMetadata(metadata); | |
| } catch (IllegalArgumentException iae) { | |
| final String msg = iae.getMessage(); | |
| commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg); | |
| return null; | |
| } |
The log4j2 config file log4j2.yaml:
Configuration:
status: INFO
monitorInterval: 30
name: pulsar
packages: io.prometheus.client.log4j2
Properties:
Property:
- name: "pulsar.log.dir"
value: "logs"
- name: "pulsar.log.file"
value: "pulsar.log"
- name: "pulsar.log.appender"
value: "RoutingAppender"
- name: "pulsar.log.root.level"
value: "info"
- name: "pulsar.log.level"
value: "info"
- name: "pulsar.routing.appender.default"
value: "Console"
Appenders:
# Console
Console:
name: Console
target: SYSTEM_OUT
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
# Rolling file appender configuration
RollingFile:
name: RollingFile
fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}"
filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
immediateFlush: false
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Policies:
TimeBasedTriggeringPolicy:
interval: 1
modulate: true
SizeBasedTriggeringPolicy:
size: 1 GB
# Delete file older than 30days
DefaultRolloverStrategy:
Delete:
basePath: ${sys:pulsar.log.dir}
maxDepth: 2
IfFileName:
glob: "*/${sys:pulsar.log.file}*log.gz"
IfLastModified:
age: 30d
Prometheus:
name: Prometheus
# Routing
Routing:
name: RoutingAppender
Routes:
pattern: "$${ctx:function}"
Route:
-
Routing:
name: InstanceRoutingAppender
Routes:
pattern: "$${ctx:instance}"
Route:
-
RollingFile:
name: "Rolling-${ctx:function}"
fileName : "${sys:pulsar.log.dir}/functions/${ctx:function}/${ctx:functionname}-${ctx:instance}.log"
filePattern : "${sys:pulsar.log.dir}/functions/${sys:pulsar.log.file}-${ctx:instance}-%d{MM-dd-yyyy}-%i.log.gz"
PatternLayout:
Pattern: "%d{ABSOLUTE} %level{length=5} [%thread] [instance: %X{instance}] %logger{1} - %msg%n"
Policies:
TimeBasedTriggeringPolicy:
interval: 1
modulate: true
SizeBasedTriggeringPolicy:
size: "20MB"
# Trigger every day at midnight that also scan
# roll-over strategy that deletes older file
CronTriggeringPolicy:
schedule: "0 0 0 * * ?"
# Delete file older than 30days
DefaultRolloverStrategy:
Delete:
basePath: ${sys:pulsar.log.dir}
maxDepth: 2
IfFileName:
glob: "*/${sys:pulsar.log.file}*log.gz"
IfLastModified:
age: 30d
- ref: "${sys:pulsar.routing.appender.default}"
key: "${ctx:function}"
- ref: "${sys:pulsar.routing.appender.default}"
key: "${ctx:function}"
Loggers:
# Default root logger configuration
Root:
level: "${sys:pulsar.log.root.level}"
additivity: true
AppenderRef:
- ref: "${sys:pulsar.log.appender}"
- ref: Prometheus
level: info
Logger:
- name: org.apache.bookkeeper.bookie.BookieShell
level: info
additivity: false
AppenderRef:
- ref: Console
- name: verbose
level: info
additivity: false
AppenderRef:
- ref: ConsoleSome other cases

caused by org.apache.pulsar.broker.service.ServerCnx.handleCloseConsumer(ServerCnx.java:1628)
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Lines 1625 to 1638 in ee87c7d
| @Override | |
| protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) { | |
| checkArgument(state == State.Connected); | |
| log.info("[{}] Closing consumer: consumerId={}", remoteAddress, closeConsumer.getConsumerId()); | |
| long requestId = closeConsumer.getRequestId(); | |
| long consumerId = closeConsumer.getConsumerId(); | |
| CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId); | |
| if (consumerFuture == null) { | |
| log.info("[{}] Consumer was not registered on the connection: {}", consumerId, remoteAddress); | |
| ctx.writeAndFlush(Commands.newSuccess(requestId)); | |
| return; | |
| } |

caused by org.apache.pulsar.broker.service.ServerCnx.lambda$handleProducer$25(ServerCnx.java:1201)
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Lines 1157 to 1202 in ee87c7d
| isAuthorizedFuture.thenApply(isAuthorized -> { | |
| if (isAuthorized) { | |
| if (log.isDebugEnabled()) { | |
| log.debug("[{}] Client is authorized to Produce with role {}", | |
| remoteAddress, getPrincipal()); | |
| } | |
| CompletableFuture<Producer> producerFuture = new CompletableFuture<>(); | |
| CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId, | |
| producerFuture); | |
| if (existingProducerFuture != null) { | |
| if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) { | |
| Producer producer = existingProducerFuture.getNow(null); | |
| log.info("[{}] Producer with the same id is already created:" | |
| + " producerId={}, producer={}", remoteAddress, producerId, producer); | |
| commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(), | |
| producer.getSchemaVersion()); | |
| return null; | |
| } else { | |
| // There was an early request to create a producer with | |
| // same producerId. This can happen when | |
| // client | |
| // timeout is lower the broker timeouts. We need to wait | |
| // until the previous producer creation | |
| // request | |
| // either complete or fails. | |
| ServerError error = null; | |
| if (!existingProducerFuture.isDone()) { | |
| error = ServerError.ServiceNotReady; | |
| } else { | |
| error = getErrorCode(existingProducerFuture); | |
| // remove producer with producerId as it's already completed with exception | |
| producers.remove(producerId, existingProducerFuture); | |
| } | |
| log.warn("[{}][{}] Producer with id is already present on the connection," | |
| + " producerId={}", remoteAddress, topicName, producerId); | |
| commandSender.sendErrorResponse(requestId, error, | |
| "Producer is already present on the connection"); | |
| return null; | |
| } | |
| } | |
| log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId); | |