Skip to content
This repository has been archived by the owner on Apr 13, 2022. It is now read-only.

Commit

Permalink
Merge branch 'master' into license-update-to-apache
Browse files Browse the repository at this point in the history
  • Loading branch information
kushti committed Nov 27, 2020
2 parents 1506895 + a7c6fd9 commit f4cf5c7
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 13 deletions.
30 changes: 19 additions & 11 deletions src/main/scala/scorex/core/network/NetworkController.scala
Expand Up @@ -7,13 +7,12 @@ import akka.io.Tcp._
import akka.io.{IO, Tcp}
import akka.pattern.ask
import akka.util.Timeout
import scorex.core.api.http.PeersApiRoute.PeersStatusResponse
import scorex.core.app.{ScorexContext, Version}
import scorex.core.network.NodeViewSynchronizer.ReceivableMessages.{DisconnectedPeer, HandshakedPeer}
import scorex.core.network.message.Message.MessageCode
import scorex.core.network.message.{Message, MessageSpec}
import scorex.core.network.peer.PeerManager.ReceivableMessages._
import scorex.core.network.peer.{LocalAddressPeerFeature, PeerInfo, PeerManager, PeersStatus, PenaltyType}
import scorex.core.network.peer.{LocalAddressPeerFeature, PeerInfo, PeerManager, PeersStatus, PenaltyType, SessionIdPeerFeature}
import scorex.core.settings.NetworkSettings
import scorex.core.utils.TimeProvider.Time
import scorex.core.utils.{NetworkUtils, TimeProvider}
Expand Down Expand Up @@ -62,6 +61,7 @@ class NetworkController(settings: NetworkSettings,
private var connections = Map.empty[InetSocketAddress, ConnectedPeer]
private var unconfirmedConnections = Set.empty[InetSocketAddress]

private val mySessionIdFeature = SessionIdPeerFeature(settings.magicBytes)
/**
* Storing timestamp of a last message got via p2p network.
* Used to check whether connectivity is lost.
Expand Down Expand Up @@ -292,14 +292,16 @@ class NetworkController(settings: NetworkSettings,
s"New outgoing connection to ${connectionId.remoteAddress} established (bound to local ${connectionId.localAddress})"
}
}

val isLocal = connectionId.remoteAddress.getAddress.isSiteLocalAddress ||
connectionId.remoteAddress.getAddress.isLoopbackAddress
val peerFeatures =
if (isLocal) scorexContext.features :+ LocalAddressPeerFeature(
new InetSocketAddress(connectionId.localAddress.getAddress, settings.bindAddress.getPort))
else scorexContext.features

val mandatoryFeatures = scorexContext.features :+ mySessionIdFeature
val peerFeatures = if (isLocal) {
val la = new InetSocketAddress(connectionId.localAddress.getAddress, settings.bindAddress.getPort)
val localAddrFeature = LocalAddressPeerFeature(la)
mandatoryFeatures :+ localAddrFeature
} else {
mandatoryFeatures
}
val selfAddressOpt = getNodeAddressForPeer(connectionId.localAddress)

if (selfAddressOpt.isEmpty)
Expand All @@ -321,10 +323,16 @@ class NetworkController(settings: NetworkSettings,
connectionForHandler(peerHandlerRef).foreach { connectedPeer =>
val remoteAddress = connectedPeer.connectionId.remoteAddress
val peerAddress = peerInfo.peerSpec.address.getOrElse(remoteAddress)
// Drop connection to self if occurred or peer already connected.
// Decision whether connection is local or is from some other network is made
// based on SessionIdPeerFeature if exists or in old way using isSelf() function
val shouldDrop =
connectionForPeerAddress(peerAddress).exists(_.handlerRef != peerHandlerRef) ||
peerInfo.peerSpec.features.collectFirst {
case SessionIdPeerFeature(networkMagic, sessionId) =>
!networkMagic.sameElements(mySessionIdFeature.networkMagic) || sessionId == mySessionIdFeature.sessionId
}.getOrElse(isSelf(remoteAddress))

//drop connection to self if occurred or peer already connected
val shouldDrop = isSelf(remoteAddress) ||
connectionForPeerAddress(peerAddress).exists(_.handlerRef != peerHandlerRef)
if (shouldDrop) {
connectedPeer.handlerRef ! CloseConnection
peerManagerRef ! RemovePeer(peerAddress)
Expand Down
44 changes: 44 additions & 0 deletions src/main/scala/scorex/core/network/peer/SessionIdPeerFeature.scala
@@ -0,0 +1,44 @@
package scorex.core.network.peer

import scorex.core.network.PeerFeature
import scorex.core.network.PeerFeature.Id
import scorex.core.network.message.Message
import scorex.util.serialization._
import scorex.core.serialization.ScorexSerializer

/**
* This peer feature allows to more reliably detect connections to self node and connections from other networks
*
* @param networkMagic network magic bytes (taken from settings)
* @param sessionId randomly generated 64-bit session identifier
*/
case class SessionIdPeerFeature(networkMagic: Array[Byte],
sessionId: Long = scala.util.Random.nextLong()) extends PeerFeature {

override type M = SessionIdPeerFeature
override val featureId: Id = SessionIdPeerFeature.featureId

override def serializer: SessionIdPeerFeatureSerializer.type = SessionIdPeerFeatureSerializer

}

object SessionIdPeerFeature {

val featureId: Id = 3: Byte

}

object SessionIdPeerFeatureSerializer extends ScorexSerializer[SessionIdPeerFeature] {

override def serialize(obj: SessionIdPeerFeature, w: Writer): Unit = {
w.putBytes(obj.networkMagic)
w.putLong(obj.sessionId)
}

override def parse(r: Reader): SessionIdPeerFeature = {
val networkMagic = r.getBytes(Message.MagicLength)
val sessionId = r.getLong()
SessionIdPeerFeature(networkMagic, sessionId)
}

}
5 changes: 3 additions & 2 deletions src/test/scala/scorex/network/NetworkControllerSpec.scala
Expand Up @@ -15,7 +15,7 @@ import scorex.core.app.{Version, ScorexContext}
import scorex.core.network.NetworkController.ReceivableMessages.{GetConnectedPeers, GetPeersStatus}
import scorex.core.network._
import scorex.core.network.message.{PeersSpec, _}
import scorex.core.network.peer.{LocalAddressPeerFeature, PeerManagerRef, LocalAddressPeerFeatureSerializer, PeersStatus}
import scorex.core.network.peer.{LocalAddressPeerFeature, PeerManagerRef, LocalAddressPeerFeatureSerializer, PeersStatus, SessionIdPeerFeature}
import scorex.core.settings.ScorexSettings
import scorex.core.utils.LocalTimeProvider

Expand Down Expand Up @@ -399,7 +399,8 @@ class TestPeer(settings: ScorexSettings, networkControllerRef: ActorRef, tcpMana
* @return
*/
def sendHandshake(declaredAddress: Option[InetSocketAddress], localAddress: Option[InetSocketAddress]): Tcp.ResumeReading.type = {
val features = localAddress.map(LocalAddressPeerFeature(_)).toSeq
val localFeature:Seq[PeerFeature] = localAddress.map(LocalAddressPeerFeature(_)).toSeq
val features = localFeature :+ SessionIdPeerFeature(settings.network.magicBytes)
val handshakeToNode = Handshake(PeerSpec(settings.network.agentName,
Version(settings.network.appVersion), "test",
declaredAddress, features), timeProvider.time())
Expand Down

0 comments on commit f4cf5c7

Please sign in to comment.