Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug] all consumer blocked on pq.mutex.Lock() #897

Closed
wangzeping722 opened this issue Aug 20, 2022 · 0 comments · Fixed by #898
Closed

[bug] all consumer blocked on pq.mutex.Lock() #897

wangzeping722 opened this issue Aug 20, 2022 · 0 comments · Fixed by #898

Comments

@wangzeping722
Copy link
Contributor

BUG REPORT

  1. Please describe the issue you observed:

    • What did you do (The steps to reproduce)?
      Client consumes message for broker. Sometime,
    • What did you expect to see?
      consumer keep consuming messages.
    • What did you see instead?
      all consumer goroutines block on consumer/process_queue.go:201 pq.mutex.Lock(), when updateProcessQueueTable is called and meet pq.isPullExpired() && dc.cType == _PushConsume, the channel——pq.closeChan will be closed, but the receiver doesn't release pq.mutex.

please look the deadlock of the following pprof log.
2. Please tell us about your environment:

 - What is your OS?

 - What is your client version?

 - What is your RocketMQ version?
  1. Other information (e.g. detailed explanation, logs, related issues, suggestions on how to fix, etc):
    pprof:
goroutine profile: total 73
20 @ 0x1045aeafc 0x1045aeb88 0x1045bf7dc 0x1045d9fa8 0x1045e6a50 0x1045e66d4 0x1045e8e7c 0x1049905f4 0x10499c5c4 0x104937f1c 0x1045de424
#	0x1045d9fa7	sync.runtime_SemacquireMutex+0x27									/Users/wero/.g/versions/1.18.3/src/runtime/sema.go:71
#	0x1045e6a4f	sync.(*Mutex).lockSlow+0x34f										/Users/wero/.g/versions/1.18.3/src/sync/mutex.go:162
#	0x1045e66d3	sync.(*Mutex).Lock+0xa3											/Users/wero/.g/versions/1.18.3/src/sync/mutex.go:81
#	0x1045e8e7b	sync.(*RWMutex).Lock+0x2b										/Users/wero/.g/versions/1.18.3/src/sync/rwmutex.go:139
#	0x1049905f3	github.com/apache/rocketmq-client-go/v2/consumer.(*processQueue).removeMessage+0x53			/Users/wero/workspace/go/rocketmq-client-go/consumer/process_queue.go:201
#	0x10499c5c3	github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).consumeMessageCurrently.func1+0xba3	/Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:1101
#	0x104937f1b	github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b					/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104

5 @ 0x1045aeafc 0x1045becfc 0x10499f920 0x104937f1c 0x1045de424
#	0x10499f91f	github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func4+0x15f	/Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:218
#	0x104937f1b	github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b			/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104

5 @ 0x1045aeafc 0x1045becfc 0x10499fb64 0x104937f1c 0x1045de424
#	0x10499fb63	github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func3+0x103	/Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:204
#	0x104937f1b	github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b			/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104

5 @ 0x1045aeafc 0x1045becfc 0x10499fda4 0x104937f1c 0x1045de424
#	0x10499fda3	github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func2+0x103	/Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:191
#	0x104937f1b	github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b			/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104

5 @ 0x1045aeafc 0x1045becfc 0x10499ffe4 0x104937f1c 0x1045de424
#	0x10499ffe3	github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func1+0x103	/Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:178
#	0x104937f1b	github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b			/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104

5 @ 0x1045aeafc 0x1045daee8 0x10499f308 0x104937f1c 0x1045de424
#	0x1045daee7	time.Sleep+0x117									/Users/wero/.g/versions/1.18.3/src/runtime/time.go:194
#	0x10499f307	github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func6+0x87	/Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:242
#	0x104937f1b	github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b			/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104

5 @ 0x1045aeafc 0x1045daee8 0x10499f5a8 0x104937f1c 0x1045de424
#	0x1045daee7	time.Sleep+0x117									/Users/wero/.g/versions/1.18.3/src/runtime/time.go:194
#	0x10499f5a7	github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func5+0x87	/Users/wero/workspace/go/rocketmq-client-go/consumer/statistics.go:228
#	0x104937f1b	github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b			/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104

