Skip to content
This repository has been archived by the owner on Apr 13, 2022. It is now read-only.

Commit

Permalink
Merge pull request #373 from ScorexFoundation/i141
Browse files Browse the repository at this point in the history
Fix for #141: updates for last seen & dropping inactive connections
  • Loading branch information
kushti committed Oct 22, 2020
2 parents ab86f9c + e3140f6 commit 539a4d3
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 19 deletions.
25 changes: 21 additions & 4 deletions src/main/scala/scorex/core/api/http/PeersApiRoute.scala
Expand Up @@ -7,9 +7,9 @@ import akka.http.scaladsl.server.Route
import io.circe.generic.semiauto._
import io.circe.syntax._
import io.circe.{Encoder, Json}
import scorex.core.api.http.PeersApiRoute.{BlacklistedPeers, PeerInfoResponse}
import scorex.core.network.NetworkController.ReceivableMessages.{ConnectTo, GetConnectedPeers}
import scorex.core.network.peer.PeerInfo
import scorex.core.api.http.PeersApiRoute.{BlacklistedPeers, PeerInfoResponse, PeersStatusResponse}
import scorex.core.network.NetworkController.ReceivableMessages.{ConnectTo, GetConnectedPeers, GetPeersStatus}
import scorex.core.network.peer.{PeerInfo, PeersStatus}
import scorex.core.network.peer.PeerManager.ReceivableMessages.{GetAllPeers, GetBlacklistedPeers}
import scorex.core.settings.RESTApiSettings
import scorex.core.utils.NetworkTimeProvider
Expand All @@ -23,7 +23,7 @@ case class PeersApiRoute(peerManager: ActorRef,
(implicit val context: ActorRefFactory, val ec: ExecutionContext) extends ApiRoute {

override lazy val route: Route = pathPrefix("peers") {
allPeers ~ connectedPeers ~ blacklistedPeers ~ connect
allPeers ~ connectedPeers ~ blacklistedPeers ~ connect ~ peersStatus
}

def allPeers: Route = (path("all") & get) {
Expand All @@ -49,6 +49,19 @@ case class PeersApiRoute(peerManager: ActorRef,
ApiResponse(result)
}

/**
* Get status of P2P layer
*
* @return time of last incoming message and network time (got from NTP server)
*/
def peersStatus: Route = (path("status") & get) {
val result = askActor[PeersStatus](networkController, GetPeersStatus).map {
case PeersStatus(lastIncomingMessage, currentNetworkTime) =>
PeersStatusResponse(lastIncomingMessage, currentNetworkTime)
}
ApiResponse(result)
}

private val addressAndPortRegexp = "([\\w\\.]+):(\\d{1,5})".r

def connect: Route = (path("connect") & post & withAuth & entity(as[Json])) { json =>
Expand Down Expand Up @@ -88,6 +101,8 @@ object PeersApiRoute {
)
}

case class PeersStatusResponse(lastIncomingMessage: Long, currentSystemTime: Long)

case class BlacklistedPeers(addresses: Seq[String])

@SuppressWarnings(Array("org.wartremover.warts.PublicInference"))
Expand All @@ -96,5 +111,7 @@ object PeersApiRoute {
@SuppressWarnings(Array("org.wartremover.warts.PublicInference"))
implicit val encodeBlackListedPeers: Encoder[BlacklistedPeers] = deriveEncoder

@SuppressWarnings(Array("org.wartremover.warts.PublicInference"))
implicit val encodePeersStatusResponse: Encoder[PeersStatusResponse] = deriveEncoder
}

70 changes: 64 additions & 6 deletions src/main/scala/scorex/core/network/NetworkController.scala
Expand Up @@ -7,14 +7,16 @@ import akka.io.Tcp._
import akka.io.{IO, Tcp}
import akka.pattern.ask
import akka.util.Timeout
import scorex.core.api.http.PeersApiRoute.PeersStatusResponse
import scorex.core.app.{ScorexContext, Version}
import scorex.core.network.NodeViewSynchronizer.ReceivableMessages.{DisconnectedPeer, HandshakedPeer}
import scorex.core.network.message.Message.MessageCode
import scorex.core.network.message.{Message, MessageSpec}
import scorex.core.network.peer.PeerManager.ReceivableMessages._
import scorex.core.network.peer.{LocalAddressPeerFeature, PeerInfo, PeerManager, PenaltyType}
import scorex.core.settings.NetworkSettings
import scorex.core.utils.NetworkUtils
import scorex.core.utils.TimeProvider.Time
import scorex.core.utils.{NetworkUtils, TimeProvider}
import scorex.util.ScorexLogging

import scala.concurrent.ExecutionContext
Expand Down Expand Up @@ -60,6 +62,12 @@ class NetworkController(settings: NetworkSettings,
private var connections = Map.empty[InetSocketAddress, ConnectedPeer]
private var unconfirmedConnections = Set.empty[InetSocketAddress]

/**
* Storing timestamp of a last message got via p2p network.
* Used to check whether connectivity is lost.
*/
private var lastIncomingMessageTime : TimeProvider.Time = 0L

//check own declared address for validity
validateDeclaredAddress()

Expand All @@ -80,19 +88,36 @@ class NetworkController(settings: NetworkSettings,
case Bound(_) =>
log.info("Successfully bound to the port " + settings.bindAddress.getPort)
scheduleConnectionToPeer()
scheduleDroppingDeadConnections()

case CommandFailed(_: Bind) =>
log.error("Network port " + settings.bindAddress.getPort + " already in use!")
java.lang.System.exit(1) // Terminate node if port is in use
context stop self
}

private def networkTime(): Time = scorexContext.timeProvider.time()

private def businessLogic: Receive = {
//a message coming in from another peer
case msg @ Message(spec, _, Some(remote)) =>
case msg@Message(spec, _, Some(remote)) =>
messageHandlers.get(spec.messageCode) match {
case Some(handler) => handler ! msg // forward the message to the appropriate handler for processing
case None => log.error(s"No handlers found for message $remote: " + spec.messageCode)
case None => log.error(s"No handlers found for message $remote: " + spec.messageCode)
}

// Update last seen message timestamps, global and peer's, with the message timestamp
val remoteAddress = remote.connectionId.remoteAddress
connections.get(remoteAddress) match {
case Some(cp) => cp.peerInfo match {
case Some(pi) =>
val now = networkTime()
lastIncomingMessageTime = now
connections += remoteAddress -> cp.copy(peerInfo = Some(pi.copy(lastSeen = now)))
case None =>
log.warn("Peer info not found for a message got from: " + remoteAddress)
}
case None => log.warn("Connection not found for a message got from: " + remoteAddress)
}

case SendToNetwork(message, sendingStrategy) =>
Expand Down Expand Up @@ -150,8 +175,12 @@ class NetworkController(settings: NetworkSettings,
case Some(t) => log.info("Failed to connect to : " + c.remoteAddress, t)
case None => log.info("Failed to connect to : " + c.remoteAddress)
}
// remove not responding peer from database
peerManagerRef ! RemovePeer(c.remoteAddress)

// If enough live connections, remove not responding peer from database
// In not enough live connections, maybe connectivity lost but the node has not updated its status, no ban then
if(connections.size > settings.maxConnections / 2) {
peerManagerRef ! RemovePeer(c.remoteAddress)
}

case Terminated(ref) =>
connectionForHandler(ref).foreach { connectedPeer =>
Expand All @@ -167,6 +196,9 @@ class NetworkController(settings: NetworkSettings,

//calls from API / application
private def interfaceCalls: Receive = {
case GetPeersStatus =>
sender() ! PeersStatusResponse(lastIncomingMessageTime, networkTime())

case GetConnectedPeers =>
sender() ! connections.values.flatMap(_.peerInfo).toSeq

Expand All @@ -193,6 +225,7 @@ class NetworkController(settings: NetworkSettings,
private def scheduleConnectionToPeer(): Unit = {
context.system.scheduler.schedule(5.seconds, 5.seconds) {
if (connections.size < settings.maxConnections) {
log.debug(s"Looking for a new random connection")
val randomPeerF = peerManagerRef ? RandomPeerExcluding(connections.values.flatMap(_.peerInfo).toSeq)
randomPeerF.mapTo[Option[PeerInfo]].foreach { peerInfoOpt =>
peerInfoOpt.foreach(peerInfo => self ! ConnectTo(peerInfo))
Expand All @@ -201,6 +234,27 @@ class NetworkController(settings: NetworkSettings,
}
}

/**
* Schedule a periodic dropping of connections which seem to be inactive
*/
private def scheduleDroppingDeadConnections(): Unit = {
context.system.scheduler.schedule(60.seconds, 60.seconds) {
// Drop connections with peers if they seem to be inactive
val now = networkTime()
connections.values.foreach { cp =>
val lastSeen = cp.peerInfo.map(_.lastSeen).getOrElse(now)
// A peer should send out sync message to us at least once per settings.syncStatusRefreshStable duration.
// We wait for more, namely settings.syncStatusRefreshStable.toMillis * 3
val timeout = settings.syncStatusRefreshStable.toMillis * 3
val delta = now - lastSeen
if (delta > timeout) {
log.info(s"Dropping connection with ${cp.peerInfo}, last seen ${delta / 1000.0} seconds ago")
cp.handlerRef ! CloseConnection
}
}
}
}

/**
* Connect to peer
*
Expand Down Expand Up @@ -443,6 +497,10 @@ object NetworkController {

case object GetConnectedPeers

/**
* Get p2p network status
*/
case object GetPeersStatus
}

}
Expand Down Expand Up @@ -485,4 +543,4 @@ object NetworkControllerRef {
props(settings, peerManagerRef, scorexContext, IO(Tcp)),
name)
}
}
}
4 changes: 2 additions & 2 deletions src/main/scala/scorex/core/network/PeerSynchronizer.scala
Expand Up @@ -6,7 +6,7 @@ import akka.util.Timeout
import scorex.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork}
import scorex.core.network.message.{GetPeersSpec, Message, MessageSpec, PeersSpec}
import scorex.core.network.peer.{PeerInfo, PenaltyType}
import scorex.core.network.peer.PeerManager.ReceivableMessages.{AddPeerIfEmpty, RecentlySeenPeers}
import scorex.core.network.peer.PeerManager.ReceivableMessages.{AddPeerIfEmpty, SeenPeers}
import scorex.core.settings.NetworkSettings
import scorex.util.ScorexLogging
import shapeless.syntax.typeable._
Expand Down Expand Up @@ -74,7 +74,7 @@ class PeerSynchronizer(val networkControllerRef: ActorRef,
* @param remote the remote peer to be informed of our local peers
*/
private def gossipPeers ( remote: ConnectedPeer ): Unit =
(peerManager ? RecentlySeenPeers(settings.maxPeerSpecObjects))
(peerManager ? SeenPeers(settings.maxPeerSpecObjects))
.mapTo[Seq[PeerInfo]]
.foreach
{ peers =>
Expand Down
Expand Up @@ -31,6 +31,7 @@ final class InMemoryPeerDatabase(settings: NetworkSettings, timeProvider: TimePr
override def addOrUpdateKnownPeer(peerInfo: PeerInfo): Unit = {
if (!peerInfo.peerSpec.declaredAddress.exists(x => isBlacklisted(x.getAddress))) {
peerInfo.peerSpec.address.foreach { address =>
log.debug(s"Updating peer info for $address")
peers += address -> peerInfo
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/scorex/core/network/peer/PeerDatabase.scala
Expand Up @@ -8,6 +8,10 @@ trait PeerDatabase {

def isEmpty: Boolean

/**
* Add peer to the database, or update it
* @param peerInfo - peer record
*/
def addOrUpdateKnownPeer(peerInfo: PeerInfo): Unit

def knownPeers: Map[InetSocketAddress, PeerInfo]
Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/scorex/core/network/peer/PeerInfo.scala
Expand Up @@ -16,6 +16,14 @@ case class PeerInfo(peerSpec: PeerSpec,
lastSeen: Long,
connectionType: Option[ConnectionDirection] = None)

/**
* Information about P2P layer status
*
* @param lastIncomingMessage - timestamp of last received message from any peer
* @param currentNetworkTime - current network time
*/
case class PeersStatus(lastIncomingMessage: Long, currentNetworkTime: Long)

object PeerInfo {

/**
Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/scorex/core/network/peer/PeerManager.scala
Expand Up @@ -58,6 +58,7 @@ class PeerManager(settings: ScorexSettings, scorexContext: ScorexContext) extend
// We have received peer data from other peers. It might be modified and should not affect existing data if any
if (peerSpec.address.forall(a => peerDatabase.get(a).isEmpty) && !isSelf(peerSpec)) {
val peerInfo: PeerInfo = PeerInfo(peerSpec, 0, None)
log.info(s"New discovered peer: $peerInfo")
peerDatabase.addOrUpdateKnownPeer(peerInfo)
}

Expand Down Expand Up @@ -123,19 +124,18 @@ object PeerManager {
}

/**
* Choose at most `howMany` random peers, which are connected to our peer or
* were connected in at most 1 hour ago and weren't blacklisted.
* Choose at most `howMany` random peers, which were connected to our peer and weren't blacklisted.
*
* Used in peer propagation: peers chosen are recommended to a peer asking our node about more peers.
*/
case class RecentlySeenPeers(howMany: Int) extends GetPeers[Seq[PeerInfo]] {
private val TimeDiff: Long = 60 * 60 * 1000
case class SeenPeers(howMany: Int) extends GetPeers[Seq[PeerInfo]] {

override def choose(knownPeers: Map[InetSocketAddress, PeerInfo],
blacklistedPeers: Seq[InetAddress],
sc: ScorexContext): Seq[PeerInfo] = {
val currentTime = sc.timeProvider.time()
val recentlySeenNonBlacklisted = knownPeers.values.toSeq
.filter { p =>
(p.connectionType.isDefined || currentTime - p.lastSeen > TimeDiff) &&
(p.connectionType.isDefined || p.lastSeen > 0) &&
!blacklistedPeers.exists(ip => p.peerSpec.declaredAddress.exists(_.getAddress == ip))
}
Random.shuffle(recentlySeenNonBlacklisted).take(howMany)
Expand Down
40 changes: 39 additions & 1 deletion src/test/scala/scorex/network/NetworkControllerSpec.scala
Expand Up @@ -11,10 +11,12 @@ import org.scalatest.EitherValues._
import org.scalatest.Matchers
import org.scalatest.OptionValues._
import org.scalatest.TryValues._
import scorex.core.api.http.PeersApiRoute.PeersStatusResponse
import scorex.core.app.{ScorexContext, Version}
import scorex.core.network.NetworkController.ReceivableMessages.{GetConnectedPeers, GetPeersStatus}
import scorex.core.network._
import scorex.core.network.message.{PeersSpec, _}
import scorex.core.network.peer.{LocalAddressPeerFeature, LocalAddressPeerFeatureSerializer, PeerManagerRef}
import scorex.core.network.peer.{LocalAddressPeerFeature, LocalAddressPeerFeatureSerializer, PeerInfo, PeerManagerRef, PeersStatus}
import scorex.core.settings.ScorexSettings
import scorex.core.utils.LocalTimeProvider

Expand Down Expand Up @@ -270,6 +272,41 @@ class NetworkControllerSpec extends NetworkTests {
system.terminate()
}

it should "update last-seen on getting message from peer" in {
implicit val system = ActorSystem()
val tcpManagerProbe = TestProbe()
val p = TestProbe("p")(system)

val nodeAddr = new InetSocketAddress("88.77.66.55", 12345)
val settings2 = settings.copy(network = settings.network.copy(bindAddress = nodeAddr))
val networkControllerRef: ActorRef = createNetworkController(settings2, tcpManagerProbe)

val testPeer = new TestPeer(settings2, networkControllerRef, tcpManagerProbe)
val peerAddr = new InetSocketAddress("88.77.66.55", 5678)

testPeer.connect(peerAddr, nodeAddr)
testPeer.receiveHandshake
testPeer.sendHandshake(Some(peerAddr), None)

p.send(networkControllerRef, GetConnectedPeers)
val data0 = p.expectMsgClass(classOf[Seq[PeerInfo]])
val ls0 = data0(0).lastSeen

Thread.sleep(1000)
testPeer.sendGetPeers() // send a message to see node's status update then

p.send(networkControllerRef, GetConnectedPeers)
val data = p.expectMsgClass(classOf[Seq[PeerInfo]])
val ls = data(0).lastSeen
ls should not be ls0

p.send(networkControllerRef, GetPeersStatus)
val status = p.expectMsgClass(classOf[PeersStatusResponse])
status.lastIncomingMessage shouldBe ls

system.terminate()
}

private def extractLocalAddrFeat(handshakeFromNode: Handshake): Option[InetSocketAddress] = {
handshakeFromNode.peerSpec.localAddressOpt
}
Expand Down Expand Up @@ -441,4 +478,5 @@ class TestPeer(settings: ScorexSettings, networkControllerRef: ActorRef, tcpMana
messagesSerializer.deserialize(b, None).success.value.value
}
}

}

0 comments on commit 539a4d3

Please sign in to comment.