Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LoadBalancedRSocket引发的ConcurrentModificationException #247

Closed
hupeiD opened this issue Mar 25, 2024 · 1 comment
Closed

LoadBalancedRSocket引发的ConcurrentModificationException #247

hupeiD opened this issue Mar 25, 2024 · 1 comment

Comments

@hupeiD
Copy link

hupeiD commented Mar 25, 2024

Describe the bug
程序在运行过程中,与Broker集群断开连接后,每一次发出请求都报ConcurrentModificationException。

Environment

  • Alibaba RSocket Broker version: 1.1.5
  • Operating System version: Linux
  • Java version: 11.0.17

Steps to reproduce this issue

  1. 在不知名的原因下,客户端与服务端断连
  2. 断连后每一次请求,都触发了com.alibaba.rsocket.loadbalance.LoadBalancedRSocket.onRSocketClosed(RSocket, Throwable)
  3. 然后抛出了ConcurrentModificationException
  4. 并且一直重连失败

Expected Result

当发出请求后,根据LoadBalancedRSocket.CONNECTION_ERROR_PREDICATE捕获到了异常之后,能安全的关闭对应的连接,并在接下来重连成功。

Actual Result

持续触发ConcurrentModificationException,并无法重连

2024-03-22 06:31:44.988 -ERROR 1 --- [     parallel-4] c.a.r.loadbalance.LoadBalancedRSocket    : RST-500407: RSocket close by peer: tcp://172.23.64.104:9999
2024-03-22 06:31:44.989 -ERROR 1 --- [     parallel-4] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.ConcurrentModificationException
Caused by: java.util.ConcurrentModificationException: null
	at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1511)
	at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1544)
	at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1542)
	at com.alibaba.rsocket.loadbalance.LoadBalancedRSocket.onRSocketClosed(LoadBalancedRSocket.java:314)
	at com.alibaba.rsocket.loadbalance.LoadBalancedRSocket.lambda$requestResponse$8(LoadBalancedRSocket.java:201)
	at reactor.core.publisher.Mono.lambda$onErrorResume$32(Mono.java:3887)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
	at io.rsocket.core.RequestResponseRequesterMono.sendFirstPayload(RequestResponseRequesterMono.java:190)
	at io.rsocket.core.RequestResponseRequesterMono.request(RequestResponseRequesterMono.java:152)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2159)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:74)
	at io.rsocket.core.RequestResponseRequesterMono.subscribe(RequestResponseRequesterMono.java:130)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:426)
	at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:125)
	at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
	at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
	Suppressed: java.nio.channels.ClosedChannelException: null

而且我注意到了这里的activeSockets使用的HashMap而非ConcurrentHashMap

    public void onRSocketClosed(RSocket rsocket, @Nullable Throwable cause) {
        for (Map.Entry<String, RSocket> entry : activeSockets.entrySet()) {
            if (entry.getValue() == rsocket) {
                onRSocketClosed(entry.getKey(), entry.getValue(), null);
            }
        }
        if (!rsocket.isDisposed()) {
            try {
                rsocket.dispose();
            } catch (Exception ignore) {

            }
        }
    }
@hupeiD
Copy link
Author

hupeiD commented Mar 25, 2024

更正一下,并非一直重连不上,而是重连上了还是一直断连

@hupeiD hupeiD closed this as completed Mar 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant