Skip to content
This repository has been archived by the owner on Apr 13, 2022. It is now read-only.

Commit

Permalink
Merge 9f070a8 into 1086cef
Browse files Browse the repository at this point in the history
  • Loading branch information
kushti committed Oct 5, 2020
2 parents 1086cef + 9f070a8 commit f79431b
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 9 deletions.
39 changes: 35 additions & 4 deletions src/main/scala/scorex/core/network/NetworkController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class NetworkController(settings: NetworkSettings,
case Bound(_) =>
log.info("Successfully bound to the port " + settings.bindAddress.getPort)
scheduleConnectionToPeer()
dropDeadConnections()

case CommandFailed(_: Bind) =>
log.error("Network port " + settings.bindAddress.getPort + " already in use!")
Expand All @@ -89,10 +90,21 @@ class NetworkController(settings: NetworkSettings,

private def businessLogic: Receive = {
//a message coming in from another peer
case msg @ Message(spec, _, Some(remote)) =>
case msg@Message(spec, _, Some(remote)) =>
messageHandlers.get(spec.messageCode) match {
case Some(handler) => handler ! msg // forward the message to the appropriate handler for processing
case None => log.error(s"No handlers found for message $remote: " + spec.messageCode)
case None => log.error(s"No handlers found for message $remote: " + spec.messageCode)
}

val remoteAddress = remote.connectionId.remoteAddress
connections.get(remote.connectionId.remoteAddress) match {
case Some(cp) => cp.peerInfo match {
case Some(pi) =>
val now = scorexContext.timeProvider.time()
connections += remoteAddress -> cp.copy(peerInfo = Some(pi.copy(lastSeen = now)))
case None => log.warn("Peer info not found for a message got from: " + remoteAddress)
}
case None => log.warn("Connection not found for a message got from: " + remoteAddress)
}

case SendToNetwork(message, sendingStrategy) =>
Expand Down Expand Up @@ -150,8 +162,11 @@ class NetworkController(settings: NetworkSettings,
case Some(t) => log.info("Failed to connect to : " + c.remoteAddress, t)
case None => log.info("Failed to connect to : " + c.remoteAddress)
}
// remove not responding peer from database
peerManagerRef ! RemovePeer(c.remoteAddress)

// If enough live connections, remove not responding peer from database
if(connections.size > settings.maxConnections / 2) {
peerManagerRef ! RemovePeer(c.remoteAddress)
}

case Terminated(ref) =>
connectionForHandler(ref).foreach { connectedPeer =>
Expand Down Expand Up @@ -193,6 +208,7 @@ class NetworkController(settings: NetworkSettings,
private def scheduleConnectionToPeer(): Unit = {
context.system.scheduler.schedule(5.seconds, 5.seconds) {
if (connections.size < settings.maxConnections) {
log.info(s"Looking for a new random connection")
val randomPeerF = peerManagerRef ? RandomPeerExcluding(connections.values.flatMap(_.peerInfo).toSeq)
randomPeerF.mapTo[Option[PeerInfo]].foreach { peerInfoOpt =>
peerInfoOpt.foreach(peerInfo => self ! ConnectTo(peerInfo))
Expand All @@ -201,6 +217,21 @@ class NetworkController(settings: NetworkSettings,
}
}

private def dropDeadConnections(): Unit = {
context.system.scheduler.schedule(60.seconds, 60.seconds) {
connections.values.filter { cp =>
val now = scorexContext.timeProvider.time()
val lastSeen = cp.peerInfo.map(_.lastSeen).getOrElse(now)
(now - lastSeen) > 1000 * 60 * 2 // 2 minutes
}.foreach { cp =>
val now = scorexContext.timeProvider.time()
val lastSeen = cp.peerInfo.map(_.lastSeen).getOrElse(now)
log.info(s"Dropping connection with ${cp.peerInfo}, last seen ${(now - lastSeen) / 1000} seconds ago")
cp.handlerRef ! CloseConnection
}
}
}

/**
* Connect to peer
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ final class InMemoryPeerDatabase(settings: NetworkSettings, timeProvider: TimePr
override def addOrUpdateKnownPeer(peerInfo: PeerInfo): Unit = {
if (!peerInfo.peerSpec.declaredAddress.exists(x => isBlacklisted(x.getAddress))) {
peerInfo.peerSpec.address.foreach { address =>
log.info(s"Updating peer info for $address")
peers += address -> peerInfo
}
}
Expand Down Expand Up @@ -127,5 +128,4 @@ final class InMemoryPeerDatabase(settings: NetworkSettings, timeProvider: TimePr
case PenaltyType.PermanentPenalty =>
(360 * 10).days.toMillis
}

}
5 changes: 2 additions & 3 deletions src/main/scala/scorex/core/network/peer/PeerManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class PeerManager(settings: ScorexSettings, scorexContext: ScorexContext) extend
// We have received peer data from other peers. It might be modified and should not affect existing data if any
if (peerSpec.address.forall(a => peerDatabase.get(a).isEmpty) && !isSelf(peerSpec)) {
val peerInfo: PeerInfo = PeerInfo(peerSpec, 0, None)
log.info(s"New discovered peer: $peerInfo")
peerDatabase.addOrUpdateKnownPeer(peerInfo)
}

Expand Down Expand Up @@ -127,15 +128,13 @@ object PeerManager {
* were connected in at most 1 hour ago and weren't blacklisted.
*/
case class RecentlySeenPeers(howMany: Int) extends GetPeers[Seq[PeerInfo]] {
private val TimeDiff: Long = 60 * 60 * 1000

override def choose(knownPeers: Map[InetSocketAddress, PeerInfo],
blacklistedPeers: Seq[InetAddress],
sc: ScorexContext): Seq[PeerInfo] = {
val currentTime = sc.timeProvider.time()
val recentlySeenNonBlacklisted = knownPeers.values.toSeq
.filter { p =>
(p.connectionType.isDefined || currentTime - p.lastSeen > TimeDiff) &&
(p.connectionType.isDefined || p.lastSeen > 0) &&
!blacklistedPeers.exists(ip => p.peerSpec.declaredAddress.exists(_.getAddress == ip))
}
Random.shuffle(recentlySeenNonBlacklisted).take(howMany)
Expand Down
34 changes: 33 additions & 1 deletion src/test/scala/scorex/network/NetworkControllerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import org.scalatest.Matchers
import org.scalatest.OptionValues._
import org.scalatest.TryValues._
import scorex.core.app.{ScorexContext, Version}
import scorex.core.network.NetworkController.ReceivableMessages.GetConnectedPeers
import scorex.core.network._
import scorex.core.network.message.{PeersSpec, _}
import scorex.core.network.peer.{LocalAddressPeerFeature, LocalAddressPeerFeatureSerializer, PeerManagerRef}
import scorex.core.network.peer.{LocalAddressPeerFeature, LocalAddressPeerFeatureSerializer, PeerInfo, PeerManagerRef}
import scorex.core.settings.ScorexSettings
import scorex.core.utils.LocalTimeProvider

Expand Down Expand Up @@ -270,6 +271,37 @@ class NetworkControllerSpec extends NetworkTests {
system.terminate()
}

it should "update last-seen on getting message from peer" in {
implicit val system = ActorSystem()
val tcpManagerProbe = TestProbe()
val p = TestProbe("p")(system)

val nodeAddr = new InetSocketAddress("88.77.66.55", 12345)
val settings2 = settings.copy(network = settings.network.copy(bindAddress = nodeAddr))
val networkControllerRef: ActorRef = createNetworkController(settings2, tcpManagerProbe)

val testPeer = new TestPeer(settings2, networkControllerRef, tcpManagerProbe)
val peerAddr = new InetSocketAddress("88.77.66.55", 5678)

testPeer.connect(peerAddr, nodeAddr)
testPeer.receiveHandshake
testPeer.sendHandshake(Some(peerAddr), None)

p.send(networkControllerRef, GetConnectedPeers)
val data0 = p.expectMsgClass(classOf[Seq[PeerInfo]])
val ls0 = data0(0).lastSeen

Thread.sleep(1000)
testPeer.sendGetPeers()
p.send(networkControllerRef, GetConnectedPeers)
val data = p.expectMsgClass(classOf[Seq[PeerInfo]])
val ls = data(0).lastSeen

ls should not be ls0

system.terminate()
}

private def extractLocalAddrFeat(handshakeFromNode: Handshake): Option[InetSocketAddress] = {
handshakeFromNode.peerSpec.localAddressOpt
}
Expand Down

0 comments on commit f79431b

Please sign in to comment.