Skip to content
This repository has been archived by the owner on Apr 13, 2022. It is now read-only.

Parse in synchronizers #370

Merged
merged 6 commits into from
Oct 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 5 additions & 17 deletions src/main/scala/scorex/core/network/NetworkController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scorex.util.ScorexLogging
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.language.{existentials, postfixOps}
import scala.util.{Failure, Success, Try}
import scala.util.Try

/**
* Control all network interaction
Expand All @@ -33,7 +33,6 @@ class NetworkController(settings: NetworkSettings,
)(implicit ec: ExecutionContext) extends Actor with ScorexLogging {

import NetworkController.ReceivableMessages._
import NetworkControllerSharedMessages.ReceivableMessages.DataFromPeer
import PeerConnectionHandler.ReceivableMessages.CloseConnection
import akka.actor.SupervisorStrategy._

Expand Down Expand Up @@ -90,21 +89,10 @@ class NetworkController(settings: NetworkSettings,

private def businessLogic: Receive = {
//a message coming in from another peer
case Message(spec, Left(msgBytes), Some(remote)) =>
val msgId = spec.messageCode

spec.parseBytesTry(msgBytes) match {
case Success(content) =>
messageHandlers.get(msgId) match {
case Some(handler) =>
handler ! DataFromPeer(spec, content, remote)

case None =>
log.error(s"No handlers found for message $remote: " + msgId)
}
case Failure(e) =>
log.error(s"Failed to deserialize data from $remote: ", e)
penalize(remote.connectionId.remoteAddress, PenaltyType.PermanentPenalty)
case msg @ Message(spec, _, Some(remote)) =>
messageHandlers.get(spec.messageCode) match {
case Some(handler) => handler ! msg // forward the message to the appropriate handler for processing
case None => log.error(s"No handlers found for message $remote: " + spec.messageCode)
}

case SendToNetwork(message, sendingStrategy) =>
Expand Down

This file was deleted.

43 changes: 21 additions & 22 deletions src/main/scala/scorex/core/network/NodeViewSynchronizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import scorex.core.consensus.History._
import scorex.core.consensus.{History, HistoryReader, SyncInfo}
import scorex.core.network.ModifiersStatus.Requested
import scorex.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork}
import scorex.core.network.NetworkControllerSharedMessages.ReceivableMessages.DataFromPeer
import scorex.core.network.NodeViewSynchronizer.ReceivableMessages._
import scorex.core.network.message.{InvSpec, RequestModifierSpec, _}
import scorex.core.network.peer.PenaltyType
Expand Down Expand Up @@ -54,14 +53,21 @@ PMOD <: PersistentNodeViewModifier, HR <: HistoryReader[PMOD, SI] : ClassTag, MR
networkSettings: NetworkSettings,
timeProvider: NetworkTimeProvider,
modifierSerializers: Map[ModifierTypeId, ScorexSerializer[_ <: NodeViewModifier]])(implicit ec: ExecutionContext)
extends Actor with ScorexLogging with ScorexEncoding {
extends Actor with Synchronizer with ScorexLogging with ScorexEncoding {

protected val deliveryTimeout: FiniteDuration = networkSettings.deliveryTimeout
protected val maxDeliveryChecks: Int = networkSettings.maxDeliveryChecks
protected val invSpec = new InvSpec(networkSettings.maxInvObjects)
protected val requestModifierSpec = new RequestModifierSpec(networkSettings.maxInvObjects)
protected val modifiersSpec = new ModifiersSpec(networkSettings.maxPacketSize)

protected val msgHandlers: PartialFunction[(MessageSpec[_], _, ConnectedPeer), Unit] = {
case (_: SIS @unchecked, data: SI @unchecked, remote) => processSync(data, remote)
case (_: InvSpec, data: InvData, remote) => processInv(data, remote)
case (_: RequestModifierSpec, data: InvData, remote) => modifiersReq(data, remote)
case (_: ModifiersSpec, data: ModifiersData, remote) => modifiersFromRemote(data, remote)
}

protected val deliveryTracker = new DeliveryTracker(context.system, deliveryTimeout, maxDeliveryChecks, self)
protected val statusTracker = new SyncTracker(self, context, networkSettings, timeProvider)

Expand Down Expand Up @@ -160,11 +166,12 @@ PMOD <: PersistentNodeViewModifier, HR <: HistoryReader[PMOD, SI] : ClassTag, MR
}
}

//sync info is coming from another node
protected def processSync: Receive = {
case DataFromPeer(spec, syncInfo: SI@unchecked, remote)
if spec.messageCode == syncInfoSpec.messageCode =>
protected def processDataFromPeer: Receive = {
case Message(spec, Left(msgBytes), Some(source)) => parseAndHandle(spec, msgBytes, source)
}

//sync info is coming from another node
protected def processSync(syncInfo: SI, remote: ConnectedPeer): Unit = {
historyReaderOpt match {
case Some(historyReader) =>
val ext = historyReader.continuationIds(syncInfo, networkSettings.desiredInvObjects)
Expand Down Expand Up @@ -212,10 +219,7 @@ PMOD <: PersistentNodeViewModifier, HR <: HistoryReader[PMOD, SI] : ClassTag, MR
* Filter out modifier ids that are already in process (requested, received or applied),
* request unknown ids from peer and set this ids to requested state.
*/
protected def processInv: Receive = {
case DataFromPeer(spec, invData: InvData@unchecked, peer)
if spec.messageCode == InvSpec.MessageCode =>

protected def processInv(invData: InvData, peer: ConnectedPeer): Unit = {
(mempoolReaderOpt, historyReaderOpt) match {
case (Some(mempool), Some(history)) =>
val modifierTypeId = invData.typeId
Expand All @@ -238,10 +242,7 @@ PMOD <: PersistentNodeViewModifier, HR <: HistoryReader[PMOD, SI] : ClassTag, MR
}

//other node asking for objects by their ids
protected def modifiersReq: Receive = {
case DataFromPeer(spec, invData: InvData@unchecked, remote)
if spec.messageCode == RequestModifierSpec.MessageCode =>

protected def modifiersReq(invData: InvData, remote: ConnectedPeer): Unit = {
readersOpt.foreach { readers =>
val objs: Seq[NodeViewModifier] = invData.typeId match {
case typeId: ModifierTypeId if typeId == Transaction.ModifierTypeId =>
Expand All @@ -261,10 +262,7 @@ PMOD <: PersistentNodeViewModifier, HR <: HistoryReader[PMOD, SI] : ClassTag, MR
* Filter out non-requested modifiers (with a penalty to spamming peer),
* parse modifiers and send valid modifiers to NodeViewHolder
*/
protected def modifiersFromRemote: Receive = {
case DataFromPeer(spec, data: ModifiersData@unchecked, remote)
if spec.messageCode == ModifiersSpec.MessageCode =>

protected def modifiersFromRemote(data: ModifiersData, remote: ConnectedPeer): Unit = {
val typeId = data.typeId
val modifiers = data.modifiers
log.info(s"Got ${modifiers.size} modifiers of type $typeId from remote connected peer: $remote")
Expand Down Expand Up @@ -402,6 +400,10 @@ PMOD <: PersistentNodeViewModifier, HR <: HistoryReader[PMOD, SI] : ClassTag, MR
networkControllerRef ! PenalizePeer(peer.connectionId.remoteAddress, PenaltyType.MisbehaviorPenalty)
}

protected def penalizeMaliciousPeer(peer: ConnectedPeer): Unit = {
networkControllerRef ! PenalizePeer(peer.connectionId.remoteAddress, PenaltyType.PermanentPenalty)
}

/**
* Local node sending out objects requested to remote
*/
Expand Down Expand Up @@ -452,14 +454,11 @@ PMOD <: PersistentNodeViewModifier, HR <: HistoryReader[PMOD, SI] : ClassTag, MR
}

override def receive: Receive =
processDataFromPeer orElse
onDownloadRequest orElse
getLocalSyncInfo orElse
processSync orElse
processSyncStatus orElse
processInv orElse
modifiersReq orElse
responseFromLocal orElse
modifiersFromRemote orElse
viewHolderEvents orElse
peerManagerEvents orElse
checkDelivery orElse {
Expand Down
60 changes: 43 additions & 17 deletions src/main/scala/scorex/core/network/PeerSynchronizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package scorex.core.network
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import scorex.core.network.NetworkController.ReceivableMessages.{RegisterMessageSpecs, SendToNetwork}
import scorex.core.network.NetworkControllerSharedMessages.ReceivableMessages.DataFromPeer
import scorex.core.network.message.{GetPeersSpec, Message, PeersSpec}
import scorex.core.network.peer.PeerInfo
import scorex.core.network.NetworkController.ReceivableMessages.{PenalizePeer, RegisterMessageSpecs, SendToNetwork}
import scorex.core.network.message.{GetPeersSpec, Message, MessageSpec, PeersSpec}
import scorex.core.network.peer.{PeerInfo, PenaltyType}
import scorex.core.network.peer.PeerManager.ReceivableMessages.{AddPeerIfEmpty, RecentlySeenPeers}
import scorex.core.settings.NetworkSettings
import scorex.util.ScorexLogging
Expand All @@ -23,11 +22,19 @@ class PeerSynchronizer(val networkControllerRef: ActorRef,
peerManager: ActorRef,
settings: NetworkSettings,
featureSerializers: PeerFeature.Serializers)
(implicit ec: ExecutionContext) extends Actor with ScorexLogging {
(implicit ec: ExecutionContext) extends Actor with Synchronizer with ScorexLogging {

private implicit val timeout: Timeout = Timeout(settings.syncTimeout.getOrElse(5 seconds))
private val peersSpec = new PeersSpec(featureSerializers, settings.maxPeerSpecObjects)

protected val msgHandlers: PartialFunction[(MessageSpec[_], _, ConnectedPeer), Unit] = {
case (_: PeersSpec, peers: Seq[PeerSpec]@unchecked, _) if peers.cast[Seq[PeerSpec]].isDefined =>
addNewPeers(peers)

case (spec, _, remote) if spec.messageCode == GetPeersSpec.messageCode =>
gossipPeers(remote)
}

override def preStart: Unit = {
super.preStart()

Expand All @@ -39,22 +46,41 @@ class PeerSynchronizer(val networkControllerRef: ActorRef,
}

override def receive: Receive = {
case DataFromPeer(spec, peers: Seq[PeerSpec]@unchecked, _)
if spec.messageCode == PeersSpec.messageCode && peers.cast[Seq[PeerSpec]].isDefined =>

peers.foreach(peerSpec => peerManager ! AddPeerIfEmpty(peerSpec))

case DataFromPeer(spec, _, peer) if spec.messageCode == GetPeersSpec.messageCode =>
// data received from a remote peer
case Message(spec, Left(msgBytes), Some(source)) => parseAndHandle(spec, msgBytes, source)

(peerManager ? RecentlySeenPeers(settings.maxPeerSpecObjects))
.mapTo[Seq[PeerInfo]]
.foreach { peers =>
val msg = Message(peersSpec, Right(peers.map(_.peerSpec)), None)
networkControllerRef ! SendToNetwork(msg, SendToPeers(Seq(peer)))
}
// fall-through method for reporting unhandled messages
case nonsense: Any => log.warn(s"PeerSynchronizer: got unexpected input $nonsense from ${sender()}")
}

case nonsense: Any => log.warn(s"PeerSynchronizer: got something strange $nonsense")
override protected def penalizeMaliciousPeer ( peer: ConnectedPeer ): Unit = {
networkControllerRef ! PenalizePeer(peer.connectionId.remoteAddress, PenaltyType.PermanentPenalty)
}

/**
* Handles adding new peers to the peer database if they were previously unknown
*
* @param peers sequence of peer specs describing a remote peers details
*/
private def addNewPeers ( peers: Seq[PeerSpec] ): Unit =
if ( peers.cast[Seq[PeerSpec]].isDefined ) {
peers.foreach(peerSpec => peerManager ! AddPeerIfEmpty(peerSpec))
}

/**
* Handles gossiping about the locally known peer set to a given remote peer
*
* @param remote the remote peer to be informed of our local peers
*/
private def gossipPeers ( remote: ConnectedPeer ): Unit =
(peerManager ? RecentlySeenPeers(settings.maxPeerSpecObjects))
.mapTo[Seq[PeerInfo]]
.foreach
{ peers =>
val msg = Message(peersSpec, Right(peers.map(_.peerSpec)), None)
networkControllerRef ! SendToNetwork(msg, SendToPeer(remote))
}
}

object PeerSynchronizerRef {
Expand Down
44 changes: 44 additions & 0 deletions src/main/scala/scorex/core/network/Synchronizer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package scorex.core.network

import scorex.core.network.message.MessageSpec
import scorex.util.ScorexLogging

import scala.util.{Failure, Success}

trait Synchronizer extends ScorexLogging {

// these are the case statements for identifying the message handlers
protected val msgHandlers: PartialFunction[(MessageSpec[_], _, ConnectedPeer), Unit]

/**
* This method will attempt to parse a message from a remote peer into it class representation and use
* the defined message handlers for processing the message
*
* @param spec the message specification (basically a header informing of the message type)
* @param msgBytes a ByteString of the message data that must be parsed
* @param source the remote peer that sent the message
*/
protected def parseAndHandle(spec: MessageSpec[Any], msgBytes: Array[Byte], source: ConnectedPeer): Unit = {
// attempt to parse the message
spec.parseBytesTry(msgBytes) match {
// if a message could be parsed, match the type of content found and ensure a handler is defined
case Success(content) =>
val parsedMsg = (spec, content, source)
if (msgHandlers.isDefinedAt(parsedMsg)) msgHandlers.apply(parsedMsg)
else log.error(s"Function handler not found for the parsed message: $parsedMsg")

// if a message could not be parsed, penalize the remote peer
case Failure(e) =>
log.error(s"Failed to deserialize data from $source: ", e)
penalizeMaliciousPeer(source)
}
}

/**
* Handles how a peer that sent un-parsable data should be handled
*
* @param peer peer that sent the offending message
*/
protected def penalizeMaliciousPeer(peer: ConnectedPeer): Unit

}