From 5d8be6dc26283dece641eb909e832cb87a36314c Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 18 Dec 2020 16:22:58 +0300 Subject: [PATCH 1/2] [refer #393] Support periodic random connection eviction --- src/main/resources/reference.conf | 2 ++ .../core/network/NetworkController.scala | 21 +++++++++++++++++-- .../scala/scorex/core/settings/Settings.scala | 3 ++- 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 05a7d5e0c..c5c376b2f 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -183,6 +183,8 @@ scorex { # Max penalty score peer can accumulate before being banned penaltyScoreThreshold = 100 + # interval of evicting random peer to avoid eclipsing + peerEvictInterval = 1h } ntp { diff --git a/src/main/scala/scorex/core/network/NetworkController.scala b/src/main/scala/scorex/core/network/NetworkController.scala index f1abe5789..a3d64f8b3 100644 --- a/src/main/scala/scorex/core/network/NetworkController.scala +++ b/src/main/scala/scorex/core/network/NetworkController.scala @@ -1,7 +1,6 @@ package scorex.core.network import java.net._ - import akka.actor._ import akka.io.Tcp._ import akka.io.{IO, Tcp} @@ -21,7 +20,7 @@ import scorex.util.ScorexLogging import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.language.{existentials, postfixOps} -import scala.util.Try +import scala.util.{Random, Try} /** * Control all network interaction @@ -89,6 +88,7 @@ class NetworkController(settings: NetworkSettings, log.info("Successfully bound to the port " + settings.bindAddress.getPort) scheduleConnectionToPeer() scheduleDroppingDeadConnections() + scheduleEvictRandomConnections() case CommandFailed(_: Bind) => log.error("Network port " + settings.bindAddress.getPort + " already in use!") @@ -231,6 +231,23 @@ class NetworkController(settings: NetworkSettings, } /** + * Schedule a periodic eviction of random connection. + * It is needed to prevent eclipsing (https://www.usenix.org/system/files/conference/usenixsecurity15/sec15-paper-heilman.pdf) + */ + private def scheduleEvictRandomConnections(): Unit = { + context.system.scheduler.scheduleWithFixedDelay(settings.peerEvictInterval, settings.peerEvictInterval) { + () => + val connectedPeers = connections.values.filter(_.peerInfo.nonEmpty).toSeq + if (!connectedPeers.isEmpty) { + val victim = Random.nextInt(connectedPeers.size) + val cp = connectedPeers(victim) + log.info(s"Evict connection to ${cp.peerInfo}") + cp.handlerRef ! CloseConnection + } + } + } + + /** * Schedule a periodic dropping of connections which seem to be inactive */ private def scheduleDroppingDeadConnections(): Unit = { diff --git a/src/main/scala/scorex/core/settings/Settings.scala b/src/main/scala/scorex/core/settings/Settings.scala index 861a3aade..f7b5013cc 100644 --- a/src/main/scala/scorex/core/settings/Settings.scala +++ b/src/main/scala/scorex/core/settings/Settings.scala @@ -50,7 +50,8 @@ case class NetworkSettings(nodeName: String, maxPeerSpecObjects: Int, temporalBanDuration: FiniteDuration, penaltySafeInterval: FiniteDuration, - penaltyScoreThreshold: Int) + penaltyScoreThreshold: Int, + peerEvictInterval: FiniteDuration) case class ScorexSettings(dataDir: File, logDir: File, From d7efdbfc06802396a0bea96e9092d924da06b3f1 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 24 Dec 2020 19:50:07 +0300 Subject: [PATCH 2/2] Try to choose candidates from different groups --- .../scorex/core/network/peer/PeerManager.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/main/scala/scorex/core/network/peer/PeerManager.scala b/src/main/scala/scorex/core/network/peer/PeerManager.scala index b74a0996a..84c989929 100644 --- a/src/main/scala/scorex/core/network/peer/PeerManager.scala +++ b/src/main/scala/scorex/core/network/peer/PeerManager.scala @@ -149,15 +149,24 @@ object PeerManager { sc: ScorexContext): Map[InetSocketAddress, PeerInfo] = knownPeers } + private def getIpGroup(addr : InetSocketAddress) : Int = { + val ip = addr.getAddress.getAddress + val group = ((ip(0) & 0xFF) << 8) | (ip(1) & 0xFF) + group + } + case class RandomPeerExcluding(excludedPeers: Seq[PeerInfo]) extends GetPeers[Option[PeerInfo]] { override def choose(knownPeers: Map[InetSocketAddress, PeerInfo], blacklistedPeers: Seq[InetAddress], sc: ScorexContext): Option[PeerInfo] = { - val candidates = knownPeers.values.filterNot { p => - excludedPeers.exists(_.peerSpec.address == p.peerSpec.address) && - blacklistedPeers.exists(addr => p.peerSpec.address.map(_.getAddress).contains(addr)) + val excludedGroups = excludedPeers.flatMap(_.peerSpec.address).map(getIpGroup(_)).toSet + val allCandidates = knownPeers.values.filterNot { p => + excludedPeers.exists(_.peerSpec.address == p.peerSpec.address) || + blacklistedPeers.exists(addr => p.peerSpec.address.map(_.getAddress).contains(addr)) }.toSeq + val preferredCandidates = allCandidates.filterNot(_.peerSpec.address.fold(true)(addr => excludedGroups.contains(getIpGroup(addr)))) + val candidates = if (preferredCandidates.nonEmpty) preferredCandidates else allCandidates if (candidates.nonEmpty) Some(candidates(Random.nextInt(candidates.size))) else None }