Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.1.0 final #75

Merged
merged 29 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
517c680
SDK-1032 - synctracker statuses map fix
davehorizen Jun 15, 2023
7e938d3
SDK-1032 - closeConnectionFromNode method refactor
davehorizen Jul 3, 2023
acf335a
SDK-1032 - wrong comment removed
davehorizen Jul 4, 2023
8280424
SDK-863: when peer is asking to sync we update our internal status an…
Jul 7, 2023
e071cc0
SDK-863: add peer to statusTracker as soon as nodeViewSynchronizer re…
Jul 17, 2023
70c268d
bump version to 2.1.0-shapshot
paolocappelletti Jul 17, 2023
1c2bfac
Merge pull request #57 from HorizenOfficial/dd/SDK-1032
paolocappelletti Jul 18, 2023
3882c69
Merge pull request #63 from HorizenOfficial/SDK-863
paolocappelletti Jul 18, 2023
bac33fd
Add a flag to force connecting to known peers.
i-skrypnyk Jul 24, 2023
c9ec46b
Preserve the order of block during synchronization
i-skrypnyk Jul 31, 2023
1f85996
Merge branch 'dev' into dev_210
paolocappelletti Aug 4, 2023
09a2b70
Merge pull request #68 from HorizenOfficial/dev_210
paolocappelletti Aug 4, 2023
7e3d318
Core Forgers Network Impl
i-skrypnyk Aug 8, 2023
685e1a4
ModifiersCache Map -> LinkedHashMap
i-skrypnyk Aug 21, 2023
06ddd64
review fixes
i-skrypnyk Aug 23, 2023
faa9b78
Merge pull request #67 from HorizenOfficial/is/SDK-1264_preserve_orde…
paolocappelletti Aug 24, 2023
bf0983a
Merge branch 'dev' into is/SDK-1323_forgers_network_core_impl
i-skrypnyk Aug 29, 2023
6b783e7
review fixes
i-skrypnyk Aug 29, 2023
da34947
review fixes
i-skrypnyk Aug 31, 2023
f660c15
review fixes
i-skrypnyk Sep 1, 2023
a3089e9
Merge pull request #69 from HorizenOfficial/is/SDK-1323_forgers_netwo…
paolocappelletti Sep 2, 2023
79a9da8
Merge remote-tracking branch 'remotes/origin/main' into backport_from…
paolocappelletti Sep 2, 2023
2cf9977
Merge pull request #72 from HorizenOfficial/backport_from_203
paolocappelletti Sep 4, 2023
2161b34
Merge pull request #64 from HorizenOfficial/is/force_connecting_to_kn…
paolocappelletti Sep 4, 2023
d77ae4b
updated release notes
paolocappelletti Sep 4, 2023
f1f9933
Update release-notes.md
paolocappelletti Sep 4, 2023
5576cab
Merge pull request #73 from HorizenOfficial/210_releasenotes
paolocappelletti Sep 4, 2023
29606eb
2.1.0 final
paolocappelletti Oct 2, 2023
86cfd3c
Merge pull request #74 from HorizenOfficial/2_1_0_final
paolocappelletti Oct 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ lazy val commonSettings = Seq(
Wart.OptionPartial),
organization := "io.horizen",
organizationName := "Zen Blockchain Foundation",
version := "2.0.3",
version := "2.1.0",
licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")),
homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")),
pomExtra :=
Expand Down
9 changes: 9 additions & 0 deletions release-notes.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
2.1.0
---------
* Added support for forger's connections prioritization
* Node synchronization improvements:
* Optimization of lookup strategy in modifiersCache
* Preserve the order of block during synchronization
* Added option to force only connecting to known peers
* Fixes/Improvements on the way the SyncTracker handles the internal statuses maps

