Skip to content
This repository has been archived by the owner on Apr 13, 2022. It is now read-only.

Commit

Permalink
Merge pull request #368 from ScorexFoundation/tx-delivery
Browse files Browse the repository at this point in the history
Simplified check for transaction delivery
  • Loading branch information
kushti committed Aug 13, 2020
2 parents 940842a + 01d4eff commit bb48da3
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 58 deletions.
3 changes: 2 additions & 1 deletion src/main/resources/reference.conf
Expand Up @@ -121,7 +121,8 @@ scorex {
# Network delivery timeout
deliveryTimeout = 2s

# Max number of delivery checks. Stop expecting modifier (and penalize peer) if it was not delivered on time
# Max number of delivery checks for a persistent modifier.
# Stop expecting modifier (and penalize peer) if it was not delivered on time.
maxDeliveryChecks = 2

############
Expand Down
67 changes: 35 additions & 32 deletions src/main/scala/scorex/core/network/DeliveryTracker.scala
Expand Up @@ -41,7 +41,7 @@ class DeliveryTracker(system: ActorSystem,

protected case class RequestedInfo(peer: Option[ConnectedPeer], cancellable: Cancellable, checks: Int)

// when a remote peer is asked a modifier we add the requested data to `requested`
// when a remote peer is asked for a modifier we add the requested data to `requested`
protected val requested: mutable.Map[ModifierId, RequestedInfo] = mutable.Map()

// when our node received invalid modifier we put it to `invalid`
Expand All @@ -55,16 +55,20 @@ class DeliveryTracker(system: ActorSystem,
* Since this class do not keep statuses for modifiers that are already in NodeViewHolder,
* `modifierKeepers` are required here to check that modifier is in `Held` status
*/
def status(id: ModifierId, modifierKeepers: Seq[ContainsModifiers[_]]): ModifiersStatus =
if (received.contains(id)) Received
else if (requested.contains(id)) Requested
else if (invalid.contains(id)) Invalid
else if (modifierKeepers.exists(_.contains(id))) Held
def status(modifierId: ModifierId, modifierKeepers: Seq[ContainsModifiers[_]]): ModifiersStatus =
if (received.contains(modifierId)) Received
else if (requested.contains(modifierId)) Requested
else if (invalid.contains(modifierId)) Invalid
else if (modifierKeepers.exists(_.contains(modifierId))) Held
else Unknown

def status(id: ModifierId, mk: ContainsModifiers[_ <: NodeViewModifier]): ModifiersStatus = status(id, Seq(mk))
def status(modifierId: ModifierId, mk: ContainsModifiers[_ <: NodeViewModifier]): ModifiersStatus = {
status(modifierId, Seq(mk))
}

def status(id: ModifierId): ModifiersStatus = status(id, Seq())
def status(modifierId: ModifierId): ModifiersStatus = {
status(modifierId, Seq())
}

/**
*
Expand All @@ -73,13 +77,13 @@ class DeliveryTracker(system: ActorSystem,
*
* @return `true` if number of checks was not exceed, `false` otherwise
*/
def onStillWaiting(cp: ConnectedPeer, typeId: ModifierTypeId, id: ModifierId)
def onStillWaiting(cp: ConnectedPeer, modifierTypeId: ModifierTypeId, modifierId: ModifierId)
(implicit ec: ExecutionContext): Try[Unit] =
tryWithLogging {
val checks = requested(id).checks + 1
setUnknown(id)
if (checks < maxDeliveryChecks) setRequested(id, typeId, Some(cp), checks)
else throw new StopExpectingError(id, checks)
val checks = requested(modifierId).checks + 1
setUnknown(modifierId)
if (checks < maxDeliveryChecks) setRequested(modifierId, modifierTypeId, Some(cp), checks)
else throw new StopExpectingError(modifierId, checks)
}

/**
Expand All @@ -100,8 +104,8 @@ class DeliveryTracker(system: ActorSystem,
* Modified with id `id` is permanently invalid - set its status to `Invalid`
* and return [[ConnectedPeer]] which sent bad modifier.
*/
def setInvalid(id: ModifierId): Option[ConnectedPeer] = {
val oldStatus: ModifiersStatus = status(id)
def setInvalid(modifierId: ModifierId): Option[ConnectedPeer] = {
val oldStatus: ModifiersStatus = status(modifierId)
val transitionCheck = tryWithLogging {
require(isCorrectTransition(oldStatus, Invalid), s"Illegal status transition: $oldStatus -> Invalid")
}
Expand All @@ -110,14 +114,14 @@ class DeliveryTracker(system: ActorSystem,
.flatMap { _ =>
val senderOpt = oldStatus match {
case Requested =>
requested(id).cancellable.cancel()
requested.remove(id).flatMap(_.peer)
requested(modifierId).cancellable.cancel()
requested.remove(modifierId).flatMap(_.peer)
case Received =>
received.remove(id)
received.remove(modifierId)
case _ =>
None
}
invalid.add(id)
invalid.add(modifierId)
senderOpt
}
}
Expand All @@ -129,7 +133,7 @@ class DeliveryTracker(system: ActorSystem,
tryWithLogging {
val oldStatus: ModifiersStatus = status(id)
require(isCorrectTransition(oldStatus, Held), s"Illegal status transition: $oldStatus -> Held")
clearStatusForModifier(id, oldStatus) // we need only to clear old status in this case
clearStatusForModifier(id, oldStatus) // clear old status
}

/**
Expand All @@ -144,7 +148,7 @@ class DeliveryTracker(system: ActorSystem,
tryWithLogging {
val oldStatus: ModifiersStatus = status(id)
require(isCorrectTransition(oldStatus, Unknown), s"Illegal status transition: $oldStatus -> Unknown")
clearStatusForModifier(id, oldStatus) // we need only to clear old status in this case
clearStatusForModifier(id, oldStatus) // clear old status
}

/**
Expand Down Expand Up @@ -180,17 +184,7 @@ class DeliveryTracker(system: ActorSystem,
case _ => false
}

private def tryWithLogging[T](fn: => T): Try[T] =
Try(fn).recoverWith {
case e: StopExpectingError =>
log.warn(e.getMessage)
Failure(e)
case e =>
log.warn("Unexpected error", e)
Failure(e)
}

private def clearStatusForModifier(id: ModifierId, oldStatus: ModifiersStatus): Unit =
private[network] def clearStatusForModifier(id: ModifierId, oldStatus: ModifiersStatus): Unit =
oldStatus match {
case Requested =>
requested(id).cancellable.cancel()
Expand All @@ -204,4 +198,13 @@ class DeliveryTracker(system: ActorSystem,
class StopExpectingError(mid: ModifierId, checks: Int)
extends Error(s"Stop expecting ${encoder.encodeId(mid)} due to exceeded number of retries $checks")

private def tryWithLogging[T](fn: => T): Try[T] =
Try(fn).recoverWith {
case e: StopExpectingError =>
log.warn(e.getMessage)
Failure(e)
case e =>
log.warn("Unexpected error", e)
Failure(e)
}
}
61 changes: 36 additions & 25 deletions src/main/scala/scorex/core/network/NodeViewSynchronizer.scala
Expand Up @@ -32,27 +32,29 @@ import scala.reflect.ClassTag
import scala.util.{Failure, Success}

/**
* A component which is synchronizing local node view (locked inside NodeViewHolder) with the p2p network.
* A component which is synchronizing local node view (processed by NodeViewHolder) with the p2p network.
*
* @tparam TX transaction
* @tparam SIS SyncInfoMessage specification
* @tparam PMOD Basic trait of persistent modifiers type family
* @tparam HR History reader type
* @tparam MR Mempool reader type
* @param networkControllerRef reference to network controller actor
* @param viewHolderRef reference to node view holder actor
* @param syncInfoSpec SyncInfo specification
* @tparam TX transaction
* @tparam SIS SyncInfoMessage specification
* @param networkSettings network settings instance
* @param timeProvider network time provider
* @param modifierSerializers dictionary of modifiers serializers
*/
class NodeViewSynchronizer[TX <: Transaction,
SI <: SyncInfo,
SIS <: SyncInfoMessageSpec[SI],
PMOD <: PersistentNodeViewModifier,
HR <: HistoryReader[PMOD, SI] : ClassTag,
MR <: MempoolReader[TX] : ClassTag]
class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMessageSpec[SI],
PMOD <: PersistentNodeViewModifier, HR <: HistoryReader[PMOD, SI] : ClassTag, MR <: MempoolReader[TX] : ClassTag]
(networkControllerRef: ActorRef,
viewHolderRef: ActorRef,
syncInfoSpec: SIS,
networkSettings: NetworkSettings,
timeProvider: NetworkTimeProvider,
modifierSerializers: Map[ModifierTypeId, ScorexSerializer[_ <: NodeViewModifier]])(implicit ec: ExecutionContext) extends Actor
with ScorexLogging with ScorexEncoding {
modifierSerializers: Map[ModifierTypeId, ScorexSerializer[_ <: NodeViewModifier]])(implicit ec: ExecutionContext)
extends Actor with ScorexLogging with ScorexEncoding {

protected val deliveryTimeout: FiniteDuration = networkSettings.deliveryTimeout
protected val maxDeliveryChecks: Int = networkSettings.maxDeliveryChecks
Expand All @@ -67,20 +69,22 @@ MR <: MempoolReader[TX] : ClassTag]
protected var mempoolReaderOpt: Option[MR] = None

override def preStart(): Unit = {
//register as a handler for synchronization-specific types of messages
// register as a handler for synchronization-specific types of messages
val messageSpecs: Seq[MessageSpec[_]] = Seq(invSpec, requestModifierSpec, modifiersSpec, syncInfoSpec)
networkControllerRef ! RegisterMessageSpecs(messageSpecs, self)

//register as a listener for peers got connected (handshaked) or disconnected
// register as a listener for peers got connected (handshaked) or disconnected
context.system.eventStream.subscribe(self, classOf[HandshakedPeer])
context.system.eventStream.subscribe(self, classOf[DisconnectedPeer])

//subscribe for all the node view holder events involving modifiers and transactions
// subscribe for all the node view holder events involving modifiers and transactions
context.system.eventStream.subscribe(self, classOf[ChangedHistory[HR]])
context.system.eventStream.subscribe(self, classOf[ChangedMempool[MR]])
context.system.eventStream.subscribe(self, classOf[ModificationOutcome])
context.system.eventStream.subscribe(self, classOf[DownloadRequest])
context.system.eventStream.subscribe(self, classOf[ModifiersProcessingResult[PMOD]])

// subscribe for history and mempool changes
viewHolderRef ! GetNodeViewChanges(history = true, state = false, vault = false, mempool = true)

statusTracker.scheduleSendSyncInfo()
Expand Down Expand Up @@ -364,17 +368,24 @@ MR <: MempoolReader[TX] : ClassTag]
protected def checkDelivery: Receive = {
case CheckDelivery(peerOpt, modifierTypeId, modifierId) =>
if (deliveryTracker.status(modifierId) == ModifiersStatus.Requested) {
peerOpt match {
case Some(peer) =>
log.info(s"Peer ${peer.toString} has not delivered asked modifier ${encoder.encodeId(modifierId)} on time")
penalizeNonDeliveringPeer(peer)
deliveryTracker.onStillWaiting(peer, modifierTypeId, modifierId)
case None =>
// Random peer did not delivered modifier we need, ask another peer
// We need this modifier - no limit for number of attempts
log.info(s"Modifier ${encoder.encodeId(modifierId)} was not delivered on time")
deliveryTracker.setUnknown(modifierId)
requestDownload(modifierTypeId, Seq(modifierId))
// If transaction not delivered on time, we just forget about it.
// It could be removed from other peer's mempool, so no reason to penalize the peer.
if (modifierTypeId == Transaction.ModifierTypeId) {
deliveryTracker.clearStatusForModifier(modifierId, ModifiersStatus.Requested)
} else {
// A persistent modifier is not delivered on time.
peerOpt match {
case Some(peer) =>
log.info(s"Peer ${peer.toString} has not delivered asked modifier ${encoder.encodeId(modifierId)} on time")
penalizeNonDeliveringPeer(peer)
deliveryTracker.onStillWaiting(peer, modifierTypeId, modifierId)
case None =>
// Random peer has not delivered modifier we need, ask another peer
// We need this modifier - no limit for number of attempts
log.info(s"Modifier ${encoder.encodeId(modifierId)} has not delivered on time")
deliveryTracker.setUnknown(modifierId)
requestDownload(modifierTypeId, Seq(modifierId))
}
}
}
}
Expand Down

0 comments on commit bb48da3

Please sign in to comment.