Skip to content

Commit

Permalink
changes from comments: made P2PClient Future
Browse files Browse the repository at this point in the history
  • Loading branch information
solo1g committed Aug 1, 2022
1 parent e82a949 commit 6e4d53e
Show file tree
Hide file tree
Showing 16 changed files with 199 additions and 125 deletions.
3 changes: 0 additions & 3 deletions app/server/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@
<!-- inspect TCP details -->
<logger name="org.bitcoins.node.networking.P2PClientActor" level="INFO"/>

<!-- See exceptions thrown in actor-->
<logger name="org.bitcoins.node.networking.P2PClientSupervisor" level="WARN"/>

<!-- See peer details -->
<logger name="org.bitcoins.node.PeerManager" level="INFO"/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {

val connAndInit = for {
_ <- AsyncUtil
.retryUntilSatisfied(peers.size == 2, interval = 1.second, maxTries = 5)
.retryUntilSatisfied(peers.size == 2,
maxTries = 30,
interval = 1.second)
_ <- Future
.sequence(peers.map(peerManager.isConnected))
.flatMap(p => assert(p.forall(_ == true)))
res <- Future
.sequence(peers.map(peerManager.isConnected))
.sequence(peers.map(peerManager.isInitialized))
.flatMap(p => assert(p.forall(_ == true)))
} yield res

Expand All @@ -80,7 +82,7 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
def has2Peers: Future[Unit] =
AsyncUtil.retryUntilSatisfied(peers.size == 2,
interval = 1.second,
maxTries = 5)
maxTries = 30)
def bothOurs: Future[Assertion] = ourPeersF.map { ours =>
assert(ours.map(peers.contains(_)).forall(_ == true))
}
Expand Down Expand Up @@ -144,7 +146,7 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
_ <- AsyncUtil
.retryUntilSatisfied(peers.size == 2,
interval = 1.second,
maxTries = 5)
maxTries = 30)
_ <- Future
.sequence(peers.map(peerManager.isConnected))
.flatMap(p => assert(p.forall(_ == true)))
Expand Down Expand Up @@ -178,7 +180,7 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
_ <- AsyncUtil
.retryUntilSatisfied(peers.size == 2,
interval = 1.second,
maxTries = 5)
maxTries = 30)
_ <- NodeUnitTest.syncNeutrinoNode(node, bitcoind)
_ <- Future
.sequence(peers.map(peerManager.isConnected))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.bitcoins.testkit.tor.CachedTor
import org.bitcoins.testkit.util.TorUtil
import org.scalatest.{FutureOutcome, Outcome}

import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}

/** Neutrino node tests that require changing the state of bitcoind instance */
Expand Down Expand Up @@ -59,7 +60,9 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {

for {
bitcoindPeers <- bitcoinPeersF
_ <- AsyncUtil.retryUntilSatisfied(peers.size == 2)
_ <- AsyncUtil.retryUntilSatisfied(peers.size == 2,
maxTries = 30,
interval = 1.second)
//sync from first bitcoind
_ = node.updateDataMessageHandler(
node.getDataMessageHandler.copy(syncPeer = Some(bitcoindPeers(0)))(
Expand All @@ -69,7 +72,8 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
expectHeaders = ExpectResponseCommand(
GetHeadersMessage(node.chainConfig.chain.genesisHash))
//waiting for response to header query now
_ = peerManager.peerData(bitcoindPeers(0)).client.actor ! expectHeaders
client <- peerManager.peerData(bitcoindPeers(0)).client
_ = client.actor ! expectHeaders
nodeUri <- NodeTestUtil.getNodeURIFromBitcoind(bitcoinds(0))
_ <- bitcoinds(0).disconnectNode(nodeUri)
//should now sync from bitcoinds(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.scalatest.{FutureOutcome, Outcome}

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

class NeutrinoNodeWithWalletTest extends NodeTestWithCachedBitcoindNewest {

Expand Down Expand Up @@ -113,7 +114,9 @@ class NeutrinoNodeWithWalletTest extends NodeTestWithCachedBitcoindNewest {
_ <- NodeTestUtil.awaitSync(node, bitcoind)
_ <- NodeTestUtil.awaitCompactFilterHeadersSync(node, bitcoind)
_ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind)
_ <- TestAsyncUtil.awaitConditionF(condition2)
_ <- TestAsyncUtil.awaitConditionF(condition2,
interval = 1.second,
maxTries = 30)
// assert we got the full tx with witness data
txs <- wallet.listTransactions()
} yield assert(txs.exists(_.transaction == expectedTx))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
val NeutrinoNodeConnectedWithBitcoindV22(node, _) = param
val peer = node.peerManager.peers.head

val sender = node.peerMsgSenders(0)
val senderF = node.peerMsgSenders(0)
for {
chainApi <- node.chainApiFromDb()
dataMessageHandler = DataMessageHandler(chainApi,
Expand All @@ -52,6 +52,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
_ <- recoverToSucceededIf[RuntimeException](
chainApi.processHeaders(invalidPayload.headers))

sender <- senderF
// Verify we handle the payload correctly
_ <- dataMessageHandler.handleDataPayload(invalidPayload, sender, peer)
} yield succeed
Expand All @@ -70,7 +71,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
()
}
}
val sender = node.peerMsgSenders(0)
val senderF = node.peerMsgSenders(0)

for {
hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head)
Expand All @@ -86,6 +87,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
node.executionContext,
node.nodeAppConfig,
node.chainConfig)
sender <- senderF
_ <- dataMessageHandler.handleDataPayload(payload, sender, peer)
result <- resultP.future
} yield assert(result == block)
Expand All @@ -107,7 +109,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
}
}

