diff --git a/build.sbt b/build.sbt index 68cbc048c..c5ca37c18 100644 --- a/build.sbt +++ b/build.sbt @@ -25,7 +25,7 @@ lazy val commonSettings = Seq( Wart.OptionPartial), organization := "io.horizen", organizationName := "Zen Blockchain Foundation", - version := "2.0.3", + version := "2.1.0", licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")), homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")), pomExtra := diff --git a/release-notes.md b/release-notes.md index d7bcf8fb3..05c113259 100644 --- a/release-notes.md +++ b/release-notes.md @@ -1,3 +1,12 @@ +2.1.0 +--------- +* Added support for forger's connections prioritization +* Node synchronization improvements: + * Optimization of lookup strategy in modifiersCache + * Preserve the order of block during synchronization +* Added option to force only connecting to known peers +* Fixes/Improvements on the way the SyncTracker handles the internal statuses maps + 2.0.3 --------- * Fix in the handshake process - start connecting to nodes only after the Synchronizer is initialized diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 9332f588b..462ada93a 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -90,6 +90,9 @@ sparkz { # List of IP addresses of well known nodes. knownPeers = [] + # Force node to only try connecting to known peers + onlyConnectToKnownPeers = false + # Interval between GetPeers messages to be send by our node to a random one getPeersInterval = 2m @@ -99,6 +102,9 @@ sparkz { # Number of outgoing network connections maxOutgoingConnections = 10 + # Number of dedicated connections to forgers. This works in addition to the maxOutgoingConnections ones + maxForgerConnections = 20 + # Network connection timeout connectionTimeout = 1s @@ -219,6 +225,9 @@ sparkz { # Enables transactions in the mempool handlingTransactionsEnabled = true + + # Is this node a forger + isForgerNode = false } ntp { diff --git a/src/main/scala/sparkz/ObjectGenerators.scala b/src/main/scala/sparkz/ObjectGenerators.scala index e0cefca77..f9e65bd75 100644 --- a/src/main/scala/sparkz/ObjectGenerators.scala +++ b/src/main/scala/sparkz/ObjectGenerators.scala @@ -76,7 +76,7 @@ trait ObjectGenerators { lazy val modifiersGen: Gen[ModifiersData] = for { modifierTypeId: ModifierTypeId <- modifierTypeIdGen - modifiers: Map[ModifierId, Array[Byte]] <- Gen.nonEmptyMap(modifierWithIdGen).suchThat(_.nonEmpty) + modifiers: Seq[(ModifierId, Array[Byte])] <- Gen.nonEmptyListOf(modifierWithIdGen).suchThat(_.nonEmpty) } yield ModifiersData(modifierTypeId, modifiers) lazy val appVersionGen: Gen[Version] = for { diff --git a/src/main/scala/sparkz/core/ModifiersCache.scala b/src/main/scala/sparkz/core/ModifiersCache.scala index f2db77905..035f048e3 100644 --- a/src/main/scala/sparkz/core/ModifiersCache.scala +++ b/src/main/scala/sparkz/core/ModifiersCache.scala @@ -22,7 +22,7 @@ trait ModifiersCache[PMOD <: PersistentNodeViewModifier, H <: HistoryReader[PMOD type K = sparkz.util.ModifierId type V = PMOD - protected val cache: mutable.Map[K, V] = mutable.Map[K, V]() + protected val cache: mutable.Map[K, V] = mutable.LinkedHashMap[K, V]() override def modifierById(modifierId: sparkz.util.ModifierId): Option[PMOD] = cache.get(modifierId) diff --git a/src/main/scala/sparkz/core/api/http/PeersApiRoute.scala b/src/main/scala/sparkz/core/api/http/PeersApiRoute.scala index cf2e591a5..e3ad1b17a 100644 --- a/src/main/scala/sparkz/core/api/http/PeersApiRoute.scala +++ b/src/main/scala/sparkz/core/api/http/PeersApiRoute.scala @@ -6,15 +6,14 @@ import akka.http.scaladsl.server.Route import io.circe.generic.auto.exportDecoder import io.circe.generic.semiauto._ import io.circe.syntax._ -import io.circe.{Encoder, Decoder, Json} +import io.circe.{Decoder, Encoder, Json} import sparkz.core.api.http.PeersApiRoute.PeerApiRequest.AddToBlacklistBodyRequest import sparkz.core.api.http.PeersApiRoute.Request.ConnectBodyRequest import sparkz.core.api.http.PeersApiRoute.{BlacklistedPeers, PeerInfoResponse, PeersStatusResponse} import sparkz.core.network.ConnectedPeer -import sparkz.core.network.NetworkController.ReceivableMessages.{ConnectTo, GetConnectedPeers, GetPeersStatus} +import sparkz.core.network.NetworkController.ReceivableMessages.{ConnectTo, DisconnectFromNode, GetConnectedPeers, GetPeersStatus} import sparkz.core.network.peer.PeerManager.ReceivableMessages._ import sparkz.core.network.peer.PenaltyType.CustomPenaltyDuration -import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddPeersIfEmpty, GetAllPeers, GetBlacklistedPeers} import sparkz.core.network.peer.{PeerInfo, PeersStatus} import sparkz.core.settings.RESTApiSettings import sparkz.core.utils.NetworkTimeProvider @@ -179,7 +178,7 @@ case class PeersApiRoute(peerManager: ActorRef, val port = addressAndPort.group(2).toInt val peerAddress = new InetSocketAddress(host, port) peerManager ! RemovePeer(peerAddress) - networkController ! DisconnectFromAddress(peerAddress) + networkController ! DisconnectFromNode(peerAddress) ApiResponse.OK } } diff --git a/src/main/scala/sparkz/core/network/ConnectedPeer.scala b/src/main/scala/sparkz/core/network/ConnectedPeer.scala index d1f26ce46..153b52bc6 100644 --- a/src/main/scala/sparkz/core/network/ConnectedPeer.scala +++ b/src/main/scala/sparkz/core/network/ConnectedPeer.scala @@ -16,7 +16,7 @@ case class ConnectedPeer(connectionId: ConnectionId, var lastMessage: Long, peerInfo: Option[PeerInfo]) { - override def hashCode(): Int = connectionId.hashCode() + override def hashCode(): Int = connectionId.remoteAddress.hashCode() override def equals(obj: Any): Boolean = obj match { case that: ConnectedPeer => this.connectionId.remoteAddress == that.connectionId.remoteAddress diff --git a/src/main/scala/sparkz/core/network/NetworkController.scala b/src/main/scala/sparkz/core/network/NetworkController.scala index 471e1eedf..5722194a1 100644 --- a/src/main/scala/sparkz/core/network/NetworkController.scala +++ b/src/main/scala/sparkz/core/network/NetworkController.scala @@ -61,7 +61,7 @@ class NetworkController(settings: NetworkSettings, private var connections = Map.empty[InetSocketAddress, ConnectedPeer] private var unconfirmedConnections = Set.empty[InetSocketAddress] - private val maxConnections = settings.maxIncomingConnections + settings.maxOutgoingConnections + private val maxConnections = settings.maxIncomingConnections + settings.maxOutgoingConnections + settings.maxForgerConnections private val peersLastConnectionAttempts = new LRUSimpleCache[InetSocketAddress, TimeProvider.Time](threshold = 10000) private val tryNewConnectionAttemptDelay = 5.seconds @@ -150,6 +150,10 @@ class NetworkController(settings: NetworkSettings, case DisconnectFromAddress(peerAddress) => closeConnection(peerAddress) + + case DisconnectFromNode(peerAddress) => + closeConnectionFromNode(peerAddress) + } private def connectionEvents: Receive = { @@ -208,7 +212,7 @@ class NetworkController(settings: NetworkSettings, log.info(s"Unconfirmed connection: ($remoteAddress, $localAddress) => $connectionId") val handlerRef = sender() - if (connectionDirection.isOutgoing && canEstablishNewOutgoingConnection) { + if (connectionDirection.isOutgoing && (canEstablishNewOutgoingConnection || canEstablishNewForgerConnection)) { createPeerConnectionHandler(connectionId, handlerRef) } else if (connectionDirection.isIncoming && canEstablishNewIncomingConnection) @@ -260,7 +264,7 @@ class NetworkController(settings: NetworkSettings, */ private def scheduleConnectionToPeer(): Unit = { val connectionTask: Runnable = () => { - if (canEstablishNewOutgoingConnection) { + if (canEstablishNewOutgoingConnection || canEstablishNewForgerConnection) { log.trace(s"Looking for a new random connection") connectionToPeer(connections, unconfirmedConnections) } @@ -277,6 +281,22 @@ class NetworkController(settings: NetworkSettings, getIncomingConnectionsSize < settings.maxIncomingConnections } + private def canEstablishNewForgerConnection: Boolean = { + getForgerConnectionsSize < settings.maxForgerConnections + } + + private def shouldDropForgerConnection: Boolean = { + getForgerConnectionsSize > settings.maxForgerConnections + } + + private def shouldDropOutgoingConnection: Boolean = { + getOutgoingConnectionsSize > settings.maxOutgoingConnections + } + + private def shouldDropIncomingConnection: Boolean = { + getIncomingConnectionsSize > settings.maxIncomingConnections + } + private def getOutgoingConnectionsSize: Int = { connections.count { p => p._2.connectionId.direction == Outgoing } } @@ -285,6 +305,10 @@ class NetworkController(settings: NetworkSettings, connections.count { p => p._2.connectionId.direction == Incoming } } + private def getForgerConnectionsSize: Int = { + connections.count { p => p._2.peerInfo.exists(_.peerSpec.features.contains(ForgerNodePeerFeature())) } + } + private def connectionToPeer(activeConnections: Map[InetSocketAddress, ConnectedPeer], unconfirmedConnections: Set[InetSocketAddress]): Unit = { val connectionsAddressSeq = activeConnections.values.flatMap(_.peerInfo).toSeq val unconfirmedConnectionsAddressSeq = unconfirmedConnections.map(connection => PeerInfo.fromAddress(connection)).toSeq @@ -380,19 +404,17 @@ class NetworkController(settings: NetworkSettings, } } val isLocal = NetworkUtils.isLocalAddress(connectionId.remoteAddress.getAddress) - val mandatoryFeatures = if (settings.handlingTransactionsEnabled) { - sparkzContext.features :+ mySessionIdFeature - } - else { - sparkzContext.features :+ mySessionIdFeature :+ TransactionsDisabledPeerFeature() - } - val peerFeatures = if (isLocal) { + val mandatoryFeatures = sparkzContext.features :+ mySessionIdFeature + + val maybeTransactionDisabledFeature = + if (settings.handlingTransactionsEnabled) None else Some(TransactionsDisabledPeerFeature()) + val maybeLocalAddressFeature = if (isLocal) { val la = new InetSocketAddress(connectionId.localAddress.getAddress, settings.bindAddress.getPort) - val localAddrFeature = LocalAddressPeerFeature(la) - mandatoryFeatures :+ localAddrFeature - } else { - mandatoryFeatures - } + Some(LocalAddressPeerFeature(la)) + } else None + val maybeForgerNodeFeature = if (settings.isForgerNode) Some(ForgerNodePeerFeature()) else None + + val peerFeatures = mandatoryFeatures ++ maybeTransactionDisabledFeature ++ maybeLocalAddressFeature ++ maybeForgerNodeFeature val selfAddressOpt = getNodeAddressForPeer(connectionId.localAddress) if (selfAddressOpt.isEmpty) @@ -417,13 +439,20 @@ class NetworkController(settings: NetworkSettings, // Drop connection to self if occurred or peer already connected. // Decision whether connection is local or is from some other network is made // based on SessionIdPeerFeature if exists or in old way using isSelf() function - val shouldDrop = + var shouldDrop = connectionForPeerAddress(peerAddress).exists(_.handlerRef != peerHandlerRef) || peerInfo.peerSpec.features.collectFirst { case SessionIdPeerFeature(networkMagic, sessionId) => !networkMagic.sameElements(mySessionIdFeature.networkMagic) || sessionId == mySessionIdFeature.sessionId }.getOrElse(isSelf(remoteAddress)) + // We allow temporary overflowing outgoing connection limits to get the peerInfo and see if peer if a forger. + // Drop connection if the peer does not fit in the limits. + val isForgerConnection = peerInfo.peerSpec.features.contains(ForgerNodePeerFeature()) + + val connectionLimitExhausted = isConnectionLimitExhausted(peerInfo, isForgerConnection) + shouldDrop = shouldDrop || connectionLimitExhausted + if (shouldDrop) { connectedPeer.handlerRef ! CloseConnection peerManagerRef ! RemovePeer(peerAddress) @@ -438,6 +467,12 @@ class NetworkController(settings: NetworkSettings, } } + private[network] def isConnectionLimitExhausted(peerInfo: PeerInfo, isForgerConnection: Boolean) = { + (isForgerConnection && shouldDropForgerConnection) || + (!isForgerConnection && peerInfo.connectionType.contains(Incoming) && shouldDropIncomingConnection) || + (!isForgerConnection && peerInfo.connectionType.contains(Outgoing) && shouldDropOutgoingConnection) + } + /** * Returns connections filtered by given SendingStrategy and Version. * Exclude all connections with lower version and apply sendingStrategy to remaining connected peers @@ -557,6 +592,16 @@ class NetworkController(settings: NetworkSettings, } } + private def closeConnectionFromNode(peerAddress: InetSocketAddress): Unit = { + connections = connections.filterNot { + case (address, connectedPeer) if address == peerAddress => + connectedPeer.handlerRef ! CloseConnection + context.system.eventStream.publish(DisconnectedPeer(peerAddress)) + true // exclude the entry from the filtered map + case _ => false // don't modify the connections map + } + } + /** * Register a new penalty for given peer address. */ @@ -583,6 +628,8 @@ object NetworkController { case class DisconnectFrom(peer: ConnectedPeer) + case class DisconnectFromNode(address: InetSocketAddress) + case class PenalizePeer(address: InetSocketAddress, penaltyType: PenaltyType) case class GetFilteredConnectedPeers(sendingStrategy: SendingStrategy, version: Version) diff --git a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala index 98cb9370f..008bc2966 100644 --- a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala @@ -1,6 +1,5 @@ package sparkz.core.network -import java.net.InetSocketAddress import akka.actor.{Actor, ActorRef, ActorSystem, Props} import sparkz.core.NodeViewHolder.ReceivableMessages.{GetNodeViewChanges, ModifiersFromRemote, TransactionsFromRemote} import sparkz.core.consensus.History._ @@ -21,11 +20,10 @@ import sparkz.core.{ModifierTypeId, NodeViewModifier, PersistentNodeViewModifier import sparkz.util.serialization.{VLQByteBufferReader, VLQReader} import sparkz.util.{ModifierId, SparkzEncoding, SparkzLogging} +import java.net.InetSocketAddress import java.nio.ByteBuffer import scala.annotation.{nowarn, tailrec} -import scala.collection.mutable import scala.concurrent.ExecutionContext -import scala.concurrent.duration._ import scala.language.postfixOps import scala.reflect.ClassTag import scala.util.{Failure, Success} @@ -103,10 +101,10 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes deliveryTracker.putInRebroadcastQueue(m.id) case _ => val msg = Message(invSpec, Right(InvData(m.modifierTypeId, Seq(m.id))), None) - if (m.modifierTypeId == Transaction.ModifierTypeId) - networkControllerRef ! SendToNetwork(msg, BroadcastTransaction) - else - networkControllerRef ! SendToNetwork(msg, Broadcast) + if (m.modifierTypeId == Transaction.ModifierTypeId) + networkControllerRef ! SendToNetwork(msg, BroadcastTransaction) + else + networkControllerRef ! SendToNetwork(msg, BroadcastBlock) } } @@ -200,14 +198,15 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes log.warn("Extension is empty while comparison is younger") self ! OtherNodeSyncingStatus(remote, comparison, ext) + case _ => } } // Send history extension to the (less developed) peer 'remote' which does not have it. @nowarn def sendExtension(remote: ConnectedPeer, - status: HistoryComparisonResult, - ext: Seq[(ModifierTypeId, ModifierId)]): Unit = + status: HistoryComparisonResult, + ext: Seq[(ModifierTypeId, ModifierId)]): Unit = ext.groupBy(_._1).mapValues(_.map(_._2)).foreach { case (mid, mods) => networkControllerRef ! SendToNetwork(Message(invSpec, Right(InvData(mid, mods)), None), SendToPeer(remote)) @@ -289,7 +288,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes val typeId = data.typeId val modifiers = data.modifiers log.info(s"Got ${modifiers.size} modifiers of type $typeId from remote connected peer: $remote") - log.trace(s"Received modifier ids ${modifiers.keySet.map(encoder.encodeId).mkString(",")}") + log.trace(s"Received modifier ids ${modifiers.map(_._1).map(encoder.encodeId).mkString(",")}") // filter out non-requested modifiers val requestedModifiers = processSpam(remote, typeId, modifiers) @@ -303,7 +302,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes case Some(serializer: SparkzSerializer[PMOD]@unchecked) => // parse all modifiers and put them to modifiers cache - log.info(s"Received block ids ${modifiers.keySet.map(encoder.encodeId).mkString(",")}") + log.info(s"Received block ids ${modifiers.map(_._1).map(encoder.encodeId).mkString(",")}") val parsed: Iterable[PMOD] = parseModifiers(requestedModifiers, serializer, remote) val valid: Iterable[PMOD] = parsed.filter(validateAndSetStatus(remote, _)) if (valid.nonEmpty) viewHolderRef ! ModifiersFromRemote[PMOD](valid) @@ -344,7 +343,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes * * @return collection of parsed modifiers */ - private def parseModifiers[M <: NodeViewModifier](modifiers: Map[ModifierId, Array[Byte]], + private def parseModifiers[M <: NodeViewModifier](modifiers: Seq[(ModifierId, Array[Byte])], serializer: SparkzSerializer[M], remote: ConnectedPeer): Iterable[M] = { modifiers.flatMap { case (id, bytes) => @@ -374,7 +373,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes */ private def processSpam(remote: ConnectedPeer, typeId: ModifierTypeId, - modifiers: Map[ModifierId, Array[Byte]]): Map[ModifierId, Array[Byte]] = { + modifiers: Seq[(ModifierId, Array[Byte])]): Seq[(ModifierId, Array[Byte])] = { val (requested, spam) = modifiers.partition { case (id, _) => deliveryTracker.status(id) == Requested @@ -382,7 +381,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes if (spam.nonEmpty) { log.info(s"Spam attempt: peer $remote has sent a non-requested modifiers of type $typeId with ids" + - s": ${spam.keys.map(encoder.encodeId)}") + s": ${spam.map(_._1).map(encoder.encodeId)}") penalizeSpammingPeer(remote) } requested @@ -440,7 +439,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes size += NodeViewModifier.ModifierIdSize + 4 + modBytes.length size < networkSettings.maxModifiersSpecMessageSize } - peer.handlerRef ! Message(modifiersSpec, Right(ModifiersData(modType, batch.toMap)), None) + peer.handlerRef ! Message(modifiersSpec, Right(ModifiersData(modType, batch)), None) val remaining = mods.drop(batch.length) if (remaining.nonEmpty) { sendByParts(remaining) @@ -461,7 +460,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes val mods = deliveryTracker.getRebroadcastModifiers mempoolReaderOpt match { case Some(mempool) => - mempool.getAll(ids = mods).foreach { tx =>broadcastModifierInv(tx) } + mempool.getAll(ids = mods).foreach { tx => broadcastModifierInv(tx) } case None => log.warn(s"Trying to rebroadcast while readers are not ready $mempoolReaderOpt") } @@ -619,44 +618,4 @@ object NodeViewSynchronizerRef { (implicit system: ActorSystem, ec: ExecutionContext): ActorRef = system.actorOf(props[TX, SI, SIS, PMOD, HR, MR](networkControllerRef, viewHolderRef, syncInfoSpec, networkSettings, timeProvider, modifierSerializers), name) -} - -object Test extends App { - - test() - - protected val rebroadcastQueue: mutable.Queue[String] = mutable.Queue() - - def test(): Unit = { - putInRebroadcastQueue("1") - putInRebroadcastQueue("2") - putInRebroadcastQueue("3") - putInRebroadcastQueue("4") - putInRebroadcastQueue("5") - putInRebroadcastQueue("6") - putInRebroadcastQueue("7") - putInRebroadcastQueue("8") - putInRebroadcastQueue("9") - putInRebroadcastQueue("10") - - println(s"get modifiers = $getRebroadcastModifiers") - - println(s"queue = $rebroadcastQueue") - - println(s"get modifiers = $getRebroadcastModifiers") - - println(s"queue = $rebroadcastQueue") - } - - def putInRebroadcastQueue(modifierId: String): Unit = { - rebroadcastQueue.enqueue(modifierId) - } - - def getRebroadcastModifiers: Seq[String] = { - val mods = rebroadcastQueue.take(5).toSeq - rebroadcastQueue.drop(5) - mods - } - - } \ No newline at end of file diff --git a/src/main/scala/sparkz/core/network/PeerConnectionHandler.scala b/src/main/scala/sparkz/core/network/PeerConnectionHandler.scala index 1cc738adf..83b6d6863 100644 --- a/src/main/scala/sparkz/core/network/PeerConnectionHandler.scala +++ b/src/main/scala/sparkz/core/network/PeerConnectionHandler.scala @@ -9,8 +9,7 @@ import sparkz.core.network.NetworkController.ReceivableMessages.{Handshaked, Pen import sparkz.core.network.PeerConnectionHandler.ReceivableMessages import sparkz.core.network.PeerFeature.Serializers import sparkz.core.network.message.{HandshakeSpec, MessageSerializer} -import sparkz.core.network.peer.PeerManager.ReceivableMessages.AddOrUpdatePeer -import sparkz.core.network.peer.{PeerInfo, PenaltyType} +import sparkz.core.network.peer.{ForgerNodePeerFeature, ForgerNodePeerFeatureSerializer, PeerInfo, PenaltyType} import sparkz.core.serialization.SparkzSerializer import sparkz.core.settings.NetworkSettings import sparkz.util.SparkzLogging @@ -35,11 +34,10 @@ class PeerConnectionHandler(val settings: NetworkSettings, private val direction = connectionDescription.connectionId.direction private val ownSocketAddress = connectionDescription.ownSocketAddress private val localFeatures = connectionDescription.localFeatures + private val mandatoryFeatureSerializers: Serializers = Map(ForgerNodePeerFeature.featureId -> ForgerNodePeerFeatureSerializer) + private val localFeatureSerializers: Serializers = localFeatures.map(f => f.featureId -> (f.serializer: SparkzSerializer[_ <: PeerFeature])).toMap - private val featureSerializers: Serializers = - localFeatures.map(f => f.featureId -> (f.serializer: SparkzSerializer[_ <: PeerFeature])).toMap - - private val handshakeSerializer = new HandshakeSpec(featureSerializers, settings.maxHandshakeSize) + private val handshakeSerializer = new HandshakeSpec(mandatoryFeatureSerializers ++ localFeatureSerializers, settings.maxHandshakeSize) private val messageSerializer = new MessageSerializer(sparkzContext.messageSpecs, settings.magicBytes, settings.messageLengthBytesLimit) // there is no recovery for broken connections diff --git a/src/main/scala/sparkz/core/network/PeerSynchronizer.scala b/src/main/scala/sparkz/core/network/PeerSynchronizer.scala index 3a08aba40..f17df1391 100644 --- a/src/main/scala/sparkz/core/network/PeerSynchronizer.scala +++ b/src/main/scala/sparkz/core/network/PeerSynchronizer.scala @@ -38,9 +38,12 @@ class PeerSynchronizer(val networkControllerRef: ActorRef, networkControllerRef ! RegisterMessageSpecs(Seq(GetPeersSpec, peersSpec), self) - val msg = Message[Unit](GetPeersSpec, Right(()), None) - val stn = SendToNetwork(msg, SendToRandom) - context.system.scheduler.scheduleWithFixedDelay(2.seconds, settings.getPeersInterval, networkControllerRef, stn) + //do not ask network for new peers if we are only connecting to known peers + if (!settings.onlyConnectToKnownPeers) { + val msg = Message[Unit](GetPeersSpec, Right(()), None) + val stn = SendToNetwork(msg, SendToRandom) + context.system.scheduler.scheduleWithFixedDelay(2.seconds, settings.getPeersInterval, networkControllerRef, stn) + } } override def receive: Receive = { diff --git a/src/main/scala/sparkz/core/network/SendingStrategy.scala b/src/main/scala/sparkz/core/network/SendingStrategy.scala index 4d228b032..d313d66dc 100644 --- a/src/main/scala/sparkz/core/network/SendingStrategy.scala +++ b/src/main/scala/sparkz/core/network/SendingStrategy.scala @@ -1,8 +1,10 @@ package sparkz.core.network -import sparkz.core.network.peer.TransactionsDisabledPeerFeature +import sparkz.core.network.peer.{ForgerNodePeerFeature, TransactionsDisabledPeerFeature} +import sparkz.core.settings.NetworkSettings import java.security.SecureRandom +import scala.util.Random trait SendingStrategy { val secureRandom = new SecureRandom() @@ -31,6 +33,14 @@ case object BroadcastTransaction extends SendingStrategy { } } +case object BroadcastBlock extends SendingStrategy { + override def choose(peers: Seq[ConnectedPeer]): Seq[ConnectedPeer] = { + val (forgerPeers, remainingPeers) = peers.partition(_.peerInfo.exists(_.peerSpec.features.contains(ForgerNodePeerFeature()))) + + forgerPeers ++ remainingPeers + } +} + case class BroadcastExceptOf(exceptOf: Seq[ConnectedPeer]) extends SendingStrategy { override def choose(peers: Seq[ConnectedPeer]): Seq[ConnectedPeer] = peers.filterNot(exceptOf.contains) diff --git a/src/main/scala/sparkz/core/network/SyncTracker.scala b/src/main/scala/sparkz/core/network/SyncTracker.scala index 841e9fa5a..783275a09 100644 --- a/src/main/scala/sparkz/core/network/SyncTracker.scala +++ b/src/main/scala/sparkz/core/network/SyncTracker.scala @@ -54,6 +54,11 @@ class SyncTracker(nvsRef: ActorRef, def updateStatus(peer: ConnectedPeer, status: HistoryComparisonResult): Unit = { val seniorsBefore = numOfSeniors() statuses += peer -> status + + if (isFirstTimeTrackingPeer(peer)) { + updateLastSyncSentTime(peer) + } + val seniorsAfter = numOfSeniors() // todo: we should also send NoBetterNeighbour signal when all the peers around are not seniors initially @@ -68,6 +73,10 @@ class SyncTracker(nvsRef: ActorRef, } } + private def isFirstTimeTrackingPeer(peer: ConnectedPeer) = { + !lastSyncSentTime.contains(peer) + } + //todo: combine both? def clearStatus(remote: InetSocketAddress): Unit = { statuses.find(_._1.connectionId.remoteAddress == remote) match { @@ -92,7 +101,6 @@ class SyncTracker(nvsRef: ActorRef, private def outdatedPeers(): Seq[ConnectedPeer] = lastSyncSentTime.filter(t => (timeProvider.time() - t._2).millis > maxInterval()).keys.toSeq - @nowarn def peersByStatus: Map[HistoryComparisonResult, Iterable[ConnectedPeer]] = statuses.groupBy(_._2).mapValues(_.keys).toMap diff --git a/src/main/scala/sparkz/core/network/message/BasicMessagesRepo.scala b/src/main/scala/sparkz/core/network/message/BasicMessagesRepo.scala index 75707e417..bd67a860c 100644 --- a/src/main/scala/sparkz/core/network/message/BasicMessagesRepo.scala +++ b/src/main/scala/sparkz/core/network/message/BasicMessagesRepo.scala @@ -9,9 +9,9 @@ import sparkz.util.Extensions._ import sparkz.util.serialization.{Reader, Writer} import sparkz.util.{ModifierId, SparkzLogging, bytesToId, idToBytes} -import scala.collection.immutable +import scala.collection.mutable -case class ModifiersData(typeId: ModifierTypeId, modifiers: Map[ModifierId, Array[Byte]]) +case class ModifiersData(typeId: ModifierTypeId, modifiers: Seq[(ModifierId, Array[Byte])]) case class InvData(typeId: ModifierTypeId, ids: Seq[ModifierId]) @@ -157,7 +157,7 @@ class ModifiersSpec(maxMessageSize: Int) extends MessageSpecV1[ModifiersData] wi } if (msgSize > maxMessageSize) { - log.warn(s"Message with modifiers ${modifiers.keySet} has size $msgSize exceeding limit $maxMessageSize." + + log.warn(s"Message with modifiers ${modifiers.map(_._1)} has size $msgSize exceeding limit $maxMessageSize." + s" Sending ${w.length() - start} bytes instead") } } @@ -165,7 +165,7 @@ class ModifiersSpec(maxMessageSize: Int) extends MessageSpecV1[ModifiersData] wi override def parse(r: Reader): ModifiersData = { val typeId = ModifierTypeId @@ r.getByte() // 1 byte val count = r.getUInt().toIntExact // 8 bytes - val resMap = immutable.Map.newBuilder[ModifierId, Array[Byte]] + val res = mutable.Buffer[(ModifierId, Array[Byte])]() (0 until count).foldLeft(HeaderLength) { case (msgSize, _) => val id = bytesToId(r.getBytes(NodeViewModifier.ModifierIdSize)) val objBytesCnt = r.getUInt().toIntExact @@ -174,10 +174,10 @@ class ModifiersSpec(maxMessageSize: Int) extends MessageSpecV1[ModifiersData] wi throw new Exception("Too big message with modifiers, size: " + maxMessageSize) } val obj = r.getBytes(objBytesCnt) - resMap += (id -> obj) + res += (id -> obj) newMsgSize } - ModifiersData(typeId, resMap.result()) + ModifiersData(typeId, res.toSeq) } } @@ -256,8 +256,9 @@ class HandshakeSpec(featureSerializers: PeerFeature.Serializers, sizeLimit: Int) /** * Serializing handshake into a byte writer. + * * @param hs - handshake instance - * @param w - writer to write bytes to + * @param w - writer to write bytes to */ override def serialize(hs: Handshake, w: Writer): Unit = { // first writes down handshake time, then peer specification of our node diff --git a/src/main/scala/sparkz/core/network/peer/ForgerNodePeerFeature.scala b/src/main/scala/sparkz/core/network/peer/ForgerNodePeerFeature.scala new file mode 100644 index 000000000..1049da080 --- /dev/null +++ b/src/main/scala/sparkz/core/network/peer/ForgerNodePeerFeature.scala @@ -0,0 +1,34 @@ +package sparkz.core.network.peer + +import sparkz.core.network.PeerFeature +import sparkz.core.network.PeerFeature.Id +import sparkz.util.serialization._ +import sparkz.core.serialization.SparkzSerializer + +/** + * This peer feature marks forger nodes + */ +case class ForgerNodePeerFeature() extends PeerFeature { + + override type M = ForgerNodePeerFeature + override val featureId: Id = ForgerNodePeerFeature.featureId + + override def serializer: ForgerNodePeerFeatureSerializer.type = ForgerNodePeerFeatureSerializer + +} + +object ForgerNodePeerFeature { + + val featureId: Id = 5: Byte + +} + +object ForgerNodePeerFeatureSerializer extends SparkzSerializer[ForgerNodePeerFeature] { + + + override def parse(r: Reader): ForgerNodePeerFeature = { + ForgerNodePeerFeature() + } + + override def serialize(obj: ForgerNodePeerFeature, w: Writer): Unit = {} +} diff --git a/src/main/scala/sparkz/core/network/peer/InMemoryPeerDatabase.scala b/src/main/scala/sparkz/core/network/peer/InMemoryPeerDatabase.scala index 3d5dbf89a..8b48f4083 100644 --- a/src/main/scala/sparkz/core/network/peer/InMemoryPeerDatabase.scala +++ b/src/main/scala/sparkz/core/network/peer/InMemoryPeerDatabase.scala @@ -103,11 +103,14 @@ final class InMemoryPeerDatabase(sparkzSettings: SparkzSettings, sparkzContext: bucketManager.removePeer(address) } - override def allPeers: Map[InetSocketAddress, PeerDatabaseValue] = knownPeers ++ bucketManager.getTriedPeers ++ bucketManager.getNewPeers + override def allPeers: Map[InetSocketAddress, PeerDatabaseValue] = + if (settings.onlyConnectToKnownPeers) + knownPeers + else + knownPeers ++ bucketManager.getTriedPeers ++ bucketManager.getNewPeers override def blacklistedPeers: Seq[InetAddress] = blacklist - .map { case (address, bannedTill) => - checkBanned(address, bannedTill) + .map { case (address, bannedTill) if checkBanned(address, bannedTill) => address } .toSeq @@ -170,7 +173,11 @@ final class InMemoryPeerDatabase(sparkzSettings: SparkzSettings, sparkzContext: (360 * 10).days.toMillis } - override def randomPeersSubset: Map[InetSocketAddress, PeerDatabaseValue] = knownPeers ++ bucketManager.getRandomPeers + override def randomPeersSubset: Map[InetSocketAddress, PeerDatabaseValue] = + if (settings.onlyConnectToKnownPeers) + knownPeers + else + knownPeers ++ bucketManager.getRandomPeers override def updatePeer(peerDatabaseValue: PeerDatabaseValue): Unit = { if (peerIsNotBlacklistedAndNotKnownPeer(peerDatabaseValue)) { diff --git a/src/main/scala/sparkz/core/network/peer/PeerDatabase.scala b/src/main/scala/sparkz/core/network/peer/PeerDatabase.scala index 2cdb1804b..24dd03fe3 100644 --- a/src/main/scala/sparkz/core/network/peer/PeerDatabase.scala +++ b/src/main/scala/sparkz/core/network/peer/PeerDatabase.scala @@ -45,7 +45,7 @@ object PeerDatabase { */ object PeerConfidence extends Enumeration { type PeerConfidence = Value - val Unknown, Low, Medium, High: Value = Value + val Unknown, Low, Medium, High, Forger: Value = Value } case class PeerDatabaseValue(address: InetSocketAddress, peerInfo: PeerInfo, confidence: PeerConfidence) { diff --git a/src/main/scala/sparkz/core/network/peer/PeerManager.scala b/src/main/scala/sparkz/core/network/peer/PeerManager.scala index a79dab635..35937bb26 100644 --- a/src/main/scala/sparkz/core/network/peer/PeerManager.scala +++ b/src/main/scala/sparkz/core/network/peer/PeerManager.scala @@ -43,7 +43,7 @@ class PeerManager( PeerDatabaseValue( extractAddressFromPeerInfo(peerInfo), peerInfo, - PeerConfidence.Unknown + if (peerInfo.peerSpec.features.contains(ForgerNodePeerFeature())) PeerConfidence.Forger else PeerConfidence.Unknown ) ) } @@ -75,7 +75,9 @@ class PeerManager( val address: InetSocketAddress = peerSpec.address.getOrElse(throw new IllegalArgumentException()) val peerInfo: PeerInfo = PeerInfo(peerSpec, 0L, None) log.info(s"New discovered peer: $peerInfo") - PeerDatabaseValue(address, peerInfo, PeerConfidence.Unknown) + val peerConfidence = + if (peerInfo.peerSpec.features.contains(ForgerNodePeerFeature())) PeerConfidence.Forger else PeerConfidence.Unknown + PeerDatabaseValue(address, peerInfo, peerConfidence) } peerDatabase.addOrUpdateKnownPeers(filteredPeers) @@ -195,16 +197,23 @@ object PeerManager { sparkzContext: SparkzContext): Option[PeerInfo] = { var response: Option[PeerInfo] = None - val highConfidencePeers = peers.filter(_._2.confidence == PeerConfidence.High) - val highConfidenceCandidates = highConfidencePeers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq + val forgerPeers = peers.filter(_._2.confidence == PeerConfidence.Forger) + val forgerCandidates = forgerPeers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq - if (highConfidenceCandidates.nonEmpty) { - response = Some(highConfidenceCandidates(secureRandom.nextInt(highConfidenceCandidates.size)).peerInfo) + if (forgerCandidates.nonEmpty) { + response = Some(forgerCandidates(secureRandom.nextInt(forgerCandidates.size)).peerInfo) } else { - val candidates = peers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq + val highConfidencePeers = peers.filter(_._2.confidence == PeerConfidence.High) + val highConfidenceCandidates = highConfidencePeers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq + + if (highConfidenceCandidates.nonEmpty) { + response = Some(highConfidenceCandidates(secureRandom.nextInt(highConfidenceCandidates.size)).peerInfo) + } else { + val candidates = peers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq - if (candidates.nonEmpty) - response = Some(candidates(secureRandom.nextInt(candidates.size)).peerInfo) + if (candidates.nonEmpty) + response = Some(candidates(secureRandom.nextInt(candidates.size)).peerInfo) + } } response @@ -219,30 +228,6 @@ object PeerManager { } } - case class RandomPeerExcluding(excludedPeers: Seq[Option[InetSocketAddress]]) extends GetPeers[Option[PeerInfo]] { - private val secureRandom = new SecureRandom() - - override def choose(peers: Map[InetSocketAddress, PeerDatabaseValue], - blacklistedPeers: Seq[InetAddress], - sparkzContext: SparkzContext): Option[PeerInfo] = { - var response: Option[PeerInfo] = None - - val highConfidencePeers = peers.filter(_._2.confidence == PeerConfidence.High) - val highConfidenceCandidates = highConfidencePeers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq - - if (highConfidenceCandidates.nonEmpty) { - response = Some(highConfidenceCandidates(secureRandom.nextInt(highConfidenceCandidates.size)).peerInfo) - } else { - val candidates = peers.values.filterNot(goodCandidateFilter(excludedPeers, blacklistedPeers, _)).toSeq - - if (candidates.nonEmpty) - response = Some(candidates(secureRandom.nextInt(candidates.size)).peerInfo) - } - - response - } - } - case object GetBlacklistedPeers extends GetPeers[Seq[InetAddress]] { override def choose(peers: Map[InetSocketAddress, PeerDatabaseValue], diff --git a/src/main/scala/sparkz/core/settings/Settings.scala b/src/main/scala/sparkz/core/settings/Settings.scala index 5e9939831..a3a45ef05 100644 --- a/src/main/scala/sparkz/core/settings/Settings.scala +++ b/src/main/scala/sparkz/core/settings/Settings.scala @@ -2,7 +2,6 @@ package sparkz.core.settings import java.io.File import java.net.InetSocketAddress - import com.typesafe.config.{Config, ConfigFactory} import net.ceedubs.ficus.Ficus._ import net.ceedubs.ficus.readers.ArbitraryTypeReader._ @@ -21,9 +20,11 @@ case class NetworkSettings(nodeName: String, addedMaxDelay: Option[FiniteDuration], localOnly: Boolean, knownPeers: Seq[InetSocketAddress], + onlyConnectToKnownPeers: Boolean, bindAddress: InetSocketAddress, maxIncomingConnections: Int, maxOutgoingConnections: Int, + maxForgerConnections: Int, connectionTimeout: FiniteDuration, declaredAddress: Option[InetSocketAddress], handshakeTimeout: FiniteDuration, @@ -62,7 +63,8 @@ case class NetworkSettings(nodeName: String, temporalBanDuration: FiniteDuration, penaltySafeInterval: FiniteDuration, penaltyScoreThreshold: Int, - handlingTransactionsEnabled: Boolean) + handlingTransactionsEnabled: Boolean, + isForgerNode: Boolean) case class SparkzSettings(dataDir: File, logDir: File, diff --git a/src/test/scala/sparkz/core/api/http/PeersApiRouteSpec.scala b/src/test/scala/sparkz/core/api/http/PeersApiRouteSpec.scala index 9d5393960..bf61a2ec0 100644 --- a/src/test/scala/sparkz/core/api/http/PeersApiRouteSpec.scala +++ b/src/test/scala/sparkz/core/api/http/PeersApiRouteSpec.scala @@ -11,6 +11,7 @@ import io.circe.syntax._ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import sparkz.core.api.http.PeersApiRoute.PeerInfoResponse +import sparkz.core.network.NetworkController.ReceivableMessages.DisconnectFromNode import sparkz.core.network.peer.PeerInfo import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddToBlacklist, DisconnectFromAddress, RemoveFromBlacklist, RemovePeer} import sparkz.core.settings.{RESTApiSettings, SparkzSettings} @@ -228,7 +229,7 @@ class PeersApiRouteSpec extends AnyFlatSpec Delete(prefix + "/peer", body) ~> routesWithProbes ~> check { peerManagerProbe.expectMsgClass(classOf[RemovePeer]) - networkControllerProbe.expectMsgClass(classOf[DisconnectFromAddress]) + networkControllerProbe.expectMsgClass(classOf[DisconnectFromNode]) status shouldBe StatusCodes.OK } diff --git a/src/test/scala/sparkz/core/network/MessageSpecification.scala b/src/test/scala/sparkz/core/network/MessageSpecification.scala index 0721ac0c9..1cec19142 100644 --- a/src/test/scala/sparkz/core/network/MessageSpecification.scala +++ b/src/test/scala/sparkz/core/network/MessageSpecification.scala @@ -76,14 +76,14 @@ class MessageSpecification extends AnyPropSpec val recovered = modifiersSpec.parseByteString(bytes) recovered.typeId shouldEqual data.typeId - recovered.modifiers.keys.size shouldEqual data.modifiers.keys.size + recovered.modifiers.map(_._1).size shouldEqual data.modifiers.map(_._1).size - recovered.modifiers.keys.foreach { id => - data.modifiers.get(id).isDefined shouldEqual true + recovered.modifiers.map(_._1).foreach { id => + data.modifiers.exists(_._1 == id) shouldEqual true } - recovered.modifiers.values.toSet.foreach { v: Array[Byte] => - data.modifiers.values.toSet.exists(_.sameElements(v)) shouldEqual true + recovered.modifiers.map(_._2).toSet.foreach { v: Array[Byte] => + data.modifiers.map(_._2).toSet.exists(_.sameElements(v)) shouldEqual true } modifiersSpec.toByteString(data) shouldEqual bytes @@ -93,8 +93,8 @@ class MessageSpecification extends AnyPropSpec val recovered2 = modifiersSpecLimited.parseByteString(bytes2) recovered2.typeId shouldEqual data.typeId - (recovered2.modifiers.keys.size == data.modifiers.keys.size) shouldEqual false - recovered2.modifiers.keys.size shouldEqual 0 + (recovered2.modifiers.map(_._1).size == data.modifiers.map(_._1).size) shouldEqual false + recovered2.modifiers.map(_._1).size shouldEqual 0 } } } diff --git a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala index f3dc238d8..162ca300f 100644 --- a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala +++ b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala @@ -18,7 +18,7 @@ import sparkz.core.network.NetworkController.ReceivableMessages.{ConnectTo, GetC import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.DisconnectedPeer import sparkz.core.network.message._ import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddOrUpdatePeer, ConfirmConnection, DisconnectFromAddress, GetAllPeers, RandomPeerForConnectionExcluding} -import sparkz.core.network.peer._ +import sparkz.core.network.peer.{ForgerNodePeerFeature, _} import sparkz.core.serialization.SparkzSerializer import sparkz.core.settings.SparkzSettings import sparkz.core.utils.LocalTimeProvider @@ -33,7 +33,8 @@ class NetworkControllerSpec extends NetworkTests with ScalaFutures { import scala.concurrent.ExecutionContext.Implicits.global private val featureSerializers = Map[Byte, SparkzSerializer[_ <: PeerFeature]](LocalAddressPeerFeature.featureId -> LocalAddressPeerFeatureSerializer, - TransactionsDisabledPeerFeature.featureId -> TransactionsDisabledPeerFeatureSerializer) + TransactionsDisabledPeerFeature.featureId -> TransactionsDisabledPeerFeatureSerializer, + ForgerNodePeerFeature.featureId -> ForgerNodePeerFeatureSerializer) "A NetworkController" should "send local address on handshake when peer and node address are in localhost" in { implicit val system: ActorSystem = ActorSystem() @@ -264,7 +265,7 @@ class NetworkControllerSpec extends NetworkTests with ScalaFutures { val tcpManagerProbe = TestProbe() val nodeAddr = new InetSocketAddress("88.77.66.55", 12345) - val settings2 = settings.copy(network = settings.network.copy(bindAddress = nodeAddr, maxOutgoingConnections = 1)) + val settings2 = settings.copy(network = settings.network.copy(bindAddress = nodeAddr, maxOutgoingConnections = 1, maxForgerConnections = 0)) val (networkControllerRef: ActorRef, _) = createNetworkController(settings2, tcpManagerProbe) val testPeer = new TestPeer(settings2, networkControllerRef, tcpManagerProbe) @@ -286,6 +287,70 @@ class NetworkControllerSpec extends NetworkTests with ScalaFutures { system.terminate() } + it should "allow connecting to forger over maxOutgoingConnections limit" in { + implicit val system: ActorSystem = ActorSystem() + + val tcpManagerProbe = TestProbe() + + val nodeAddr = new InetSocketAddress("88.77.66.55", 12345) + val settings2 = settings.copy(network = settings.network.copy(bindAddress = nodeAddr, maxOutgoingConnections = 1, maxForgerConnections = 1)) + val (networkControllerRef: ActorRef, _) = createNetworkController(settings2, tcpManagerProbe) + + val testPeer = new TestPeer(settings2, networkControllerRef, tcpManagerProbe) + val peer1DecalredAddr = new InetSocketAddress("88.77.66.55", 5678) + val peer1LocalAddr = new InetSocketAddress("192.168.1.55", 5678) + val peerInfo1 = getPeerInfo(peer1LocalAddr) + testPeer.establishNewOutgoingConnection(peerInfo1) + testPeer.connectAndExpectSuccessfulMessages(peer1LocalAddr, nodeAddr, Tcp.ResumeReading) + testPeer.receiveHandshake + testPeer.sendHandshake(Some(peer1DecalredAddr), Some(peer1LocalAddr)) + testPeer.receiveGetPeers + testPeer.sendPeers(Seq.empty) + + val peer2DeclaredAddr = new InetSocketAddress("99.88.77.66", 5678) + val peer2LocalAddr = new InetSocketAddress("192.168.1.56", 5678) + val peerInfo2 = getPeerInfo(peer2LocalAddr) + testPeer.establishNewOutgoingConnection(peerInfo2) + testPeer.connectAndExpectSuccessfulMessages(peer2LocalAddr, nodeAddr, Tcp.ResumeReading) + testPeer.receiveHandshake + testPeer.sendHandshake(Some(peer2DeclaredAddr), Some(peer2LocalAddr), forgerNode = true) + tcpManagerProbe.expectNoMessage() + + system.terminate() + } + + it should "disconnect after handshake if the maxOutgoingConnections limit reached" in { + implicit val system: ActorSystem = ActorSystem() + + val tcpManagerProbe = TestProbe() + + val nodeAddr = new InetSocketAddress("88.77.66.55", 12345) + val settings2 = settings.copy(network = settings.network.copy(bindAddress = nodeAddr, maxOutgoingConnections = 1, maxForgerConnections = 1)) + val (networkControllerRef: ActorRef, _) = createNetworkController(settings2, tcpManagerProbe) + + val testPeer = new TestPeer(settings2, networkControllerRef, tcpManagerProbe) + val peer1DecalredAddr = new InetSocketAddress("88.77.66.55", 5678) + val peer1LocalAddr = new InetSocketAddress("192.168.1.55", 5678) + val peerInfo1 = getPeerInfo(peer1LocalAddr) + testPeer.establishNewOutgoingConnection(peerInfo1) + testPeer.connectAndExpectSuccessfulMessages(peer1LocalAddr, nodeAddr, Tcp.ResumeReading) + testPeer.receiveHandshake + testPeer.sendHandshake(Some(peer1DecalredAddr), Some(peer1LocalAddr)) + testPeer.receiveGetPeers + testPeer.sendPeers(Seq.empty) + + val peer2DeclaredAddr = new InetSocketAddress("99.88.77.66", 5678) + val peer2LocalAddr = new InetSocketAddress("192.168.1.56", 5678) + val peerInfo2 = getPeerInfo(peer2LocalAddr) + testPeer.establishNewOutgoingConnection(peerInfo2) + testPeer.connectAndExpectSuccessfulMessages(peer2LocalAddr, nodeAddr, Tcp.ResumeReading) + testPeer.receiveHandshake + testPeer.sendHandshake(Some(peer2DeclaredAddr), Some(peer2LocalAddr)) + tcpManagerProbe.expectMsg(Abort) + + system.terminate() + } + it should "not send known local address of peer when node is not in local network" in { implicit val system: ActorSystem = ActorSystem() @@ -626,7 +691,8 @@ class TestPeer(settings: SparkzSettings, networkControllerRef: ActorRef, tcpMana private val timeProvider = LocalTimeProvider private val featureSerializers = Map[Byte, SparkzSerializer[_ <: PeerFeature]](LocalAddressPeerFeature.featureId -> LocalAddressPeerFeatureSerializer, - TransactionsDisabledPeerFeature.featureId -> TransactionsDisabledPeerFeatureSerializer) + TransactionsDisabledPeerFeature.featureId -> TransactionsDisabledPeerFeatureSerializer, + ForgerNodePeerFeature.featureId -> ForgerNodePeerFeatureSerializer) private val handshakeSerializer = new HandshakeSpec(featureSerializers, Int.MaxValue) private val peersSpec = new PeersSpec(featureSerializers, settings.network.maxPeerSpecObjects) private val messageSpecs = Seq(GetPeersSpec, peersSpec) @@ -675,9 +741,10 @@ class TestPeer(settings: SparkzSettings, networkControllerRef: ActorRef, tcpMana * @param localAddress * @return */ - def sendHandshake(declaredAddress: Option[InetSocketAddress], localAddress: Option[InetSocketAddress]): Tcp.ResumeReading.type = { + def sendHandshake(declaredAddress: Option[InetSocketAddress], localAddress: Option[InetSocketAddress], forgerNode: Boolean = false): Tcp.ResumeReading.type = { val localFeature: Seq[PeerFeature] = localAddress.map(LocalAddressPeerFeature(_)).toSeq - val features = localFeature :+ SessionIdPeerFeature(settings.network.magicBytes) + val forgerNodeFeature = if (forgerNode) Some(ForgerNodePeerFeature()) else None + val features = localFeature ++ forgerNodeFeature :+ SessionIdPeerFeature(settings.network.magicBytes) val handshakeToNode = Handshake(PeerSpec(settings.network.agentName, Version(settings.network.appVersion), "test", declaredAddress, features), timeProvider.time()) diff --git a/src/test/scala/sparkz/core/network/NetworkTests.scala b/src/test/scala/sparkz/core/network/NetworkTests.scala index 3c4cb8c79..d4e359389 100644 --- a/src/test/scala/sparkz/core/network/NetworkTests.scala +++ b/src/test/scala/sparkz/core/network/NetworkTests.scala @@ -4,7 +4,7 @@ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import sparkz.core.app.Version import sparkz.core.network.NetworkTests.MockTimeProvider -import sparkz.core.network.peer.PeerInfo +import sparkz.core.network.peer.{ForgerNodePeerFeature, PeerInfo} import sparkz.core.settings.SparkzSettings import sparkz.core.utils.TimeProvider import sparkz.core.utils.TimeProvider.Time @@ -18,8 +18,9 @@ class NetworkTests extends AnyFlatSpec with Matchers { protected def currentTime(): TimeProvider.Time = mockTimeProvider.time() - protected def getPeerInfo(address: InetSocketAddress, nameOpt: Option[String] = None, featureSeq: Seq[PeerFeature] = Seq()): PeerInfo = { - val data = PeerSpec("full node", Version.last, nameOpt.getOrElse(address.toString), Some(address), featureSeq) + protected def getPeerInfo(address: InetSocketAddress, nameOpt: Option[String] = None, featureSeq: Seq[PeerFeature] = Seq(), forgerNode: Boolean = false): PeerInfo = { + val features = if (forgerNode) featureSeq :+ ForgerNodePeerFeature() else featureSeq + val data = PeerSpec("full node", Version.last, nameOpt.getOrElse(address.toString), Some(address), features) PeerInfo(data, currentTime(), None) } diff --git a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala index 1d447f0a7..3a26b0445 100644 --- a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala +++ b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala @@ -1,10 +1,12 @@ package sparkz.core.network import akka.actor.{ActorRef, ActorSystem, Props} -import akka.testkit.TestProbe +import akka.testkit.{TestActorRef, TestProbe} +import sparkz.ObjectGenerators import sparkz.core.NodeViewHolder.ReceivableMessages.{GetNodeViewChanges, TransactionsFromRemote} +import sparkz.core.consensus.History.Unknown import sparkz.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork, StartConnectingPeers} -import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, ChangedMempool, FailedTransaction, SuccessfulTransaction} +import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages._ import sparkz.core.network.message._ import sparkz.core.network.peer.PenaltyType import sparkz.core.network.peer.PenaltyType.MisbehaviorPenalty @@ -14,23 +16,29 @@ import sparkz.core.transaction.Transaction import sparkz.core.utils.NetworkTimeProvider import sparkz.core.{ModifierTypeId, NodeViewModifier} import sparkz.util.ModifierId +import sparkz.util.serialization.{Reader, Writer} import java.net.InetSocketAddress import scala.concurrent.ExecutionContext import scala.concurrent.duration.DurationInt import scala.util.{Failure, Success} -class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementations { +class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementations with ObjectGenerators { implicit val actorSystem: ActorSystem = ActorSystem() implicit val executionContext: ExecutionContext = actorSystem.dispatchers.lookup("sparkz.executionContext") private val modifiersSpec = new ModifiersSpec(1024 * 1024) private val requestModifierSpec = new RequestModifierSpec(settings.network.maxInvObjects) + private val syncSpec = new SyncInfoMessageSpec[TestSyncInfo](new SparkzSerializer[TestSyncInfo] { + override def serialize(obj: TestSyncInfo, w: Writer): Unit = {} + + override def parse(r: Reader): TestSyncInfo = new TestSyncInfo + }) private val invSpec = new InvSpec(settings.network.maxInvObjects) private val (synchronizer, networkController, viewHolder) = createNodeViewSynchronizer(settings) private val peerProbe = TestProbe() private val peer = ConnectedPeer(ConnectionId(new InetSocketAddress(10), new InetSocketAddress(11), Incoming), peerProbe.ref, 0L, None) - private val messageSerializer = new MessageSerializer(Seq(modifiersSpec, invSpec, requestModifierSpec), + private val messageSerializer = new MessageSerializer(Seq(modifiersSpec, invSpec, requestModifierSpec, syncSpec), settings.network.magicBytes, settings.network.messageLengthBytesLimit) @@ -39,7 +47,7 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa val transaction = TestTransaction(1, 1) val txBytes = TestTransactionSerializer.toBytes(transaction) - synchronizer ! roundTrip(Message(modifiersSpec, Right(ModifiersData(Transaction.ModifierTypeId, Map(transaction.id -> txBytes))), Some(peer))) + synchronizer ! roundTrip(Message(modifiersSpec, Right(ModifiersData(Transaction.ModifierTypeId, Seq(transaction.id -> txBytes))), Some(peer))) viewHolder.expectMsg(TransactionsFromRemote(Seq(transaction))) networkController.expectNoMessage() } @@ -48,7 +56,7 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa val transaction = TestTransaction(1, 1) val txBytes = TestTransactionSerializer.toBytes(transaction) ++ Array[Byte](0x01, 0x02) - synchronizer ! roundTrip(Message(modifiersSpec, Right(ModifiersData(Transaction.ModifierTypeId, Map(transaction.id -> txBytes))), Some(peer))) + synchronizer ! roundTrip(Message(modifiersSpec, Right(ModifiersData(Transaction.ModifierTypeId, Seq(transaction.id -> txBytes))), Some(peer))) viewHolder.expectMsg(TransactionsFromRemote(Seq(transaction))) networkController.expectMsg(PenalizePeer(peer.connectionId.remoteAddress, MisbehaviorPenalty)) } @@ -239,6 +247,24 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa networkController.expectNoMessage() } + it should "when HandshakedPeer message is received, statusTracker should add the peer" in { + // Arrange + val nodeViewSynchronizerRef = createNodeViewSynchronizerAsTestActorRef(settings) + val nodeViewSynchronizer = nodeViewSynchronizerRef.underlyingActor + + val statusTrackerField = classOf[NodeViewSynchronizer[_, _, _, _, _, _]].getDeclaredField("statusTracker") + statusTrackerField.setAccessible(true) + val statusTracker = statusTrackerField.get(nodeViewSynchronizer).asInstanceOf[SyncTracker] + + // Act + nodeViewSynchronizerRef ! HandshakedPeer(peer) + + // Assert + val peersStatus = statusTracker.peersByStatus + peersStatus.nonEmpty should be(true) + peersStatus(Unknown) should be(Set(peer)) + } + def setupHistoryAndMempoolReaders(synchronizer: ActorRef): Unit = { val history = new TestHistory val mempool = new TestMempool { @@ -298,4 +324,39 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa (nodeViewSynchronizerRef, networkControllerProbe, viewHolderProbe) } + + private def createNodeViewSynchronizerAsTestActorRef(settings: SparkzSettings): TestActorRef[NodeViewSynchronizer[_, _, _, _, _, _]] = { + val networkControllerProbe = TestProbe() + val viewHolderProbe = TestProbe() + val timeProvider = new NetworkTimeProvider(settings.ntp) + + val modifierSerializers: Map[ModifierTypeId, SparkzSerializer[_ <: NodeViewModifier]] = + Map(Transaction.ModifierTypeId -> TestTransactionSerializer) + + TestActorRef(Props( + new NodeViewSynchronizer[TestTransaction, + TestSyncInfo, + TestSyncInfoMessageSpec.type, + TestModifier, + TestHistory, + TestMempool + ] + ( + networkControllerProbe.ref, + viewHolderProbe.ref, + TestSyncInfoMessageSpec, + settings.network, + timeProvider, + modifierSerializers + ) { + override val deliveryTracker: DeliveryTracker = new DeliveryTracker(context.system, settings.network, self) { + override def status(modifierId: ModifierId): ModifiersStatus = ModifiersStatus.Requested + + override private[network] def clearStatusForModifier(id: ModifierId, oldStatus: ModifiersStatus): Unit = {} + + override def setInvalid(modifierId: ModifierId): Option[ConnectedPeer] = Some(peer) + } + } + )) + } } diff --git a/src/test/scala/sparkz/core/network/SyncTrackerSpec.scala b/src/test/scala/sparkz/core/network/SyncTrackerSpec.scala index 17319e4bf..8ffbe8d7e 100644 --- a/src/test/scala/sparkz/core/network/SyncTrackerSpec.scala +++ b/src/test/scala/sparkz/core/network/SyncTrackerSpec.scala @@ -5,7 +5,7 @@ import akka.testkit.TestProbe import org.mockito.MockitoSugar.mock import org.scalatest.matchers.should.Matchers import org.scalatest.propspec.AnyPropSpec -import sparkz.core.consensus.History.{Fork, HistoryComparisonResult} +import sparkz.core.consensus.History.{Fork, HistoryComparisonResult, Unknown} import sparkz.core.settings.NetworkSettings import sparkz.core.utils.NetworkTimeProvider import sparkz.core.utils.TimeProvider.Time @@ -61,4 +61,21 @@ class SyncTrackerSpec extends AnyPropSpec with Matchers { statusAfterDeletion.size should be(0) timestampAfterDeletion.size should be(0) } + + property("syncTracker should start tracking a peer once the handshake message has been received") { + // Arrange + val syncTracker = new SyncTracker(probe.ref, context, networkSettings, timeProvider) + + val lastSyncSentTimeField = classOf[SyncTracker].getDeclaredField("lastSyncSentTime") + lastSyncSentTimeField.setAccessible(true) + + val lastSyncSentTime = lastSyncSentTimeField.get(syncTracker).asInstanceOf[mutable.Map[ConnectedPeer, Time]] + + // Act + lastSyncSentTime should be(Map.empty) + syncTracker.updateStatus(connectedPeer, Unknown) + + // Assert + lastSyncSentTime.contains(connectedPeer) should be(true) + } } diff --git a/src/test/scala/sparkz/core/network/TestImplementations.scala b/src/test/scala/sparkz/core/network/TestImplementations.scala index 4e27f1d4a..0fa635e47 100644 --- a/src/test/scala/sparkz/core/network/TestImplementations.scala +++ b/src/test/scala/sparkz/core/network/TestImplementations.scala @@ -2,7 +2,7 @@ package sparkz.core.network import sparkz.core.{ModifierTypeId, PersistentNodeViewModifier} -import sparkz.core.consensus.History.ModifierIds +import sparkz.core.consensus.History.{ModifierIds, Older} import sparkz.core.consensus.{History, HistoryReader, ModifierSemanticValidity, SyncInfo} import sparkz.core.network.message.SyncInfoMessageSpec import sparkz.core.serialization.SparkzSerializer @@ -46,9 +46,9 @@ trait TestImplementations { override def modifierById(modifierId: sparkz.util.ModifierId): Option[TestModifier] = ??? override def isSemanticallyValid(modifierId: sparkz.util.ModifierId): ModifierSemanticValidity = ??? override def openSurfaceIds(): Seq[sparkz.util.ModifierId] = ??? - override def continuationIds(info: TestSyncInfo, size: Int): ModifierIds = ??? - override def syncInfo: TestSyncInfo = ??? - override def compare(other: TestSyncInfo): History.HistoryComparisonResult = ??? + override def continuationIds(info: TestSyncInfo, size: Int): ModifierIds = Seq() + override def syncInfo: TestSyncInfo = new TestSyncInfo + override def compare(other: TestSyncInfo): History.HistoryComparisonResult = Older override type NVCT = this.type } diff --git a/src/test/scala/sparkz/core/network/peer/InMemoryPeerDatabaseSpec.scala b/src/test/scala/sparkz/core/network/peer/InMemoryPeerDatabaseSpec.scala index c2a8ca3c2..b066fe5fe 100644 --- a/src/test/scala/sparkz/core/network/peer/InMemoryPeerDatabaseSpec.scala +++ b/src/test/scala/sparkz/core/network/peer/InMemoryPeerDatabaseSpec.scala @@ -283,4 +283,44 @@ class InMemoryPeerDatabaseSpec extends NetworkTests with ObjectGenerators with B allPeers.contains(thirdAddress) shouldBe true } } + + it should "only return knownPeers if the flag is set to true" in { + val firstAddress = new InetSocketAddress(10) + val secondAddress = new InetSocketAddress(11) + val thirdAddress = new InetSocketAddress(12) + val forthAddress = new InetSocketAddress(13) + val fifthAddress = new InetSocketAddress(14) + val sixthAddress = new InetSocketAddress(15) + val knownPeers = Seq(firstAddress, secondAddress, thirdAddress) + + def withDbHavingKnownPeers(test: InMemoryPeerDatabase => Assertion): Assertion = + test(new InMemoryPeerDatabase( + settings.copy(network = settings.network.copy(penaltySafeInterval = 1.seconds, knownPeers = knownPeers, onlyConnectToKnownPeers = true)), + sparkzContext + )) + + withDbHavingKnownPeers { db => + val extraPeerOne = PeerDatabaseValue(forthAddress, getPeerInfo(forthAddress), PeerConfidence.Unknown) + val extraPeerTwo = PeerDatabaseValue(fifthAddress, getPeerInfo(fifthAddress), PeerConfidence.Unknown) + val extraPeerThree = PeerDatabaseValue(sixthAddress, getPeerInfo(sixthAddress), PeerConfidence.Unknown) + + db.addOrUpdateKnownPeer(extraPeerOne) + db.addOrUpdateKnownPeer(extraPeerTwo) + db.addOrUpdateKnownPeer(extraPeerThree) + + val allPeers = db.allPeers + allPeers.size shouldBe 3 + allPeers.foreach(p => p._2.confidence shouldBe PeerConfidence.High) + allPeers.contains(firstAddress) shouldBe true + allPeers.contains(secondAddress) shouldBe true + allPeers.contains(thirdAddress) shouldBe true + + val randomPeersSubset = db.randomPeersSubset + randomPeersSubset.size shouldBe 3 + randomPeersSubset.foreach(p => p._2.confidence shouldBe PeerConfidence.High) + randomPeersSubset.contains(firstAddress) shouldBe true + randomPeersSubset.contains(secondAddress) shouldBe true + randomPeersSubset.contains(thirdAddress) shouldBe true + } + } } \ No newline at end of file diff --git a/src/test/scala/sparkz/core/network/peer/PeerManagerSpec.scala b/src/test/scala/sparkz/core/network/peer/PeerManagerSpec.scala index a302594be..ba74bcf71 100644 --- a/src/test/scala/sparkz/core/network/peer/PeerManagerSpec.scala +++ b/src/test/scala/sparkz/core/network/peer/PeerManagerSpec.scala @@ -11,9 +11,11 @@ import sparkz.core.network.{NetworkTests, PeerSpec} import java.net.{InetAddress, InetSocketAddress} import scala.concurrent.ExecutionContext -class PeerManagerSpec(implicit val ec: ExecutionContext) extends NetworkTests with BeforeAndAfter { +class PeerManagerSpec extends NetworkTests with BeforeAndAfter { import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddOrUpdatePeer, GetAllPeers} + implicit val actorSystem: ActorSystem = ActorSystem() + implicit val ec: ExecutionContext = actorSystem.dispatchers.lookup("sparkz.executionContext") type Data = Map[InetSocketAddress, PeerInfo] private val DefaultPort = 27017 @@ -355,4 +357,34 @@ class PeerManagerSpec(implicit val ec: ExecutionContext) extends NetworkTests wi system.terminate() } + + it should "prioritize forgerPeers over other peers" in { + // Arrange + val knownPeerAddress1 = new InetSocketAddress("127.0.0.1", DefaultPort) + val knownPeerAddress2 = new InetSocketAddress("127.0.0.2", DefaultPort) + val settingsWithKnownPeer = settings.copy(network = settings.network.copy(knownPeers = Seq(knownPeerAddress1, knownPeerAddress2))) + + implicit val system: ActorSystem = ActorSystem() + val p = TestProbe("p")(system) + implicit val defaultSender: ActorRef = p.testActor + + val sparkzContext = SparkzContext(Seq.empty, Seq.empty, mockTimeProvider, None) + val peerDatabase = new InMemoryPeerDatabase(settingsWithKnownPeer, sparkzContext) + val peerManager = PeerManagerRef(settingsWithKnownPeer, sparkzContext, peerDatabase)(system, ec) + + val peerAddress = new InetSocketAddress("1.1.1.1", DefaultPort) + val peerInfo = getPeerInfo(peerAddress, forgerNode = true) + + // Act + peerManager ! AddOrUpdatePeer(peerInfo) + + peerManager ! RandomPeerForConnectionExcluding(Seq(Some(knownPeerAddress2))) + + // Assert + val data = p.expectMsgClass(classOf[Option[PeerInfo]]) + data shouldNot be(empty) + data.foreach(p => p.peerSpec.address.contains(peerAddress) shouldBe true) + + system.terminate() + } }