Skip to content
This repository has been archived by the owner on Apr 13, 2022. It is now read-only.

Commit

Permalink
Merge pull request #381 from ScorexFoundation/discovery-improvs
Browse files Browse the repository at this point in the history
Logic for closing connection reworked
  • Loading branch information
kushti committed Nov 3, 2020
2 parents e132359 + 3ca4293 commit 257cd96
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 60 deletions.
9 changes: 5 additions & 4 deletions src/main/scala/scorex/core/network/NetworkController.scala
Expand Up @@ -172,9 +172,9 @@ class NetworkController(settings: NetworkSettings,
case None => log.info("Failed to connect to : " + c.remoteAddress)
}

// If enough live connections, remove not responding peer from database
// In not enough live connections, maybe connectivity lost but the node has not updated its status, no ban then
if(connections.size > settings.maxConnections / 2) {
// If a message received from p2p within connection timeout,
// connectivity is not lost thus we're removing the peer
if(networkTime() - lastIncomingMessageTime < settings.connectionTimeout.toMillis) {
peerManagerRef ! RemovePeer(c.remoteAddress)
}

Expand Down Expand Up @@ -367,7 +367,7 @@ class NetworkController(settings: NetworkSettings,
* @param peerAddress - socket address of peer
* @return Some(ConnectedPeer) when the connection exists for this peer, and None otherwise
*/
private def connectionForPeerAddress(peerAddress: InetSocketAddress) = {
private def connectionForPeerAddress(peerAddress: InetSocketAddress): Option[ConnectedPeer] = {
connections.values.find { connectedPeer =>
connectedPeer.connectionId.remoteAddress == peerAddress ||
connectedPeer.peerInfo.exists(peerInfo => getPeerAddress(peerInfo).contains(peerAddress))
Expand Down Expand Up @@ -537,4 +537,5 @@ object NetworkControllerRef {
props(settings, peerManagerRef, scorexContext, IO(Tcp)),
name)
}

}
65 changes: 26 additions & 39 deletions src/main/scala/scorex/core/network/PeerConnectionHandler.scala
Expand Up @@ -88,7 +88,7 @@ class PeerConnectionHandler(val settings: NetworkSettings,
handshakeTimeoutCancellableOpt.map(_.cancel())
connection ! ResumeReading
context become workingCycleWriting
} orElse handshakeTimeout orElse fatalCommands
} orElse handshakeTimeout orElse closeCommands
}

private def receiveAndHandleHandshake(handler: Handshake => Unit): Receive = {
Expand All @@ -114,18 +114,31 @@ class PeerConnectionHandler(val settings: NetworkSettings,
private def workingCycleWriting: Receive =
localInterfaceWriting orElse
remoteInterface orElse
fatalCommands orElse
closeCommands orElse
reportStrangeInput

private def workingCycleBuffering: Receive =
localInterfaceBuffering orElse
remoteInterface orElse
fatalCommands orElse
closeCommands orElse
reportStrangeInput

private def fatalCommands: Receive = {
case _: ConnectionClosed =>
log.info(s"Connection closed to $connectionId")
private def closeCommands: Receive = {
case CloseConnection =>
log.info(s"Enforced to abort communication with: " + connectionId)
pushAllWithNoAck()
connection ! Abort // we're closing connection without waiting for a confirmation from the peer

case cc: ConnectionClosed =>
// connection closed from either side, actor is shutting down itself
val reason: String = if (cc.isErrorClosed) {
"error: " + cc.getErrorCause
} else if (cc.isPeerClosed) {
"closed by the peer"
} else if (cc.isAborted) {
"aborted locally"
} else ""
log.info(s"Connection closed to $connectionId, reason: " + reason)
context stop self
}

Expand All @@ -141,10 +154,6 @@ class PeerConnectionHandler(val settings: NetworkSettings,
buffer(id, msg)
context become workingCycleBuffering

case CloseConnection =>
log.info(s"Enforced to abort communication with: " + connectionId + ", switching to closing mode")
if (outMessagesBuffer.isEmpty) connection ! Close else context become closingWithNonEmptyBuffer

case ReceivableMessages.Ack(_) => // ignore ACKs in stable mode

case WritingResumed => // ignore in stable mode
Expand All @@ -167,16 +176,12 @@ class PeerConnectionHandler(val settings: NetworkSettings,

case ReceivableMessages.Ack(id) =>
outMessagesBuffer -= id
if (outMessagesBuffer.nonEmpty) writeFirst()
else {
if (outMessagesBuffer.nonEmpty){
writeFirst()
} else {
log.info("Buffered messages processed, exiting buffering mode")
context become workingCycleWriting
}

case CloseConnection =>
log.info(s"Enforced to abort communication with: " + connectionId + s", switching to closing mode")
writeAll()
context become closingWithNonEmptyBuffer
}

def remoteInterface: Receive = {
Expand Down Expand Up @@ -211,25 +216,6 @@ class PeerConnectionHandler(val settings: NetworkSettings,
connection ! ResumeReading
}

def closingWithNonEmptyBuffer: Receive = {
case CommandFailed(_: Write) =>
connection ! ResumeWriting
context.become({
case WritingResumed =>
writeAll()
context.unbecome()
case ReceivableMessages.Ack(id) =>
outMessagesBuffer -= id
}, discardOld = false)

case ReceivableMessages.Ack(id) =>
outMessagesBuffer -= id
if (outMessagesBuffer.isEmpty) connection ! Close

case other =>
log.debug(s"Got $other in closing phase")
}

private def reportStrangeInput: Receive = {
case nonsense =>
log.warn(s"Strange input for PeerConnectionHandler: $nonsense")
Expand All @@ -245,9 +231,10 @@ class PeerConnectionHandler(val settings: NetworkSettings,
}
}

private def writeAll(): Unit = {
outMessagesBuffer.foreach { case (id, msg) =>
connection ! Write(msg, ReceivableMessages.Ack(id))
// Write into the wire all the buffered messages we have for the peer with no ACK
private def pushAllWithNoAck(): Unit = {
outMessagesBuffer.foreach { case (_, msg) =>
connection ! Write(msg, NoAck)
}
}

Expand Down
21 changes: 10 additions & 11 deletions src/main/scala/scorex/core/network/PeerSynchronizer.scala
Expand Up @@ -13,7 +13,6 @@ import shapeless.syntax.typeable._

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.language.postfixOps

/**
* Responsible for discovering and sharing new peers.
Expand All @@ -24,10 +23,9 @@ class PeerSynchronizer(val networkControllerRef: ActorRef,
featureSerializers: PeerFeature.Serializers)
(implicit ec: ExecutionContext) extends Actor with Synchronizer with ScorexLogging {

private implicit val timeout: Timeout = Timeout(settings.syncTimeout.getOrElse(5 seconds))
private val peersSpec = new PeersSpec(featureSerializers, settings.maxPeerSpecObjects)

protected val msgHandlers: PartialFunction[(MessageSpec[_], _, ConnectedPeer), Unit] = {
protected override val msgHandlers: PartialFunction[(MessageSpec[_], _, ConnectedPeer), Unit] = {
case (_: PeersSpec, peers: Seq[PeerSpec]@unchecked, _) if peers.cast[Seq[PeerSpec]].isDefined =>
addNewPeers(peers)

Expand All @@ -54,7 +52,7 @@ class PeerSynchronizer(val networkControllerRef: ActorRef,
case nonsense: Any => log.warn(s"PeerSynchronizer: got unexpected input $nonsense from ${sender()}")
}

override protected def penalizeMaliciousPeer ( peer: ConnectedPeer ): Unit = {
override protected def penalizeMaliciousPeer(peer: ConnectedPeer): Unit = {
networkControllerRef ! PenalizePeer(peer.connectionId.remoteAddress, PenaltyType.PermanentPenalty)
}

Expand All @@ -63,24 +61,25 @@ class PeerSynchronizer(val networkControllerRef: ActorRef,
*
* @param peers sequence of peer specs describing a remote peers details
*/
private def addNewPeers ( peers: Seq[PeerSpec] ): Unit =
if ( peers.cast[Seq[PeerSpec]].isDefined ) {
peers.foreach(peerSpec => peerManager ! AddPeerIfEmpty(peerSpec))
}
private def addNewPeers(peers: Seq[PeerSpec]): Unit = {
peers.foreach(peerSpec => peerManager ! AddPeerIfEmpty(peerSpec))
}

/**
* Handles gossiping about the locally known peer set to a given remote peer
*
* @param remote the remote peer to be informed of our local peers
*/
private def gossipPeers ( remote: ConnectedPeer ): Unit =
private def gossipPeers(remote: ConnectedPeer): Unit = {
implicit val timeout: Timeout = Timeout(settings.syncTimeout.getOrElse(5.seconds))

(peerManager ? SeenPeers(settings.maxPeerSpecObjects))
.mapTo[Seq[PeerInfo]]
.foreach
{ peers =>
.foreach { peers =>
val msg = Message(peersSpec, Right(peers.map(_.peerSpec)), None)
networkControllerRef ! SendToNetwork(msg, SendToPeer(remote))
}
}
}

object PeerSynchronizerRef {
Expand Down
7 changes: 5 additions & 2 deletions src/main/scala/scorex/core/network/Synchronizer.scala
Expand Up @@ -24,8 +24,11 @@ trait Synchronizer extends ScorexLogging {
// if a message could be parsed, match the type of content found and ensure a handler is defined
case Success(content) =>
val parsedMsg = (spec, content, source)
if (msgHandlers.isDefinedAt(parsedMsg)) msgHandlers.apply(parsedMsg)
else log.error(s"Function handler not found for the parsed message: $parsedMsg")
if (msgHandlers.isDefinedAt(parsedMsg)) {
msgHandlers.apply(parsedMsg)
} else {
log.error(s"Function handler not found for the parsed message: $parsedMsg")
}

// if a message could not be parsed, penalize the remote peer
case Failure(e) =>
Expand Down
Expand Up @@ -245,9 +245,15 @@ class HandshakeSpec(featureSerializers: PeerFeature.Serializers, sizeLimit: Int)
override val messageCode: MessageCode = HandshakeSpec.messageCode
override val messageName: String = HandshakeSpec.messageName

override def serialize(obj: Handshake, w: Writer): Unit = {
w.putULong(obj.time)
peersDataSerializer.serialize(obj.peerSpec, w)
/**
* Serializing handshake into a byte writer.
* @param hs - handshake instance
* @param w - writer to write bytes to
*/
override def serialize(hs: Handshake, w: Writer): Unit = {
// first writes down handshake time, then peer specification of our node
w.putULong(hs.time)
peersDataSerializer.serialize(hs.peerSpec, w)
}

override def parse(r: Reader): Handshake = {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/scorex/core/network/peer/PeerManager.scala
Expand Up @@ -63,7 +63,7 @@ class PeerManager(settings: ScorexSettings, scorexContext: ScorexContext) extend
}

case RemovePeer(address) =>
log.info(s"$address removed")
log.info(s"$address removed from peers database")
peerDatabase.remove(address)

case get: GetPeers[_] =>
Expand Down

0 comments on commit 257cd96

Please sign in to comment.