3 @ 0x1045aeafc 0x10457a93c 0x10457a728 0x10499b810 0x104999af0 0x104937f1c 0x1045de424
#	0x10499b80f	github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).consumeMessageCurrently+0x1ff	/Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:1029
#	0x104999aef	github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).pullMessage.func1+0x15f	/Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:580
#	0x104937f1b	github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b				/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104

3 @ 0x1045aeafc 0x1045becfc 0x10498fae0 0x104999130 0x104993e64 0x1045de424
#	0x10498fadf	github.com/apache/rocketmq-client-go/v2/consumer.(*processQueue).putMessage+0x13f	/Users/wero/workspace/go/rocketmq-client-go/consumer/process_queue.go:105
#	0x10499912f	github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).pullMessage+0x23ef	/Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:813
#	0x104993e63	github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).Start.func1.1.1+0x33	/Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:164

2 @ 0x1045aeafc 0x1045a829c 0x1045d8b38 0x10464b0f8 0x10464b188 0x10464c39c 0x10472ffe4 0x1047417e0 0x104645684 0x104645830 0x104943518 0x1049431b4 0x104937f1c 0x1045de424
#	0x1045d8b37	internal/poll.runtime_pollWait+0x47								/Users/wero/.g/versions/1.18.3/src/runtime/netpoll.go:302
#	0x10464b0f7	internal/poll.(*pollDesc).wait+0x87								/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:83
#	0x10464b187	internal/poll.(*pollDesc).waitRead+0x37								/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:88
#	0x10464c39b	internal/poll.(*FD).Read+0x33b									/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_unix.go:167
#	0x10472ffe3	net.(*netFD).Read+0x53										/Users/wero/.g/versions/1.18.3/src/net/fd_posix.go:55
#	0x1047417df	net.(*conn).Read+0x6f										/Users/wero/.g/versions/1.18.3/src/net/net.go:183
#	0x104645683	io.ReadAtLeast+0x133										/Users/wero/.g/versions/1.18.3/src/io/io.go:331
#	0x10464582f	io.ReadFull+0x5f										/Users/wero/.g/versions/1.18.3/src/io/io.go:350
#	0x104943517	github.com/apache/rocketmq-client-go/v2/internal/remote.(*remotingClient).receiveResponse+0x2d7	/Users/wero/workspace/go/rocketmq-client-go/internal/remote/remote_client.go:195
#	0x1049431b3	github.com/apache/rocketmq-client-go/v2/internal/remote.(*remotingClient).connect.func1+0x33	/Users/wero/workspace/go/rocketmq-client-go/internal/remote/remote_client.go:165
#	0x104937f1b	github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b				/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104

2 @ 0x1045aeafc 0x1045aeb88 0x1045bf7dc 0x1045d9fa8 0x1045e8c48 0x104991e40 0x10499570c 0x1045e6174 0x104995008 0x104962950 0x10495b92c 0x104943d28 0x104937f1c 0x1045de424
#	0x1045d9fa7	sync.runtime_SemacquireMutex+0x27									/Users/wero/.g/versions/1.18.3/src/runtime/sema.go:71
#	0x1045e8c47	sync.(*RWMutex).RLock+0x97										/Users/wero/.g/versions/1.18.3/src/sync/rwmutex.go:63
#	0x104991e3f	github.com/apache/rocketmq-client-go/v2/consumer.(*processQueue).currentInfo+0x4f			/Users/wero/workspace/go/rocketmq-client-go/consumer/process_queue.go:400
#	0x10499570b	github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).GetConsumerRunningInfo.func2+0x10b	/Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:414
#	0x1045e6173	sync.(*Map).Range+0x2a3											/Users/wero/.g/versions/1.18.3/src/sync/map.go:347
#	0x104995007	github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).GetConsumerRunningInfo+0xe7		/Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:411
#	0x10496294f	github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).getConsumerRunningInfo+0xcf		/Users/wero/workspace/go/rocketmq-client-go/internal/client.go:911
#	0x10495b92b	github.com/apache/rocketmq-client-go/v2/internal.GetOrNewRocketMQClient.func3+0x18b			/Users/wero/workspace/go/rocketmq-client-go/internal/client.go:277
#	0x104943d27	github.com/apache/rocketmq-client-go/v2/internal/remote.(*remotingClient).processCMD.func2+0x87		/Users/wero/workspace/go/rocketmq-client-go/internal/remote/remote_client.go:244
#	0x104937f1b	github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b					/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104

