Skip to content
This repository has been archived by the owner on Apr 13, 2022. It is now read-only.

Commit

Permalink
[refer #378] Replace lastSeen with lastHandshake and lastMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
knizhnik committed Oct 27, 2020
1 parent 4086aec commit 75f2805
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 35 deletions.
2 changes: 1 addition & 1 deletion src/main/scala/scorex/ObjectGenerators.scala
Expand Up @@ -106,6 +106,6 @@ trait ObjectGenerators {

def connectedPeerGen(peerRef: ActorRef): Gen[ConnectedPeer] = for {
connectionId <- connectionIdGen
} yield ConnectedPeer(connectionId, peerRef, None)
} yield ConnectedPeer(connectionId, peerRef, 0, None)

}
28 changes: 18 additions & 10 deletions src/main/scala/scorex/core/api/http/PeersApiRoute.scala
Expand Up @@ -8,6 +8,7 @@ import io.circe.generic.semiauto._
import io.circe.syntax._
import io.circe.{Encoder, Json}
import scorex.core.api.http.PeersApiRoute.{BlacklistedPeers, PeerInfoResponse, PeersStatusResponse}
import scorex.core.network.ConnectedPeer
import scorex.core.network.NetworkController.ReceivableMessages.{ConnectTo, GetConnectedPeers, GetPeersStatus}
import scorex.core.network.peer.{PeerInfo, PeersStatus}
import scorex.core.network.peer.PeerManager.ReceivableMessages.{GetAllPeers, GetBlacklistedPeers}
Expand Down Expand Up @@ -36,14 +37,19 @@ case class PeersApiRoute(peerManager: ActorRef,
}

def connectedPeers: Route = (path("connected") & get) {
val result = askActor[Seq[PeerInfo]](networkController, GetConnectedPeers).map {
_.map { peerInfo =>
PeerInfoResponse(
address = peerInfo.peerSpec.declaredAddress.map(_.toString).getOrElse(""),
lastSeen = peerInfo.lastSeen,
name = peerInfo.peerSpec.nodeName,
connectionType = peerInfo.connectionType.map(_.toString)
)
val result = askActor[Seq[ConnectedPeer]](networkController, GetConnectedPeers).map {
_.map { con =>
con.peerInfo match {
case Some(peerInfo) =>
PeerInfoResponse(
address = peerInfo.peerSpec.declaredAddress.map(_.toString).getOrElse(""),
lastMessage = con.lastMessage,
lastHandshake = peerInfo.lastHandshake,
name = peerInfo.peerSpec.nodeName,
connectionType = peerInfo.connectionType.map(_.toString)
)
case None => throw new Exception("Peer is not connected")
}
}
}
ApiResponse(result)
Expand Down Expand Up @@ -87,15 +93,17 @@ case class PeersApiRoute(peerManager: ActorRef,
object PeersApiRoute {

case class PeerInfoResponse(address: String,
lastSeen: Long,
lastMessage: Long,
lastHandshake: Long,
name: String,
connectionType: Option[String])

object PeerInfoResponse {

def fromAddressAndInfo(address: InetSocketAddress, peerInfo: PeerInfo): PeerInfoResponse = PeerInfoResponse(
address.toString,
peerInfo.lastSeen,
0,
peerInfo.lastHandshake,
peerInfo.peerSpec.nodeName,
peerInfo.connectionType.map(_.toString)
)
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/scorex/core/network/ConnectedPeer.scala
Expand Up @@ -8,10 +8,12 @@ import scorex.core.network.peer.PeerInfo
*
* @param connectionId - connection address
* @param handlerRef - reference to PeerConnectionHandler that is responsible for communication with this peer
* @param lastMessage - timestamp of last received message
* @param peerInfo - information about this peer. May be None if peer is connected, but is not handshaked yet
*/
case class ConnectedPeer(connectionId: ConnectionId,
handlerRef: ActorRef,
var lastMessage: Long,
peerInfo: Option[PeerInfo]) {

override def hashCode(): Int = connectionId.hashCode()
Expand Down
30 changes: 14 additions & 16 deletions src/main/scala/scorex/core/network/NetworkController.scala
Expand Up @@ -109,14 +109,10 @@ class NetworkController(settings: NetworkSettings,
// Update last seen message timestamps, global and peer's, with the message timestamp
val remoteAddress = remote.connectionId.remoteAddress
connections.get(remoteAddress) match {
case Some(cp) => cp.peerInfo match {
case Some(pi) =>
val now = networkTime()
lastIncomingMessageTime = now
connections += remoteAddress -> cp.copy(peerInfo = Some(pi.copy(lastSeen = now)))
case None =>
log.warn("Peer info not found for a message got from: " + remoteAddress)
}
case Some(cp) =>
val now = networkTime()
lastIncomingMessageTime = now
cp.lastMessage = now
case None => log.warn("Connection not found for a message got from: " + remoteAddress)
}

Expand Down Expand Up @@ -200,7 +196,7 @@ class NetworkController(settings: NetworkSettings,
sender() ! PeersStatusResponse(lastIncomingMessageTime, networkTime())

case GetConnectedPeers =>
sender() ! connections.values.flatMap(_.peerInfo).toSeq
sender() ! connections.values.filter(_.peerInfo.nonEmpty)

case ShutdownNetwork =>
log.info("Going to shutdown all connections & unbind port")
Expand Down Expand Up @@ -242,12 +238,14 @@ class NetworkController(settings: NetworkSettings,
// Drop connections with peers if they seem to be inactive
val now = networkTime()
connections.values.foreach { cp =>
val lastSeen = cp.peerInfo.map(_.lastSeen).getOrElse(now)
val timeout = settings.inactiveConnectionDeadline.toMillis
val delta = now - lastSeen
if (delta > timeout) {
log.info(s"Dropping connection with ${cp.peerInfo}, last seen ${delta / 1000.0} seconds ago")
cp.handlerRef ! CloseConnection
val lastSeen = cp.lastMessage
if (lastSeen != 0) {
val timeout = settings.inactiveConnectionDeadline.toMillis
val delta = now - lastSeen
if (delta > timeout) {
log.info(s"Dropping connection with ${cp.peerInfo}, last seen ${delta / 1000.0} seconds ago")
cp.handlerRef ! CloseConnection
}
}
}
}
Expand Down Expand Up @@ -314,7 +312,7 @@ class NetworkController(settings: NetworkSettings,

val handler = context.actorOf(handlerProps) // launch connection handler
context.watch(handler)
val connectedPeer = ConnectedPeer(connectionId, handler, None)
val connectedPeer = ConnectedPeer(connectionId, handler, 0, None)
connections += connectionId.remoteAddress -> connectedPeer
unconfirmedConnections -= connectionId.remoteAddress
}
Expand Down
Expand Up @@ -81,7 +81,7 @@ class PeerConnectionHandler(val settings: NetworkSettings,
scorexContext.timeProvider.time(),
Some(direction)
)
val peer = ConnectedPeer(connectionDescription.connectionId, self, Some(peerInfo))
val peer = ConnectedPeer(connectionDescription.connectionId, self, 0, Some(peerInfo))
selfPeer = Some(peer)

networkControllerRef ! Handshaked(peerInfo)
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/scorex/core/network/peer/PeerInfo.scala
Expand Up @@ -9,11 +9,11 @@ import scorex.core.network.{ConnectionDirection, PeerSpec}
* Information about peer to be stored in PeerDatabase
*
* @param peerSpec - general information about the peer
* @param lastSeen - timestamp when this peer was last seen in the network
* @param lastHandshake - timestamp when this peer was last seen in the network
* @param connectionType - type of connection (Incoming/Outgoing) established to this peer if any
*/
case class PeerInfo(peerSpec: PeerSpec,
lastSeen: Long,
lastHandshake: Long,
connectionType: Option[ConnectionDirection] = None)

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/scorex/core/network/peer/PeerManager.scala
Expand Up @@ -135,7 +135,7 @@ object PeerManager {
sc: ScorexContext): Seq[PeerInfo] = {
val recentlySeenNonBlacklisted = knownPeers.values.toSeq
.filter { p =>
(p.connectionType.isDefined || p.lastSeen > 0) &&
(p.connectionType.isDefined || p.lastHandshake > 0) &&
!blacklistedPeers.exists(ip => p.peerSpec.declaredAddress.exists(_.getAddress == ip))
}
Random.shuffle(recentlySeenNonBlacklisted).take(howMany)
Expand Down
8 changes: 4 additions & 4 deletions src/test/scala/scorex/network/NetworkControllerSpec.scala
Expand Up @@ -289,15 +289,15 @@ class NetworkControllerSpec extends NetworkTests {
testPeer.sendHandshake(Some(peerAddr), None)

p.send(networkControllerRef, GetConnectedPeers)
val data0 = p.expectMsgClass(classOf[Seq[PeerInfo]])
val ls0 = data0(0).lastSeen
val data0 = p.expectMsgClass(classOf[Seq[ConnectedPeer]])
val ls0 = data0(0).lastMessage

Thread.sleep(1000)
testPeer.sendGetPeers() // send a message to see node's status update then

p.send(networkControllerRef, GetConnectedPeers)
val data = p.expectMsgClass(classOf[Seq[PeerInfo]])
val ls = data(0).lastSeen
val data = p.expectMsgClass(classOf[Seq[ConnectedPeer]])
val ls = data(0).lastMessage
ls should not be ls0

p.send(networkControllerRef, GetPeersStatus)
Expand Down

0 comments on commit 75f2805

Please sign in to comment.