2.0.3
---------
* Fix in the handshake process - start connecting to nodes only after the Synchronizer is initialized
Expand Down
9 changes: 9 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ sparkz {
# List of IP addresses of well known nodes.
knownPeers = []

# Force node to only try connecting to known peers
onlyConnectToKnownPeers = false

# Interval between GetPeers messages to be send by our node to a random one
getPeersInterval = 2m

Expand All @@ -99,6 +102,9 @@ sparkz {
# Number of outgoing network connections
maxOutgoingConnections = 10

# Number of dedicated connections to forgers. This works in addition to the maxOutgoingConnections ones
maxForgerConnections = 20

# Network connection timeout
connectionTimeout = 1s

Expand Down Expand Up @@ -219,6 +225,9 @@ sparkz {

# Enables transactions in the mempool
handlingTransactionsEnabled = true

# Is this node a forger
isForgerNode = false
}

ntp {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/sparkz/ObjectGenerators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ trait ObjectGenerators {

lazy val modifiersGen: Gen[ModifiersData] = for {
modifierTypeId: ModifierTypeId <- modifierTypeIdGen
modifiers: Map[ModifierId, Array[Byte]] <- Gen.nonEmptyMap(modifierWithIdGen).suchThat(_.nonEmpty)
modifiers: Seq[(ModifierId, Array[Byte])] <- Gen.nonEmptyListOf(modifierWithIdGen).suchThat(_.nonEmpty)
} yield ModifiersData(modifierTypeId, modifiers)

lazy val appVersionGen: Gen[Version] = for {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/sparkz/core/ModifiersCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ trait ModifiersCache[PMOD <: PersistentNodeViewModifier, H <: HistoryReader[PMOD
type K = sparkz.util.ModifierId
type V = PMOD

protected val cache: mutable.Map[K, V] = mutable.Map[K, V]()
protected val cache: mutable.Map[K, V] = mutable.LinkedHashMap[K, V]()

override def modifierById(modifierId: sparkz.util.ModifierId): Option[PMOD] = cache.get(modifierId)

Expand Down
7 changes: 3 additions & 4 deletions src/main/scala/sparkz/core/api/http/PeersApiRoute.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import akka.http.scaladsl.server.Route
import io.circe.generic.auto.exportDecoder
import io.circe.generic.semiauto._
import io.circe.syntax._
import io.circe.{Encoder, Decoder, Json}
import io.circe.{Decoder, Encoder, Json}
import sparkz.core.api.http.PeersApiRoute.PeerApiRequest.AddToBlacklistBodyRequest
import sparkz.core.api.http.PeersApiRoute.Request.ConnectBodyRequest
import sparkz.core.api.http.PeersApiRoute.{BlacklistedPeers, PeerInfoResponse, PeersStatusResponse}
import sparkz.core.network.ConnectedPeer
import sparkz.core.network.NetworkController.ReceivableMessages.{ConnectTo, GetConnectedPeers, GetPeersStatus}
import sparkz.core.network.NetworkController.ReceivableMessages.{ConnectTo, DisconnectFromNode, GetConnectedPeers, GetPeersStatus}
import sparkz.core.network.peer.PeerManager.ReceivableMessages._
import sparkz.core.network.peer.PenaltyType.CustomPenaltyDuration
import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddPeersIfEmpty, GetAllPeers, GetBlacklistedPeers}
import sparkz.core.network.peer.{PeerInfo, PeersStatus}
import sparkz.core.settings.RESTApiSettings
import sparkz.core.utils.NetworkTimeProvider
Expand Down Expand Up @@ -179,7 +178,7 @@ case class PeersApiRoute(peerManager: ActorRef,
val port = addressAndPort.group(2).toInt
val peerAddress = new InetSocketAddress(host, port)
peerManager ! RemovePeer(peerAddress)
networkController ! DisconnectFromAddress(peerAddress)
networkController ! DisconnectFromNode(peerAddress)
ApiResponse.OK
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/sparkz/core/network/ConnectedPeer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ case class ConnectedPeer(connectionId: ConnectionId,
var lastMessage: Long,
peerInfo: Option[PeerInfo]) {

override def hashCode(): Int = connectionId.hashCode()
override def hashCode(): Int = connectionId.remoteAddress.hashCode()

override def equals(obj: Any): Boolean = obj match {
case that: ConnectedPeer => this.connectionId.remoteAddress == that.connectionId.remoteAddress
Expand Down
79 changes: 63 additions & 16 deletions src/main/scala/sparkz/core/network/NetworkController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class NetworkController(settings: NetworkSettings,

private var connections = Map.empty[InetSocketAddress, ConnectedPeer]
private var unconfirmedConnections = Set.empty[InetSocketAddress]
private val maxConnections = settings.maxIncomingConnections + settings.maxOutgoingConnections
private val maxConnections = settings.maxIncomingConnections + settings.maxOutgoingConnections + settings.maxForgerConnections
private val peersLastConnectionAttempts = new LRUSimpleCache[InetSocketAddress, TimeProvider.Time](threshold = 10000)

private val tryNewConnectionAttemptDelay = 5.seconds
Expand Down Expand Up @@ -150,6 +150,10 @@ class NetworkController(settings: NetworkSettings,

case DisconnectFromAddress(peerAddress) =>
closeConnection(peerAddress)

case DisconnectFromNode(peerAddress) =>
closeConnectionFromNode(peerAddress)

}

private def connectionEvents: Receive = {
Expand Down Expand Up @@ -208,7 +212,7 @@ class NetworkController(settings: NetworkSettings,
log.info(s"Unconfirmed connection: ($remoteAddress, $localAddress) => $connectionId")

val handlerRef = sender()
if (connectionDirection.isOutgoing && canEstablishNewOutgoingConnection) {
if (connectionDirection.isOutgoing && (canEstablishNewOutgoingConnection || canEstablishNewForgerConnection)) {
createPeerConnectionHandler(connectionId, handlerRef)
}
else if (connectionDirection.isIncoming && canEstablishNewIncomingConnection)
Expand Down Expand Up @@ -260,7 +264,7 @@ class NetworkController(settings: NetworkSettings,
*/
private def scheduleConnectionToPeer(): Unit = {
val connectionTask: Runnable = () => {
if (canEstablishNewOutgoingConnection) {
if (canEstablishNewOutgoingConnection || canEstablishNewForgerConnection) {
log.trace(s"Looking for a new random connection")
connectionToPeer(connections, unconfirmedConnections)
}
Expand All @@ -277,6 +281,22 @@ class NetworkController(settings: NetworkSettings,
getIncomingConnectionsSize < settings.maxIncomingConnections
}

private def canEstablishNewForgerConnection: Boolean = {
getForgerConnectionsSize < settings.maxForgerConnections
}

private def shouldDropForgerConnection: Boolean = {
getForgerConnectionsSize > settings.maxForgerConnections
}

private def shouldDropOutgoingConnection: Boolean = {
getOutgoingConnectionsSize > settings.maxOutgoingConnections
}

private def shouldDropIncomingConnection: Boolean = {
getIncomingConnectionsSize > settings.maxIncomingConnections
}

private def getOutgoingConnectionsSize: Int = {
connections.count { p => p._2.connectionId.direction == Outgoing }
}
Expand All @@ -285,6 +305,10 @@ class NetworkController(settings: NetworkSettings,
connections.count { p => p._2.connectionId.direction == Incoming }
}

private def getForgerConnectionsSize: Int = {
connections.count { p => p._2.peerInfo.exists(_.peerSpec.features.contains(ForgerNodePeerFeature())) }
}

private def connectionToPeer(activeConnections: Map[InetSocketAddress, ConnectedPeer], unconfirmedConnections: Set[InetSocketAddress]): Unit = {
val connectionsAddressSeq = activeConnections.values.flatMap(_.peerInfo).toSeq
val unconfirmedConnectionsAddressSeq = unconfirmedConnections.map(connection => PeerInfo.fromAddress(connection)).toSeq
Expand Down Expand Up @@ -380,19 +404,17 @@ class NetworkController(settings: NetworkSettings,
}
}
val isLocal = NetworkUtils.isLocalAddress(connectionId.remoteAddress.getAddress)
val mandatoryFeatures = if (settings.handlingTransactionsEnabled) {
sparkzContext.features :+ mySessionIdFeature
}
else {
sparkzContext.features :+ mySessionIdFeature :+ TransactionsDisabledPeerFeature()
}
val peerFeatures = if (isLocal) {
val mandatoryFeatures = sparkzContext.features :+ mySessionIdFeature

val maybeTransactionDisabledFeature =
if (settings.handlingTransactionsEnabled) None else Some(TransactionsDisabledPeerFeature())
val maybeLocalAddressFeature = if (isLocal) {
val la = new InetSocketAddress(connectionId.localAddress.getAddress, settings.bindAddress.getPort)
val localAddrFeature = LocalAddressPeerFeature(la)
mandatoryFeatures :+ localAddrFeature
} else {
mandatoryFeatures
}
Some(LocalAddressPeerFeature(la))
} else None
val maybeForgerNodeFeature = if (settings.isForgerNode) Some(ForgerNodePeerFeature()) else None

val peerFeatures = mandatoryFeatures ++ maybeTransactionDisabledFeature ++ maybeLocalAddressFeature ++ maybeForgerNodeFeature
val selfAddressOpt = getNodeAddressForPeer(connectionId.localAddress)

if (selfAddressOpt.isEmpty)
Expand All @@ -417,13 +439,20 @@ class NetworkController(settings: NetworkSettings,
// Drop connection to self if occurred or peer already connected.
// Decision whether connection is local or is from some other network is made
// based on SessionIdPeerFeature if exists or in old way using isSelf() function
val shouldDrop =
var shouldDrop =
connectionForPeerAddress(peerAddress).exists(_.handlerRef != peerHandlerRef) ||
peerInfo.peerSpec.features.collectFirst {
case SessionIdPeerFeature(networkMagic, sessionId) =>
!networkMagic.sameElements(mySessionIdFeature.networkMagic) || sessionId == mySessionIdFeature.sessionId
}.getOrElse(isSelf(remoteAddress))

// We allow temporary overflowing outgoing connection limits to get the peerInfo and see if peer if a forger.
// Drop connection if the peer does not fit in the limits.
val isForgerConnection = peerInfo.peerSpec.features.contains(ForgerNodePeerFeature())

val connectionLimitExhausted = isConnectionLimitExhausted(peerInfo, isForgerConnection)
shouldDrop = shouldDrop || connectionLimitExhausted

if (shouldDrop) {
connectedPeer.handlerRef ! CloseConnection
peerManagerRef ! RemovePeer(peerAddress)
Expand All @@ -438,6 +467,12 @@ class NetworkController(settings: NetworkSettings,
}
}

private[network] def isConnectionLimitExhausted(peerInfo: PeerInfo, isForgerConnection: Boolean) = {
(isForgerConnection && shouldDropForgerConnection) ||
(!isForgerConnection && peerInfo.connectionType.contains(Incoming) && shouldDropIncomingConnection) ||
(!isForgerConnection && peerInfo.connectionType.contains(Outgoing) && shouldDropOutgoingConnection)
}

/**
* Returns connections filtered by given SendingStrategy and Version.
* Exclude all connections with lower version and apply sendingStrategy to remaining connected peers
Expand Down Expand Up @@ -557,6 +592,16 @@ class NetworkController(settings: NetworkSettings,
}
}

private def closeConnectionFromNode(peerAddress: InetSocketAddress): Unit = {
connections = connections.filterNot {
case (address, connectedPeer) if address == peerAddress =>
connectedPeer.handlerRef ! CloseConnection
context.system.eventStream.publish(DisconnectedPeer(peerAddress))
true // exclude the entry from the filtered map
case _ => false // don't modify the connections map
}
}

/**
* Register a new penalty for given peer address.
*/
Expand All @@ -583,6 +628,8 @@ object NetworkController {

case class DisconnectFrom(peer: ConnectedPeer)

case class DisconnectFromNode(address: InetSocketAddress)

case class PenalizePeer(address: InetSocketAddress, penaltyType: PenaltyType)

case class GetFilteredConnectedPeers(sendingStrategy: SendingStrategy, version: Version)
Expand Down
Loading