1 @ 0x1045aeafc 0x10457b308 0x10457b064 0x1049c6aac 0x1045ae714 0x1045de424
#	0x1049c6aab	main.main+0x33b		/Users/wero/workspace/go/rocketmq-client-go/examples/consumer/simple/main.go:59
#	0x1045ae713	runtime.main+0x223	/Users/wero/.g/versions/1.18.3/src/runtime/proc.go:250

1 @ 0x1045aeafc 0x1045a829c 0x1045d8b38 0x10464b0f8 0x10464b188 0x10464c39c 0x10472ffe4 0x1047417e0 0x1048faf14 0x1045de424
#	0x1045d8b37	internal/poll.runtime_pollWait+0x47		/Users/wero/.g/versions/1.18.3/src/runtime/netpoll.go:302
#	0x10464b0f7	internal/poll.(*pollDesc).wait+0x87		/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:83
#	0x10464b187	internal/poll.(*pollDesc).waitRead+0x37		/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:88
#	0x10464c39b	internal/poll.(*FD).Read+0x33b			/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_unix.go:167
#	0x10472ffe3	net.(*netFD).Read+0x53				/Users/wero/.g/versions/1.18.3/src/net/fd_posix.go:55
#	0x1047417df	net.(*conn).Read+0x6f				/Users/wero/.g/versions/1.18.3/src/net/net.go:183
#	0x1048faf13	net/http.(*connReader).backgroundRead+0x73	/Users/wero/.g/versions/1.18.3/src/net/http/server.go:672

1 @ 0x1045aeafc 0x1045a829c 0x1045d8b38 0x10464b0f8 0x10464b188 0x10464c39c 0x10472ffe4 0x1047417e0 0x1048fb6dc 0x104779234 0x10477aa04 0x10477aa98 0x10487fa2c 0x10487f92c 0x1048f55a0 0x1048fcab4 0x104902c80 0x1045de424
#	0x1045d8b37	internal/poll.runtime_pollWait+0x47		/Users/wero/.g/versions/1.18.3/src/runtime/netpoll.go:302
#	0x10464b0f7	internal/poll.(*pollDesc).wait+0x87		/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:83
#	0x10464b187	internal/poll.(*pollDesc).waitRead+0x37		/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:88
#	0x10464c39b	internal/poll.(*FD).Read+0x33b			/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_unix.go:167
#	0x10472ffe3	net.(*netFD).Read+0x53				/Users/wero/.g/versions/1.18.3/src/net/fd_posix.go:55
#	0x1047417df	net.(*conn).Read+0x6f				/Users/wero/.g/versions/1.18.3/src/net/net.go:183
#	0x1048fb6db	net/http.(*connReader).Read+0x1fb		/Users/wero/.g/versions/1.18.3/src/net/http/server.go:780
#	0x104779233	bufio.(*Reader).fill+0x233			/Users/wero/.g/versions/1.18.3/src/bufio/bufio.go:106
#	0x10477aa03	bufio.(*Reader).ReadSlice+0x353			/Users/wero/.g/versions/1.18.3/src/bufio/bufio.go:371
#	0x10477aa97	bufio.(*Reader).ReadLine+0x47			/Users/wero/.g/versions/1.18.3/src/bufio/bufio.go:400
#	0x10487fa2b	net/textproto.(*Reader).readLineSlice+0x6b	/Users/wero/.g/versions/1.18.3/src/net/textproto/reader.go:57
#	0x10487f92b	net/textproto.(*Reader).ReadLine+0x3b		/Users/wero/.g/versions/1.18.3/src/net/textproto/reader.go:38
#	0x1048f559f	net/http.readRequest+0x5f			/Users/wero/.g/versions/1.18.3/src/net/http/request.go:1029
#	0x1048fcab3	net/http.(*conn).readRequest+0x393		/Users/wero/.g/versions/1.18.3/src/net/http/server.go:988
#	0x104902c7f	net/http.(*conn).serve+0xa6f			/Users/wero/.g/versions/1.18.3/src/net/http/server.go:1891

1 @ 0x1045aeafc 0x1045a829c 0x1045d8b38 0x10464b0f8 0x10464b188 0x10464cf6c 0x10473157c 0x10474b644 0x104749ef0 0x10490831c 0x104907e44 0x1049090ec 0x1045de424
#	0x1045d8b37	internal/poll.runtime_pollWait+0x47	/Users/wero/.g/versions/1.18.3/src/runtime/netpoll.go:302
#	0x10464b0f7	internal/poll.(*pollDesc).wait+0x87	/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:83
#	0x10464b187	internal/poll.(*pollDesc).waitRead+0x37	/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_poll_runtime.go:88
#	0x10464cf6b	internal/poll.(*FD).Accept+0x32b	/Users/wero/.g/versions/1.18.3/src/internal/poll/fd_unix.go:614
#	0x10473157b	net.(*netFD).accept+0x4b		/Users/wero/.g/versions/1.18.3/src/net/fd_unix.go:172
#	0x10474b643	net.(*TCPListener).accept+0x43		/Users/wero/.g/versions/1.18.3/src/net/tcpsock_posix.go:139
#	0x104749eef	net.(*TCPListener).Accept+0x4f		/Users/wero/.g/versions/1.18.3/src/net/tcpsock.go:288
#	0x10490831b	net/http.(*Server).Serve+0x3eb		/Users/wero/.g/versions/1.18.3/src/net/http/server.go:3039
#	0x104907e43	net/http.(*Server).ListenAndServe+0x123	/Users/wero/.g/versions/1.18.3/src/net/http/server.go:2968
#	0x1049090eb	net/http.ListenAndServe+0xfb		/Users/wero/.g/versions/1.18.3/src/net/http/server.go:3222

1 @ 0x1045aeafc 0x1045becfc 0x104958b24 0x1045de424
#	0x104958b23	github.com/patrickmn/go-cache.(*janitor).Run+0xa3	/Users/wero/go/pkg/mod/github.com/patrickmn/go-cache@v2.1.0+incompatible/cache.go:1079

1 @ 0x1045aeafc 0x1045becfc 0x10495e3b4 0x104937f1c 0x1045de424
#	0x10495e3b3	github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).Start.func1.5+0x103	/Users/wero/workspace/go/rocketmq-client-go/internal/client.go:498
#	0x104937f1b	github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b			/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104

1 @ 0x1045aeafc 0x1045becfc 0x10495e6dc 0x104937f1c 0x1045de424
#	0x10495e6db	github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).Start.func1.4+0x14b	/Users/wero/workspace/go/rocketmq-client-go/internal/client.go:482
#	0x104937f1b	github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b			/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104

1 @ 0x1045aeafc 0x1045becfc 0x10495ea68 0x104937f1c 0x1045de424
#	0x10495ea67	github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).Start.func1.3+0x147	/Users/wero/workspace/go/rocketmq-client-go/internal/client.go:450
#	0x104937f1b	github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b			/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104

1 @ 0x1045aeafc 0x1045becfc 0x10495edf8 0x104937f1c 0x1045de424
#	0x10495edf7	github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).Start.func1.2+0x147	/Users/wero/workspace/go/rocketmq-client-go/internal/client.go:426
#	0x104937f1b	github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b			/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104

1 @ 0x1045aeafc 0x1045becfc 0x10495f16c 0x104937f1c 0x1045de424
#	0x10495f16b	github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).Start.func1.1+0x14b	/Users/wero/workspace/go/rocketmq-client-go/internal/client.go:402
#	0x104937f1b	github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b			/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104

1 @ 0x1045aeafc 0x1045becfc 0x104993c44 0x1045de424
#	0x104993c43	github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).Start.func1.1+0xb3	/Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:161

