Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP EthPeers - remove if disconnected #4100

Closed
wants to merge 5 commits into from

Conversation

macfarla
Copy link
Contributor

@macfarla macfarla commented Jul 14, 2022

Testing a theory with EthPeer disconnects. From looking at logs, there's a spot in the code where it seems that duplicate connection is detected and disconnected, while the EthPeer is being created and added, so that by the time it is added, it's already disconnected.

This may not be the most elegant solution but I'm running it up and it seems to work but does create an issue with the callbacks - but I think the callback issue might be there anyway.

Documentation

  • I thought about documentation and added the doc-change-required label to this PR if
    updates are required.

Changelog

@macfarla
Copy link
Contributor Author

As suspected, this has caused an issue with callbacks

{"@timestamp":"2022-07-20T06:58:28,211","level":"ERROR","thread":"nioEventLoopGroup-3-5","class":"Subscribers","message":"Error in callback: ","throwable":" org.hyperledger.besu.ethereum.p2p.rlpx.connections.RlpxConnection$ConnectionNotEstablishedException: Cannot access PeerConnection before connection is fully established.\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.RlpxConnection$LocallyInitiatedRlpxConnection.getPeerConnection(RlpxConnection.java:181)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.RlpxConnection$LocallyInitiatedRlpxConnection.toString(RlpxConnection.java:232)\n\tat java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)\n\tat java.base/java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3605)\n\tat java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)\n\tat java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)\n\tat java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)\n\tat java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)\n\tat java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent.logConnectionsByIdToString(RlpxAgent.java:188)\n\tat java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)\n\tat java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)\n\tat java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)\n\tat java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)\n\tat java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)\n\tat java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)\n\tat java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517)\n\tat java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:523)\n\tat org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda(Slf4jLambdaHelper.java:55)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent.handleDisconnect(RlpxAgent.java:290)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnectionEvents.lambda$dispatchDisconnect$0(PeerConnectionEvents.java:55)\n\tat org.hyperledger.besu.util.Subscribers.lambda$forEach$0(Subscribers.java:112)\n\tat java.base/java.lang.Iterable.forEach(Iterable.java:75)\n\tat org.hyperledger.besu.util.Subscribers.forEach(Subscribers.java:109)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnectionEvents.dispatchDisconnect(PeerConnectionEvents.java:55)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.AbstractPeerConnection.disconnect(AbstractPeerConnection.java:159)\n\tat org.hyperledger.besu.ethereum.eth.manager.EthPeer.disconnect(EthPeer.java:223)\n\tat org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager.handleStatusMessage(EthProtocolManager.java:380)\n\tat org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager.processMessage(EthProtocolManager.java:253)\n\tat org.hyperledger.besu.ethereum.p2p.network.NetworkRunner.lambda$setupHandlers$0(NetworkRunner.java:135)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnectionEvents.lambda$dispatchMessage$1(PeerConnectionEvents.java:64)\n\tat org.hyperledger.besu.util.Subscribers.lambda$forEach$0(Subscribers.java:112)\n\tat java.base/java.lang.Iterable.forEach(Iterable.java:75)\n\tat org.hyperledger.besu.util.Subscribers.forEach(Subscribers.java:109)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnectionEvents.dispatchMessage(PeerConnectionEvents.java:64)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty.ApiHandler.channelRead0(ApiHandler.java:99)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty.ApiHandler.channelRead0(ApiHandler.java:33)\n\tat io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)\n\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n"}

@macfarla
Copy link
Contributor Author

However, it does seem to have eliminated the "keeping disconnected peers" scenario

{"@timestamp":"2022-07-20T07:19:54,046","level":"TRACE","thread":"nioEventLoopGroup-3-2","class":"EthProtocolManager","message":"AFTER registering the connection 10 EthPeers {
Peer 2016792764 0x65c2b5d3dbfa149d9a... PeerReputation 100, validated? true, disconnected? false,
Peer 1045319398 0x8b6f944e591b7ea6c6... PeerReputation 100, validated? true, disconnected? false,
Peer 1251696998 0x1de62c406ce902797f... PeerReputation 1209, validated? true, disconnected? false,
Peer 1714855942 0x14313e054f05600f25... PeerReputation 2616, validated? true, disconnected? false,
Peer 1672929284 0x9742602a91a9f82373... PeerReputation 3884, validated? true, disconnected? false,
Peer 816391528 0x545c7cd18c8add1895... PeerReputation 5694, validated? true, disconnected? false,
Peer 1538265151 0x5694e56be5672d3b21... PeerReputation 9580, validated? true, disconnected? false,
Peer 602047312 0xf9fa831d3307548b69... PeerReputation 19636, validated? true, disconnected? false,
Peer 1017520699 0x3acf0faca826ef82f9... PeerReputation 23990, validated? true, disconnected? false,
Peer 871514133 0xa6fa82cb4a766500c7... PeerReputation 25165, validated? true, disconnected? false}","throwable":""}

// if duplicate inbound and outbound connections are established at the same time
if (peer.isDisconnected()) {
connections.remove(peerConnection);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the core change here. the rest is logging.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a quick fix, but it would be better to sort out the underlying problem wrt how connect / disconnect events are being dispatched. The comment here isn't really accurate, its actually a bug if these events aren't propagated correctly to EthPeers and it doesn't necessarily have to do with duplicate connections.

On the theory that we're dispatching the disconnect event before the connect event, I haven't looked into this too deeply, but I would probably look at moving connection dispatch logic into PeerConnectionEvents. From there, you could do something like keep a connectedFuture, and when dispatchDisconnect is invoked, you could chain that logic to connectedFuture so that we're ensuring connect and disconnect events are dispatched in order.

@macfarla macfarla changed the title WIP Peer disco WIP EthPeers - remove if disconnected Jul 20, 2022
@@ -312,9 +313,15 @@ public void processMessage(final Capability cap, final Message message) {

@Override
public void handleNewConnection(final PeerConnection connection) {
LOG.trace(
"handleNewConnection {} 6 disconnected? {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the 6 just a typo?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you intend to approve a draft PR with test failures?

@mbaxter
Copy link
Contributor

mbaxter commented Jul 20, 2022

As suspected, this has caused an issue with callbacks

{"@timestamp":"2022-07-20T06:58:28,211","level":"ERROR","thread":"nioEventLoopGroup-3-5","class":"Subscribers","message":"Error in callback: ","throwable":" org.hyperledger.besu.ethereum.p2p.rlpx.connections.RlpxConnection$ConnectionNotEstablishedException: Cannot access PeerConnection before connection is fully established.\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.RlpxConnection$LocallyInitiatedRlpxConnection.getPeerConnection(RlpxConnection.java:181)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.RlpxConnection$LocallyInitiatedRlpxConnection.toString(RlpxConnection.java:232)\n\tat java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)\n\tat java.base/java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3605)\n\tat java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)\n\tat java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)\n\tat java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)\n\tat java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)\n\tat java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent.logConnectionsByIdToString(RlpxAgent.java:188)\n\tat java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)\n\tat java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)\n\tat java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)\n\tat java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)\n\tat java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)\n\tat java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)\n\tat java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517)\n\tat java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:523)\n\tat org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda(Slf4jLambdaHelper.java:55)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent.handleDisconnect(RlpxAgent.java:290)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnectionEvents.lambda$dispatchDisconnect$0(PeerConnectionEvents.java:55)\n\tat org.hyperledger.besu.util.Subscribers.lambda$forEach$0(Subscribers.java:112)\n\tat java.base/java.lang.Iterable.forEach(Iterable.java:75)\n\tat org.hyperledger.besu.util.Subscribers.forEach(Subscribers.java:109)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnectionEvents.dispatchDisconnect(PeerConnectionEvents.java:55)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.AbstractPeerConnection.disconnect(AbstractPeerConnection.java:159)\n\tat org.hyperledger.besu.ethereum.eth.manager.EthPeer.disconnect(EthPeer.java:223)\n\tat org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager.handleStatusMessage(EthProtocolManager.java:380)\n\tat org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager.processMessage(EthProtocolManager.java:253)\n\tat org.hyperledger.besu.ethereum.p2p.network.NetworkRunner.lambda$setupHandlers$0(NetworkRunner.java:135)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnectionEvents.lambda$dispatchMessage$1(PeerConnectionEvents.java:64)\n\tat org.hyperledger.besu.util.Subscribers.lambda$forEach$0(Subscribers.java:112)\n\tat java.base/java.lang.Iterable.forEach(Iterable.java:75)\n\tat org.hyperledger.besu.util.Subscribers.forEach(Subscribers.java:109)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnectionEvents.dispatchMessage(PeerConnectionEvents.java:64)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty.ApiHandler.channelRead0(ApiHandler.java:99)\n\tat org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty.ApiHandler.channelRead0(ApiHandler.java:33)\n\tat io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)\n\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n"}

This is caused by the extra logic in the toString that was added here. The PeerConnection object is only created after the connection is fully established, so you have to guard access to getPeerConnection().

@macfarla
Copy link
Contributor Author

Closing this one as it does not seem like a viable approach

@macfarla macfarla closed this Jul 21, 2022
@macfarla macfarla deleted the peer-disco branch October 12, 2022 10:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants