From 517c680eaf463eef2b9c01ee63f7cd4bc9169353 Mon Sep 17 00:00:00 2001 From: davide Date: Thu, 15 Jun 2023 17:54:33 +0200 Subject: [PATCH 01/17] SDK-1032 - synctracker statuses map fix --- .../sparkz/core/api/http/PeersApiRoute.scala | 7 +++---- .../sparkz/core/network/ConnectedPeer.scala | 2 +- .../sparkz/core/network/NetworkController.scala | 17 +++++++++++++++++ .../core/api/http/PeersApiRouteSpec.scala | 3 ++- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/main/scala/sparkz/core/api/http/PeersApiRoute.scala b/src/main/scala/sparkz/core/api/http/PeersApiRoute.scala index cf2e591a5..e3ad1b17a 100644 --- a/src/main/scala/sparkz/core/api/http/PeersApiRoute.scala +++ b/src/main/scala/sparkz/core/api/http/PeersApiRoute.scala @@ -6,15 +6,14 @@ import akka.http.scaladsl.server.Route import io.circe.generic.auto.exportDecoder import io.circe.generic.semiauto._ import io.circe.syntax._ -import io.circe.{Encoder, Decoder, Json} +import io.circe.{Decoder, Encoder, Json} import sparkz.core.api.http.PeersApiRoute.PeerApiRequest.AddToBlacklistBodyRequest import sparkz.core.api.http.PeersApiRoute.Request.ConnectBodyRequest import sparkz.core.api.http.PeersApiRoute.{BlacklistedPeers, PeerInfoResponse, PeersStatusResponse} import sparkz.core.network.ConnectedPeer -import sparkz.core.network.NetworkController.ReceivableMessages.{ConnectTo, GetConnectedPeers, GetPeersStatus} +import sparkz.core.network.NetworkController.ReceivableMessages.{ConnectTo, DisconnectFromNode, GetConnectedPeers, GetPeersStatus} import sparkz.core.network.peer.PeerManager.ReceivableMessages._ import sparkz.core.network.peer.PenaltyType.CustomPenaltyDuration -import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddPeersIfEmpty, GetAllPeers, GetBlacklistedPeers} import sparkz.core.network.peer.{PeerInfo, PeersStatus} import sparkz.core.settings.RESTApiSettings import sparkz.core.utils.NetworkTimeProvider @@ -179,7 +178,7 @@ case class PeersApiRoute(peerManager: ActorRef, val port = addressAndPort.group(2).toInt val peerAddress = new InetSocketAddress(host, port) peerManager ! RemovePeer(peerAddress) - networkController ! DisconnectFromAddress(peerAddress) + networkController ! DisconnectFromNode(peerAddress) ApiResponse.OK } } diff --git a/src/main/scala/sparkz/core/network/ConnectedPeer.scala b/src/main/scala/sparkz/core/network/ConnectedPeer.scala index d1f26ce46..153b52bc6 100644 --- a/src/main/scala/sparkz/core/network/ConnectedPeer.scala +++ b/src/main/scala/sparkz/core/network/ConnectedPeer.scala @@ -16,7 +16,7 @@ case class ConnectedPeer(connectionId: ConnectionId, var lastMessage: Long, peerInfo: Option[PeerInfo]) { - override def hashCode(): Int = connectionId.hashCode() + override def hashCode(): Int = connectionId.remoteAddress.hashCode() override def equals(obj: Any): Boolean = obj match { case that: ConnectedPeer => this.connectionId.remoteAddress == that.connectionId.remoteAddress diff --git a/src/main/scala/sparkz/core/network/NetworkController.scala b/src/main/scala/sparkz/core/network/NetworkController.scala index 1bb5d51cd..cd55e29ec 100644 --- a/src/main/scala/sparkz/core/network/NetworkController.scala +++ b/src/main/scala/sparkz/core/network/NetworkController.scala @@ -144,6 +144,10 @@ class NetworkController(settings: NetworkSettings, case DisconnectFromAddress(peerAddress) => closeConnection(peerAddress) + + case DisconnectFromNode(peerAddress) => + closeConnectionFromNode(peerAddress) + } private def connectionEvents: Receive = { @@ -551,6 +555,17 @@ class NetworkController(settings: NetworkSettings, } } + private def closeConnectionFromNode(peerAddress: InetSocketAddress): Unit = { + connections = connections.filterNot { + case (address, connectedPeer) => address == peerAddress match { + case true => + connectedPeer.handlerRef ! CloseConnection + context.system.eventStream.publish(DisconnectedPeer(peerAddress)) + true // exclude the entry from the filtered map + } + } + } + /** * Register a new penalty for given peer address. */ @@ -577,6 +592,8 @@ object NetworkController { case class DisconnectFrom(peer: ConnectedPeer) + case class DisconnectFromNode(address: InetSocketAddress) + case class PenalizePeer(address: InetSocketAddress, penaltyType: PenaltyType) case class GetFilteredConnectedPeers(sendingStrategy: SendingStrategy, version: Version) diff --git a/src/test/scala/sparkz/core/api/http/PeersApiRouteSpec.scala b/src/test/scala/sparkz/core/api/http/PeersApiRouteSpec.scala index 9d5393960..bf61a2ec0 100644 --- a/src/test/scala/sparkz/core/api/http/PeersApiRouteSpec.scala +++ b/src/test/scala/sparkz/core/api/http/PeersApiRouteSpec.scala @@ -11,6 +11,7 @@ import io.circe.syntax._ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import sparkz.core.api.http.PeersApiRoute.PeerInfoResponse +import sparkz.core.network.NetworkController.ReceivableMessages.DisconnectFromNode import sparkz.core.network.peer.PeerInfo import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddToBlacklist, DisconnectFromAddress, RemoveFromBlacklist, RemovePeer} import sparkz.core.settings.{RESTApiSettings, SparkzSettings} @@ -228,7 +229,7 @@ class PeersApiRouteSpec extends AnyFlatSpec Delete(prefix + "/peer", body) ~> routesWithProbes ~> check { peerManagerProbe.expectMsgClass(classOf[RemovePeer]) - networkControllerProbe.expectMsgClass(classOf[DisconnectFromAddress]) + networkControllerProbe.expectMsgClass(classOf[DisconnectFromNode]) status shouldBe StatusCodes.OK } From 7e938d34faf86864d281c84f684458ccb191a52f Mon Sep 17 00:00:00 2001 From: davide Date: Mon, 3 Jul 2023 17:56:45 +0200 Subject: [PATCH 02/17] SDK-1032 - closeConnectionFromNode method refactor --- src/main/scala/sparkz/core/network/NetworkController.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/scala/sparkz/core/network/NetworkController.scala b/src/main/scala/sparkz/core/network/NetworkController.scala index cd55e29ec..cd0d10ac2 100644 --- a/src/main/scala/sparkz/core/network/NetworkController.scala +++ b/src/main/scala/sparkz/core/network/NetworkController.scala @@ -557,16 +557,15 @@ class NetworkController(settings: NetworkSettings, private def closeConnectionFromNode(peerAddress: InetSocketAddress): Unit = { connections = connections.filterNot { - case (address, connectedPeer) => address == peerAddress match { - case true => + case (address, connectedPeer) if address == peerAddress => connectedPeer.handlerRef ! CloseConnection context.system.eventStream.publish(DisconnectedPeer(peerAddress)) true // exclude the entry from the filtered map - } + case _ => false // don't modify the connections map } } - /** + /**sbt * Register a new penalty for given peer address. */ private def penalize(peerAddress: InetSocketAddress, penaltyType: PenaltyType): Unit = From acf335a52553acea9692fdc24c5291b5da7ce200 Mon Sep 17 00:00:00 2001 From: davide Date: Tue, 4 Jul 2023 10:27:46 +0200 Subject: [PATCH 03/17] SDK-1032 - wrong comment removed --- src/main/scala/sparkz/core/network/NetworkController.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/sparkz/core/network/NetworkController.scala b/src/main/scala/sparkz/core/network/NetworkController.scala index cd0d10ac2..90e9761d9 100644 --- a/src/main/scala/sparkz/core/network/NetworkController.scala +++ b/src/main/scala/sparkz/core/network/NetworkController.scala @@ -565,7 +565,7 @@ class NetworkController(settings: NetworkSettings, } } - /**sbt + /** * Register a new penalty for given peer address. */ private def penalize(peerAddress: InetSocketAddress, penaltyType: PenaltyType): Unit = From 8280424b5509ce6d9b40a0a2962d888b8b62e03b Mon Sep 17 00:00:00 2001 From: paolo_galli Date: Fri, 7 Jul 2023 16:04:22 +0200 Subject: [PATCH 04/17] SDK-863: when peer is asking to sync we update our internal status and respond with a sync message to let the peer to update its status as well --- .../core/network/NodeViewSynchronizer.scala | 8 +++- .../sparkz/core/network/SyncTracker.scala | 2 + .../NodeViewSynchronizerSpecification.scala | 48 +++++++++++++++++-- .../core/network/TestImplementations.scala | 8 ++-- 4 files changed, 57 insertions(+), 9 deletions(-) diff --git a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala index d2ad5181f..15ffb75ba 100644 --- a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala @@ -1,6 +1,5 @@ package sparkz.core.network -import java.net.InetSocketAddress import akka.actor.{Actor, ActorRef, ActorSystem, Props} import sparkz.core.NodeViewHolder.ReceivableMessages.{GetNodeViewChanges, ModifiersFromRemote, TransactionsFromRemote} import sparkz.core.consensus.History._ @@ -21,11 +20,11 @@ import sparkz.core.{ModifierTypeId, NodeViewModifier, PersistentNodeViewModifier import sparkz.util.serialization.{VLQByteBufferReader, VLQReader} import sparkz.util.{ModifierId, SparkzEncoding, SparkzLogging} +import java.net.InetSocketAddress import java.nio.ByteBuffer import scala.annotation.{nowarn, tailrec} import scala.collection.mutable import scala.concurrent.ExecutionContext -import scala.concurrent.duration._ import scala.language.postfixOps import scala.reflect.ClassTag import scala.util.{Failure, Success} @@ -193,6 +192,11 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes log.warn("Extension is empty while comparison is younger") self ! OtherNodeSyncingStatus(remote, comparison, ext) + if (statusTracker.isPeerOutdated(remote)) { + // This triggers another round of sync to let the other peer update its internal status about this node + networkControllerRef ! SendToNetwork(Message(syncInfoSpec, Right(historyReader.syncInfo), None), SendToPeer(remote)) + } + case _ => } } diff --git a/src/main/scala/sparkz/core/network/SyncTracker.scala b/src/main/scala/sparkz/core/network/SyncTracker.scala index 841e9fa5a..128c34dbc 100644 --- a/src/main/scala/sparkz/core/network/SyncTracker.scala +++ b/src/main/scala/sparkz/core/network/SyncTracker.scala @@ -92,6 +92,8 @@ class SyncTracker(nvsRef: ActorRef, private def outdatedPeers(): Seq[ConnectedPeer] = lastSyncSentTime.filter(t => (timeProvider.time() - t._2).millis > maxInterval()).keys.toSeq + def isPeerOutdated(peer: ConnectedPeer): Boolean = + (timeProvider.time() - lastSyncSentTime(peer)).millis > maxInterval() @nowarn def peersByStatus: Map[HistoryComparisonResult, Iterable[ConnectedPeer]] = statuses.groupBy(_._2).mapValues(_.keys).toMap diff --git a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala index 7cb4a937d..8137b4e96 100644 --- a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala +++ b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala @@ -2,9 +2,10 @@ package sparkz.core.network import akka.actor.{ActorRef, ActorSystem, Props} import akka.testkit.TestProbe +import sparkz.ObjectGenerators import sparkz.core.NodeViewHolder.ReceivableMessages.{GetNodeViewChanges, TransactionsFromRemote} import sparkz.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork} -import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, ChangedMempool, FailedTransaction, SuccessfulTransaction} +import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages._ import sparkz.core.network.message._ import sparkz.core.network.peer.PenaltyType import sparkz.core.network.peer.PenaltyType.MisbehaviorPenalty @@ -14,23 +15,29 @@ import sparkz.core.transaction.Transaction import sparkz.core.utils.NetworkTimeProvider import sparkz.core.{ModifierTypeId, NodeViewModifier} import sparkz.util.ModifierId +import sparkz.util.serialization.{Reader, Writer} import java.net.InetSocketAddress import scala.concurrent.ExecutionContext import scala.concurrent.duration.DurationInt import scala.util.{Failure, Success} -class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementations { +class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementations with ObjectGenerators { implicit val actorSystem: ActorSystem = ActorSystem() implicit val executionContext: ExecutionContext = actorSystem.dispatchers.lookup("sparkz.executionContext") private val modifiersSpec = new ModifiersSpec(1024 * 1024) private val requestModifierSpec = new RequestModifierSpec(settings.network.maxInvObjects) + private val syncSpec = new SyncInfoMessageSpec[TestSyncInfo](new SparkzSerializer[TestSyncInfo] { + override def serialize(obj: TestSyncInfo, w: Writer): Unit = {} + + override def parse(r: Reader): TestSyncInfo = new TestSyncInfo + }) private val invSpec = new InvSpec(settings.network.maxInvObjects) private val (synchronizer, networkController, viewHolder) = createNodeViewSynchronizer(settings) private val peerProbe = TestProbe() private val peer = ConnectedPeer(ConnectionId(new InetSocketAddress(10), new InetSocketAddress(11), Incoming), peerProbe.ref, 0L, None) - private val messageSerializer = new MessageSerializer(Seq(modifiersSpec, invSpec, requestModifierSpec), + private val messageSerializer = new MessageSerializer(Seq(modifiersSpec, invSpec, requestModifierSpec, syncSpec), settings.network.magicBytes, settings.network.messageLengthBytesLimit) @@ -239,6 +246,41 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa networkController.expectNoMessage() } + it should "send a sync message in response to peer's sync message if peer is outdated" in { + val pchProbe = TestProbe("PeerHandlerProbe") + val connectedPeer: ConnectedPeer = connectedPeerGen(pchProbe.ref).sample.getOrElse(throw new IllegalArgumentException()) + + // Setting low sync interval to clean right away the peer last sync timestamp to make it outdated and send the sync response back + val networkSettings = settings.copy( + network = settings.network.copy( + syncStatusRefreshStable = 1.millis, + syncStatusRefresh = 1.millis + ) + ) + + val (synchronizer, networkController, _) = createNodeViewSynchronizer(networkSettings) + + // Handshake with the peer + synchronizer ! HandshakedPeer(connectedPeer) + + // Populate the history + setupHistoryAndMempoolReaders(synchronizer) + + // This to update the updateLastSyncSentTime in the SyncTracker + synchronizer ! SendLocalSyncInfo + + // The peer receives a sync message from the peer + synchronizer ! roundTrip(Message(syncSpec, Left(Array.emptyByteArray), Some(connectedPeer))) + + networkController.expectMsgType[SendToNetwork] + networkController.expectMsgType[SendToNetwork] + // This is expected in response to the sync message + networkController.expectMsgPF() { + case ok@SendToNetwork(Message(_: SyncInfoMessageSpec[_], Right(TestSyncInfo()), None), SendToPeers(_)) => ok + case msg => fail(s"Unexpected message received $msg") + } + } + def setupHistoryAndMempoolReaders(synchronizer: ActorRef): Unit = { val history = new TestHistory val mempool = new TestMempool { diff --git a/src/test/scala/sparkz/core/network/TestImplementations.scala b/src/test/scala/sparkz/core/network/TestImplementations.scala index 4e27f1d4a..0fa635e47 100644 --- a/src/test/scala/sparkz/core/network/TestImplementations.scala +++ b/src/test/scala/sparkz/core/network/TestImplementations.scala @@ -2,7 +2,7 @@ package sparkz.core.network import sparkz.core.{ModifierTypeId, PersistentNodeViewModifier} -import sparkz.core.consensus.History.ModifierIds +import sparkz.core.consensus.History.{ModifierIds, Older} import sparkz.core.consensus.{History, HistoryReader, ModifierSemanticValidity, SyncInfo} import sparkz.core.network.message.SyncInfoMessageSpec import sparkz.core.serialization.SparkzSerializer @@ -46,9 +46,9 @@ trait TestImplementations { override def modifierById(modifierId: sparkz.util.ModifierId): Option[TestModifier] = ??? override def isSemanticallyValid(modifierId: sparkz.util.ModifierId): ModifierSemanticValidity = ??? override def openSurfaceIds(): Seq[sparkz.util.ModifierId] = ??? - override def continuationIds(info: TestSyncInfo, size: Int): ModifierIds = ??? - override def syncInfo: TestSyncInfo = ??? - override def compare(other: TestSyncInfo): History.HistoryComparisonResult = ??? + override def continuationIds(info: TestSyncInfo, size: Int): ModifierIds = Seq() + override def syncInfo: TestSyncInfo = new TestSyncInfo + override def compare(other: TestSyncInfo): History.HistoryComparisonResult = Older override type NVCT = this.type } From e071cc0aed5279410b29a5b59055cf08aab46fd1 Mon Sep 17 00:00:00 2001 From: paolo_galli Date: Mon, 17 Jul 2023 15:22:05 +0200 Subject: [PATCH 05/17] SDK-863: add peer to statusTracker as soon as nodeViewSynchronizer receives the HandshakedPeer message --- .../core/network/NodeViewSynchronizer.scala | 45 ----------- .../sparkz/core/network/SyncTracker.scala | 12 ++- .../NodeViewSynchronizerSpecification.scala | 81 ++++++++++++------- .../sparkz/core/network/SyncTrackerSpec.scala | 19 ++++- 4 files changed, 77 insertions(+), 80 deletions(-) diff --git a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala index 15ffb75ba..e71ebf09d 100644 --- a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala @@ -23,7 +23,6 @@ import sparkz.util.{ModifierId, SparkzEncoding, SparkzLogging} import java.net.InetSocketAddress import java.nio.ByteBuffer import scala.annotation.{nowarn, tailrec} -import scala.collection.mutable import scala.concurrent.ExecutionContext import scala.language.postfixOps import scala.reflect.ClassTag @@ -192,10 +191,6 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes log.warn("Extension is empty while comparison is younger") self ! OtherNodeSyncingStatus(remote, comparison, ext) - if (statusTracker.isPeerOutdated(remote)) { - // This triggers another round of sync to let the other peer update its internal status about this node - networkControllerRef ! SendToNetwork(Message(syncInfoSpec, Right(historyReader.syncInfo), None), SendToPeer(remote)) - } case _ => } @@ -616,44 +611,4 @@ object NodeViewSynchronizerRef { (implicit system: ActorSystem, ec: ExecutionContext): ActorRef = system.actorOf(props[TX, SI, SIS, PMOD, HR, MR](networkControllerRef, viewHolderRef, syncInfoSpec, networkSettings, timeProvider, modifierSerializers), name) -} - -object Test extends App { - - test() - - protected val rebroadcastQueue: mutable.Queue[String] = mutable.Queue() - - def test(): Unit = { - putInRebroadcastQueue("1") - putInRebroadcastQueue("2") - putInRebroadcastQueue("3") - putInRebroadcastQueue("4") - putInRebroadcastQueue("5") - putInRebroadcastQueue("6") - putInRebroadcastQueue("7") - putInRebroadcastQueue("8") - putInRebroadcastQueue("9") - putInRebroadcastQueue("10") - - println(s"get modifiers = $getRebroadcastModifiers") - - println(s"queue = $rebroadcastQueue") - - println(s"get modifiers = $getRebroadcastModifiers") - - println(s"queue = $rebroadcastQueue") - } - - def putInRebroadcastQueue(modifierId: String): Unit = { - rebroadcastQueue.enqueue(modifierId) - } - - def getRebroadcastModifiers: Seq[String] = { - val mods = rebroadcastQueue.take(5).toSeq - rebroadcastQueue.drop(5) - mods - } - - } \ No newline at end of file diff --git a/src/main/scala/sparkz/core/network/SyncTracker.scala b/src/main/scala/sparkz/core/network/SyncTracker.scala index 128c34dbc..783275a09 100644 --- a/src/main/scala/sparkz/core/network/SyncTracker.scala +++ b/src/main/scala/sparkz/core/network/SyncTracker.scala @@ -54,6 +54,11 @@ class SyncTracker(nvsRef: ActorRef, def updateStatus(peer: ConnectedPeer, status: HistoryComparisonResult): Unit = { val seniorsBefore = numOfSeniors() statuses += peer -> status + + if (isFirstTimeTrackingPeer(peer)) { + updateLastSyncSentTime(peer) + } + val seniorsAfter = numOfSeniors() // todo: we should also send NoBetterNeighbour signal when all the peers around are not seniors initially @@ -68,6 +73,10 @@ class SyncTracker(nvsRef: ActorRef, } } + private def isFirstTimeTrackingPeer(peer: ConnectedPeer) = { + !lastSyncSentTime.contains(peer) + } + //todo: combine both? def clearStatus(remote: InetSocketAddress): Unit = { statuses.find(_._1.connectionId.remoteAddress == remote) match { @@ -92,9 +101,6 @@ class SyncTracker(nvsRef: ActorRef, private def outdatedPeers(): Seq[ConnectedPeer] = lastSyncSentTime.filter(t => (timeProvider.time() - t._2).millis > maxInterval()).keys.toSeq - def isPeerOutdated(peer: ConnectedPeer): Boolean = - (timeProvider.time() - lastSyncSentTime(peer)).millis > maxInterval() - @nowarn def peersByStatus: Map[HistoryComparisonResult, Iterable[ConnectedPeer]] = statuses.groupBy(_._2).mapValues(_.keys).toMap diff --git a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala index 8137b4e96..c900c369e 100644 --- a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala +++ b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala @@ -1,9 +1,10 @@ package sparkz.core.network import akka.actor.{ActorRef, ActorSystem, Props} -import akka.testkit.TestProbe +import akka.testkit.{TestActorRef, TestProbe} import sparkz.ObjectGenerators import sparkz.core.NodeViewHolder.ReceivableMessages.{GetNodeViewChanges, TransactionsFromRemote} +import sparkz.core.consensus.History.Unknown import sparkz.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork} import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages._ import sparkz.core.network.message._ @@ -246,39 +247,22 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa networkController.expectNoMessage() } - it should "send a sync message in response to peer's sync message if peer is outdated" in { - val pchProbe = TestProbe("PeerHandlerProbe") - val connectedPeer: ConnectedPeer = connectedPeerGen(pchProbe.ref).sample.getOrElse(throw new IllegalArgumentException()) + it should "when HandshakedPeer message is received, statusTracker should add the peer" in { + // Arrange + val nodeViewSynchronizerRef = createNodeViewSynchronizerAsTestActorRef(settings) + val nodeViewSynchronizer = nodeViewSynchronizerRef.underlyingActor - // Setting low sync interval to clean right away the peer last sync timestamp to make it outdated and send the sync response back - val networkSettings = settings.copy( - network = settings.network.copy( - syncStatusRefreshStable = 1.millis, - syncStatusRefresh = 1.millis - ) - ) - - val (synchronizer, networkController, _) = createNodeViewSynchronizer(networkSettings) + val statusTrackerField = classOf[NodeViewSynchronizer[_, _, _, _, _, _]].getDeclaredField("statusTracker") + statusTrackerField.setAccessible(true) + val statusTracker = statusTrackerField.get(nodeViewSynchronizer).asInstanceOf[SyncTracker] - // Handshake with the peer - synchronizer ! HandshakedPeer(connectedPeer) + // Act + nodeViewSynchronizerRef ! HandshakedPeer(peer) - // Populate the history - setupHistoryAndMempoolReaders(synchronizer) - - // This to update the updateLastSyncSentTime in the SyncTracker - synchronizer ! SendLocalSyncInfo - - // The peer receives a sync message from the peer - synchronizer ! roundTrip(Message(syncSpec, Left(Array.emptyByteArray), Some(connectedPeer))) - - networkController.expectMsgType[SendToNetwork] - networkController.expectMsgType[SendToNetwork] - // This is expected in response to the sync message - networkController.expectMsgPF() { - case ok@SendToNetwork(Message(_: SyncInfoMessageSpec[_], Right(TestSyncInfo()), None), SendToPeers(_)) => ok - case msg => fail(s"Unexpected message received $msg") - } + // Assert + val peersStatus = statusTracker.peersByStatus + peersStatus.nonEmpty should be(true) + peersStatus(Unknown) should be(Set(peer)) } def setupHistoryAndMempoolReaders(synchronizer: ActorRef): Unit = { @@ -338,4 +322,39 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa (nodeViewSynchronizerRef, networkControllerProbe, viewHolderProbe) } + + private def createNodeViewSynchronizerAsTestActorRef(settings: SparkzSettings): TestActorRef[NodeViewSynchronizer[_, _, _, _, _, _]] = { + val networkControllerProbe = TestProbe() + val viewHolderProbe = TestProbe() + val timeProvider = new NetworkTimeProvider(settings.ntp) + + val modifierSerializers: Map[ModifierTypeId, SparkzSerializer[_ <: NodeViewModifier]] = + Map(Transaction.ModifierTypeId -> TestTransactionSerializer) + + TestActorRef(Props( + new NodeViewSynchronizer[TestTransaction, + TestSyncInfo, + TestSyncInfoMessageSpec.type, + TestModifier, + TestHistory, + TestMempool + ] + ( + networkControllerProbe.ref, + viewHolderProbe.ref, + TestSyncInfoMessageSpec, + settings.network, + timeProvider, + modifierSerializers + ) { + override val deliveryTracker: DeliveryTracker = new DeliveryTracker(context.system, settings.network, self) { + override def status(modifierId: ModifierId): ModifiersStatus = ModifiersStatus.Requested + + override private[network] def clearStatusForModifier(id: ModifierId, oldStatus: ModifiersStatus): Unit = {} + + override def setInvalid(modifierId: ModifierId): Option[ConnectedPeer] = Some(peer) + } + } + )) + } } diff --git a/src/test/scala/sparkz/core/network/SyncTrackerSpec.scala b/src/test/scala/sparkz/core/network/SyncTrackerSpec.scala index 17319e4bf..8ffbe8d7e 100644 --- a/src/test/scala/sparkz/core/network/SyncTrackerSpec.scala +++ b/src/test/scala/sparkz/core/network/SyncTrackerSpec.scala @@ -5,7 +5,7 @@ import akka.testkit.TestProbe import org.mockito.MockitoSugar.mock import org.scalatest.matchers.should.Matchers import org.scalatest.propspec.AnyPropSpec -import sparkz.core.consensus.History.{Fork, HistoryComparisonResult} +import sparkz.core.consensus.History.{Fork, HistoryComparisonResult, Unknown} import sparkz.core.settings.NetworkSettings import sparkz.core.utils.NetworkTimeProvider import sparkz.core.utils.TimeProvider.Time @@ -61,4 +61,21 @@ class SyncTrackerSpec extends AnyPropSpec with Matchers { statusAfterDeletion.size should be(0) timestampAfterDeletion.size should be(0) } + + property("syncTracker should start tracking a peer once the handshake message has been received") { + // Arrange + val syncTracker = new SyncTracker(probe.ref, context, networkSettings, timeProvider) + + val lastSyncSentTimeField = classOf[SyncTracker].getDeclaredField("lastSyncSentTime") + lastSyncSentTimeField.setAccessible(true) + + val lastSyncSentTime = lastSyncSentTimeField.get(syncTracker).asInstanceOf[mutable.Map[ConnectedPeer, Time]] + + // Act + lastSyncSentTime should be(Map.empty) + syncTracker.updateStatus(connectedPeer, Unknown) + + // Assert + lastSyncSentTime.contains(connectedPeer) should be(true) + } } From 70c268dec93a8f72393ed7c7641368a14bc8212d Mon Sep 17 00:00:00 2001 From: paolocappelletti Date: Mon, 17 Jul 2023 18:02:17 +0200 Subject: [PATCH 06/17] bump version to 2.1.0-shapshot --- build.sbt | 2 +- release-notes.md | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 40d040866..598bbb4de 100644 --- a/build.sbt +++ b/build.sbt @@ -25,7 +25,7 @@ lazy val commonSettings = Seq( Wart.OptionPartial), organization := "io.horizen", organizationName := "Zen Blockchain Foundation", - version := "2.0.2-SNAPSHOT", + version := "2.1.0-SNAPSHOT", licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")), homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")), pomExtra := diff --git a/release-notes.md b/release-notes.md index f681d7c62..379013345 100644 --- a/release-notes.md +++ b/release-notes.md @@ -1,3 +1,7 @@ +2.1.0 +--------- +* Fixes/Improvements on the way the SyncTracker handles the internal statuses maps + 2.0.2 --------- * P2p rate limitng feature - added tx rebroadcast when rate limiting is reenabled From bac33fd6cc780ba381a6dc96f653e73096326567 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Mon, 24 Jul 2023 13:31:59 +0300 Subject: [PATCH 07/17] Add a flag to force connecting to known peers. --- src/main/resources/reference.conf | 3 ++ .../core/network/PeerSynchronizer.scala | 9 +++-- .../network/peer/InMemoryPeerDatabase.scala | 15 +++++-- .../scala/sparkz/core/settings/Settings.scala | 1 + .../peer/InMemoryPeerDatabaseSpec.scala | 40 +++++++++++++++++++ 5 files changed, 61 insertions(+), 7 deletions(-) diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 9332f588b..cde976f7f 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -90,6 +90,9 @@ sparkz { # List of IP addresses of well known nodes. knownPeers = [] + # Force node to only try connecting to known peers + onlyConnectToKnownPeers = false + # Interval between GetPeers messages to be send by our node to a random one getPeersInterval = 2m diff --git a/src/main/scala/sparkz/core/network/PeerSynchronizer.scala b/src/main/scala/sparkz/core/network/PeerSynchronizer.scala index 3a08aba40..f17df1391 100644 --- a/src/main/scala/sparkz/core/network/PeerSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/PeerSynchronizer.scala @@ -38,9 +38,12 @@ class PeerSynchronizer(val networkControllerRef: ActorRef, networkControllerRef ! RegisterMessageSpecs(Seq(GetPeersSpec, peersSpec), self) - val msg = Message[Unit](GetPeersSpec, Right(()), None) - val stn = SendToNetwork(msg, SendToRandom) - context.system.scheduler.scheduleWithFixedDelay(2.seconds, settings.getPeersInterval, networkControllerRef, stn) + //do not ask network for new peers if we are only connecting to known peers + if (!settings.onlyConnectToKnownPeers) { + val msg = Message[Unit](GetPeersSpec, Right(()), None) + val stn = SendToNetwork(msg, SendToRandom) + context.system.scheduler.scheduleWithFixedDelay(2.seconds, settings.getPeersInterval, networkControllerRef, stn) + } } override def receive: Receive = { diff --git a/src/main/scala/sparkz/core/network/peer/InMemoryPeerDatabase.scala b/src/main/scala/sparkz/core/network/peer/InMemoryPeerDatabase.scala index 3d5dbf89a..8b48f4083 100644 --- a/src/main/scala/sparkz/core/network/peer/InMemoryPeerDatabase.scala +++ b/src/main/scala/sparkz/core/network/peer/InMemoryPeerDatabase.scala @@ -103,11 +103,14 @@ final class InMemoryPeerDatabase(sparkzSettings: SparkzSettings, sparkzContext: bucketManager.removePeer(address) } - override def allPeers: Map[InetSocketAddress, PeerDatabaseValue] = knownPeers ++ bucketManager.getTriedPeers ++ bucketManager.getNewPeers + override def allPeers: Map[InetSocketAddress, PeerDatabaseValue] = + if (settings.onlyConnectToKnownPeers) + knownPeers + else + knownPeers ++ bucketManager.getTriedPeers ++ bucketManager.getNewPeers override def blacklistedPeers: Seq[InetAddress] = blacklist - .map { case (address, bannedTill) => - checkBanned(address, bannedTill) + .map { case (address, bannedTill) if checkBanned(address, bannedTill) => address } .toSeq @@ -170,7 +173,11 @@ final class InMemoryPeerDatabase(sparkzSettings: SparkzSettings, sparkzContext: (360 * 10).days.toMillis } - override def randomPeersSubset: Map[InetSocketAddress, PeerDatabaseValue] = knownPeers ++ bucketManager.getRandomPeers + override def randomPeersSubset: Map[InetSocketAddress, PeerDatabaseValue] = + if (settings.onlyConnectToKnownPeers) + knownPeers + else + knownPeers ++ bucketManager.getRandomPeers override def updatePeer(peerDatabaseValue: PeerDatabaseValue): Unit = { if (peerIsNotBlacklistedAndNotKnownPeer(peerDatabaseValue)) { diff --git a/src/main/scala/sparkz/core/settings/Settings.scala b/src/main/scala/sparkz/core/settings/Settings.scala index 5e9939831..7dd01ad27 100644 --- a/src/main/scala/sparkz/core/settings/Settings.scala +++ b/src/main/scala/sparkz/core/settings/Settings.scala @@ -21,6 +21,7 @@ case class NetworkSettings(nodeName: String, addedMaxDelay: Option[FiniteDuration], localOnly: Boolean, knownPeers: Seq[InetSocketAddress], + onlyConnectToKnownPeers: Boolean, bindAddress: InetSocketAddress, maxIncomingConnections: Int, maxOutgoingConnections: Int, diff --git a/src/test/scala/sparkz/core/network/peer/InMemoryPeerDatabaseSpec.scala b/src/test/scala/sparkz/core/network/peer/InMemoryPeerDatabaseSpec.scala index c2a8ca3c2..b066fe5fe 100644 --- a/src/test/scala/sparkz/core/network/peer/InMemoryPeerDatabaseSpec.scala +++ b/src/test/scala/sparkz/core/network/peer/InMemoryPeerDatabaseSpec.scala @@ -283,4 +283,44 @@ class InMemoryPeerDatabaseSpec extends NetworkTests with ObjectGenerators with B allPeers.contains(thirdAddress) shouldBe true } } + + it should "only return knownPeers if the flag is set to true" in { + val firstAddress = new InetSocketAddress(10) + val secondAddress = new InetSocketAddress(11) + val thirdAddress = new InetSocketAddress(12) + val forthAddress = new InetSocketAddress(13) + val fifthAddress = new InetSocketAddress(14) + val sixthAddress = new InetSocketAddress(15) + val knownPeers = Seq(firstAddress, secondAddress, thirdAddress) + + def withDbHavingKnownPeers(test: InMemoryPeerDatabase => Assertion): Assertion = + test(new InMemoryPeerDatabase( + settings.copy(network = settings.network.copy(penaltySafeInterval = 1.seconds, knownPeers = knownPeers, onlyConnectToKnownPeers = true)), + sparkzContext + )) + + withDbHavingKnownPeers { db => + val extraPeerOne = PeerDatabaseValue(forthAddress, getPeerInfo(forthAddress), PeerConfidence.Unknown) + val extraPeerTwo = PeerDatabaseValue(fifthAddress, getPeerInfo(fifthAddress), PeerConfidence.Unknown) + val extraPeerThree = PeerDatabaseValue(sixthAddress, getPeerInfo(sixthAddress), PeerConfidence.Unknown) + + db.addOrUpdateKnownPeer(extraPeerOne) + db.addOrUpdateKnownPeer(extraPeerTwo) + db.addOrUpdateKnownPeer(extraPeerThree) + + val allPeers = db.allPeers + allPeers.size shouldBe 3 + allPeers.foreach(p => p._2.confidence shouldBe PeerConfidence.High) + allPeers.contains(firstAddress) shouldBe true + allPeers.contains(secondAddress) shouldBe true + allPeers.contains(thirdAddress) shouldBe true + + val randomPeersSubset = db.randomPeersSubset + randomPeersSubset.size shouldBe 3 + randomPeersSubset.foreach(p => p._2.confidence shouldBe PeerConfidence.High) + randomPeersSubset.contains(firstAddress) shouldBe true + randomPeersSubset.contains(secondAddress) shouldBe true + randomPeersSubset.contains(thirdAddress) shouldBe true + } + } } \ No newline at end of file From c9ec46bc98c15c4d0e1449ef53cbf8f0750b9388 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Mon, 31 Jul 2023 16:15:49 +0300 Subject: [PATCH 08/17] Preserve the order of block during synchronization --- src/main/scala/sparkz/ObjectGenerators.scala | 2 +- .../core/network/NodeViewSynchronizer.scala | 14 +++++++------- .../core/network/message/BasicMessagesRepo.scala | 15 ++++++++------- .../core/network/MessageSpecification.scala | 14 +++++++------- .../NodeViewSynchronizerSpecification.scala | 4 ++-- 5 files changed, 25 insertions(+), 24 deletions(-) diff --git a/src/main/scala/sparkz/ObjectGenerators.scala b/src/main/scala/sparkz/ObjectGenerators.scala index e0cefca77..f9e65bd75 100644 --- a/src/main/scala/sparkz/ObjectGenerators.scala +++ b/src/main/scala/sparkz/ObjectGenerators.scala @@ -76,7 +76,7 @@ trait ObjectGenerators { lazy val modifiersGen: Gen[ModifiersData] = for { modifierTypeId: ModifierTypeId <- modifierTypeIdGen - modifiers: Map[ModifierId, Array[Byte]] <- Gen.nonEmptyMap(modifierWithIdGen).suchThat(_.nonEmpty) + modifiers: Seq[(ModifierId, Array[Byte])] <- Gen.nonEmptyListOf(modifierWithIdGen).suchThat(_.nonEmpty) } yield ModifiersData(modifierTypeId, modifiers) lazy val appVersionGen: Gen[Version] = for { diff --git a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala index d2ad5181f..265f5a7f8 100644 --- a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala @@ -282,7 +282,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes val typeId = data.typeId val modifiers = data.modifiers log.info(s"Got ${modifiers.size} modifiers of type $typeId from remote connected peer: $remote") - log.trace(s"Received modifier ids ${modifiers.keySet.map(encoder.encodeId).mkString(",")}") + log.trace(s"Received modifier ids ${modifiers.map(_._1).map(encoder.encodeId).mkString(",")}") // filter out non-requested modifiers val requestedModifiers = processSpam(remote, typeId, modifiers) @@ -296,7 +296,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes case Some(serializer: SparkzSerializer[PMOD]@unchecked) => // parse all modifiers and put them to modifiers cache - log.info(s"Received block ids ${modifiers.keySet.map(encoder.encodeId).mkString(",")}") + log.info(s"Received block ids ${modifiers.map(_._1).map(encoder.encodeId).mkString(",")}") val parsed: Iterable[PMOD] = parseModifiers(requestedModifiers, serializer, remote) val valid: Iterable[PMOD] = parsed.filter(validateAndSetStatus(remote, _)) if (valid.nonEmpty) viewHolderRef ! ModifiersFromRemote[PMOD](valid) @@ -337,7 +337,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes * * @return collection of parsed modifiers */ - private def parseModifiers[M <: NodeViewModifier](modifiers: Map[ModifierId, Array[Byte]], + private def parseModifiers[M <: NodeViewModifier](modifiers: Seq[(ModifierId, Array[Byte])], serializer: SparkzSerializer[M], remote: ConnectedPeer): Iterable[M] = { modifiers.flatMap { case (id, bytes) => @@ -367,7 +367,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes */ private def processSpam(remote: ConnectedPeer, typeId: ModifierTypeId, - modifiers: Map[ModifierId, Array[Byte]]): Map[ModifierId, Array[Byte]] = { + modifiers: Seq[(ModifierId, Array[Byte])]): Seq[(ModifierId, Array[Byte])] = { val (requested, spam) = modifiers.partition { case (id, _) => deliveryTracker.status(id) == Requested @@ -375,7 +375,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes if (spam.nonEmpty) { log.info(s"Spam attempt: peer $remote has sent a non-requested modifiers of type $typeId with ids" + - s": ${spam.keys.map(encoder.encodeId)}") + s": ${spam.map(_._1).map(encoder.encodeId)}") penalizeSpammingPeer(remote) } requested @@ -433,7 +433,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes size += NodeViewModifier.ModifierIdSize + 4 + modBytes.length size < networkSettings.maxModifiersSpecMessageSize } - peer.handlerRef ! Message(modifiersSpec, Right(ModifiersData(modType, batch.toMap)), None) + peer.handlerRef ! Message(modifiersSpec, Right(ModifiersData(modType, batch)), None) val remaining = mods.drop(batch.length) if (remaining.nonEmpty) { sendByParts(remaining) @@ -454,7 +454,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes val mods = deliveryTracker.getRebroadcastModifiers mempoolReaderOpt match { case Some(mempool) => - mempool.getAll(ids = mods).foreach { tx =>broadcastModifierInv(tx) } + mempool.getAll(ids = mods).foreach { tx => broadcastModifierInv(tx) } case None => log.warn(s"Trying to rebroadcast while readers are not ready $mempoolReaderOpt") } diff --git a/src/main/scala/sparkz/core/network/message/BasicMessagesRepo.scala b/src/main/scala/sparkz/core/network/message/BasicMessagesRepo.scala index 75707e417..bd67a860c 100644 --- a/src/main/scala/sparkz/core/network/message/BasicMessagesRepo.scala +++ b/src/main/scala/sparkz/core/network/message/BasicMessagesRepo.scala @@ -9,9 +9,9 @@ import sparkz.util.Extensions._ import sparkz.util.serialization.{Reader, Writer} import sparkz.util.{ModifierId, SparkzLogging, bytesToId, idToBytes} -import scala.collection.immutable +import scala.collection.mutable -case class ModifiersData(typeId: ModifierTypeId, modifiers: Map[ModifierId, Array[Byte]]) +case class ModifiersData(typeId: ModifierTypeId, modifiers: Seq[(ModifierId, Array[Byte])]) case class InvData(typeId: ModifierTypeId, ids: Seq[ModifierId]) @@ -157,7 +157,7 @@ class ModifiersSpec(maxMessageSize: Int) extends MessageSpecV1[ModifiersData] wi } if (msgSize > maxMessageSize) { - log.warn(s"Message with modifiers ${modifiers.keySet} has size $msgSize exceeding limit $maxMessageSize." + + log.warn(s"Message with modifiers ${modifiers.map(_._1)} has size $msgSize exceeding limit $maxMessageSize." + s" Sending ${w.length() - start} bytes instead") } } @@ -165,7 +165,7 @@ class ModifiersSpec(maxMessageSize: Int) extends MessageSpecV1[ModifiersData] wi override def parse(r: Reader): ModifiersData = { val typeId = ModifierTypeId @@ r.getByte() // 1 byte val count = r.getUInt().toIntExact // 8 bytes - val resMap = immutable.Map.newBuilder[ModifierId, Array[Byte]] + val res = mutable.Buffer[(ModifierId, Array[Byte])]() (0 until count).foldLeft(HeaderLength) { case (msgSize, _) => val id = bytesToId(r.getBytes(NodeViewModifier.ModifierIdSize)) val objBytesCnt = r.getUInt().toIntExact @@ -174,10 +174,10 @@ class ModifiersSpec(maxMessageSize: Int) extends MessageSpecV1[ModifiersData] wi throw new Exception("Too big message with modifiers, size: " + maxMessageSize) } val obj = r.getBytes(objBytesCnt) - resMap += (id -> obj) + res += (id -> obj) newMsgSize } - ModifiersData(typeId, resMap.result()) + ModifiersData(typeId, res.toSeq) } } @@ -256,8 +256,9 @@ class HandshakeSpec(featureSerializers: PeerFeature.Serializers, sizeLimit: Int) /** * Serializing handshake into a byte writer. + * * @param hs - handshake instance - * @param w - writer to write bytes to + * @param w - writer to write bytes to */ override def serialize(hs: Handshake, w: Writer): Unit = { // first writes down handshake time, then peer specification of our node diff --git a/src/test/scala/sparkz/core/network/MessageSpecification.scala b/src/test/scala/sparkz/core/network/MessageSpecification.scala index 0721ac0c9..1cec19142 100644 --- a/src/test/scala/sparkz/core/network/MessageSpecification.scala +++ b/src/test/scala/sparkz/core/network/MessageSpecification.scala @@ -76,14 +76,14 @@ class MessageSpecification extends AnyPropSpec val recovered = modifiersSpec.parseByteString(bytes) recovered.typeId shouldEqual data.typeId - recovered.modifiers.keys.size shouldEqual data.modifiers.keys.size + recovered.modifiers.map(_._1).size shouldEqual data.modifiers.map(_._1).size - recovered.modifiers.keys.foreach { id => - data.modifiers.get(id).isDefined shouldEqual true + recovered.modifiers.map(_._1).foreach { id => + data.modifiers.exists(_._1 == id) shouldEqual true } - recovered.modifiers.values.toSet.foreach { v: Array[Byte] => - data.modifiers.values.toSet.exists(_.sameElements(v)) shouldEqual true + recovered.modifiers.map(_._2).toSet.foreach { v: Array[Byte] => + data.modifiers.map(_._2).toSet.exists(_.sameElements(v)) shouldEqual true } modifiersSpec.toByteString(data) shouldEqual bytes @@ -93,8 +93,8 @@ class MessageSpecification extends AnyPropSpec val recovered2 = modifiersSpecLimited.parseByteString(bytes2) recovered2.typeId shouldEqual data.typeId - (recovered2.modifiers.keys.size == data.modifiers.keys.size) shouldEqual false - recovered2.modifiers.keys.size shouldEqual 0 + (recovered2.modifiers.map(_._1).size == data.modifiers.map(_._1).size) shouldEqual false + recovered2.modifiers.map(_._1).size shouldEqual 0 } } } diff --git a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala index 7cb4a937d..4692815fa 100644 --- a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala +++ b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala @@ -39,7 +39,7 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa val transaction = TestTransaction(1, 1) val txBytes = TestTransactionSerializer.toBytes(transaction) - synchronizer ! roundTrip(Message(modifiersSpec, Right(ModifiersData(Transaction.ModifierTypeId, Map(transaction.id -> txBytes))), Some(peer))) + synchronizer ! roundTrip(Message(modifiersSpec, Right(ModifiersData(Transaction.ModifierTypeId, Seq(transaction.id -> txBytes))), Some(peer))) viewHolder.expectMsg(TransactionsFromRemote(Seq(transaction))) networkController.expectNoMessage() } @@ -48,7 +48,7 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa val transaction = TestTransaction(1, 1) val txBytes = TestTransactionSerializer.toBytes(transaction) ++ Array[Byte](0x01, 0x02) - synchronizer ! roundTrip(Message(modifiersSpec, Right(ModifiersData(Transaction.ModifierTypeId, Map(transaction.id -> txBytes))), Some(peer))) + synchronizer ! roundTrip(Message(modifiersSpec, Right(ModifiersData(Transaction.ModifierTypeId, Seq(transaction.id -> txBytes))), Some(peer))) viewHolder.expectMsg(TransactionsFromRemote(Seq(transaction))) networkController.expectMsg(PenalizePeer(peer.connectionId.remoteAddress, MisbehaviorPenalty)) } From 7e3d31898018f428c83c4b534f119b30604c1658 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Tue, 8 Aug 2023 15:20:32 +0300 Subject: [PATCH 09/17] Core Forgers Network Impl --- src/main/resources/reference.conf | 6 ++ .../core/network/NetworkController.scala | 37 ++++++++-- .../core/network/NodeViewSynchronizer.scala | 12 ++-- .../core/network/PeerConnectionHandler.scala | 3 +- .../scala/sparkz/core/network/PeerSpec.scala | 8 ++- .../sparkz/core/network/SendingStrategy.scala | 14 ++++ .../core/network/peer/PeerDatabase.scala | 2 +- .../core/network/peer/PeerManager.scala | 50 +++++-------- .../scala/sparkz/core/settings/Settings.scala | 6 +- .../core/network/NetworkControllerSpec.scala | 70 ++++++++++++++++++- .../sparkz/core/network/NetworkTests.scala | 4 +- .../core/network/peer/PeerManagerSpec.scala | 34 ++++++++- 12 files changed, 191 insertions(+), 55 deletions(-) diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 9332f588b..cca96338e 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -99,6 +99,9 @@ sparkz { # Number of outgoing network connections maxOutgoingConnections = 10 + # Number of outgoing network connections + maxForgerConnections = 20 + # Network connection timeout connectionTimeout = 1s @@ -219,6 +222,9 @@ sparkz { # Enables transactions in the mempool handlingTransactionsEnabled = true + + # Is this node a forger + isForgerNode = false } ntp { diff --git a/src/main/scala/sparkz/core/network/NetworkController.scala b/src/main/scala/sparkz/core/network/NetworkController.scala index 90e9761d9..a4bb7e425 100644 --- a/src/main/scala/sparkz/core/network/NetworkController.scala +++ b/src/main/scala/sparkz/core/network/NetworkController.scala @@ -61,7 +61,7 @@ class NetworkController(settings: NetworkSettings, private var connections = Map.empty[InetSocketAddress, ConnectedPeer] private var unconfirmedConnections = Set.empty[InetSocketAddress] - private val maxConnections = settings.maxIncomingConnections + settings.maxOutgoingConnections + private val maxConnections = settings.maxIncomingConnections + settings.maxOutgoingConnections + settings.maxForgerConnections private val peersLastConnectionAttempts = new LRUSimpleCache[InetSocketAddress, TimeProvider.Time](threshold = 10000) private val tryNewConnectionAttemptDelay = 5.seconds @@ -206,7 +206,7 @@ class NetworkController(settings: NetworkSettings, log.info(s"Unconfirmed connection: ($remoteAddress, $localAddress) => $connectionId") val handlerRef = sender() - if (connectionDirection.isOutgoing && canEstablishNewOutgoingConnection) { + if (connectionDirection.isOutgoing && (canEstablishNewOutgoingConnection || canEstablishNewForgerConnection)) { createPeerConnectionHandler(connectionId, handlerRef) } else if (connectionDirection.isIncoming && canEstablishNewIncomingConnection) @@ -259,7 +259,7 @@ class NetworkController(settings: NetworkSettings, private def scheduleConnectionToPeer(): Unit = { context.system.scheduler.scheduleWithFixedDelay(5.seconds, tryNewConnectionAttemptDelay) { () => { - if (canEstablishNewOutgoingConnection) { + if (canEstablishNewOutgoingConnection || canEstablishNewForgerConnection) { log.trace(s"Looking for a new random connection") connectionToPeer(connections, unconfirmedConnections) } @@ -275,6 +275,22 @@ class NetworkController(settings: NetworkSettings, getIncomingConnectionsSize < settings.maxIncomingConnections } + private def canEstablishNewForgerConnection: Boolean = { + getForgerConnectionsSize < settings.maxForgerConnections + } + + private def shouldDropForgerConnection: Boolean = { + getForgerConnectionsSize > settings.maxForgerConnections + } + + private def shouldDropOutgoingConnection: Boolean = { + getOutgoingConnectionsSize > settings.maxOutgoingConnections + } + + private def shouldDropIncomingConnection: Boolean = { + getIncomingConnectionsSize > settings.maxIncomingConnections + } + private def getOutgoingConnectionsSize: Int = { connections.count { p => p._2.connectionId.direction == Outgoing } } @@ -283,6 +299,10 @@ class NetworkController(settings: NetworkSettings, connections.count { p => p._2.connectionId.direction == Incoming } } + private def getForgerConnectionsSize: Int = { + connections.count { p => p._2.peerInfo.exists(_.peerSpec.forgerPeer) } + } + private def connectionToPeer(activeConnections: Map[InetSocketAddress, ConnectedPeer], unconfirmedConnections: Set[InetSocketAddress]): Unit = { val connectionsAddressSeq = activeConnections.values.flatMap(_.peerInfo).toSeq val unconfirmedConnectionsAddressSeq = unconfirmedConnections.map(connection => PeerInfo.fromAddress(connection)).toSeq @@ -415,13 +435,22 @@ class NetworkController(settings: NetworkSettings, // Drop connection to self if occurred or peer already connected. // Decision whether connection is local or is from some other network is made // based on SessionIdPeerFeature if exists or in old way using isSelf() function - val shouldDrop = + var shouldDrop = connectionForPeerAddress(peerAddress).exists(_.handlerRef != peerHandlerRef) || peerInfo.peerSpec.features.collectFirst { case SessionIdPeerFeature(networkMagic, sessionId) => !networkMagic.sameElements(mySessionIdFeature.networkMagic) || sessionId == mySessionIdFeature.sessionId }.getOrElse(isSelf(remoteAddress)) + // We allow temporary overflowing outgoing connection limits to get the peerInfo and see if peer if a forger. + // Drop connection if the peer does not fit in the limits. + val isForgerConnection = peerInfo.peerSpec.forgerPeer + + shouldDrop = shouldDrop || + (isForgerConnection && shouldDropForgerConnection) || + (!isForgerConnection && peerInfo.connectionType.contains(Incoming) && shouldDropIncomingConnection) || + (!isForgerConnection && peerInfo.connectionType.contains(Outgoing) && shouldDropOutgoingConnection) + if (shouldDrop) { connectedPeer.handlerRef ! CloseConnection peerManagerRef ! RemovePeer(peerAddress) diff --git a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala index e71ebf09d..d33ea02b1 100644 --- a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala @@ -99,10 +99,10 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes deliveryTracker.putInRebroadcastQueue(m.id) case _ => val msg = Message(invSpec, Right(InvData(m.modifierTypeId, Seq(m.id))), None) - if (m.modifierTypeId == Transaction.ModifierTypeId) - networkControllerRef ! SendToNetwork(msg, BroadcastTransaction) - else - networkControllerRef ! SendToNetwork(msg, Broadcast) + if (m.modifierTypeId == Transaction.ModifierTypeId) + networkControllerRef ! SendToNetwork(msg, BroadcastTransaction) + else + networkControllerRef ! SendToNetwork(msg, BroadcastBlock(networkSettings)) } } @@ -198,8 +198,8 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes // Send history extension to the (less developed) peer 'remote' which does not have it. @nowarn def sendExtension(remote: ConnectedPeer, - status: HistoryComparisonResult, - ext: Seq[(ModifierTypeId, ModifierId)]): Unit = + status: HistoryComparisonResult, + ext: Seq[(ModifierTypeId, ModifierId)]): Unit = ext.groupBy(_._1).mapValues(_.map(_._2)).foreach { case (mid, mods) => networkControllerRef ! SendToNetwork(Message(invSpec, Right(InvData(mid, mods)), None), SendToPeer(remote)) diff --git a/src/main/scala/sparkz/core/network/PeerConnectionHandler.scala b/src/main/scala/sparkz/core/network/PeerConnectionHandler.scala index 1cc738adf..dfae2e921 100644 --- a/src/main/scala/sparkz/core/network/PeerConnectionHandler.scala +++ b/src/main/scala/sparkz/core/network/PeerConnectionHandler.scala @@ -247,7 +247,8 @@ class PeerConnectionHandler(val settings: NetworkSettings, Version(settings.appVersion), settings.nodeName, ownSocketAddress, - localFeatures + localFeatures, + settings.isForgerNode ), sparkzContext.timeProvider.time() ) diff --git a/src/main/scala/sparkz/core/network/PeerSpec.scala b/src/main/scala/sparkz/core/network/PeerSpec.scala index b079a2335..374efa072 100644 --- a/src/main/scala/sparkz/core/network/PeerSpec.scala +++ b/src/main/scala/sparkz/core/network/PeerSpec.scala @@ -24,7 +24,8 @@ case class PeerSpec(agentName: String, protocolVersion: Version, nodeName: String, declaredAddress: Option[InetSocketAddress], - features: Seq[PeerFeature]) { + features: Seq[PeerFeature], + forgerPeer: Boolean = false) { lazy val localAddressOpt: Option[InetSocketAddress] = { features.collectFirst { case LocalAddressPeerFeature(addr) => addr } @@ -72,6 +73,7 @@ class PeerSpecSerializer(featureSerializers: PeerFeature.Serializers) extends Sp w.putUShort(fBytes.length.toShortExact) w.putBytes(fBytes) } + w.putBoolean(obj.forgerPeer) } override def parse(r: Reader): PeerSpec = { @@ -101,7 +103,9 @@ class PeerSpecSerializer(featureSerializers: PeerFeature.Serializers) extends Sp } } - PeerSpec(appName, protocolVersion, nodeName, declaredAddressOpt, feats) + val forgerPeer = r.getBoolean() + + PeerSpec(appName, protocolVersion, nodeName, declaredAddressOpt, feats, forgerPeer) } } diff --git a/src/main/scala/sparkz/core/network/SendingStrategy.scala b/src/main/scala/sparkz/core/network/SendingStrategy.scala index 4d228b032..d62ef4a8a 100644 --- a/src/main/scala/sparkz/core/network/SendingStrategy.scala +++ b/src/main/scala/sparkz/core/network/SendingStrategy.scala @@ -1,8 +1,10 @@ package sparkz.core.network import sparkz.core.network.peer.TransactionsDisabledPeerFeature +import sparkz.core.settings.NetworkSettings import java.security.SecureRandom +import scala.util.Random trait SendingStrategy { val secureRandom = new SecureRandom() @@ -31,6 +33,18 @@ case object BroadcastTransaction extends SendingStrategy { } } +case class BroadcastBlock(settings: NetworkSettings) extends SendingStrategy { + val maxBlockBroadcastPeers: Int = settings.maxForgerConnections + + (settings.maxOutgoingConnections + settings.maxIncomingConnections) / 2 + + override def choose(peers: Seq[ConnectedPeer]): Seq[ConnectedPeer] = { + val forgerPeers = peers.filter(_.peerInfo.exists(_.peerSpec.forgerPeer)) + val remainingPeers = peers.filter(_.peerInfo.exists(!_.peerSpec.forgerPeer)) + + forgerPeers ++ Random.shuffle(remainingPeers).take(maxBlockBroadcastPeers - forgerPeers.length) + } +} + case class BroadcastExceptOf(exceptOf: Seq[ConnectedPeer]) extends SendingStrategy { override def choose(peers: Seq[ConnectedPeer]): Seq[ConnectedPeer] = peers.filterNot(exceptOf.contains) diff --git a/src/main/scala/sparkz/core/network/peer/PeerDatabase.scala b/src/main/scala/sparkz/core/network/peer/PeerDatabase.scala index 2cdb1804b..24dd03fe3 100644 --- a/src/main/scala/sparkz/core/network/peer/PeerDatabase.scala +++ b/src/main/scala/sparkz/core/network/peer/PeerDatabase.scala @@ -45,7 +45,7 @@ object PeerDatabase { */ object PeerConfidence extends Enumeration { type PeerConfidence = Value - val Unknown, Low, Medium, High: Value = Value + val Unknown, Low, Medium, High, Forger: Value = Value } case class PeerDatabaseValue(address: InetSocketAddress, peerInfo: PeerInfo, confidence: PeerConfidence) { diff --git a/src/main/scala/sparkz/core/network/peer/PeerManager.scala b/src/main/scala/sparkz/core/network/peer/PeerManager.scala index a79dab635..27dc4ba74 100644 --- a/src/main/scala/sparkz/core/network/peer/PeerManager.scala +++ b/src/main/scala/sparkz/core/network/peer/PeerManager.scala @@ -43,7 +43,7 @@ class PeerManager( PeerDatabaseValue( extractAddressFromPeerInfo(peerInfo), peerInfo, - PeerConfidence.Unknown + if (peerInfo.peerSpec.forgerPeer) PeerConfidence.Forger else PeerConfidence.Unknown ) ) } @@ -75,7 +75,8 @@ class PeerManager( val address: InetSocketAddress = peerSpec.address.getOrElse(throw new IllegalArgumentException()) val peerInfo: PeerInfo = PeerInfo(peerSpec, 0L, None) log.info(s"New discovered peer: $peerInfo") - PeerDatabaseValue(address, peerInfo, PeerConfidence.Unknown) + val peerConfidence = if (peerInfo.peerSpec.forgerPeer) PeerConfidence.Forger else PeerConfidence.Unknown + PeerDatabaseValue(address, peerInfo, peerConfidence) } peerDatabase.addOrUpdateKnownPeers(filteredPeers) @@ -195,16 +196,23 @@ object PeerManager { sparkzContext: SparkzContext): Option[PeerInfo] = { var response: Option[PeerInfo] = None - val highConfidencePeers = peers.filter(_._2.confidence == PeerConfidence.High) - val highConfidenceCandidates = highConfidencePeers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq + val forgerPeers = peers.filter(_._2.confidence == PeerConfidence.Forger) + val forgerCandidates = forgerPeers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq - if (highConfidenceCandidates.nonEmpty) { - response = Some(highConfidenceCandidates(secureRandom.nextInt(highConfidenceCandidates.size)).peerInfo) + if (forgerCandidates.nonEmpty) { + response = Some(forgerCandidates(secureRandom.nextInt(forgerCandidates.size)).peerInfo) } else { - val candidates = peers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq + val highConfidencePeers = peers.filter(_._2.confidence == PeerConfidence.High) + val highConfidenceCandidates = highConfidencePeers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq + + if (highConfidenceCandidates.nonEmpty) { + response = Some(highConfidenceCandidates(secureRandom.nextInt(highConfidenceCandidates.size)).peerInfo) + } else { + val candidates = peers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq - if (candidates.nonEmpty) - response = Some(candidates(secureRandom.nextInt(candidates.size)).peerInfo) + if (candidates.nonEmpty) + response = Some(candidates(secureRandom.nextInt(candidates.size)).peerInfo) + } } response @@ -219,30 +227,6 @@ object PeerManager { } } - case class RandomPeerExcluding(excludedPeers: Seq[Option[InetSocketAddress]]) extends GetPeers[Option[PeerInfo]] { - private val secureRandom = new SecureRandom() - - override def choose(peers: Map[InetSocketAddress, PeerDatabaseValue], - blacklistedPeers: Seq[InetAddress], - sparkzContext: SparkzContext): Option[PeerInfo] = { - var response: Option[PeerInfo] = None - - val highConfidencePeers = peers.filter(_._2.confidence == PeerConfidence.High) - val highConfidenceCandidates = highConfidencePeers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq - - if (highConfidenceCandidates.nonEmpty) { - response = Some(highConfidenceCandidates(secureRandom.nextInt(highConfidenceCandidates.size)).peerInfo) - } else { - val candidates = peers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq - - if (candidates.nonEmpty) - response = Some(candidates(secureRandom.nextInt(candidates.size)).peerInfo) - } - - response - } - } - case object GetBlacklistedPeers extends GetPeers[Seq[InetAddress]] { override def choose(peers: Map[InetSocketAddress, PeerDatabaseValue], diff --git a/src/main/scala/sparkz/core/settings/Settings.scala b/src/main/scala/sparkz/core/settings/Settings.scala index 5e9939831..c5aed3a97 100644 --- a/src/main/scala/sparkz/core/settings/Settings.scala +++ b/src/main/scala/sparkz/core/settings/Settings.scala @@ -2,7 +2,6 @@ package sparkz.core.settings import java.io.File import java.net.InetSocketAddress - import com.typesafe.config.{Config, ConfigFactory} import net.ceedubs.ficus.Ficus._ import net.ceedubs.ficus.readers.ArbitraryTypeReader._ @@ -10,6 +9,7 @@ import sparkz.core.network.message.Message import sparkz.core.utils.NetworkTimeProviderSettings import sparkz.util.SparkzLogging +import scala.collection.Seq import scala.concurrent.duration._ case class RESTApiSettings(bindAddress: InetSocketAddress, @@ -24,6 +24,7 @@ case class NetworkSettings(nodeName: String, bindAddress: InetSocketAddress, maxIncomingConnections: Int, maxOutgoingConnections: Int, + maxForgerConnections: Int, connectionTimeout: FiniteDuration, declaredAddress: Option[InetSocketAddress], handshakeTimeout: FiniteDuration, @@ -62,7 +63,8 @@ case class NetworkSettings(nodeName: String, temporalBanDuration: FiniteDuration, penaltySafeInterval: FiniteDuration, penaltyScoreThreshold: Int, - handlingTransactionsEnabled: Boolean) + handlingTransactionsEnabled: Boolean, + isForgerNode: Boolean) case class SparkzSettings(dataDir: File, logDir: File, diff --git a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala index f3dc238d8..0d1ab82e8 100644 --- a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala +++ b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala @@ -264,7 +264,7 @@ class NetworkControllerSpec extends NetworkTests with ScalaFutures { val tcpManagerProbe = TestProbe() val nodeAddr = new InetSocketAddress("88.77.66.55", 12345) - val settings2 = settings.copy(network = settings.network.copy(bindAddress = nodeAddr, maxOutgoingConnections = 1)) + val settings2 = settings.copy(network = settings.network.copy(bindAddress = nodeAddr, maxOutgoingConnections = 1, maxForgerConnections = 0)) val (networkControllerRef: ActorRef, _) = createNetworkController(settings2, tcpManagerProbe) val testPeer = new TestPeer(settings2, networkControllerRef, tcpManagerProbe) @@ -286,6 +286,70 @@ class NetworkControllerSpec extends NetworkTests with ScalaFutures { system.terminate() } + it should "allow connecting to forger over maxOutgoingConnections limit" in { + implicit val system: ActorSystem = ActorSystem() + + val tcpManagerProbe = TestProbe() + + val nodeAddr = new InetSocketAddress("88.77.66.55", 12345) + val settings2 = settings.copy(network = settings.network.copy(bindAddress = nodeAddr, maxOutgoingConnections = 1, maxForgerConnections = 1)) + val (networkControllerRef: ActorRef, _) = createNetworkController(settings2, tcpManagerProbe) + + val testPeer = new TestPeer(settings2, networkControllerRef, tcpManagerProbe) + val peer1DecalredAddr = new InetSocketAddress("88.77.66.55", 5678) + val peer1LocalAddr = new InetSocketAddress("192.168.1.55", 5678) + val peerInfo1 = getPeerInfo(peer1LocalAddr) + testPeer.establishNewOutgoingConnection(peerInfo1) + testPeer.connectAndExpectSuccessfulMessages(peer1LocalAddr, nodeAddr, Tcp.ResumeReading) + testPeer.receiveHandshake + testPeer.sendHandshake(Some(peer1DecalredAddr), Some(peer1LocalAddr)) + testPeer.receiveGetPeers + testPeer.sendPeers(Seq.empty) + + val peer2DeclaredAddr = new InetSocketAddress("99.88.77.66", 5678) + val peer2LocalAddr = new InetSocketAddress("192.168.1.56", 5678) + val peerInfo2 = getPeerInfo(peer2LocalAddr) + testPeer.establishNewOutgoingConnection(peerInfo2) + testPeer.connectAndExpectSuccessfulMessages(peer2LocalAddr, nodeAddr, Tcp.ResumeReading) + testPeer.receiveHandshake + testPeer.sendHandshake(Some(peer2DeclaredAddr), Some(peer2LocalAddr), forgerNode = true) + tcpManagerProbe.expectNoMessage() + + system.terminate() + } + + it should "disconnect after handshake if the maxOutgoingConnections limit reached" in { + implicit val system: ActorSystem = ActorSystem() + + val tcpManagerProbe = TestProbe() + + val nodeAddr = new InetSocketAddress("88.77.66.55", 12345) + val settings2 = settings.copy(network = settings.network.copy(bindAddress = nodeAddr, maxOutgoingConnections = 1, maxForgerConnections = 1)) + val (networkControllerRef: ActorRef, _) = createNetworkController(settings2, tcpManagerProbe) + + val testPeer = new TestPeer(settings2, networkControllerRef, tcpManagerProbe) + val peer1DecalredAddr = new InetSocketAddress("88.77.66.55", 5678) + val peer1LocalAddr = new InetSocketAddress("192.168.1.55", 5678) + val peerInfo1 = getPeerInfo(peer1LocalAddr) + testPeer.establishNewOutgoingConnection(peerInfo1) + testPeer.connectAndExpectSuccessfulMessages(peer1LocalAddr, nodeAddr, Tcp.ResumeReading) + testPeer.receiveHandshake + testPeer.sendHandshake(Some(peer1DecalredAddr), Some(peer1LocalAddr)) + testPeer.receiveGetPeers + testPeer.sendPeers(Seq.empty) + + val peer2DeclaredAddr = new InetSocketAddress("99.88.77.66", 5678) + val peer2LocalAddr = new InetSocketAddress("192.168.1.56", 5678) + val peerInfo2 = getPeerInfo(peer2LocalAddr) + testPeer.establishNewOutgoingConnection(peerInfo2) + testPeer.connectAndExpectSuccessfulMessages(peer2LocalAddr, nodeAddr, Tcp.ResumeReading) + testPeer.receiveHandshake + testPeer.sendHandshake(Some(peer2DeclaredAddr), Some(peer2LocalAddr)) + tcpManagerProbe.expectMsg(Abort) + + system.terminate() + } + it should "not send known local address of peer when node is not in local network" in { implicit val system: ActorSystem = ActorSystem() @@ -675,12 +739,12 @@ class TestPeer(settings: SparkzSettings, networkControllerRef: ActorRef, tcpMana * @param localAddress * @return */ - def sendHandshake(declaredAddress: Option[InetSocketAddress], localAddress: Option[InetSocketAddress]): Tcp.ResumeReading.type = { + def sendHandshake(declaredAddress: Option[InetSocketAddress], localAddress: Option[InetSocketAddress], forgerNode: Boolean = false): Tcp.ResumeReading.type = { val localFeature: Seq[PeerFeature] = localAddress.map(LocalAddressPeerFeature(_)).toSeq val features = localFeature :+ SessionIdPeerFeature(settings.network.magicBytes) val handshakeToNode = Handshake(PeerSpec(settings.network.agentName, Version(settings.network.appVersion), "test", - declaredAddress, features), timeProvider.time()) + declaredAddress, features, forgerNode), timeProvider.time()) tcpManagerProbe.send(connectionHandler, Tcp.Received(ByteString(handshakeSerializer.toBytes(handshakeToNode)))) tcpManagerProbe.expectMsg(Tcp.ResumeReading) diff --git a/src/test/scala/sparkz/core/network/NetworkTests.scala b/src/test/scala/sparkz/core/network/NetworkTests.scala index 3c4cb8c79..e4ea72958 100644 --- a/src/test/scala/sparkz/core/network/NetworkTests.scala +++ b/src/test/scala/sparkz/core/network/NetworkTests.scala @@ -18,8 +18,8 @@ class NetworkTests extends AnyFlatSpec with Matchers { protected def currentTime(): TimeProvider.Time = mockTimeProvider.time() - protected def getPeerInfo(address: InetSocketAddress, nameOpt: Option[String] = None, featureSeq: Seq[PeerFeature] = Seq()): PeerInfo = { - val data = PeerSpec("full node", Version.last, nameOpt.getOrElse(address.toString), Some(address), featureSeq) + protected def getPeerInfo(address: InetSocketAddress, nameOpt: Option[String] = None, featureSeq: Seq[PeerFeature] = Seq(), forgerNode: Boolean = false): PeerInfo = { + val data = PeerSpec("full node", Version.last, nameOpt.getOrElse(address.toString), Some(address), featureSeq, forgerNode) PeerInfo(data, currentTime(), None) } diff --git a/src/test/scala/sparkz/core/network/peer/PeerManagerSpec.scala b/src/test/scala/sparkz/core/network/peer/PeerManagerSpec.scala index a302594be..ba74bcf71 100644 --- a/src/test/scala/sparkz/core/network/peer/PeerManagerSpec.scala +++ b/src/test/scala/sparkz/core/network/peer/PeerManagerSpec.scala @@ -11,9 +11,11 @@ import sparkz.core.network.{NetworkTests, PeerSpec} import java.net.{InetAddress, InetSocketAddress} import scala.concurrent.ExecutionContext -class PeerManagerSpec(implicit val ec: ExecutionContext) extends NetworkTests with BeforeAndAfter { +class PeerManagerSpec extends NetworkTests with BeforeAndAfter { import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddOrUpdatePeer, GetAllPeers} + implicit val actorSystem: ActorSystem = ActorSystem() + implicit val ec: ExecutionContext = actorSystem.dispatchers.lookup("sparkz.executionContext") type Data = Map[InetSocketAddress, PeerInfo] private val DefaultPort = 27017 @@ -355,4 +357,34 @@ class PeerManagerSpec(implicit val ec: ExecutionContext) extends NetworkTests wi system.terminate() } + + it should "prioritize forgerPeers over other peers" in { + // Arrange + val knownPeerAddress1 = new InetSocketAddress("127.0.0.1", DefaultPort) + val knownPeerAddress2 = new InetSocketAddress("127.0.0.2", DefaultPort) + val settingsWithKnownPeer = settings.copy(network = settings.network.copy(knownPeers = Seq(knownPeerAddress1, knownPeerAddress2))) + + implicit val system: ActorSystem = ActorSystem() + val p = TestProbe("p")(system) + implicit val defaultSender: ActorRef = p.testActor + + val sparkzContext = SparkzContext(Seq.empty, Seq.empty, mockTimeProvider, None) + val peerDatabase = new InMemoryPeerDatabase(settingsWithKnownPeer, sparkzContext) + val peerManager = PeerManagerRef(settingsWithKnownPeer, sparkzContext, peerDatabase)(system, ec) + + val peerAddress = new InetSocketAddress("1.1.1.1", DefaultPort) + val peerInfo = getPeerInfo(peerAddress, forgerNode = true) + + // Act + peerManager ! AddOrUpdatePeer(peerInfo) + + peerManager ! RandomPeerForConnectionExcluding(Seq(Some(knownPeerAddress2))) + + // Assert + val data = p.expectMsgClass(classOf[Option[PeerInfo]]) + data shouldNot be(empty) + data.foreach(p => p.peerSpec.address.contains(peerAddress) shouldBe true) + + system.terminate() + } } From 685e1a40ce1b106d1dfd8e28f87e70caccb691f4 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Mon, 21 Aug 2023 16:27:42 +0300 Subject: [PATCH 10/17] ModifiersCache Map -> LinkedHashMap --- src/main/scala/sparkz/core/ModifiersCache.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/sparkz/core/ModifiersCache.scala b/src/main/scala/sparkz/core/ModifiersCache.scala index f2db77905..035f048e3 100644 --- a/src/main/scala/sparkz/core/ModifiersCache.scala +++ b/src/main/scala/sparkz/core/ModifiersCache.scala @@ -22,7 +22,7 @@ trait ModifiersCache[PMOD <: PersistentNodeViewModifier, H <: HistoryReader[PMOD type K = sparkz.util.ModifierId type V = PMOD - protected val cache: mutable.Map[K, V] = mutable.Map[K, V]() + protected val cache: mutable.Map[K, V] = mutable.LinkedHashMap[K, V]() override def modifierById(modifierId: sparkz.util.ModifierId): Option[PMOD] = cache.get(modifierId) From 06ddd64702c12ccd2acea06e4a68b807c8d539c5 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Wed, 23 Aug 2023 18:04:59 +0300 Subject: [PATCH 11/17] review fixes --- src/main/resources/reference.conf | 2 +- .../sparkz/core/network/NetworkController.scala | 12 ++++++++---- .../scala/sparkz/core/network/SendingStrategy.scala | 3 +-- src/main/scala/sparkz/core/settings/Settings.scala | 1 - 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index cca96338e..311c7e6d7 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -99,7 +99,7 @@ sparkz { # Number of outgoing network connections maxOutgoingConnections = 10 - # Number of outgoing network connections + # Number of connections to forgers maxForgerConnections = 20 # Network connection timeout diff --git a/src/main/scala/sparkz/core/network/NetworkController.scala b/src/main/scala/sparkz/core/network/NetworkController.scala index a4bb7e425..0d00ef98e 100644 --- a/src/main/scala/sparkz/core/network/NetworkController.scala +++ b/src/main/scala/sparkz/core/network/NetworkController.scala @@ -446,10 +446,8 @@ class NetworkController(settings: NetworkSettings, // Drop connection if the peer does not fit in the limits. val isForgerConnection = peerInfo.peerSpec.forgerPeer - shouldDrop = shouldDrop || - (isForgerConnection && shouldDropForgerConnection) || - (!isForgerConnection && peerInfo.connectionType.contains(Incoming) && shouldDropIncomingConnection) || - (!isForgerConnection && peerInfo.connectionType.contains(Outgoing) && shouldDropOutgoingConnection) + val connectionLimitExhausted = isConnectionLimitExhausted(peerInfo, isForgerConnection) + shouldDrop = shouldDrop || connectionLimitExhausted if (shouldDrop) { connectedPeer.handlerRef ! CloseConnection @@ -465,6 +463,12 @@ class NetworkController(settings: NetworkSettings, } } + private[network] def isConnectionLimitExhausted(peerInfo: PeerInfo, isForgerConnection: Boolean) = { + (isForgerConnection && shouldDropForgerConnection) || + (!isForgerConnection && peerInfo.connectionType.contains(Incoming) && shouldDropIncomingConnection) || + (!isForgerConnection && peerInfo.connectionType.contains(Outgoing) && shouldDropOutgoingConnection) + } + /** * Returns connections filtered by given SendingStrategy and Version. * Exclude all connections with lower version and apply sendingStrategy to remaining connected peers diff --git a/src/main/scala/sparkz/core/network/SendingStrategy.scala b/src/main/scala/sparkz/core/network/SendingStrategy.scala index d62ef4a8a..1bbe7da32 100644 --- a/src/main/scala/sparkz/core/network/SendingStrategy.scala +++ b/src/main/scala/sparkz/core/network/SendingStrategy.scala @@ -38,8 +38,7 @@ case class BroadcastBlock(settings: NetworkSettings) extends SendingStrategy { (settings.maxOutgoingConnections + settings.maxIncomingConnections) / 2 override def choose(peers: Seq[ConnectedPeer]): Seq[ConnectedPeer] = { - val forgerPeers = peers.filter(_.peerInfo.exists(_.peerSpec.forgerPeer)) - val remainingPeers = peers.filter(_.peerInfo.exists(!_.peerSpec.forgerPeer)) + val (forgerPeers, remainingPeers) = peers.partition(_.peerInfo.exists(_.peerSpec.forgerPeer)) forgerPeers ++ Random.shuffle(remainingPeers).take(maxBlockBroadcastPeers - forgerPeers.length) } diff --git a/src/main/scala/sparkz/core/settings/Settings.scala b/src/main/scala/sparkz/core/settings/Settings.scala index c5aed3a97..4b35b2cd4 100644 --- a/src/main/scala/sparkz/core/settings/Settings.scala +++ b/src/main/scala/sparkz/core/settings/Settings.scala @@ -9,7 +9,6 @@ import sparkz.core.network.message.Message import sparkz.core.utils.NetworkTimeProviderSettings import sparkz.util.SparkzLogging -import scala.collection.Seq import scala.concurrent.duration._ case class RESTApiSettings(bindAddress: InetSocketAddress, From 6b783e79e94d43d8d3324324a3548b2ce8588fd8 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Tue, 29 Aug 2023 20:45:32 +0300 Subject: [PATCH 12/17] review fixes --- src/main/resources/reference.conf | 2 +- .../core/network/NetworkController.scala | 26 +++++++------- .../core/network/PeerConnectionHandler.scala | 13 +++---- .../scala/sparkz/core/network/PeerSpec.scala | 8 ++--- .../sparkz/core/network/SendingStrategy.scala | 4 +-- .../network/peer/ForgerNodePeerFeature.scala | 34 +++++++++++++++++++ .../core/network/peer/PeerManager.scala | 5 +-- .../core/network/NetworkControllerSpec.scala | 13 ++++--- .../sparkz/core/network/NetworkTests.scala | 5 +-- 9 files changed, 70 insertions(+), 40 deletions(-) create mode 100644 src/main/scala/sparkz/core/network/peer/ForgerNodePeerFeature.scala diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 311c7e6d7..175b7a9cc 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -99,7 +99,7 @@ sparkz { # Number of outgoing network connections maxOutgoingConnections = 10 - # Number of connections to forgers + # Number of dedicated connections to forgers. This works in addition to the maxOutgoingConnections ones maxForgerConnections = 20 # Network connection timeout diff --git a/src/main/scala/sparkz/core/network/NetworkController.scala b/src/main/scala/sparkz/core/network/NetworkController.scala index 0d00ef98e..24470a238 100644 --- a/src/main/scala/sparkz/core/network/NetworkController.scala +++ b/src/main/scala/sparkz/core/network/NetworkController.scala @@ -300,7 +300,7 @@ class NetworkController(settings: NetworkSettings, } private def getForgerConnectionsSize: Int = { - connections.count { p => p._2.peerInfo.exists(_.peerSpec.forgerPeer) } + connections.count { p => p._2.peerInfo.exists(_.peerSpec.features.contains(ForgerNodePeerFeature())) } } private def connectionToPeer(activeConnections: Map[InetSocketAddress, ConnectedPeer], unconfirmedConnections: Set[InetSocketAddress]): Unit = { @@ -398,19 +398,17 @@ class NetworkController(settings: NetworkSettings, } } val isLocal = NetworkUtils.isLocalAddress(connectionId.remoteAddress.getAddress) - val mandatoryFeatures = if (settings.handlingTransactionsEnabled) { - sparkzContext.features :+ mySessionIdFeature - } - else { - sparkzContext.features :+ mySessionIdFeature :+ TransactionsDisabledPeerFeature() - } - val peerFeatures = if (isLocal) { + val mandatoryFeatures = sparkzContext.features :+ mySessionIdFeature + + val maybeTransactionDisabledFeature = + if (settings.handlingTransactionsEnabled) None else Some(TransactionsDisabledPeerFeature()) + val maybeLocalAddressFeature = if (isLocal) { val la = new InetSocketAddress(connectionId.localAddress.getAddress, settings.bindAddress.getPort) - val localAddrFeature = LocalAddressPeerFeature(la) - mandatoryFeatures :+ localAddrFeature - } else { - mandatoryFeatures - } + Some(LocalAddressPeerFeature(la)) + } else None + val maybeForgerNodeFeature = if (settings.isForgerNode) Some(ForgerNodePeerFeature()) else None + + val peerFeatures = mandatoryFeatures ++ maybeTransactionDisabledFeature ++ maybeLocalAddressFeature ++ maybeForgerNodeFeature val selfAddressOpt = getNodeAddressForPeer(connectionId.localAddress) if (selfAddressOpt.isEmpty) @@ -444,7 +442,7 @@ class NetworkController(settings: NetworkSettings, // We allow temporary overflowing outgoing connection limits to get the peerInfo and see if peer if a forger. // Drop connection if the peer does not fit in the limits. - val isForgerConnection = peerInfo.peerSpec.forgerPeer + val isForgerConnection = peerInfo.peerSpec.features.contains(ForgerNodePeerFeature()) val connectionLimitExhausted = isConnectionLimitExhausted(peerInfo, isForgerConnection) shouldDrop = shouldDrop || connectionLimitExhausted diff --git a/src/main/scala/sparkz/core/network/PeerConnectionHandler.scala b/src/main/scala/sparkz/core/network/PeerConnectionHandler.scala index dfae2e921..83b6d6863 100644 --- a/src/main/scala/sparkz/core/network/PeerConnectionHandler.scala +++ b/src/main/scala/sparkz/core/network/PeerConnectionHandler.scala @@ -9,8 +9,7 @@ import sparkz.core.network.NetworkController.ReceivableMessages.{Handshaked, Pen import sparkz.core.network.PeerConnectionHandler.ReceivableMessages import sparkz.core.network.PeerFeature.Serializers import sparkz.core.network.message.{HandshakeSpec, MessageSerializer} -import sparkz.core.network.peer.PeerManager.ReceivableMessages.AddOrUpdatePeer -import sparkz.core.network.peer.{PeerInfo, PenaltyType} +import sparkz.core.network.peer.{ForgerNodePeerFeature, ForgerNodePeerFeatureSerializer, PeerInfo, PenaltyType} import sparkz.core.serialization.SparkzSerializer import sparkz.core.settings.NetworkSettings import sparkz.util.SparkzLogging @@ -35,11 +34,10 @@ class PeerConnectionHandler(val settings: NetworkSettings, private val direction = connectionDescription.connectionId.direction private val ownSocketAddress = connectionDescription.ownSocketAddress private val localFeatures = connectionDescription.localFeatures + private val mandatoryFeatureSerializers: Serializers = Map(ForgerNodePeerFeature.featureId -> ForgerNodePeerFeatureSerializer) + private val localFeatureSerializers: Serializers = localFeatures.map(f => f.featureId -> (f.serializer: SparkzSerializer[_ <: PeerFeature])).toMap - private val featureSerializers: Serializers = - localFeatures.map(f => f.featureId -> (f.serializer: SparkzSerializer[_ <: PeerFeature])).toMap - - private val handshakeSerializer = new HandshakeSpec(featureSerializers, settings.maxHandshakeSize) + private val handshakeSerializer = new HandshakeSpec(mandatoryFeatureSerializers ++ localFeatureSerializers, settings.maxHandshakeSize) private val messageSerializer = new MessageSerializer(sparkzContext.messageSpecs, settings.magicBytes, settings.messageLengthBytesLimit) // there is no recovery for broken connections @@ -247,8 +245,7 @@ class PeerConnectionHandler(val settings: NetworkSettings, Version(settings.appVersion), settings.nodeName, ownSocketAddress, - localFeatures, - settings.isForgerNode + localFeatures ), sparkzContext.timeProvider.time() ) diff --git a/src/main/scala/sparkz/core/network/PeerSpec.scala b/src/main/scala/sparkz/core/network/PeerSpec.scala index 374efa072..b079a2335 100644 --- a/src/main/scala/sparkz/core/network/PeerSpec.scala +++ b/src/main/scala/sparkz/core/network/PeerSpec.scala @@ -24,8 +24,7 @@ case class PeerSpec(agentName: String, protocolVersion: Version, nodeName: String, declaredAddress: Option[InetSocketAddress], - features: Seq[PeerFeature], - forgerPeer: Boolean = false) { + features: Seq[PeerFeature]) { lazy val localAddressOpt: Option[InetSocketAddress] = { features.collectFirst { case LocalAddressPeerFeature(addr) => addr } @@ -73,7 +72,6 @@ class PeerSpecSerializer(featureSerializers: PeerFeature.Serializers) extends Sp w.putUShort(fBytes.length.toShortExact) w.putBytes(fBytes) } - w.putBoolean(obj.forgerPeer) } override def parse(r: Reader): PeerSpec = { @@ -103,9 +101,7 @@ class PeerSpecSerializer(featureSerializers: PeerFeature.Serializers) extends Sp } } - val forgerPeer = r.getBoolean() - - PeerSpec(appName, protocolVersion, nodeName, declaredAddressOpt, feats, forgerPeer) + PeerSpec(appName, protocolVersion, nodeName, declaredAddressOpt, feats) } } diff --git a/src/main/scala/sparkz/core/network/SendingStrategy.scala b/src/main/scala/sparkz/core/network/SendingStrategy.scala index 1bbe7da32..3ecc4a17e 100644 --- a/src/main/scala/sparkz/core/network/SendingStrategy.scala +++ b/src/main/scala/sparkz/core/network/SendingStrategy.scala @@ -1,6 +1,6 @@ package sparkz.core.network -import sparkz.core.network.peer.TransactionsDisabledPeerFeature +import sparkz.core.network.peer.{ForgerNodePeerFeature, TransactionsDisabledPeerFeature} import sparkz.core.settings.NetworkSettings import java.security.SecureRandom @@ -38,7 +38,7 @@ case class BroadcastBlock(settings: NetworkSettings) extends SendingStrategy { (settings.maxOutgoingConnections + settings.maxIncomingConnections) / 2 override def choose(peers: Seq[ConnectedPeer]): Seq[ConnectedPeer] = { - val (forgerPeers, remainingPeers) = peers.partition(_.peerInfo.exists(_.peerSpec.forgerPeer)) + val (forgerPeers, remainingPeers) = peers.partition(_.peerInfo.exists(_.peerSpec.features.contains(ForgerNodePeerFeature()))) forgerPeers ++ Random.shuffle(remainingPeers).take(maxBlockBroadcastPeers - forgerPeers.length) } diff --git a/src/main/scala/sparkz/core/network/peer/ForgerNodePeerFeature.scala b/src/main/scala/sparkz/core/network/peer/ForgerNodePeerFeature.scala new file mode 100644 index 000000000..64f9fdf98 --- /dev/null +++ b/src/main/scala/sparkz/core/network/peer/ForgerNodePeerFeature.scala @@ -0,0 +1,34 @@ +package sparkz.core.network.peer + +import sparkz.core.network.PeerFeature +import sparkz.core.network.PeerFeature.Id +import sparkz.util.serialization._ +import sparkz.core.serialization.SparkzSerializer + +/** + * This peer feature allows to detect peers who don't support transactions + */ +case class ForgerNodePeerFeature() extends PeerFeature { + + override type M = ForgerNodePeerFeature + override val featureId: Id = ForgerNodePeerFeature.featureId + + override def serializer: ForgerNodePeerFeatureSerializer.type = ForgerNodePeerFeatureSerializer + +} + +object ForgerNodePeerFeature { + + val featureId: Id = 5: Byte + +} + +object ForgerNodePeerFeatureSerializer extends SparkzSerializer[ForgerNodePeerFeature] { + + + override def parse(r: Reader): ForgerNodePeerFeature = { + ForgerNodePeerFeature() + } + + override def serialize(obj: ForgerNodePeerFeature, w: Writer): Unit = {} +} diff --git a/src/main/scala/sparkz/core/network/peer/PeerManager.scala b/src/main/scala/sparkz/core/network/peer/PeerManager.scala index 27dc4ba74..35937bb26 100644 --- a/src/main/scala/sparkz/core/network/peer/PeerManager.scala +++ b/src/main/scala/sparkz/core/network/peer/PeerManager.scala @@ -43,7 +43,7 @@ class PeerManager( PeerDatabaseValue( extractAddressFromPeerInfo(peerInfo), peerInfo, - if (peerInfo.peerSpec.forgerPeer) PeerConfidence.Forger else PeerConfidence.Unknown + if (peerInfo.peerSpec.features.contains(ForgerNodePeerFeature())) PeerConfidence.Forger else PeerConfidence.Unknown ) ) } @@ -75,7 +75,8 @@ class PeerManager( val address: InetSocketAddress = peerSpec.address.getOrElse(throw new IllegalArgumentException()) val peerInfo: PeerInfo = PeerInfo(peerSpec, 0L, None) log.info(s"New discovered peer: $peerInfo") - val peerConfidence = if (peerInfo.peerSpec.forgerPeer) PeerConfidence.Forger else PeerConfidence.Unknown + val peerConfidence = + if (peerInfo.peerSpec.features.contains(ForgerNodePeerFeature())) PeerConfidence.Forger else PeerConfidence.Unknown PeerDatabaseValue(address, peerInfo, peerConfidence) } peerDatabase.addOrUpdateKnownPeers(filteredPeers) diff --git a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala index 0d1ab82e8..162ca300f 100644 --- a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala +++ b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala @@ -18,7 +18,7 @@ import sparkz.core.network.NetworkController.ReceivableMessages.{ConnectTo, GetC import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.DisconnectedPeer import sparkz.core.network.message._ import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddOrUpdatePeer, ConfirmConnection, DisconnectFromAddress, GetAllPeers, RandomPeerForConnectionExcluding} -import sparkz.core.network.peer._ +import sparkz.core.network.peer.{ForgerNodePeerFeature, _} import sparkz.core.serialization.SparkzSerializer import sparkz.core.settings.SparkzSettings import sparkz.core.utils.LocalTimeProvider @@ -33,7 +33,8 @@ class NetworkControllerSpec extends NetworkTests with ScalaFutures { import scala.concurrent.ExecutionContext.Implicits.global private val featureSerializers = Map[Byte, SparkzSerializer[_ <: PeerFeature]](LocalAddressPeerFeature.featureId -> LocalAddressPeerFeatureSerializer, - TransactionsDisabledPeerFeature.featureId -> TransactionsDisabledPeerFeatureSerializer) + TransactionsDisabledPeerFeature.featureId -> TransactionsDisabledPeerFeatureSerializer, + ForgerNodePeerFeature.featureId -> ForgerNodePeerFeatureSerializer) "A NetworkController" should "send local address on handshake when peer and node address are in localhost" in { implicit val system: ActorSystem = ActorSystem() @@ -690,7 +691,8 @@ class TestPeer(settings: SparkzSettings, networkControllerRef: ActorRef, tcpMana private val timeProvider = LocalTimeProvider private val featureSerializers = Map[Byte, SparkzSerializer[_ <: PeerFeature]](LocalAddressPeerFeature.featureId -> LocalAddressPeerFeatureSerializer, - TransactionsDisabledPeerFeature.featureId -> TransactionsDisabledPeerFeatureSerializer) + TransactionsDisabledPeerFeature.featureId -> TransactionsDisabledPeerFeatureSerializer, + ForgerNodePeerFeature.featureId -> ForgerNodePeerFeatureSerializer) private val handshakeSerializer = new HandshakeSpec(featureSerializers, Int.MaxValue) private val peersSpec = new PeersSpec(featureSerializers, settings.network.maxPeerSpecObjects) private val messageSpecs = Seq(GetPeersSpec, peersSpec) @@ -741,10 +743,11 @@ class TestPeer(settings: SparkzSettings, networkControllerRef: ActorRef, tcpMana */ def sendHandshake(declaredAddress: Option[InetSocketAddress], localAddress: Option[InetSocketAddress], forgerNode: Boolean = false): Tcp.ResumeReading.type = { val localFeature: Seq[PeerFeature] = localAddress.map(LocalAddressPeerFeature(_)).toSeq - val features = localFeature :+ SessionIdPeerFeature(settings.network.magicBytes) + val forgerNodeFeature = if (forgerNode) Some(ForgerNodePeerFeature()) else None + val features = localFeature ++ forgerNodeFeature :+ SessionIdPeerFeature(settings.network.magicBytes) val handshakeToNode = Handshake(PeerSpec(settings.network.agentName, Version(settings.network.appVersion), "test", - declaredAddress, features, forgerNode), timeProvider.time()) + declaredAddress, features), timeProvider.time()) tcpManagerProbe.send(connectionHandler, Tcp.Received(ByteString(handshakeSerializer.toBytes(handshakeToNode)))) tcpManagerProbe.expectMsg(Tcp.ResumeReading) diff --git a/src/test/scala/sparkz/core/network/NetworkTests.scala b/src/test/scala/sparkz/core/network/NetworkTests.scala index e4ea72958..d4e359389 100644 --- a/src/test/scala/sparkz/core/network/NetworkTests.scala +++ b/src/test/scala/sparkz/core/network/NetworkTests.scala @@ -4,7 +4,7 @@ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import sparkz.core.app.Version import sparkz.core.network.NetworkTests.MockTimeProvider -import sparkz.core.network.peer.PeerInfo +import sparkz.core.network.peer.{ForgerNodePeerFeature, PeerInfo} import sparkz.core.settings.SparkzSettings import sparkz.core.utils.TimeProvider import sparkz.core.utils.TimeProvider.Time @@ -19,7 +19,8 @@ class NetworkTests extends AnyFlatSpec with Matchers { protected def currentTime(): TimeProvider.Time = mockTimeProvider.time() protected def getPeerInfo(address: InetSocketAddress, nameOpt: Option[String] = None, featureSeq: Seq[PeerFeature] = Seq(), forgerNode: Boolean = false): PeerInfo = { - val data = PeerSpec("full node", Version.last, nameOpt.getOrElse(address.toString), Some(address), featureSeq, forgerNode) + val features = if (forgerNode) featureSeq :+ ForgerNodePeerFeature() else featureSeq + val data = PeerSpec("full node", Version.last, nameOpt.getOrElse(address.toString), Some(address), features) PeerInfo(data, currentTime(), None) } From da3494753a0dc9e075452e827e9b22017a3eb290 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Thu, 31 Aug 2023 14:59:14 +0300 Subject: [PATCH 13/17] review fixes --- .../core/network/NodeViewSynchronizer.scala | 2 +- .../sparkz/core/network/SendingStrategy.scala | 15 +-------------- .../core/network/peer/ForgerNodePeerFeature.scala | 2 +- 3 files changed, 3 insertions(+), 16 deletions(-) diff --git a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala index 83da06d3e..4a04193df 100644 --- a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala @@ -102,7 +102,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes if (m.modifierTypeId == Transaction.ModifierTypeId) networkControllerRef ! SendToNetwork(msg, BroadcastTransaction) else - networkControllerRef ! SendToNetwork(msg, BroadcastBlock(networkSettings)) + networkControllerRef ! SendToNetwork(msg, Broadcast) } } diff --git a/src/main/scala/sparkz/core/network/SendingStrategy.scala b/src/main/scala/sparkz/core/network/SendingStrategy.scala index 3ecc4a17e..4d228b032 100644 --- a/src/main/scala/sparkz/core/network/SendingStrategy.scala +++ b/src/main/scala/sparkz/core/network/SendingStrategy.scala @@ -1,10 +1,8 @@ package sparkz.core.network -import sparkz.core.network.peer.{ForgerNodePeerFeature, TransactionsDisabledPeerFeature} -import sparkz.core.settings.NetworkSettings +import sparkz.core.network.peer.TransactionsDisabledPeerFeature import java.security.SecureRandom -import scala.util.Random trait SendingStrategy { val secureRandom = new SecureRandom() @@ -33,17 +31,6 @@ case object BroadcastTransaction extends SendingStrategy { } } -case class BroadcastBlock(settings: NetworkSettings) extends SendingStrategy { - val maxBlockBroadcastPeers: Int = settings.maxForgerConnections + - (settings.maxOutgoingConnections + settings.maxIncomingConnections) / 2 - - override def choose(peers: Seq[ConnectedPeer]): Seq[ConnectedPeer] = { - val (forgerPeers, remainingPeers) = peers.partition(_.peerInfo.exists(_.peerSpec.features.contains(ForgerNodePeerFeature()))) - - forgerPeers ++ Random.shuffle(remainingPeers).take(maxBlockBroadcastPeers - forgerPeers.length) - } -} - case class BroadcastExceptOf(exceptOf: Seq[ConnectedPeer]) extends SendingStrategy { override def choose(peers: Seq[ConnectedPeer]): Seq[ConnectedPeer] = peers.filterNot(exceptOf.contains) diff --git a/src/main/scala/sparkz/core/network/peer/ForgerNodePeerFeature.scala b/src/main/scala/sparkz/core/network/peer/ForgerNodePeerFeature.scala index 64f9fdf98..1049da080 100644 --- a/src/main/scala/sparkz/core/network/peer/ForgerNodePeerFeature.scala +++ b/src/main/scala/sparkz/core/network/peer/ForgerNodePeerFeature.scala @@ -6,7 +6,7 @@ import sparkz.util.serialization._ import sparkz.core.serialization.SparkzSerializer /** - * This peer feature allows to detect peers who don't support transactions + * This peer feature marks forger nodes */ case class ForgerNodePeerFeature() extends PeerFeature { From f660c155daa82af987f065dcb0fa0d35edda6ab4 Mon Sep 17 00:00:00 2001 From: Ivan Skrypnyk Date: Fri, 1 Sep 2023 13:01:15 +0300 Subject: [PATCH 14/17] review fixes --- .../sparkz/core/network/NodeViewSynchronizer.scala | 2 +- .../scala/sparkz/core/network/SendingStrategy.scala | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala index 4a04193df..0742f8de9 100644 --- a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala @@ -102,7 +102,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes if (m.modifierTypeId == Transaction.ModifierTypeId) networkControllerRef ! SendToNetwork(msg, BroadcastTransaction) else - networkControllerRef ! SendToNetwork(msg, Broadcast) + networkControllerRef ! SendToNetwork(msg, BroadcastBlock) } } diff --git a/src/main/scala/sparkz/core/network/SendingStrategy.scala b/src/main/scala/sparkz/core/network/SendingStrategy.scala index 4d228b032..d313d66dc 100644 --- a/src/main/scala/sparkz/core/network/SendingStrategy.scala +++ b/src/main/scala/sparkz/core/network/SendingStrategy.scala @@ -1,8 +1,10 @@ package sparkz.core.network -import sparkz.core.network.peer.TransactionsDisabledPeerFeature +import sparkz.core.network.peer.{ForgerNodePeerFeature, TransactionsDisabledPeerFeature} +import sparkz.core.settings.NetworkSettings import java.security.SecureRandom +import scala.util.Random trait SendingStrategy { val secureRandom = new SecureRandom() @@ -31,6 +33,14 @@ case object BroadcastTransaction extends SendingStrategy { } } +case object BroadcastBlock extends SendingStrategy { + override def choose(peers: Seq[ConnectedPeer]): Seq[ConnectedPeer] = { + val (forgerPeers, remainingPeers) = peers.partition(_.peerInfo.exists(_.peerSpec.features.contains(ForgerNodePeerFeature()))) + + forgerPeers ++ remainingPeers + } +} + case class BroadcastExceptOf(exceptOf: Seq[ConnectedPeer]) extends SendingStrategy { override def choose(peers: Seq[ConnectedPeer]): Seq[ConnectedPeer] = peers.filterNot(exceptOf.contains) From d77ae4b37d97dd5b31fe47de24ba3a9c35621a45 Mon Sep 17 00:00:00 2001 From: paolocappelletti Date: Mon, 4 Sep 2023 15:13:58 +0200 Subject: [PATCH 15/17] updated release notes --- release-notes.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/release-notes.md b/release-notes.md index f7c5317d7..9569fbe6f 100644 --- a/release-notes.md +++ b/release-notes.md @@ -1,5 +1,9 @@ 2.1.0 --------- +* Added support for forger's connections prioritization +* Node synchronization improvements: + * Optimization of lookup strategy in modifiersCache + * Preserve the order of block during synchronization * Fixes/Improvements on the way the SyncTracker handles the internal statuses maps 2.0.3 From f1f993394b193b00030c04410e93a9954f32a2fb Mon Sep 17 00:00:00 2001 From: paolocappelletti <56390199+paolocappelletti@users.noreply.github.com> Date: Mon, 4 Sep 2023 15:20:06 +0200 Subject: [PATCH 16/17] Update release-notes.md Co-authored-by: Ivan Skrypnyk <64777406+i-skrypnyk@users.noreply.github.com> --- release-notes.md | 1 + 1 file changed, 1 insertion(+) diff --git a/release-notes.md b/release-notes.md index 9569fbe6f..05c113259 100644 --- a/release-notes.md +++ b/release-notes.md @@ -4,6 +4,7 @@ * Node synchronization improvements: * Optimization of lookup strategy in modifiersCache * Preserve the order of block during synchronization +* Added option to force only connecting to known peers * Fixes/Improvements on the way the SyncTracker handles the internal statuses maps 2.0.3 From 29606ebde6f99f1cd439a1653b48c4ed27e9e1e4 Mon Sep 17 00:00:00 2001 From: paolocappelletti Date: Mon, 2 Oct 2023 12:10:12 +0200 Subject: [PATCH 17/17] 2.1.0 final --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 598bbb4de..c5ca37c18 100644 --- a/build.sbt +++ b/build.sbt @@ -25,7 +25,7 @@ lazy val commonSettings = Seq( Wart.OptionPartial), organization := "io.horizen", organizationName := "Zen Blockchain Foundation", - version := "2.1.0-SNAPSHOT", + version := "2.1.0", licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")), homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")), pomExtra :=