New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add P2PClientSupervisor #4509
Add P2PClientSupervisor #4509
Conversation
node/src/main/scala/org/bitcoins/node/networking/P2PClient.scala
Outdated
Show resolved
Hide resolved
48be02f
to
b88ed90
Compare
6e4d53e
to
d2159e6
Compare
I'll be looking into the failed compact filter header sync mac test and fix it in another PR. This PR, concerned with adding a supervisor, should be ready now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall good job, this is looking much more async safe! 🎉
case Terminated(actor) if actor == peerConnection => | ||
reconnect() | ||
} | ||
|
||
private def ignoreNetworkMessages( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a scaladoc of why we would want to ignore certain network messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what happens if we receive a normal network message when we were expecting a specific message.
For instance,
- I send an inventory message to request a bitcoin transaction
- A bitcoin block gets mined, and I receive a spontaneous
blockmsg
- My peer sends me the
txmsg
with the transaction i requested in (1).
Does 2
(the block message) get dropped on the floor here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not all requests expect a response, it's only for getheaders
, getcfilters
, getcfheaders
atm. But I get what you want to ask, in such a case both are processed. But only a matching response cancels the timer, so suppose I asked for headers and got a block, it will process the block and keep on waiting for headers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for why and when we ignore messages, this only happens when we have started a disconnection from the peer. initializeDisconnect
does not stop the actor in one actor message, it first sends a Tcp.Close
to the peerConnection
which when complete is intercepted by the actor and then the actor is stopped. The relevant part is that actor may process a message if its in the queue even while its disconnecting and that was causing some issues. We do not at all ignore any message in the usual operation.
def handleExpectResponse(msg: NetworkPayload): Unit = { | ||
currentPeerMsgHandlerRecv = | ||
currentPeerMsgHandlerRecv.handleExpectResponse(msg) | ||
def handleExpectResponse(msg: NetworkPayload): Future[Unit] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a scaladoc to this with an example of an expected response, and what we do when we did not expect a response (I presume call ignoreNetworkMessages
)
node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala
Outdated
Show resolved
Hide resolved
node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala
Outdated
Show resolved
Hide resolved
@@ -130,17 +131,20 @@ case class PeerFinder( | |||
.filterNot(p => skipPeers().contains(p) || _peerData.contains(p)) | |||
|
|||
logger.debug(s"Trying next set of peers $peers") | |||
peers.foreach(tryPeer) | |||
val peersF = Future.sequence(peers.map(tryPeer)) | |||
Await.result(peersF, 10.seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any benefit here to using Await.result
compared to just mapping on peersF
like so
peersF.failed.foreach(err => logger.error(s"Failed to connect to all peers,err"))
What is the significance of the 10.seconds
timeout?
In my solution it doesn't block the thread at least, and I believe there is anything dependent on peersF
being completed in 10 seconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing special about 10.seconds
, Await
needed a duration and 10.second
is a generous one. But you are correct, there's nothing dependent on it.
As for why Await
, the idea was that otherwise this block of code would be completed and scheduler will start the next countdown while things are still happening. With Await
, the next countdown would only start once the future is done. so I felt using Runnable
run in their own threadAwait
is good enough.
I'm, not really sure what would be better and to be fair, I think it doesn't really matter, both will do just fine. I will make this change, so that we are clearly sure that any Await
that we have in our code base as far as node
is concerned, is in an actor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for why Await, the idea was that otherwise this block of code would be completed and scheduler will start the next countdown while things are still happening. With Await, the next countdown would only start once the future is done. Runnable run in their own thread so I felt using Await is good enough.
This is a good point. Maybe we should add
val isConnectionSchedulerRunning = new AtomicBoolean(false)
system.scheduler.scheduleWithFixedDelay(
initialDelay = initialDelay,
delay = nodeAppConfig.tryNextPeersInterval) { () =>
if (isConnectionSchedulerRunning.compareAndSet(false,true)) {
logger.debug(s"Cache size: ${_peerData.size}. ${_peerData.keys}")
if (_peersToTry.size < 32)
_peersToTry.pushAll(getPeersFromDnsSeeds)
val peers = (for { _ <- 1 to 32 } yield _peersToTry.pop()).distinct
.filterNot(p => skipPeers().contains(p) || _peerData.contains(p))
logger.debug(s"Trying next set of peers $peers")
val peersF = Future.sequence(peers.map(tryPeer))
peersF.onComplete {
case Success(_) =>
isConnectionSchedulerRunning.set(false)
case Failure(err) =>
isConnectionSchedulerRunning.set(false)
logger.error(s"Failed to connect to peers", err)
}
} else {
logger.warn(s"Previous connection scheduler is still running, skipping this run, it will run again in ${nodeAppConfig.tryNextPeersInterval}")
}
}
Note, the Scala compiler interprets anonymous functions as a Runnable
if the type signature () => Unit
so you don't have to do the overhead of new Runnable { ... }
|
||
AsyncUtil | ||
val waitStopF = AsyncUtil | ||
.retryUntilSatisfied(_peerData.isEmpty, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why wait for _peerData.isEmpty
, is it not good enough for closeF
future to be completed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So peerData
map is such that the keys correspond to a running actor. The close
called here has a return type of Unit
and just sends a message to the client to stop the actor along with properly changing the state of PeerMessageReceiver
. The actor actually stops sometime in the future, and the key-value pair is deleted from the map in the postStop
callback of actors, so this is just an additional check to ensure that all actors have actually stopped.
These CI failures will be fixed in #4565 https://github.com/bitcoin-s/bitcoin-s/runs/7672367084?check_suite_focus=true#step:5:3640 |
* Currently, such a situation is not meant to happen. | ||
*/ | ||
def handleExpectResponse(msg: NetworkPayload): Future[Unit] = { | ||
require(msg.isInstanceOf[ExpectsResponse], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It's always nice to put the unexpected thing you receive in the error message, i.e.
require(msg.isInstanceOf[ExpectsResponse],
"Tried to wait for response to message which is not a query, got=$msg")
@@ -130,17 +131,20 @@ case class PeerFinder( | |||
.filterNot(p => skipPeers().contains(p) || _peerData.contains(p)) | |||
|
|||
logger.debug(s"Trying next set of peers $peers") | |||
peers.foreach(tryPeer) | |||
val peersF = Future.sequence(peers.map(tryPeer)) | |||
Await.result(peersF, 10.seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for why Await, the idea was that otherwise this block of code would be completed and scheduler will start the next countdown while things are still happening. With Await, the next countdown would only start once the future is done. Runnable run in their own thread so I felt using Await is good enough.
This is a good point. Maybe we should add
val isConnectionSchedulerRunning = new AtomicBoolean(false)
system.scheduler.scheduleWithFixedDelay(
initialDelay = initialDelay,
delay = nodeAppConfig.tryNextPeersInterval) { () =>
if (isConnectionSchedulerRunning.compareAndSet(false,true)) {
logger.debug(s"Cache size: ${_peerData.size}. ${_peerData.keys}")
if (_peersToTry.size < 32)
_peersToTry.pushAll(getPeersFromDnsSeeds)
val peers = (for { _ <- 1 to 32 } yield _peersToTry.pop()).distinct
.filterNot(p => skipPeers().contains(p) || _peerData.contains(p))
logger.debug(s"Trying next set of peers $peers")
val peersF = Future.sequence(peers.map(tryPeer))
peersF.onComplete {
case Success(_) =>
isConnectionSchedulerRunning.set(false)
case Failure(err) =>
isConnectionSchedulerRunning.set(false)
logger.error(s"Failed to connect to peers", err)
}
} else {
logger.warn(s"Previous connection scheduler is still running, skipping this run, it will run again in ${nodeAppConfig.tryNextPeersInterval}")
}
}
Note, the Scala compiler interprets anonymous functions as a Runnable
if the type signature () => Unit
so you don't have to do the overhead of new Runnable { ... }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job @shreyanshyad 🎉
P2PClientActor
.