1 @ 0x1045aeafc 0x1045daee8 0x104993920 0x104937f1c 0x1045de424
#	0x1045daee7	time.Sleep+0x117									/Users/wero/.g/versions/1.18.3/src/runtime/time.go:194
#	0x10499391f	github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).Start.func1.2+0x6f	/Users/wero/workspace/go/rocketmq-client-go/consumer/push_consumer.go:179
#	0x104937f1b	github.com/apache/rocketmq-client-go/v2/primitive.WithRecover+0x4b			/Users/wero/workspace/go/rocketmq-client-go/primitive/base.go:104

1 @ 0x1045d8778 0x10497c704 0x10497c41c 0x104978150 0x1049c425c 0x1049c4d68 0x104904348 0x104906594 0x104907bd4 0x104903720 0x1045de424
#	0x1045d8777	runtime/pprof.runtime_goroutineProfileWithLabels+0x27	/Users/wero/.g/versions/1.18.3/src/runtime/mprof.go:753
#	0x10497c703	runtime/pprof.writeRuntimeProfile+0x113			/Users/wero/.g/versions/1.18.3/src/runtime/pprof/pprof.go:725
#	0x10497c41b	runtime/pprof.writeGoroutine+0x8b			/Users/wero/.g/versions/1.18.3/src/runtime/pprof/pprof.go:685
#	0x10497814f	runtime/pprof.(*Profile).WriteTo+0x7f			/Users/wero/.g/versions/1.18.3/src/runtime/pprof/pprof.go:332
#	0x1049c425b	net/http/pprof.handler.ServeHTTP+0x38b			/Users/wero/.g/versions/1.18.3/src/net/http/pprof/pprof.go:253
#	0x1049c4d67	net/http/pprof.Index+0xb7				/Users/wero/.g/versions/1.18.3/src/net/http/pprof/pprof.go:371
#	0x104904347	net/http.HandlerFunc.ServeHTTP+0x47			/Users/wero/.g/versions/1.18.3/src/net/http/server.go:2084
#	0x104906593	net/http.(*ServeMux).ServeHTTP+0x133			/Users/wero/.g/versions/1.18.3/src/net/http/server.go:2462
#	0x104907bd3	net/http.serverHandler.ServeHTTP+0x423			/Users/wero/.g/versions/1.18.3/src/net/http/server.go:2916
#	0x10490371f	net/http.(*conn).serve+0x150f				/Users/wero/.g/versions/1.18.3/src/net/http/server.go:1966

there should Unlock

REPRODUCE
Run the consumer, then run the producer, and after a while there will be a deadlock for the following reasons.

producer:

package main

import (
	"context"
	"fmt"
	"os"
	"strconv"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
)

// Package main implements a simple producer to send message.
func main() {
	p, _ := rocketmq.NewProducer(
		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
		producer.WithRetry(2),
	)
	err := p.Start()
	if err != nil {
		fmt.Printf("start producer error: %s", err.Error())
		os.Exit(1)
	}
	topic := "test"

	for i := 0; i < 100000; i++ {
		msg := &primitive.Message{
			Topic: topic,
			Body:  []byte("H" + strconv.Itoa(i)),
		}
		res, err := p.SendSync(context.Background(), msg)

		if err != nil {
			fmt.Printf("send message error: %s\n", err)
		} else {
			fmt.Printf("send message success: result=%s\n", res.String())
		}
	}
	err = p.Shutdown()
	if err != nil {
		fmt.Printf("shutdown producer error: %s", err.Error())
	}
}

consumer, If it doesn't reproduce, we can run it a few more times:

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"net/http"
	_ "net/http/pprof"
	"os"
)

func main() {
	sig := make(chan os.Signal)
	c, _ := rocketmq.NewPushConsumer(
		consumer.WithGroupName("testGroup"),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
		consumer.WithConsumeMessageBatchMaxSize(5),
	)
	go http.ListenAndServe("0.0.0.0:6060", nil)
	err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for i := range msgs {
			fmt.Printf("subscribe callback: %v \n", msgs[i].MsgId)
		}
		return consumer.ConsumeRetryLater, nil
	})
	if err != nil {
		fmt.Println(err.Error())
	}
	// Note: start after subscribe
	err = c.Start()
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}
	<-sig
	err = c.Shutdown()
	if err != nil {
		fmt.Printf("shutdown Consumer error: %s", err.Error())
	}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant