Skip to content

Commit

Permalink
Merge pull request #75 from HorizenOfficial/dev
Browse files Browse the repository at this point in the history
2.1.0 final
  • Loading branch information
paolocappelletti committed Oct 2, 2023
2 parents 9c6dae6 + 86cfd3c commit e74d093
Show file tree
Hide file tree
Showing 28 changed files with 457 additions and 167 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ lazy val commonSettings = Seq(
Wart.OptionPartial),
organization := "io.horizen",
organizationName := "Zen Blockchain Foundation",
version := "2.0.3",
version := "2.1.0",
licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")),
homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")),
pomExtra :=
Expand Down
9 changes: 9 additions & 0 deletions release-notes.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
2.1.0
---------
* Added support for forger's connections prioritization
* Node synchronization improvements:
* Optimization of lookup strategy in modifiersCache
* Preserve the order of block during synchronization
* Added option to force only connecting to known peers
* Fixes/Improvements on the way the SyncTracker handles the internal statuses maps

2.0.3
---------
* Fix in the handshake process - start connecting to nodes only after the Synchronizer is initialized
Expand Down
9 changes: 9 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ sparkz {
# List of IP addresses of well known nodes.
knownPeers = []

# Force node to only try connecting to known peers
onlyConnectToKnownPeers = false

# Interval between GetPeers messages to be send by our node to a random one
getPeersInterval = 2m

Expand All @@ -99,6 +102,9 @@ sparkz {
# Number of outgoing network connections
maxOutgoingConnections = 10

# Number of dedicated connections to forgers. This works in addition to the maxOutgoingConnections ones
maxForgerConnections = 20

# Network connection timeout
connectionTimeout = 1s

Expand Down Expand Up @@ -219,6 +225,9 @@ sparkz {

# Enables transactions in the mempool
handlingTransactionsEnabled = true

# Is this node a forger
isForgerNode = false
}

ntp {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/sparkz/ObjectGenerators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ trait ObjectGenerators {

lazy val modifiersGen: Gen[ModifiersData] = for {
modifierTypeId: ModifierTypeId <- modifierTypeIdGen
modifiers: Map[ModifierId, Array[Byte]] <- Gen.nonEmptyMap(modifierWithIdGen).suchThat(_.nonEmpty)
modifiers: Seq[(ModifierId, Array[Byte])] <- Gen.nonEmptyListOf(modifierWithIdGen).suchThat(_.nonEmpty)
} yield ModifiersData(modifierTypeId, modifiers)

lazy val appVersionGen: Gen[Version] = for {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/sparkz/core/ModifiersCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ trait ModifiersCache[PMOD <: PersistentNodeViewModifier, H <: HistoryReader[PMOD
type K = sparkz.util.ModifierId
type V = PMOD

protected val cache: mutable.Map[K, V] = mutable.Map[K, V]()
protected val cache: mutable.Map[K, V] = mutable.LinkedHashMap[K, V]()

override def modifierById(modifierId: sparkz.util.ModifierId): Option[PMOD] = cache.get(modifierId)

Expand Down
7 changes: 3 additions & 4 deletions src/main/scala/sparkz/core/api/http/PeersApiRoute.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import akka.http.scaladsl.server.Route
import io.circe.generic.auto.exportDecoder
import io.circe.generic.semiauto._
import io.circe.syntax._
import io.circe.{Encoder, Decoder, Json}
import io.circe.{Decoder, Encoder, Json}
import sparkz.core.api.http.PeersApiRoute.PeerApiRequest.AddToBlacklistBodyRequest
import sparkz.core.api.http.PeersApiRoute.Request.ConnectBodyRequest
import sparkz.core.api.http.PeersApiRoute.{BlacklistedPeers, PeerInfoResponse, PeersStatusResponse}
import sparkz.core.network.ConnectedPeer
import sparkz.core.network.NetworkController.ReceivableMessages.{ConnectTo, GetConnectedPeers, GetPeersStatus}
import sparkz.core.network.NetworkController.ReceivableMessages.{ConnectTo, DisconnectFromNode, GetConnectedPeers, GetPeersStatus}
import sparkz.core.network.peer.PeerManager.ReceivableMessages._
import sparkz.core.network.peer.PenaltyType.CustomPenaltyDuration
import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddPeersIfEmpty, GetAllPeers, GetBlacklistedPeers}
import sparkz.core.network.peer.{PeerInfo, PeersStatus}
import sparkz.core.settings.RESTApiSettings
import sparkz.core.utils.NetworkTimeProvider
Expand Down Expand Up @@ -179,7 +178,7 @@ case class PeersApiRoute(peerManager: ActorRef,
val port = addressAndPort.group(2).toInt
val peerAddress = new InetSocketAddress(host, port)
peerManager ! RemovePeer(peerAddress)
networkController ! DisconnectFromAddress(peerAddress)
networkController ! DisconnectFromNode(peerAddress)
ApiResponse.OK
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/sparkz/core/network/ConnectedPeer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ case class ConnectedPeer(connectionId: ConnectionId,
var lastMessage: Long,
peerInfo: Option[PeerInfo]) {

override def hashCode(): Int = connectionId.hashCode()
override def hashCode(): Int = connectionId.remoteAddress.hashCode()

override def equals(obj: Any): Boolean = obj match {
case that: ConnectedPeer => this.connectionId.remoteAddress == that.connectionId.remoteAddress
Expand Down
79 changes: 63 additions & 16 deletions src/main/scala/sparkz/core/network/NetworkController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class NetworkController(settings: NetworkSettings,

private var connections = Map.empty[InetSocketAddress, ConnectedPeer]
private var unconfirmedConnections = Set.empty[InetSocketAddress]
private val maxConnections = settings.maxIncomingConnections + settings.maxOutgoingConnections
private val maxConnections = settings.maxIncomingConnections + settings.maxOutgoingConnections + settings.maxForgerConnections
private val peersLastConnectionAttempts = new LRUSimpleCache[InetSocketAddress, TimeProvider.Time](threshold = 10000)

private val tryNewConnectionAttemptDelay = 5.seconds
Expand Down Expand Up @@ -150,6 +150,10 @@ class NetworkController(settings: NetworkSettings,

case DisconnectFromAddress(peerAddress) =>
closeConnection(peerAddress)

case DisconnectFromNode(peerAddress) =>
closeConnectionFromNode(peerAddress)

}

private def connectionEvents: Receive = {
Expand Down Expand Up @@ -208,7 +212,7 @@ class NetworkController(settings: NetworkSettings,
log.info(s"Unconfirmed connection: ($remoteAddress, $localAddress) => $connectionId")

val handlerRef = sender()
if (connectionDirection.isOutgoing && canEstablishNewOutgoingConnection) {
if (connectionDirection.isOutgoing && (canEstablishNewOutgoingConnection || canEstablishNewForgerConnection)) {
createPeerConnectionHandler(connectionId, handlerRef)
}
else if (connectionDirection.isIncoming && canEstablishNewIncomingConnection)
Expand Down Expand Up @@ -260,7 +264,7 @@ class NetworkController(settings: NetworkSettings,
*/
private def scheduleConnectionToPeer(): Unit = {
val connectionTask: Runnable = () => {
if (canEstablishNewOutgoingConnection) {
if (canEstablishNewOutgoingConnection || canEstablishNewForgerConnection) {
log.trace(s"Looking for a new random connection")
connectionToPeer(connections, unconfirmedConnections)
}
Expand All @@ -277,6 +281,22 @@ class NetworkController(settings: NetworkSettings,
getIncomingConnectionsSize < settings.maxIncomingConnections
}

private def canEstablishNewForgerConnection: Boolean = {
getForgerConnectionsSize < settings.maxForgerConnections
}

private def shouldDropForgerConnection: Boolean = {
getForgerConnectionsSize > settings.maxForgerConnections
}

private def shouldDropOutgoingConnection: Boolean = {
getOutgoingConnectionsSize > settings.maxOutgoingConnections
}

private def shouldDropIncomingConnection: Boolean = {
getIncomingConnectionsSize > settings.maxIncomingConnections
}

private def getOutgoingConnectionsSize: Int = {
connections.count { p => p._2.connectionId.direction == Outgoing }
}
Expand All @@ -285,6 +305,10 @@ class NetworkController(settings: NetworkSettings,
connections.count { p => p._2.connectionId.direction == Incoming }
}

private def getForgerConnectionsSize: Int = {
connections.count { p => p._2.peerInfo.exists(_.peerSpec.features.contains(ForgerNodePeerFeature())) }
}

private def connectionToPeer(activeConnections: Map[InetSocketAddress, ConnectedPeer], unconfirmedConnections: Set[InetSocketAddress]): Unit = {
val connectionsAddressSeq = activeConnections.values.flatMap(_.peerInfo).toSeq
val unconfirmedConnectionsAddressSeq = unconfirmedConnections.map(connection => PeerInfo.fromAddress(connection)).toSeq
Expand Down Expand Up @@ -380,19 +404,17 @@ class NetworkController(settings: NetworkSettings,
}
}
val isLocal = NetworkUtils.isLocalAddress(connectionId.remoteAddress.getAddress)
val mandatoryFeatures = if (settings.handlingTransactionsEnabled) {
sparkzContext.features :+ mySessionIdFeature
}
else {
sparkzContext.features :+ mySessionIdFeature :+ TransactionsDisabledPeerFeature()
}
val peerFeatures = if (isLocal) {
val mandatoryFeatures = sparkzContext.features :+ mySessionIdFeature

val maybeTransactionDisabledFeature =
if (settings.handlingTransactionsEnabled) None else Some(TransactionsDisabledPeerFeature())
val maybeLocalAddressFeature = if (isLocal) {
val la = new InetSocketAddress(connectionId.localAddress.getAddress, settings.bindAddress.getPort)
val localAddrFeature = LocalAddressPeerFeature(la)
mandatoryFeatures :+ localAddrFeature
} else {
mandatoryFeatures
}
Some(LocalAddressPeerFeature(la))
} else None
val maybeForgerNodeFeature = if (settings.isForgerNode) Some(ForgerNodePeerFeature()) else None

val peerFeatures = mandatoryFeatures ++ maybeTransactionDisabledFeature ++ maybeLocalAddressFeature ++ maybeForgerNodeFeature
val selfAddressOpt = getNodeAddressForPeer(connectionId.localAddress)

if (selfAddressOpt.isEmpty)
Expand All @@ -417,13 +439,20 @@ class NetworkController(settings: NetworkSettings,
// Drop connection to self if occurred or peer already connected.
// Decision whether connection is local or is from some other network is made
// based on SessionIdPeerFeature if exists or in old way using isSelf() function
val shouldDrop =
var shouldDrop =
connectionForPeerAddress(peerAddress).exists(_.handlerRef != peerHandlerRef) ||
peerInfo.peerSpec.features.collectFirst {
case SessionIdPeerFeature(networkMagic, sessionId) =>
!networkMagic.sameElements(mySessionIdFeature.networkMagic) || sessionId == mySessionIdFeature.sessionId
}.getOrElse(isSelf(remoteAddress))

// We allow temporary overflowing outgoing connection limits to get the peerInfo and see if peer if a forger.
// Drop connection if the peer does not fit in the limits.
val isForgerConnection = peerInfo.peerSpec.features.contains(ForgerNodePeerFeature())

val connectionLimitExhausted = isConnectionLimitExhausted(peerInfo, isForgerConnection)
shouldDrop = shouldDrop || connectionLimitExhausted

if (shouldDrop) {
connectedPeer.handlerRef ! CloseConnection
peerManagerRef ! RemovePeer(peerAddress)
Expand All @@ -438,6 +467,12 @@ class NetworkController(settings: NetworkSettings,
}
}

private[network] def isConnectionLimitExhausted(peerInfo: PeerInfo, isForgerConnection: Boolean) = {
(isForgerConnection && shouldDropForgerConnection) ||
(!isForgerConnection && peerInfo.connectionType.contains(Incoming) && shouldDropIncomingConnection) ||
(!isForgerConnection && peerInfo.connectionType.contains(Outgoing) && shouldDropOutgoingConnection)
}

/**
* Returns connections filtered by given SendingStrategy and Version.
* Exclude all connections with lower version and apply sendingStrategy to remaining connected peers
Expand Down Expand Up @@ -557,6 +592,16 @@ class NetworkController(settings: NetworkSettings,
}
}

private def closeConnectionFromNode(peerAddress: InetSocketAddress): Unit = {
connections = connections.filterNot {
case (address, connectedPeer) if address == peerAddress =>
connectedPeer.handlerRef ! CloseConnection
context.system.eventStream.publish(DisconnectedPeer(peerAddress))
true // exclude the entry from the filtered map
case _ => false // don't modify the connections map
}
}

/**
* Register a new penalty for given peer address.
*/
Expand All @@ -583,6 +628,8 @@ object NetworkController {

case class DisconnectFrom(peer: ConnectedPeer)

case class DisconnectFromNode(address: InetSocketAddress)

case class PenalizePeer(address: InetSocketAddress, penaltyType: PenaltyType)

case class GetFilteredConnectedPeers(sendingStrategy: SendingStrategy, version: Version)
Expand Down
Loading

0 comments on commit e74d093

Please sign in to comment.