val sender = node.peerMsgSenders(0)
val senderF = node.peerMsgSenders(0)
for {
hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head)
header <- bitcoind.getBlockHeaderRaw(hash)
Expand All @@ -122,7 +124,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
node.executionContext,
node.nodeAppConfig,
node.chainConfig)

sender <- senderF
_ <- dataMessageHandler.handleDataPayload(payload, sender, peer)
result <- resultP.future
} yield assert(result == Vector(header))
Expand All @@ -142,7 +144,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
()
}
}
val sender = node.peerMsgSenders(0)
val senderF = node.peerMsgSenders(0)
for {
hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head)
filter <- bitcoind.getBlockFilter(hash, FilterType.Basic)
Expand All @@ -157,7 +159,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
node.executionContext,
node.nodeAppConfig,
node.chainConfig)

sender <- senderF
_ <- dataMessageHandler.handleDataPayload(payload, sender, peer)
result <- resultP.future
} yield assert(result == Vector((hash.flip, filter.filter)))
Expand All @@ -176,7 +178,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
()
}
}
val sender = node.peerMsgSenders(0)
val senderF = node.peerMsgSenders(0)

for {

Expand All @@ -193,6 +195,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
node.executionContext,
node.nodeAppConfig,
node.chainConfig)
sender <- senderF
_ <- dataMessageHandler.handleDataPayload(payload, sender, peer)
result <- resultP.future
} yield assert(result == tx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,15 @@ class PeerMessageReceiverTest extends NodeTestWithCachedBitcoindNewest {
val peerMsgReceiver =
PeerMessageReceiver(normal, node, peer)(system, node.nodeAppConfig)

val newMsgReceiver = peerMsgReceiver.disconnect()
val newMsgReceiverF = peerMsgReceiver.disconnect()

newMsgReceiverF.map { newMsgReceiver =>
assert(
newMsgReceiver.state
.isInstanceOf[PeerMessageReceiverState.Disconnected])
assert(newMsgReceiver.isDisconnected)
}

assert(
newMsgReceiver.state
.isInstanceOf[PeerMessageReceiverState.Disconnected])
assert(newMsgReceiver.isDisconnected)
}

it must "change a peer message receiver to be initializing disconnect" in {
Expand Down Expand Up @@ -118,12 +121,12 @@ class PeerMessageReceiverTest extends NodeTestWithCachedBitcoindNewest {
.isInstanceOf[PeerMessageReceiverState.InitializedDisconnect])
assert(!newMsgReceiver.isDisconnected)

val disconnectRecv = newMsgReceiver.disconnect()

assert(
disconnectRecv.state
.isInstanceOf[PeerMessageReceiverState.InitializedDisconnectDone])
assert(disconnectRecv.isDisconnected)
assert(disconnectRecv.state.clientDisconnectP.isCompleted)
newMsgReceiver.disconnect().map { disconnectRecv =>
assert(
disconnectRecv.state
.isInstanceOf[PeerMessageReceiverState.InitializedDisconnectDone])
assert(disconnectRecv.isDisconnected)
assert(disconnectRecv.state.clientDisconnectP.isCompleted)
}
}
}
32 changes: 17 additions & 15 deletions node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ case class NeutrinoNode(
_ = logger.info(s"Syncing with $syncPeer")
_ = updateDataMessageHandler(
dataMessageHandler.copy(syncPeer = Some(syncPeer)))
peerMsgSender = peerManager.peerData(syncPeer).peerMessageSender
peerMsgSender <- peerManager.peerData(syncPeer).peerMessageSender

bestHash <- chainApi.getBestBlockHash()
_ <- peerMsgSender.sendGetCompactFilterCheckPointMessage(stopHash =
Expand Down Expand Up @@ -149,14 +149,13 @@ case class NeutrinoNode(
bestFilterOpt: Option[CompactFilterDb]): Future[Unit] = {
val syncPeer = dataMessageHandler.syncPeer.getOrElse(
throw new RuntimeException("Sync peer not set"))
val syncPeerMsgSender = peerManager.peerData(syncPeer).peerMessageSender
val sendCompactFilterHeaderMsgF = {
syncPeerMsgSender
.sendNextGetCompactFilterHeadersCommand(
chainApi = chainApi,
filterHeaderBatchSize = chainConfig.filterHeaderBatchSize,
prevStopHash = bestFilterHeader.blockHashBE)
}
val syncPeerMsgSenderF = peerManager.peerData(syncPeer).peerMessageSender
val sendCompactFilterHeaderMsgF = syncPeerMsgSenderF.flatMap(
_.sendNextGetCompactFilterHeadersCommand(
chainApi = chainApi,
filterHeaderBatchSize = chainConfig.filterHeaderBatchSize,
prevStopHash = bestFilterHeader.blockHashBE)
)
sendCompactFilterHeaderMsgF.flatMap { isSyncFilterHeaders =>
// If we have started syncing filters
if (
Expand All @@ -166,12 +165,15 @@ case class NeutrinoNode(
) {
//means we are not syncing filter headers, and our filters are NOT
//in sync with our compact filter headers
syncPeerMsgSender
.sendNextGetCompactFilterCommand(
chainApi = chainApi,
filterBatchSize = chainConfig.filterBatchSize,
startHeight = bestFilterOpt.get.height)
.map(_ => ())
syncPeerMsgSenderF.flatMap { sender =>
sender
.sendNextGetCompactFilterCommand(chainApi = chainApi,
filterBatchSize =
chainConfig.filterBatchSize,
startHeight =
bestFilterOpt.get.height)
.map(_ => ())
}
} else {
Future.unit
}
Expand Down
14 changes: 9 additions & 5 deletions node/src/main/scala/org/bitcoins/node/Node.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,19 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
* object. Internally in [[org.bitcoins.node.networking.P2PClient p2p client]] you will see that
* the [[ChainApi chain api]] is updated inside of the p2p client
*/
def clients: Vector[P2PClient] = peerManager.clients
def clients: Vector[Future[P2PClient]] = peerManager.clients

def peerMsgSenders: Vector[PeerMessageSender] = peerManager.peerMsgSenders
def peerMsgSenders: Vector[Future[PeerMessageSender]] =
peerManager.peerMsgSenders

/** Sends the given P2P to our peer.
* This method is useful for playing around
* with P2P messages, therefore marked as
* `private[node]`.
*/
def send(msg: NetworkPayload, peer: Peer): Future[Unit] = {
peerManager.peerData(peer).peerMessageSender.sendMsg(msg)
val senderF = peerManager.peerData(peer).peerMessageSender
senderF.flatMap(_.sendMsg(msg))
}

/** Starts our node */
Expand Down Expand Up @@ -163,7 +165,8 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
if (connected) {
logger.info(s"Sending out tx message for tx=$txIds")
Future.sequence(
peerMsgSenders.map(_.sendInventoryMessage(transactions: _*)))
peerMsgSenders.map(
_.flatMap(_.sendInventoryMessage(transactions: _*))))
} else {
Future.failed(
new RuntimeException(
Expand All @@ -186,7 +189,8 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
peerManager
.peerData(peer)
.peerMessageSender
.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock, blockHashes: _*)
.flatMap(_.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock,
blockHashes: _*))
case None =>
throw new RuntimeException(
"IBD not started yet. Cannot query for blocks.")
Expand Down
8 changes: 6 additions & 2 deletions node/src/main/scala/org/bitcoins/node/PeerData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.bitcoins.node.networking.peer.{
PeerMessageSender
}

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

/** PeerData contains objects specific to a peer associated together
Expand All @@ -19,10 +20,13 @@ case class PeerData(
node: Node,
supervisor: ActorRef
)(implicit system: ActorSystem, nodeAppConfig: NodeAppConfig) {
import system.dispatcher

lazy val peerMessageSender: PeerMessageSender = PeerMessageSender(client)
lazy val peerMessageSender: Future[PeerMessageSender] = {
client.map(PeerMessageSender(_))
}

lazy val client: P2PClient = {
lazy val client: Future[P2PClient] = {
val peerMessageReceiver =
PeerMessageReceiver.newReceiver(node = node, peer = peer)
P2PClient(
Expand Down

0 comments on commit 6e4d53e

Please sign in to comment.