Skip to content
This repository has been archived by the owner on Apr 13, 2022. It is now read-only.

Commit

Permalink
Merge e7c4dcc into 1086cef
Browse files Browse the repository at this point in the history
  • Loading branch information
kushti committed Oct 4, 2020
2 parents 1086cef + e7c4dcc commit 9d0c214
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 3 deletions.
24 changes: 22 additions & 2 deletions src/main/scala/scorex/core/network/NetworkController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class NetworkController(settings: NetworkSettings,
case Bound(_) =>
log.info("Successfully bound to the port " + settings.bindAddress.getPort)
scheduleConnectionToPeer()
dropDeadConnections()

case CommandFailed(_: Bind) =>
log.error("Network port " + settings.bindAddress.getPort + " already in use!")
Expand All @@ -89,10 +90,14 @@ class NetworkController(settings: NetworkSettings,

private def businessLogic: Receive = {
//a message coming in from another peer
case msg @ Message(spec, _, Some(remote)) =>
case msg@Message(spec, _, Some(remote)) =>
messageHandlers.get(spec.messageCode) match {
case Some(handler) => handler ! msg // forward the message to the appropriate handler for processing
case None => log.error(s"No handlers found for message $remote: " + spec.messageCode)
case None => log.error(s"No handlers found for message $remote: " + spec.messageCode)
}
remote.peerInfo.foreach { pi =>
log.debug(s"Got a message from ${pi.peerSpec.address}, going to update last-seen")
peerManagerRef ! PeerSeen(pi)
}

case SendToNetwork(message, sendingStrategy) =>
Expand Down Expand Up @@ -201,6 +206,21 @@ class NetworkController(settings: NetworkSettings,
}
}

private def dropDeadConnections(): Unit = {
context.system.scheduler.schedule(60.seconds, 60.seconds) {
connections.values.filter{ cp =>
val now = scorexContext.timeProvider.time()
val lastSeen = cp.peerInfo.map(_.lastSeen).getOrElse(now)
(now - lastSeen) > 1000 * 60 * 2 // 2 minutes
}.foreach { cp =>
val now = scorexContext.timeProvider.time()
val lastSeen = cp.peerInfo.map(_.lastSeen).getOrElse(now)
log.info(s"Dropping connection with ${cp.peerInfo}, last seen ${(now-lastSeen) / 1000} seconds ago")
cp.handlerRef ! CloseConnection
}
}
}

/**
* Connect to peer
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ final class InMemoryPeerDatabase(settings: NetworkSettings, timeProvider: TimePr
stillBanned
}

override def peerSeen(peerInfo: PeerInfo): Unit = {
val pi = peerInfo.copy(lastSeen = timeProvider.time())
addOrUpdateKnownPeer(pi)
}

private def penaltyScore(penaltyType: PenaltyType): Int =
penaltyType match {
case PenaltyType.NonDeliveryPenalty =>
Expand All @@ -127,5 +132,4 @@ final class InMemoryPeerDatabase(settings: NetworkSettings, timeProvider: TimePr
case PenaltyType.PermanentPenalty =>
(360 * 10).days.toMillis
}

}
2 changes: 2 additions & 0 deletions src/main/scala/scorex/core/network/peer/PeerDatabase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ trait PeerDatabase {

def addOrUpdateKnownPeer(peerInfo: PeerInfo): Unit

def peerSeen(peerInfo: PeerInfo): Unit

def knownPeers: Map[InetSocketAddress, PeerInfo]

def addToBlacklist(address: InetSocketAddress, penaltyType: PenaltyType): Unit
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/scorex/core/network/peer/PeerManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class PeerManager(settings: ScorexSettings, scorexContext: ScorexContext) extend
// We have connected to a peer and got his peerInfo from him
if (!isSelf(peerInfo.peerSpec)) peerDatabase.addOrUpdateKnownPeer(peerInfo)

case PeerSeen(peerInfo) => peerDatabase.peerSeen(peerInfo)

case Penalize(peer, penaltyType) =>
log.info(s"$peer penalized, penalty: $penaltyType")
if (peerDatabase.penalize(peer, penaltyType)) {
Expand Down Expand Up @@ -109,6 +111,8 @@ object PeerManager {
// peerListOperations messages
case class AddOrUpdatePeer(data: PeerInfo)

case class PeerSeen(peerInfo: PeerInfo)

case class AddPeerIfEmpty(data: PeerSpec)

case class RemovePeer(address: InetSocketAddress)
Expand Down

0 comments on commit 9d0c214

Please sign in to comment.