Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Issue with setting setAwaitDuration below 20 seconds in RocketMQ SimpleConsumer mode? #7986

Open
3 tasks done
CodingOX opened this issue Mar 29, 2024 · 4 comments
Open
3 tasks done

Comments

@CodingOX
Copy link

Before Creating the Bug Report

  • I found a bug, not just asking a question, which should be created in GitHub Discussions.

  • I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.

  • I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.

Runtime platform environment

Ubuntu 20.04

RocketMQ version

RocketMQ 5.2

JDK Version

JDK 17

Describe the Bug

Feedback for RocketMQ 5.2 version using SimpleConsumer mode:

I am encountering an issue when using the SimpleConsumer mode in RocketMQ version 5.2. Specifically, when I set the setAwaitDuration to a value less than 20 seconds, I receive the following error message:

Exception in thread "main" org.apache.rocketmq.client.java.exception.BadRequestException: [request-id=0bb55e31-e6ba-4539-8e23-296c6b35224f, response-code=40018] The deadline time remaining is not enough for polling, please check network condition

Here is the relevant Kotlin code snippet for reference:

val simpleConsumer = provider
    .newSimpleConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    .setConsumerGroup(consumerGroup)
    .setAwaitDuration(Duration.ofMillis(500)) // The issue occurs when this value is set below 20s
    .setSubscriptionExpressions(mapOf(topic to filterExpression))
    .build()

It seems that the error is related to the setAwaitDuration parameter needing to be greater than 20 seconds for proper functioning. However, I require more flexibility in setting this duration for my use case. Could you provide guidance on how to resolve this issue or suggest any workarounds that would allow me to set an AwaitDuration below 20 seconds without encountering this error?

Steps to Reproduce

Exception in thread "main" org.apache.rocketmq.client.java.exception.BadRequestException: [request-id=0bb55e31-e6ba-4539-8e23-296c6b35224f, response-code=40018] The deadline time remaining is not enough for polling, please check network condition
	at org.apache.rocketmq.client.java.exception.StatusChecker.check(StatusChecker.java:63)
	at org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl.lambda$receiveMessage$0(ConsumerImpl.java:114)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractTransformFuture$AsyncTransformFuture.doTransform(AbstractTransformFuture.java:221)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractTransformFuture$AsyncTransformFuture.doTransform(AbstractTransformFuture.java:208)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractTransformFuture.run(AbstractTransformFuture.java:122)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:783)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture.set(SettableFuture.java:49)
	at org.apache.rocketmq.client.java.rpc.RpcClientImpl$1.onCompleted(RpcClientImpl.java:168)
	at org.apache.rocketmq.shaded.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:485)
	at org.apache.rocketmq.shaded.io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at org.apache.rocketmq.shaded.io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at org.apache.rocketmq.shaded.io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:563)
	at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
	at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:744)
	at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
	at org.apache.rocketmq.shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at org.apache.rocketmq.shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

What Did You Expect to See?

Code can run well when setAwaitDuration paramter < 20 seconds

What Did You See Instead?

No

Additional Context

No response

@redlsz
Copy link
Contributor

redlsz commented Apr 1, 2024

This exception indicates the pollingTime (awaitDuration) set by client is too small.

There are two configs related to pollingtime in proxy, grpcClientConsumerMinLongPollingTimeoutMillis (default=5s) and grpcClientConsumerMaxLongPollingTimeoutMillis (default=20s). Set the pollingTime within this range would be ok.

Related proxy codes: https://github.com/apache/rocketmq/blob/develop/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java#L82

@CodingOX
Copy link
Author

CodingOX commented Apr 1, 2024

This exception indicates the pollingTime (awaitDuration) set by client is too small.

There are two configs related to pollingtime in proxy, grpcClientConsumerMinLongPollingTimeoutMillis (default=5s) and grpcClientConsumerMaxLongPollingTimeoutMillis (default=20s). Set the pollingTime within this range would be ok.

Related proxy codes: develop/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java#L82

thanks, I consulted the relevant documents and did not find this suggestion. The code comments also did not mention this problem. In order to better help subsequent users, would you consider improving the documentation?

@lizhimins
Copy link
Member

Can you describe what scenario requires about setting setAwaitDuration below 20 seconds.

@CodingOX
Copy link
Author

CodingOX commented Apr 7, 2024

Can you describe what scenario requires about setting setAwaitDuration below 20 seconds.

In my understanding, the awaitDuration parameter allows me to fetch messages more promptly. During periods of low business activity, when a particular message is highly important yet the number of such messages may fall short of the maxMessageNum threshold (which stems from the org.apache.rocketmq.client.apis.consumer.SimpleConsumer#receive(maxMessageNum, duration) call), this parameter ensures a swift return nonetheless.
Alternatively, I might need to fallback to assigning the macMessageNum variable with a value of 1, which could potentially be less efficient.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants