Skip to content
This repository has been archived by the owner on Apr 13, 2022. It is now read-only.

Commit

Permalink
Merge 9807db2 into 539a4d3
Browse files Browse the repository at this point in the history
  • Loading branch information
knizhnik committed Nov 2, 2020
2 parents 539a4d3 + 9807db2 commit 02a6e0c
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 33 deletions.
3 changes: 3 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ scorex {
# Synchronization status update interval for stable regime
syncStatusRefreshStable = 4m

# Timeout for dropping dead connections
inactiveConnectionDeadline = 12m

# Network controller timeout
controllerTimeout = 5s

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/scorex/ObjectGenerators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,6 @@ trait ObjectGenerators {

def connectedPeerGen(peerRef: ActorRef): Gen[ConnectedPeer] = for {
connectionId <- connectionIdGen
} yield ConnectedPeer(connectionId, peerRef, None)
} yield ConnectedPeer(connectionId, peerRef, 0, None)

}
26 changes: 16 additions & 10 deletions src/main/scala/scorex/core/api/http/PeersApiRoute.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.circe.generic.semiauto._
import io.circe.syntax._
import io.circe.{Encoder, Json}
import scorex.core.api.http.PeersApiRoute.{BlacklistedPeers, PeerInfoResponse, PeersStatusResponse}
import scorex.core.network.ConnectedPeer
import scorex.core.network.NetworkController.ReceivableMessages.{ConnectTo, GetConnectedPeers, GetPeersStatus}
import scorex.core.network.peer.{PeerInfo, PeersStatus}
import scorex.core.network.peer.PeerManager.ReceivableMessages.{GetAllPeers, GetBlacklistedPeers}
Expand Down Expand Up @@ -36,14 +37,17 @@ case class PeersApiRoute(peerManager: ActorRef,
}

def connectedPeers: Route = (path("connected") & get) {
val result = askActor[Seq[PeerInfo]](networkController, GetConnectedPeers).map {
_.map { peerInfo =>
PeerInfoResponse(
address = peerInfo.peerSpec.declaredAddress.map(_.toString).getOrElse(""),
lastSeen = peerInfo.lastSeen,
name = peerInfo.peerSpec.nodeName,
connectionType = peerInfo.connectionType.map(_.toString)
)
val result = askActor[Seq[ConnectedPeer]](networkController, GetConnectedPeers).map {
_.flatMap { con =>
con.peerInfo.map { peerInfo =>
PeerInfoResponse(
address = peerInfo.peerSpec.declaredAddress.map(_.toString).getOrElse(""),
lastMessage = con.lastMessage,
lastHandshake = peerInfo.lastHandshake,
name = peerInfo.peerSpec.nodeName,
connectionType = peerInfo.connectionType.map(_.toString)
)
}
}
}
ApiResponse(result)
Expand Down Expand Up @@ -87,15 +91,17 @@ case class PeersApiRoute(peerManager: ActorRef,
object PeersApiRoute {

case class PeerInfoResponse(address: String,
lastSeen: Long,
lastMessage: Long,
lastHandshake: Long,
name: String,
connectionType: Option[String])

object PeerInfoResponse {

def fromAddressAndInfo(address: InetSocketAddress, peerInfo: PeerInfo): PeerInfoResponse = PeerInfoResponse(
address.toString,
peerInfo.lastSeen,
0,
peerInfo.lastHandshake,
peerInfo.peerSpec.nodeName,
peerInfo.connectionType.map(_.toString)
)
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/scorex/core/network/ConnectedPeer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import scorex.core.network.peer.PeerInfo
*
* @param connectionId - connection address
* @param handlerRef - reference to PeerConnectionHandler that is responsible for communication with this peer
* @param lastMessage - timestamp of last received message
* @param peerInfo - information about this peer. May be None if peer is connected, but is not handshaked yet
*/
case class ConnectedPeer(connectionId: ConnectionId,
handlerRef: ActorRef,
var lastMessage: Long,
peerInfo: Option[PeerInfo]) {

override def hashCode(): Int = connectionId.hashCode()
Expand Down
22 changes: 8 additions & 14 deletions src/main/scala/scorex/core/network/NetworkController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,10 @@ class NetworkController(settings: NetworkSettings,
// Update last seen message timestamps, global and peer's, with the message timestamp
val remoteAddress = remote.connectionId.remoteAddress
connections.get(remoteAddress) match {
case Some(cp) => cp.peerInfo match {
case Some(pi) =>
val now = networkTime()
lastIncomingMessageTime = now
connections += remoteAddress -> cp.copy(peerInfo = Some(pi.copy(lastSeen = now)))
case None =>
log.warn("Peer info not found for a message got from: " + remoteAddress)
}
case Some(cp) =>
val now = networkTime()
lastIncomingMessageTime = now
cp.lastMessage = now
case None => log.warn("Connection not found for a message got from: " + remoteAddress)
}

Expand Down Expand Up @@ -200,7 +196,7 @@ class NetworkController(settings: NetworkSettings,
sender() ! PeersStatusResponse(lastIncomingMessageTime, networkTime())

case GetConnectedPeers =>
sender() ! connections.values.flatMap(_.peerInfo).toSeq
sender() ! connections.values.filter(_.peerInfo.nonEmpty)

case ShutdownNetwork =>
log.info("Going to shutdown all connections & unbind port")
Expand Down Expand Up @@ -242,10 +238,8 @@ class NetworkController(settings: NetworkSettings,
// Drop connections with peers if they seem to be inactive
val now = networkTime()
connections.values.foreach { cp =>
val lastSeen = cp.peerInfo.map(_.lastSeen).getOrElse(now)
// A peer should send out sync message to us at least once per settings.syncStatusRefreshStable duration.
// We wait for more, namely settings.syncStatusRefreshStable.toMillis * 3
val timeout = settings.syncStatusRefreshStable.toMillis * 3
val lastSeen = cp.lastMessage
val timeout = settings.inactiveConnectionDeadline.toMillis
val delta = now - lastSeen
if (delta > timeout) {
log.info(s"Dropping connection with ${cp.peerInfo}, last seen ${delta / 1000.0} seconds ago")
Expand Down Expand Up @@ -316,7 +310,7 @@ class NetworkController(settings: NetworkSettings,

val handler = context.actorOf(handlerProps) // launch connection handler
context.watch(handler)
val connectedPeer = ConnectedPeer(connectionId, handler, None)
val connectedPeer = ConnectedPeer(connectionId, handler, networkTime(), None)
connections += connectionId.remoteAddress -> connectedPeer
unconfirmedConnections -= connectionId.remoteAddress
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class PeerConnectionHandler(val settings: NetworkSettings,
scorexContext.timeProvider.time(),
Some(direction)
)
val peer = ConnectedPeer(connectionDescription.connectionId, self, Some(peerInfo))
val peer = ConnectedPeer(connectionDescription.connectionId, self, 0, Some(peerInfo))
selfPeer = Some(peer)

networkControllerRef ! Handshaked(peerInfo)
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/scorex/core/network/peer/PeerInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import scorex.core.network.{ConnectionDirection, PeerSpec}
* Information about peer to be stored in PeerDatabase
*
* @param peerSpec - general information about the peer
* @param lastSeen - timestamp when this peer was last seen in the network
* @param lastHandshake - timestamp when last handshake was done
* @param connectionType - type of connection (Incoming/Outgoing) established to this peer if any
*/
case class PeerInfo(peerSpec: PeerSpec,
lastSeen: Long,
lastHandshake: Long,
connectionType: Option[ConnectionDirection] = None)

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/scorex/core/network/peer/PeerManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ object PeerManager {
sc: ScorexContext): Seq[PeerInfo] = {
val recentlySeenNonBlacklisted = knownPeers.values.toSeq
.filter { p =>
(p.connectionType.isDefined || p.lastSeen > 0) &&
(p.connectionType.isDefined || p.lastHandshake > 0) &&
!blacklistedPeers.exists(ip => p.peerSpec.declaredAddress.exists(_.getAddress == ip))
}
Random.shuffle(recentlySeenNonBlacklisted).take(howMany)
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/scorex/core/settings/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ case class NetworkSettings(nodeName: String,
syncStatusRefresh: FiniteDuration,
syncIntervalStable: FiniteDuration,
syncStatusRefreshStable: FiniteDuration,
inactiveConnectionDeadline: FiniteDuration,
syncTimeout: Option[FiniteDuration],
controllerTimeout: Option[FiniteDuration],
maxModifiersCacheSize: Int,
Expand Down
8 changes: 4 additions & 4 deletions src/test/scala/scorex/network/NetworkControllerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,15 +289,15 @@ class NetworkControllerSpec extends NetworkTests {
testPeer.sendHandshake(Some(peerAddr), None)

p.send(networkControllerRef, GetConnectedPeers)
val data0 = p.expectMsgClass(classOf[Seq[PeerInfo]])
val ls0 = data0(0).lastSeen
val data0 = p.expectMsgClass(classOf[Seq[ConnectedPeer]])
val ls0 = data0(0).lastMessage

Thread.sleep(1000)
testPeer.sendGetPeers() // send a message to see node's status update then

p.send(networkControllerRef, GetConnectedPeers)
val data = p.expectMsgClass(classOf[Seq[PeerInfo]])
val ls = data(0).lastSeen
val data = p.expectMsgClass(classOf[Seq[ConnectedPeer]])
val ls = data(0).lastMessage
ls should not be ls0

p.send(networkControllerRef, GetPeersStatus)
Expand Down

0 comments on commit 02a6e0c

Please sign in to comment.