From a6c8ad522a55f79b862cb1a744dec0f8fb5dbcf9 Mon Sep 17 00:00:00 2001 From: Alex Chepurnoy Date: Fri, 2 Oct 2020 18:21:40 +0300 Subject: [PATCH 01/13] lastseen & dropconnections --- .../scorex/core/network/NetworkController.scala | 14 ++++++++++++++ .../core/network/peer/InMemoryPeerDatabase.scala | 6 +++++- .../scorex/core/network/peer/PeerDatabase.scala | 2 ++ .../scorex/core/network/peer/PeerManager.scala | 4 ++++ 4 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/main/scala/scorex/core/network/NetworkController.scala b/src/main/scala/scorex/core/network/NetworkController.scala index 44cc2115a..9fbdc4451 100644 --- a/src/main/scala/scorex/core/network/NetworkController.scala +++ b/src/main/scala/scorex/core/network/NetworkController.scala @@ -80,6 +80,7 @@ class NetworkController(settings: NetworkSettings, case Bound(_) => log.info("Successfully bound to the port " + settings.bindAddress.getPort) scheduleConnectionToPeer() + dropDeadConnections() case CommandFailed(_: Bind) => log.error("Network port " + settings.bindAddress.getPort + " already in use!") @@ -94,6 +95,7 @@ class NetworkController(settings: NetworkSettings, case Some(handler) => handler ! msg // forward the message to the appropriate handler for processing case None => log.error(s"No handlers found for message $remote: " + spec.messageCode) } + remote.peerInfo.foreach(pi => peerManagerRef ! PeerSeen(pi)) case SendToNetwork(message, sendingStrategy) => filterConnections(sendingStrategy, message.spec.protocolVersion).foreach { connectedPeer => @@ -201,6 +203,18 @@ class NetworkController(settings: NetworkSettings, } } + private def dropDeadConnections(): Unit = { + context.system.scheduler.schedule(5.seconds, 15.seconds) { + connections.values.filter{ cp => + val now = scorexContext.timeProvider.time() + val lastSeen = cp.peerInfo.map(_.lastSeen).getOrElse(now) + (now - lastSeen) > 1000 * 90 + }.foreach { cp => + cp.handlerRef ! CloseConnection + } + } + } + /** * Connect to peer * diff --git a/src/main/scala/scorex/core/network/peer/InMemoryPeerDatabase.scala b/src/main/scala/scorex/core/network/peer/InMemoryPeerDatabase.scala index 54361ebac..75e4c561d 100644 --- a/src/main/scala/scorex/core/network/peer/InMemoryPeerDatabase.scala +++ b/src/main/scala/scorex/core/network/peer/InMemoryPeerDatabase.scala @@ -108,6 +108,11 @@ final class InMemoryPeerDatabase(settings: NetworkSettings, timeProvider: TimePr stillBanned } + override def peerSeen(peerInfo: PeerInfo): Unit = { + val pi = peerInfo.copy(lastSeen = timeProvider.time()) + addOrUpdateKnownPeer(pi) + } + private def penaltyScore(penaltyType: PenaltyType): Int = penaltyType match { case PenaltyType.NonDeliveryPenalty => @@ -127,5 +132,4 @@ final class InMemoryPeerDatabase(settings: NetworkSettings, timeProvider: TimePr case PenaltyType.PermanentPenalty => (360 * 10).days.toMillis } - } diff --git a/src/main/scala/scorex/core/network/peer/PeerDatabase.scala b/src/main/scala/scorex/core/network/peer/PeerDatabase.scala index ef09a0b45..05872cfe9 100644 --- a/src/main/scala/scorex/core/network/peer/PeerDatabase.scala +++ b/src/main/scala/scorex/core/network/peer/PeerDatabase.scala @@ -10,6 +10,8 @@ trait PeerDatabase { def addOrUpdateKnownPeer(peerInfo: PeerInfo): Unit + def peerSeen(peerInfo: PeerInfo): Unit + def knownPeers: Map[InetSocketAddress, PeerInfo] def addToBlacklist(address: InetSocketAddress, penaltyType: PenaltyType): Unit diff --git a/src/main/scala/scorex/core/network/peer/PeerManager.scala b/src/main/scala/scorex/core/network/peer/PeerManager.scala index 082f10ad8..04f5c61d9 100644 --- a/src/main/scala/scorex/core/network/peer/PeerManager.scala +++ b/src/main/scala/scorex/core/network/peer/PeerManager.scala @@ -46,6 +46,8 @@ class PeerManager(settings: ScorexSettings, scorexContext: ScorexContext) extend // We have connected to a peer and got his peerInfo from him if (!isSelf(peerInfo.peerSpec)) peerDatabase.addOrUpdateKnownPeer(peerInfo) + case PeerSeen(peerInfo) => peerDatabase.peerSeen(peerInfo) + case Penalize(peer, penaltyType) => log.info(s"$peer penalized, penalty: $penaltyType") if (peerDatabase.penalize(peer, penaltyType)) { @@ -109,6 +111,8 @@ object PeerManager { // peerListOperations messages case class AddOrUpdatePeer(data: PeerInfo) + case class PeerSeen(peerInfo: PeerInfo) + case class AddPeerIfEmpty(data: PeerSpec) case class RemovePeer(address: InetSocketAddress) From e7c4dcc68175020857387f3304209f961198251b Mon Sep 17 00:00:00 2001 From: Alex Chepurnoy Date: Sun, 4 Oct 2020 20:27:37 +0300 Subject: [PATCH 02/13] log messages --- .../scorex/core/network/NetworkController.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/main/scala/scorex/core/network/NetworkController.scala b/src/main/scala/scorex/core/network/NetworkController.scala index 9fbdc4451..badd2ef17 100644 --- a/src/main/scala/scorex/core/network/NetworkController.scala +++ b/src/main/scala/scorex/core/network/NetworkController.scala @@ -90,12 +90,15 @@ class NetworkController(settings: NetworkSettings, private def businessLogic: Receive = { //a message coming in from another peer - case msg @ Message(spec, _, Some(remote)) => + case msg@Message(spec, _, Some(remote)) => messageHandlers.get(spec.messageCode) match { case Some(handler) => handler ! msg // forward the message to the appropriate handler for processing - case None => log.error(s"No handlers found for message $remote: " + spec.messageCode) + case None => log.error(s"No handlers found for message $remote: " + spec.messageCode) + } + remote.peerInfo.foreach { pi => + log.debug(s"Got a message from ${pi.peerSpec.address}, going to update last-seen") + peerManagerRef ! PeerSeen(pi) } - remote.peerInfo.foreach(pi => peerManagerRef ! PeerSeen(pi)) case SendToNetwork(message, sendingStrategy) => filterConnections(sendingStrategy, message.spec.protocolVersion).foreach { connectedPeer => @@ -204,12 +207,15 @@ class NetworkController(settings: NetworkSettings, } private def dropDeadConnections(): Unit = { - context.system.scheduler.schedule(5.seconds, 15.seconds) { + context.system.scheduler.schedule(60.seconds, 60.seconds) { connections.values.filter{ cp => val now = scorexContext.timeProvider.time() val lastSeen = cp.peerInfo.map(_.lastSeen).getOrElse(now) - (now - lastSeen) > 1000 * 90 + (now - lastSeen) > 1000 * 60 * 2 // 2 minutes }.foreach { cp => + val now = scorexContext.timeProvider.time() + val lastSeen = cp.peerInfo.map(_.lastSeen).getOrElse(now) + log.info(s"Dropping connection with ${cp.peerInfo}, last seen ${(now-lastSeen) / 1000} seconds ago") cp.handlerRef ! CloseConnection } } From 9fddae6433d96c6828dabe811d956f9c69c7b909 Mon Sep 17 00:00:00 2001 From: Alex Chepurnoy Date: Sun, 4 Oct 2020 22:06:14 +0300 Subject: [PATCH 03/13] remove peer from db only if there are enough living connections --- .../scala/scorex/core/network/NetworkController.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/scala/scorex/core/network/NetworkController.scala b/src/main/scala/scorex/core/network/NetworkController.scala index badd2ef17..a7efdd4c1 100644 --- a/src/main/scala/scorex/core/network/NetworkController.scala +++ b/src/main/scala/scorex/core/network/NetworkController.scala @@ -155,8 +155,11 @@ class NetworkController(settings: NetworkSettings, case Some(t) => log.info("Failed to connect to : " + c.remoteAddress, t) case None => log.info("Failed to connect to : " + c.remoteAddress) } - // remove not responding peer from database - peerManagerRef ! RemovePeer(c.remoteAddress) + + // If enough live connections, remove not responding peer from database + if(connections.size > settings.maxConnections / 2) { + peerManagerRef ! RemovePeer(c.remoteAddress) + } case Terminated(ref) => connectionForHandler(ref).foreach { connectedPeer => @@ -197,6 +200,7 @@ class NetworkController(settings: NetworkSettings, */ private def scheduleConnectionToPeer(): Unit = { context.system.scheduler.schedule(5.seconds, 5.seconds) { + log.info(s"Has ${connections.size} connections") if (connections.size < settings.maxConnections) { val randomPeerF = peerManagerRef ? RandomPeerExcluding(connections.values.flatMap(_.peerInfo).toSeq) randomPeerF.mapTo[Option[PeerInfo]].foreach { peerInfoOpt => From 8a49a4418339a8f1824765b6fceaace0df15131a Mon Sep 17 00:00:00 2001 From: Alex Chepurnoy Date: Sun, 4 Oct 2020 23:22:36 +0300 Subject: [PATCH 04/13] updating last-seen for connected peers; test --- .../core/network/NetworkController.scala | 19 +++++++---- .../network/peer/InMemoryPeerDatabase.scala | 6 +--- .../core/network/peer/PeerDatabase.scala | 2 -- .../core/network/peer/PeerManager.scala | 4 --- .../network/NetworkControllerSpec.scala | 34 ++++++++++++++++++- 5 files changed, 47 insertions(+), 18 deletions(-) diff --git a/src/main/scala/scorex/core/network/NetworkController.scala b/src/main/scala/scorex/core/network/NetworkController.scala index a7efdd4c1..4913dd56b 100644 --- a/src/main/scala/scorex/core/network/NetworkController.scala +++ b/src/main/scala/scorex/core/network/NetworkController.scala @@ -95,9 +95,16 @@ class NetworkController(settings: NetworkSettings, case Some(handler) => handler ! msg // forward the message to the appropriate handler for processing case None => log.error(s"No handlers found for message $remote: " + spec.messageCode) } - remote.peerInfo.foreach { pi => - log.debug(s"Got a message from ${pi.peerSpec.address}, going to update last-seen") - peerManagerRef ! PeerSeen(pi) + + val remoteAddress = remote.connectionId.remoteAddress + connections.get(remote.connectionId.remoteAddress) match { + case Some(cp) => cp.peerInfo match { + case Some(pi) => + val now = scorexContext.timeProvider.time() + connections += remoteAddress -> cp.copy(peerInfo = Some(pi.copy(lastSeen = now))) + case None => log.warn("Peer info not found for a message got from: " + remoteAddress) + } + case None => log.warn("Connection not found for a message got from: " + remoteAddress) } case SendToNetwork(message, sendingStrategy) => @@ -200,8 +207,8 @@ class NetworkController(settings: NetworkSettings, */ private def scheduleConnectionToPeer(): Unit = { context.system.scheduler.schedule(5.seconds, 5.seconds) { - log.info(s"Has ${connections.size} connections") if (connections.size < settings.maxConnections) { + log.info(s"Looking for a new random connection") val randomPeerF = peerManagerRef ? RandomPeerExcluding(connections.values.flatMap(_.peerInfo).toSeq) randomPeerF.mapTo[Option[PeerInfo]].foreach { peerInfoOpt => peerInfoOpt.foreach(peerInfo => self ! ConnectTo(peerInfo)) @@ -212,14 +219,14 @@ class NetworkController(settings: NetworkSettings, private def dropDeadConnections(): Unit = { context.system.scheduler.schedule(60.seconds, 60.seconds) { - connections.values.filter{ cp => + connections.values.filter { cp => val now = scorexContext.timeProvider.time() val lastSeen = cp.peerInfo.map(_.lastSeen).getOrElse(now) (now - lastSeen) > 1000 * 60 * 2 // 2 minutes }.foreach { cp => val now = scorexContext.timeProvider.time() val lastSeen = cp.peerInfo.map(_.lastSeen).getOrElse(now) - log.info(s"Dropping connection with ${cp.peerInfo}, last seen ${(now-lastSeen) / 1000} seconds ago") + log.info(s"Dropping connection with ${cp.peerInfo}, last seen ${(now - lastSeen) / 1000} seconds ago") cp.handlerRef ! CloseConnection } } diff --git a/src/main/scala/scorex/core/network/peer/InMemoryPeerDatabase.scala b/src/main/scala/scorex/core/network/peer/InMemoryPeerDatabase.scala index 75e4c561d..673098be8 100644 --- a/src/main/scala/scorex/core/network/peer/InMemoryPeerDatabase.scala +++ b/src/main/scala/scorex/core/network/peer/InMemoryPeerDatabase.scala @@ -31,6 +31,7 @@ final class InMemoryPeerDatabase(settings: NetworkSettings, timeProvider: TimePr override def addOrUpdateKnownPeer(peerInfo: PeerInfo): Unit = { if (!peerInfo.peerSpec.declaredAddress.exists(x => isBlacklisted(x.getAddress))) { peerInfo.peerSpec.address.foreach { address => + log.info(s"Updating peer info for $address") peers += address -> peerInfo } } @@ -108,11 +109,6 @@ final class InMemoryPeerDatabase(settings: NetworkSettings, timeProvider: TimePr stillBanned } - override def peerSeen(peerInfo: PeerInfo): Unit = { - val pi = peerInfo.copy(lastSeen = timeProvider.time()) - addOrUpdateKnownPeer(pi) - } - private def penaltyScore(penaltyType: PenaltyType): Int = penaltyType match { case PenaltyType.NonDeliveryPenalty => diff --git a/src/main/scala/scorex/core/network/peer/PeerDatabase.scala b/src/main/scala/scorex/core/network/peer/PeerDatabase.scala index 05872cfe9..ef09a0b45 100644 --- a/src/main/scala/scorex/core/network/peer/PeerDatabase.scala +++ b/src/main/scala/scorex/core/network/peer/PeerDatabase.scala @@ -10,8 +10,6 @@ trait PeerDatabase { def addOrUpdateKnownPeer(peerInfo: PeerInfo): Unit - def peerSeen(peerInfo: PeerInfo): Unit - def knownPeers: Map[InetSocketAddress, PeerInfo] def addToBlacklist(address: InetSocketAddress, penaltyType: PenaltyType): Unit diff --git a/src/main/scala/scorex/core/network/peer/PeerManager.scala b/src/main/scala/scorex/core/network/peer/PeerManager.scala index 04f5c61d9..082f10ad8 100644 --- a/src/main/scala/scorex/core/network/peer/PeerManager.scala +++ b/src/main/scala/scorex/core/network/peer/PeerManager.scala @@ -46,8 +46,6 @@ class PeerManager(settings: ScorexSettings, scorexContext: ScorexContext) extend // We have connected to a peer and got his peerInfo from him if (!isSelf(peerInfo.peerSpec)) peerDatabase.addOrUpdateKnownPeer(peerInfo) - case PeerSeen(peerInfo) => peerDatabase.peerSeen(peerInfo) - case Penalize(peer, penaltyType) => log.info(s"$peer penalized, penalty: $penaltyType") if (peerDatabase.penalize(peer, penaltyType)) { @@ -111,8 +109,6 @@ object PeerManager { // peerListOperations messages case class AddOrUpdatePeer(data: PeerInfo) - case class PeerSeen(peerInfo: PeerInfo) - case class AddPeerIfEmpty(data: PeerSpec) case class RemovePeer(address: InetSocketAddress) diff --git a/src/test/scala/scorex/network/NetworkControllerSpec.scala b/src/test/scala/scorex/network/NetworkControllerSpec.scala index 7083382d0..6f041f30e 100644 --- a/src/test/scala/scorex/network/NetworkControllerSpec.scala +++ b/src/test/scala/scorex/network/NetworkControllerSpec.scala @@ -12,9 +12,10 @@ import org.scalatest.Matchers import org.scalatest.OptionValues._ import org.scalatest.TryValues._ import scorex.core.app.{ScorexContext, Version} +import scorex.core.network.NetworkController.ReceivableMessages.GetConnectedPeers import scorex.core.network._ import scorex.core.network.message.{PeersSpec, _} -import scorex.core.network.peer.{LocalAddressPeerFeature, LocalAddressPeerFeatureSerializer, PeerManagerRef} +import scorex.core.network.peer.{LocalAddressPeerFeature, LocalAddressPeerFeatureSerializer, PeerInfo, PeerManagerRef} import scorex.core.settings.ScorexSettings import scorex.core.utils.LocalTimeProvider @@ -270,6 +271,37 @@ class NetworkControllerSpec extends NetworkTests { system.terminate() } + it should "update last-seen on getting message from peer" in { + implicit val system = ActorSystem() + val tcpManagerProbe = TestProbe() + val p = TestProbe("p")(system) + + val nodeAddr = new InetSocketAddress("88.77.66.55", 12345) + val settings2 = settings.copy(network = settings.network.copy(bindAddress = nodeAddr)) + val networkControllerRef: ActorRef = createNetworkController(settings2, tcpManagerProbe) + + val testPeer = new TestPeer(settings2, networkControllerRef, tcpManagerProbe) + val peerAddr = new InetSocketAddress("88.77.66.55", 5678) + + testPeer.connect(peerAddr, nodeAddr) + testPeer.receiveHandshake + testPeer.sendHandshake(Some(peerAddr), None) + + p.send(networkControllerRef, GetConnectedPeers) + val data0 = p.expectMsgClass(classOf[Seq[PeerInfo]]) + val ls0 = data0(0).lastSeen + + Thread.sleep(1000) + testPeer.sendGetPeers() + p.send(networkControllerRef, GetConnectedPeers) + val data = p.expectMsgClass(classOf[Seq[PeerInfo]]) + val ls = data(0).lastSeen + + ls should not be ls0 + + system.terminate() + } + private def extractLocalAddrFeat(handshakeFromNode: Handshake): Option[InetSocketAddress] = { handshakeFromNode.peerSpec.localAddressOpt } From 9f070a8b261c92807b1664754732d0e140d53fe5 Mon Sep 17 00:00:00 2001 From: Alex Chepurnoy Date: Mon, 5 Oct 2020 09:52:09 +0300 Subject: [PATCH 05/13] fix for RecentlySeenPeers --- src/main/scala/scorex/core/network/peer/PeerManager.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/scala/scorex/core/network/peer/PeerManager.scala b/src/main/scala/scorex/core/network/peer/PeerManager.scala index 082f10ad8..67b83403c 100644 --- a/src/main/scala/scorex/core/network/peer/PeerManager.scala +++ b/src/main/scala/scorex/core/network/peer/PeerManager.scala @@ -58,6 +58,7 @@ class PeerManager(settings: ScorexSettings, scorexContext: ScorexContext) extend // We have received peer data from other peers. It might be modified and should not affect existing data if any if (peerSpec.address.forall(a => peerDatabase.get(a).isEmpty) && !isSelf(peerSpec)) { val peerInfo: PeerInfo = PeerInfo(peerSpec, 0, None) + log.info(s"New discovered peer: $peerInfo") peerDatabase.addOrUpdateKnownPeer(peerInfo) } @@ -127,15 +128,13 @@ object PeerManager { * were connected in at most 1 hour ago and weren't blacklisted. */ case class RecentlySeenPeers(howMany: Int) extends GetPeers[Seq[PeerInfo]] { - private val TimeDiff: Long = 60 * 60 * 1000 override def choose(knownPeers: Map[InetSocketAddress, PeerInfo], blacklistedPeers: Seq[InetAddress], sc: ScorexContext): Seq[PeerInfo] = { - val currentTime = sc.timeProvider.time() val recentlySeenNonBlacklisted = knownPeers.values.toSeq .filter { p => - (p.connectionType.isDefined || currentTime - p.lastSeen > TimeDiff) && + (p.connectionType.isDefined || p.lastSeen > 0) && !blacklistedPeers.exists(ip => p.peerSpec.declaredAddress.exists(_.getAddress == ip)) } Random.shuffle(recentlySeenNonBlacklisted).take(howMany) From 102e2d491d0ffb24a4c876ddca56f8f9beb95b96 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sun, 11 Oct 2020 22:08:05 +0300 Subject: [PATCH 06/13] Add peers/status method --- .../scorex/core/api/http/PeersApiRoute.scala | 19 +++++++++++++++---- .../core/network/NetworkController.scala | 10 +++++++++- .../scorex/core/network/peer/PeerInfo.scala | 8 ++++++++ 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/main/scala/scorex/core/api/http/PeersApiRoute.scala b/src/main/scala/scorex/core/api/http/PeersApiRoute.scala index 2273fe141..ac38dc0e7 100644 --- a/src/main/scala/scorex/core/api/http/PeersApiRoute.scala +++ b/src/main/scala/scorex/core/api/http/PeersApiRoute.scala @@ -7,9 +7,9 @@ import akka.http.scaladsl.server.Route import io.circe.generic.semiauto._ import io.circe.syntax._ import io.circe.{Encoder, Json} -import scorex.core.api.http.PeersApiRoute.{BlacklistedPeers, PeerInfoResponse} -import scorex.core.network.NetworkController.ReceivableMessages.{ConnectTo, GetConnectedPeers} -import scorex.core.network.peer.PeerInfo +import scorex.core.api.http.PeersApiRoute.{BlacklistedPeers, PeerInfoResponse, PeersStatusResponse} +import scorex.core.network.NetworkController.ReceivableMessages.{ConnectTo, GetConnectedPeers, GetPeersStatus} +import scorex.core.network.peer.{PeerInfo, PeersStatus} import scorex.core.network.peer.PeerManager.ReceivableMessages.{GetAllPeers, GetBlacklistedPeers} import scorex.core.settings.RESTApiSettings import scorex.core.utils.NetworkTimeProvider @@ -23,7 +23,7 @@ case class PeersApiRoute(peerManager: ActorRef, (implicit val context: ActorRefFactory, val ec: ExecutionContext) extends ApiRoute { override lazy val route: Route = pathPrefix("peers") { - allPeers ~ connectedPeers ~ blacklistedPeers ~ connect + allPeers ~ connectedPeers ~ blacklistedPeers ~ connect ~ peersStatus } def allPeers: Route = (path("all") & get) { @@ -49,6 +49,13 @@ case class PeersApiRoute(peerManager: ActorRef, ApiResponse(result) } + def peersStatus: Route = (path("status") & get) { + val result = askActor[PeersStatus](networkController, GetPeersStatus).map { + case PeersStatus(lastIncomingMessage,currentNetworkTime) => PeersStatusResponse(lastIncomingMessage,currentNetworkTime) + } + ApiResponse(result) + } + private val addressAndPortRegexp = "([\\w\\.]+):(\\d{1,5})".r def connect: Route = (path("connect") & post & withAuth & entity(as[Json])) { json => @@ -88,6 +95,8 @@ object PeersApiRoute { ) } + case class PeersStatusResponse(lastIncomingMessage: Long, currentSystemTime: Long) + case class BlacklistedPeers(addresses: Seq[String]) @SuppressWarnings(Array("org.wartremover.warts.PublicInference")) @@ -96,5 +105,7 @@ object PeersApiRoute { @SuppressWarnings(Array("org.wartremover.warts.PublicInference")) implicit val encodeBlackListedPeers: Encoder[BlacklistedPeers] = deriveEncoder + @SuppressWarnings(Array("org.wartremover.warts.PublicInference")) + implicit val encodePeersStatusResponse: Encoder[PeersStatusResponse] = deriveEncoder } diff --git a/src/main/scala/scorex/core/network/NetworkController.scala b/src/main/scala/scorex/core/network/NetworkController.scala index 4913dd56b..72fcd9fe3 100644 --- a/src/main/scala/scorex/core/network/NetworkController.scala +++ b/src/main/scala/scorex/core/network/NetworkController.scala @@ -7,6 +7,7 @@ import akka.io.Tcp._ import akka.io.{IO, Tcp} import akka.pattern.ask import akka.util.Timeout +import scorex.core.api.http.PeersApiRoute.PeersStatusResponse import scorex.core.app.{ScorexContext, Version} import scorex.core.network.NodeViewSynchronizer.ReceivableMessages.{DisconnectedPeer, HandshakedPeer} import scorex.core.network.message.Message.MessageCode @@ -14,7 +15,7 @@ import scorex.core.network.message.{Message, MessageSpec} import scorex.core.network.peer.PeerManager.ReceivableMessages._ import scorex.core.network.peer.{LocalAddressPeerFeature, PeerInfo, PeerManager, PenaltyType} import scorex.core.settings.NetworkSettings -import scorex.core.utils.NetworkUtils +import scorex.core.utils.{NetworkUtils, TimeProvider} import scorex.util.ScorexLogging import scala.concurrent.ExecutionContext @@ -60,6 +61,8 @@ class NetworkController(settings: NetworkSettings, private var connections = Map.empty[InetSocketAddress, ConnectedPeer] private var unconfirmedConnections = Set.empty[InetSocketAddress] + private var lastIncomingMessage : TimeProvider.Time = 0 + //check own declared address for validity validateDeclaredAddress() @@ -101,6 +104,7 @@ class NetworkController(settings: NetworkSettings, case Some(cp) => cp.peerInfo match { case Some(pi) => val now = scorexContext.timeProvider.time() + lastIncomingMessage = now connections += remoteAddress -> cp.copy(peerInfo = Some(pi.copy(lastSeen = now))) case None => log.warn("Peer info not found for a message got from: " + remoteAddress) } @@ -182,6 +186,9 @@ class NetworkController(settings: NetworkSettings, //calls from API / application private def interfaceCalls: Receive = { + case GetPeersStatus => + sender() ! PeersStatusResponse(lastIncomingMessage, scorexContext.timeProvider.time()) + case GetConnectedPeers => sender() ! connections.values.flatMap(_.peerInfo).toSeq @@ -474,6 +481,7 @@ object NetworkController { case object GetConnectedPeers + case object GetPeersStatus } } diff --git a/src/main/scala/scorex/core/network/peer/PeerInfo.scala b/src/main/scala/scorex/core/network/peer/PeerInfo.scala index b504d2638..f6aa92da6 100644 --- a/src/main/scala/scorex/core/network/peer/PeerInfo.scala +++ b/src/main/scala/scorex/core/network/peer/PeerInfo.scala @@ -16,6 +16,14 @@ case class PeerInfo(peerSpec: PeerSpec, lastSeen: Long, connectionType: Option[ConnectionDirection] = None) +/** + * Information about all peers + * + * @param lastIncomingMessage - timestamp of last received message from any peer + * @param currentNetworkTime - current network time + */ +case class PeersStatus(lastIncomingMessage: Long, currentNetworkTime: Long) + object PeerInfo { /** From a05971e4cdefb13675c4f43a8b760301e2e5ef67 Mon Sep 17 00:00:00 2001 From: Alex Chepurnoy Date: Thu, 15 Oct 2020 16:40:04 +0300 Subject: [PATCH 07/13] RecentlySeenPeers comments fixed --- src/main/scala/scorex/core/api/http/PeersApiRoute.scala | 4 ++++ src/main/scala/scorex/core/network/peer/PeerInfo.scala | 2 +- src/main/scala/scorex/core/network/peer/PeerManager.scala | 5 +++-- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/main/scala/scorex/core/api/http/PeersApiRoute.scala b/src/main/scala/scorex/core/api/http/PeersApiRoute.scala index ac38dc0e7..45e440f3b 100644 --- a/src/main/scala/scorex/core/api/http/PeersApiRoute.scala +++ b/src/main/scala/scorex/core/api/http/PeersApiRoute.scala @@ -49,6 +49,10 @@ case class PeersApiRoute(peerManager: ActorRef, ApiResponse(result) } + /** + * Get status of P2P layer + * @return time of last incoming message and network time (got from NTP server) + */ def peersStatus: Route = (path("status") & get) { val result = askActor[PeersStatus](networkController, GetPeersStatus).map { case PeersStatus(lastIncomingMessage,currentNetworkTime) => PeersStatusResponse(lastIncomingMessage,currentNetworkTime) diff --git a/src/main/scala/scorex/core/network/peer/PeerInfo.scala b/src/main/scala/scorex/core/network/peer/PeerInfo.scala index f6aa92da6..a8aede604 100644 --- a/src/main/scala/scorex/core/network/peer/PeerInfo.scala +++ b/src/main/scala/scorex/core/network/peer/PeerInfo.scala @@ -17,7 +17,7 @@ case class PeerInfo(peerSpec: PeerSpec, connectionType: Option[ConnectionDirection] = None) /** - * Information about all peers + * Information about P2P layer status * * @param lastIncomingMessage - timestamp of last received message from any peer * @param currentNetworkTime - current network time diff --git a/src/main/scala/scorex/core/network/peer/PeerManager.scala b/src/main/scala/scorex/core/network/peer/PeerManager.scala index 67b83403c..1b19eba66 100644 --- a/src/main/scala/scorex/core/network/peer/PeerManager.scala +++ b/src/main/scala/scorex/core/network/peer/PeerManager.scala @@ -124,8 +124,9 @@ object PeerManager { } /** - * Choose at most `howMany` random peers, which are connected to our peer or - * were connected in at most 1 hour ago and weren't blacklisted. + * Choose at most `howMany` random peers, which were connected to our peer and weren't blacklisted. + * + * Used in peer propagation: peers chosen are recommended to a peer asking our node about more peers. */ case class RecentlySeenPeers(howMany: Int) extends GetPeers[Seq[PeerInfo]] { From b9ab7aebbb619956207ceb059c9dfbe7277f506b Mon Sep 17 00:00:00 2001 From: Alex Chepurnoy Date: Thu, 15 Oct 2020 17:06:28 +0300 Subject: [PATCH 08/13] last-seen test improved --- .../scala/scorex/core/api/http/PeersApiRoute.scala | 6 ++++-- .../scorex/core/network/peer/PeerDatabase.scala | 4 ++++ .../scorex/network/NetworkControllerSpec.scala | 13 +++++++++---- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/main/scala/scorex/core/api/http/PeersApiRoute.scala b/src/main/scala/scorex/core/api/http/PeersApiRoute.scala index 45e440f3b..1b86c46c8 100644 --- a/src/main/scala/scorex/core/api/http/PeersApiRoute.scala +++ b/src/main/scala/scorex/core/api/http/PeersApiRoute.scala @@ -51,11 +51,13 @@ case class PeersApiRoute(peerManager: ActorRef, /** * Get status of P2P layer + * * @return time of last incoming message and network time (got from NTP server) */ - def peersStatus: Route = (path("status") & get) { + def peersStatus: Route = (path("status") & get) { val result = askActor[PeersStatus](networkController, GetPeersStatus).map { - case PeersStatus(lastIncomingMessage,currentNetworkTime) => PeersStatusResponse(lastIncomingMessage,currentNetworkTime) + case PeersStatus(lastIncomingMessage, currentNetworkTime) => + PeersStatusResponse(lastIncomingMessage, currentNetworkTime) } ApiResponse(result) } diff --git a/src/main/scala/scorex/core/network/peer/PeerDatabase.scala b/src/main/scala/scorex/core/network/peer/PeerDatabase.scala index ef09a0b45..7cdead342 100644 --- a/src/main/scala/scorex/core/network/peer/PeerDatabase.scala +++ b/src/main/scala/scorex/core/network/peer/PeerDatabase.scala @@ -8,6 +8,10 @@ trait PeerDatabase { def isEmpty: Boolean + /** + * Add peer to the database, or update it + * @param peerInfo - peer record + */ def addOrUpdateKnownPeer(peerInfo: PeerInfo): Unit def knownPeers: Map[InetSocketAddress, PeerInfo] diff --git a/src/test/scala/scorex/network/NetworkControllerSpec.scala b/src/test/scala/scorex/network/NetworkControllerSpec.scala index 6f041f30e..a2fe05a7f 100644 --- a/src/test/scala/scorex/network/NetworkControllerSpec.scala +++ b/src/test/scala/scorex/network/NetworkControllerSpec.scala @@ -11,11 +11,12 @@ import org.scalatest.EitherValues._ import org.scalatest.Matchers import org.scalatest.OptionValues._ import org.scalatest.TryValues._ +import scorex.core.api.http.PeersApiRoute.PeersStatusResponse import scorex.core.app.{ScorexContext, Version} -import scorex.core.network.NetworkController.ReceivableMessages.GetConnectedPeers +import scorex.core.network.NetworkController.ReceivableMessages.{GetConnectedPeers, GetPeersStatus} import scorex.core.network._ import scorex.core.network.message.{PeersSpec, _} -import scorex.core.network.peer.{LocalAddressPeerFeature, LocalAddressPeerFeatureSerializer, PeerInfo, PeerManagerRef} +import scorex.core.network.peer.{LocalAddressPeerFeature, LocalAddressPeerFeatureSerializer, PeerInfo, PeerManagerRef, PeersStatus} import scorex.core.settings.ScorexSettings import scorex.core.utils.LocalTimeProvider @@ -292,13 +293,17 @@ class NetworkControllerSpec extends NetworkTests { val ls0 = data0(0).lastSeen Thread.sleep(1000) - testPeer.sendGetPeers() + testPeer.sendGetPeers() // send a message to see node's status update then + p.send(networkControllerRef, GetConnectedPeers) val data = p.expectMsgClass(classOf[Seq[PeerInfo]]) val ls = data(0).lastSeen - ls should not be ls0 + p.send(networkControllerRef, GetPeersStatus) + val status = p.expectMsgClass(classOf[PeersStatusResponse]) + status.lastIncomingMessage shouldBe ls + system.terminate() } From f920be38b58b899fe5d87addc5bdbb6a177a1f7d Mon Sep 17 00:00:00 2001 From: Alex Chepurnoy Date: Thu, 15 Oct 2020 18:20:39 +0300 Subject: [PATCH 09/13] conectivity, flexible disconnecting time --- .../core/network/NetworkController.scala | 40 +++++++++++++------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/src/main/scala/scorex/core/network/NetworkController.scala b/src/main/scala/scorex/core/network/NetworkController.scala index 72fcd9fe3..96fac41da 100644 --- a/src/main/scala/scorex/core/network/NetworkController.scala +++ b/src/main/scala/scorex/core/network/NetworkController.scala @@ -15,6 +15,7 @@ import scorex.core.network.message.{Message, MessageSpec} import scorex.core.network.peer.PeerManager.ReceivableMessages._ import scorex.core.network.peer.{LocalAddressPeerFeature, PeerInfo, PeerManager, PenaltyType} import scorex.core.settings.NetworkSettings +import scorex.core.utils.TimeProvider.Time import scorex.core.utils.{NetworkUtils, TimeProvider} import scorex.util.ScorexLogging @@ -61,7 +62,11 @@ class NetworkController(settings: NetworkSettings, private var connections = Map.empty[InetSocketAddress, ConnectedPeer] private var unconfirmedConnections = Set.empty[InetSocketAddress] - private var lastIncomingMessage : TimeProvider.Time = 0 + /** + * Storing timestamp of a last message got via p2p network. + * Used to check whether connectivity is lost. + */ + private var lastIncomingMessageTime : TimeProvider.Time = 0L //check own declared address for validity validateDeclaredAddress() @@ -91,6 +96,14 @@ class NetworkController(settings: NetworkSettings, context stop self } + def networkTime(): Time = scorexContext.timeProvider.time() + + // Checks that connectivity is not lost + private def connectivity: Boolean = { + connections.nonEmpty && + networkTime() < (lastIncomingMessageTime + settings.syncStatusRefreshStable.toMillis) + } + private def businessLogic: Receive = { //a message coming in from another peer case msg@Message(spec, _, Some(remote)) => @@ -103,8 +116,8 @@ class NetworkController(settings: NetworkSettings, connections.get(remote.connectionId.remoteAddress) match { case Some(cp) => cp.peerInfo match { case Some(pi) => - val now = scorexContext.timeProvider.time() - lastIncomingMessage = now + val now = networkTime() + lastIncomingMessageTime = now connections += remoteAddress -> cp.copy(peerInfo = Some(pi.copy(lastSeen = now))) case None => log.warn("Peer info not found for a message got from: " + remoteAddress) } @@ -168,6 +181,7 @@ class NetworkController(settings: NetworkSettings, } // If enough live connections, remove not responding peer from database + // In not enough live connections, maybe connectivity lost but the node has not updated its status, no ban then if(connections.size > settings.maxConnections / 2) { peerManagerRef ! RemovePeer(c.remoteAddress) } @@ -187,7 +201,7 @@ class NetworkController(settings: NetworkSettings, //calls from API / application private def interfaceCalls: Receive = { case GetPeersStatus => - sender() ! PeersStatusResponse(lastIncomingMessage, scorexContext.timeProvider.time()) + sender() ! PeersStatusResponse(lastIncomingMessageTime, networkTime()) case GetConnectedPeers => sender() ! connections.values.flatMap(_.peerInfo).toSeq @@ -226,15 +240,14 @@ class NetworkController(settings: NetworkSettings, private def dropDeadConnections(): Unit = { context.system.scheduler.schedule(60.seconds, 60.seconds) { - connections.values.filter { cp => - val now = scorexContext.timeProvider.time() - val lastSeen = cp.peerInfo.map(_.lastSeen).getOrElse(now) - (now - lastSeen) > 1000 * 60 * 2 // 2 minutes - }.foreach { cp => - val now = scorexContext.timeProvider.time() + // Drop connections with peers if they seem to be inactive + val now = networkTime() + connections.values.foreach { cp => val lastSeen = cp.peerInfo.map(_.lastSeen).getOrElse(now) - log.info(s"Dropping connection with ${cp.peerInfo}, last seen ${(now - lastSeen) / 1000} seconds ago") - cp.handlerRef ! CloseConnection + if ((now - lastSeen) > settings.syncStatusRefreshStable.toMillis) { + log.info(s"Dropping connection with ${cp.peerInfo}, last seen ${(now - lastSeen) / 1000} seconds ago") + cp.handlerRef ! CloseConnection + } } } } @@ -481,6 +494,9 @@ object NetworkController { case object GetConnectedPeers + /** + * Get p2p network status + */ case object GetPeersStatus } From 09df1d40a61a990ac0ef7c8348c8dd684650b3d1 Mon Sep 17 00:00:00 2001 From: Alex Chepurnoy Date: Sat, 17 Oct 2020 00:32:51 +0300 Subject: [PATCH 10/13] global connectivity check removed --- .../scorex/core/network/NetworkController.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/main/scala/scorex/core/network/NetworkController.scala b/src/main/scala/scorex/core/network/NetworkController.scala index 96fac41da..7ce4f0f4b 100644 --- a/src/main/scala/scorex/core/network/NetworkController.scala +++ b/src/main/scala/scorex/core/network/NetworkController.scala @@ -88,7 +88,7 @@ class NetworkController(settings: NetworkSettings, case Bound(_) => log.info("Successfully bound to the port " + settings.bindAddress.getPort) scheduleConnectionToPeer() - dropDeadConnections() + scheduleDroppingDeadConnections() case CommandFailed(_: Bind) => log.error("Network port " + settings.bindAddress.getPort + " already in use!") @@ -98,12 +98,6 @@ class NetworkController(settings: NetworkSettings, def networkTime(): Time = scorexContext.timeProvider.time() - // Checks that connectivity is not lost - private def connectivity: Boolean = { - connections.nonEmpty && - networkTime() < (lastIncomingMessageTime + settings.syncStatusRefreshStable.toMillis) - } - private def businessLogic: Receive = { //a message coming in from another peer case msg@Message(spec, _, Some(remote)) => @@ -238,7 +232,10 @@ class NetworkController(settings: NetworkSettings, } } - private def dropDeadConnections(): Unit = { + /** + * Schedule a periodic dropping of connections which seem to be inactive + */ + private def scheduleDroppingDeadConnections(): Unit = { context.system.scheduler.schedule(60.seconds, 60.seconds) { // Drop connections with peers if they seem to be inactive val now = networkTime() From 53efc6577666dc7cc82cbaf553e6842472cc2085 Mon Sep 17 00:00:00 2001 From: Alex Chepurnoy Date: Wed, 21 Oct 2020 22:56:21 +0300 Subject: [PATCH 11/13] final edits --- .../scorex/core/network/NetworkController.scala | 16 +++++++++++----- .../scorex/core/network/PeerSynchronizer.scala | 4 ++-- .../core/network/peer/InMemoryPeerDatabase.scala | 3 ++- .../scorex/core/network/peer/PeerManager.scala | 2 +- .../scorex/network/NetworkControllerSpec.scala | 1 + 5 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/main/scala/scorex/core/network/NetworkController.scala b/src/main/scala/scorex/core/network/NetworkController.scala index 7ce4f0f4b..cae5d3252 100644 --- a/src/main/scala/scorex/core/network/NetworkController.scala +++ b/src/main/scala/scorex/core/network/NetworkController.scala @@ -96,7 +96,7 @@ class NetworkController(settings: NetworkSettings, context stop self } - def networkTime(): Time = scorexContext.timeProvider.time() + private def networkTime(): Time = scorexContext.timeProvider.time() private def businessLogic: Receive = { //a message coming in from another peer @@ -106,6 +106,7 @@ class NetworkController(settings: NetworkSettings, case None => log.error(s"No handlers found for message $remote: " + spec.messageCode) } + // Update last seen message timestamps, global and peer's, with the message timestamp val remoteAddress = remote.connectionId.remoteAddress connections.get(remote.connectionId.remoteAddress) match { case Some(cp) => cp.peerInfo match { @@ -113,7 +114,8 @@ class NetworkController(settings: NetworkSettings, val now = networkTime() lastIncomingMessageTime = now connections += remoteAddress -> cp.copy(peerInfo = Some(pi.copy(lastSeen = now))) - case None => log.warn("Peer info not found for a message got from: " + remoteAddress) + case None => + log.warn("Peer info not found for a message got from: " + remoteAddress) } case None => log.warn("Connection not found for a message got from: " + remoteAddress) } @@ -223,7 +225,7 @@ class NetworkController(settings: NetworkSettings, private def scheduleConnectionToPeer(): Unit = { context.system.scheduler.schedule(5.seconds, 5.seconds) { if (connections.size < settings.maxConnections) { - log.info(s"Looking for a new random connection") + log.debug(s"Looking for a new random connection") val randomPeerF = peerManagerRef ? RandomPeerExcluding(connections.values.flatMap(_.peerInfo).toSeq) randomPeerF.mapTo[Option[PeerInfo]].foreach { peerInfoOpt => peerInfoOpt.foreach(peerInfo => self ! ConnectTo(peerInfo)) @@ -241,8 +243,12 @@ class NetworkController(settings: NetworkSettings, val now = networkTime() connections.values.foreach { cp => val lastSeen = cp.peerInfo.map(_.lastSeen).getOrElse(now) - if ((now - lastSeen) > settings.syncStatusRefreshStable.toMillis) { - log.info(s"Dropping connection with ${cp.peerInfo}, last seen ${(now - lastSeen) / 1000} seconds ago") + // A peer should send out sync message to us at least once per settings.syncStatusRefreshStable duration. + // We wait for more, namely settings.syncStatusRefreshStable.toMillis * 2 + val timeout = settings.syncStatusRefreshStable.toMillis * 2 + val delta = now - lastSeen + if (delta > timeout) { + log.info(s"Dropping connection with ${cp.peerInfo}, last seen ${delta / 1000.0} seconds ago") cp.handlerRef ! CloseConnection } } diff --git a/src/main/scala/scorex/core/network/PeerSynchronizer.scala b/src/main/scala/scorex/core/network/PeerSynchronizer.scala index 98cbce998..43704c080 100644 --- a/src/main/scala/scorex/core/network/PeerSynchronizer.scala +++ b/src/main/scala/scorex/core/network/PeerSynchronizer.scala @@ -6,7 +6,7 @@ import akka.util.Timeout import scorex.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork} import scorex.core.network.message.{GetPeersSpec, Message, MessageSpec, PeersSpec} import scorex.core.network.peer.{PeerInfo, PenaltyType} -import scorex.core.network.peer.PeerManager.ReceivableMessages.{AddPeerIfEmpty, RecentlySeenPeers} +import scorex.core.network.peer.PeerManager.ReceivableMessages.{AddPeerIfEmpty, SeenPeers} import scorex.core.settings.NetworkSettings import scorex.util.ScorexLogging import shapeless.syntax.typeable._ @@ -74,7 +74,7 @@ class PeerSynchronizer(val networkControllerRef: ActorRef, * @param remote the remote peer to be informed of our local peers */ private def gossipPeers ( remote: ConnectedPeer ): Unit = - (peerManager ? RecentlySeenPeers(settings.maxPeerSpecObjects)) + (peerManager ? SeenPeers(settings.maxPeerSpecObjects)) .mapTo[Seq[PeerInfo]] .foreach { peers => diff --git a/src/main/scala/scorex/core/network/peer/InMemoryPeerDatabase.scala b/src/main/scala/scorex/core/network/peer/InMemoryPeerDatabase.scala index 673098be8..e25da6d0c 100644 --- a/src/main/scala/scorex/core/network/peer/InMemoryPeerDatabase.scala +++ b/src/main/scala/scorex/core/network/peer/InMemoryPeerDatabase.scala @@ -31,7 +31,7 @@ final class InMemoryPeerDatabase(settings: NetworkSettings, timeProvider: TimePr override def addOrUpdateKnownPeer(peerInfo: PeerInfo): Unit = { if (!peerInfo.peerSpec.declaredAddress.exists(x => isBlacklisted(x.getAddress))) { peerInfo.peerSpec.address.foreach { address => - log.info(s"Updating peer info for $address") + log.debug(s"Updating peer info for $address") peers += address -> peerInfo } } @@ -128,4 +128,5 @@ final class InMemoryPeerDatabase(settings: NetworkSettings, timeProvider: TimePr case PenaltyType.PermanentPenalty => (360 * 10).days.toMillis } + } diff --git a/src/main/scala/scorex/core/network/peer/PeerManager.scala b/src/main/scala/scorex/core/network/peer/PeerManager.scala index 1b19eba66..ec79dbf8f 100644 --- a/src/main/scala/scorex/core/network/peer/PeerManager.scala +++ b/src/main/scala/scorex/core/network/peer/PeerManager.scala @@ -128,7 +128,7 @@ object PeerManager { * * Used in peer propagation: peers chosen are recommended to a peer asking our node about more peers. */ - case class RecentlySeenPeers(howMany: Int) extends GetPeers[Seq[PeerInfo]] { + case class SeenPeers(howMany: Int) extends GetPeers[Seq[PeerInfo]] { override def choose(knownPeers: Map[InetSocketAddress, PeerInfo], blacklistedPeers: Seq[InetAddress], diff --git a/src/test/scala/scorex/network/NetworkControllerSpec.scala b/src/test/scala/scorex/network/NetworkControllerSpec.scala index a2fe05a7f..5c3629a51 100644 --- a/src/test/scala/scorex/network/NetworkControllerSpec.scala +++ b/src/test/scala/scorex/network/NetworkControllerSpec.scala @@ -478,4 +478,5 @@ class TestPeer(settings: ScorexSettings, networkControllerRef: ActorRef, tcpMana messagesSerializer.deserialize(b, None).success.value.value } } + } From 113e14b65f4d335c353c06c3e6b19039b14f50e8 Mon Sep 17 00:00:00 2001 From: Alexander Chepurnoy Date: Thu, 22 Oct 2020 10:39:06 +0300 Subject: [PATCH 12/13] Update src/main/scala/scorex/core/network/NetworkController.scala Co-authored-by: Alexander Slesarenko --- src/main/scala/scorex/core/network/NetworkController.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/scorex/core/network/NetworkController.scala b/src/main/scala/scorex/core/network/NetworkController.scala index cae5d3252..0530662c2 100644 --- a/src/main/scala/scorex/core/network/NetworkController.scala +++ b/src/main/scala/scorex/core/network/NetworkController.scala @@ -108,7 +108,7 @@ class NetworkController(settings: NetworkSettings, // Update last seen message timestamps, global and peer's, with the message timestamp val remoteAddress = remote.connectionId.remoteAddress - connections.get(remote.connectionId.remoteAddress) match { + connections.get(remoteAddress) match { case Some(cp) => cp.peerInfo match { case Some(pi) => val now = networkTime() @@ -543,4 +543,4 @@ object NetworkControllerRef { props(settings, peerManagerRef, scorexContext, IO(Tcp)), name) } -} \ No newline at end of file +} From c1733b2f12eaa5e738dc53aa5cbc7529e59b9245 Mon Sep 17 00:00:00 2001 From: Alex Chepurnoy Date: Thu, 22 Oct 2020 10:41:26 +0300 Subject: [PATCH 13/13] syncStatusRefreshStable.toMillis * 3 --- src/main/scala/scorex/core/network/NetworkController.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/scorex/core/network/NetworkController.scala b/src/main/scala/scorex/core/network/NetworkController.scala index cae5d3252..ed72ff439 100644 --- a/src/main/scala/scorex/core/network/NetworkController.scala +++ b/src/main/scala/scorex/core/network/NetworkController.scala @@ -244,8 +244,8 @@ class NetworkController(settings: NetworkSettings, connections.values.foreach { cp => val lastSeen = cp.peerInfo.map(_.lastSeen).getOrElse(now) // A peer should send out sync message to us at least once per settings.syncStatusRefreshStable duration. - // We wait for more, namely settings.syncStatusRefreshStable.toMillis * 2 - val timeout = settings.syncStatusRefreshStable.toMillis * 2 + // We wait for more, namely settings.syncStatusRefreshStable.toMillis * 3 + val timeout = settings.syncStatusRefreshStable.toMillis * 3 val delta = now - lastSeen if (delta > timeout) { log.info(s"Dropping connection with ${cp.peerInfo}, last seen ${delta / 1000.0} seconds ago")