Skip to content
This repository has been archived by the owner on Apr 13, 2022. It is now read-only.

Commit

Permalink
Merge pull request #280 from ScorexFoundation/i342
Browse files Browse the repository at this point in the history
Synchronization rework
  • Loading branch information
catena2w committed Aug 14, 2018
2 parents 6451f17 + fa9db02 commit 283fd63
Show file tree
Hide file tree
Showing 34 changed files with 1,128 additions and 766 deletions.
@@ -1,6 +1,5 @@
package examples.commons

import io.iohk.iodb.ByteArrayWrapper
import scorex.core.ModifierId
import scorex.core.transaction.MemoryPool
import scorex.core.utils.ScorexLogging
Expand All @@ -14,8 +13,7 @@ case class SimpleBoxTransactionMemPool(unconfirmed: TrieMap[ModifierId, SimpleBo
override type NVCT = SimpleBoxTransactionMemPool

//getters
override def getById(id: ModifierId): Option[SimpleBoxTransaction] =
unconfirmed.get(id)
override def modifierById(id: ModifierId): Option[SimpleBoxTransaction] = unconfirmed.get(id)

override def contains(id: ModifierId): Boolean = unconfirmed.contains(id)

Expand Down
16 changes: 12 additions & 4 deletions examples/src/main/scala/examples/hybrid/HybridApp.scala
@@ -1,19 +1,21 @@
package examples.hybrid

import akka.actor.ActorRef
import examples.commons.{SimpleBoxTransaction, SimpleBoxTransactionMemPool}
import examples.commons.{SimpleBoxTransaction, SimpleBoxTransactionCompanion, SimpleBoxTransactionMemPool}
import examples.hybrid.api.http.{DebugApiRoute, StatsApiRoute, WalletApiRoute}
import examples.hybrid.blocks.HybridBlock
import examples.hybrid.blocks._
import examples.hybrid.history.{HybridHistory, HybridSyncInfo, HybridSyncInfoMessageSpec}
import examples.hybrid.mining._
import examples.hybrid.wallet.SimpleBoxTransactionGeneratorRef
import scorex.core.{ModifierTypeId, NodeViewModifier}
import scorex.core.api.http.{ApiRoute, NodeViewApiRoute, PeersApiRoute, UtilsApiRoute}
import scorex.core.app.Application
import scorex.core.network.message.MessageSpec
import scorex.core.network.{NodeViewSynchronizerRef, PeerFeature}
import scorex.core.serialization.SerializerRegistry
import scorex.core.serialization.{Serializer, SerializerRegistry}
import scorex.core.serialization.SerializerRegistry.SerializerRecord
import scorex.core.settings.ScorexSettings
import scorex.core.transaction.Transaction

import scala.concurrent.duration._
import scala.io.Source
Expand Down Expand Up @@ -60,7 +62,7 @@ class HybridApp(val settingsFilename: String) extends Application {
actorSystem.actorOf(NodeViewSynchronizerRef.props[SimpleBoxTransaction, HybridSyncInfo, HybridSyncInfoMessageSpec.type,
HybridBlock, HybridHistory, SimpleBoxTransactionMemPool]
(networkControllerRef, nodeViewHolderRef,
HybridSyncInfoMessageSpec, settings.network, timeProvider))
HybridSyncInfoMessageSpec, settings.network, timeProvider, HybridApp.modifierSerializers))

if (settings.network.nodeName.startsWith("generatorNode")) {
log.info("Starting transactions generation")
Expand All @@ -70,6 +72,12 @@ class HybridApp(val settingsFilename: String) extends Application {
}

object HybridApp extends App {
def modifierSerializers: Map[ModifierTypeId, Serializer[_ <: NodeViewModifier]] =
Map(PosBlock.ModifierTypeId -> PosBlockCompanion,
PowBlock.ModifierTypeId -> PowBlockCompanion,
Transaction.ModifierTypeId -> SimpleBoxTransactionCompanion)

private val settingsFilename = args.headOption.getOrElse("settings.conf")
new HybridApp(settingsFilename).run()

}
Expand Up @@ -31,11 +31,6 @@ class HybridNodeViewHolder(hybridSettings: HybridSettings,
override lazy val scorexSettings: ScorexSettings = hybridSettings.scorexSettings
private lazy val minerSettings: HybridMiningSettings = hybridSettings.mining

override val modifierSerializers: Map[ModifierTypeId, Serializer[_ <: NodeViewModifier]] =
Map(PosBlock.ModifierTypeId -> PosBlockCompanion,
PowBlock.ModifierTypeId -> PowBlockCompanion,
Transaction.ModifierTypeId -> SimpleBoxTransactionCompanion)

override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
super.preRestart(reason, message)
log.error("HybridNodeViewHolder has been restarted, not a valid situation!")
Expand Down
Expand Up @@ -32,17 +32,17 @@ trait ExamplesCommonGenerators extends CoreGenerators {
txs <- smallInt.flatMap(i => Gen.listOfN(i, simpleBoxTransactionGen))
} yield txs

def simpleBoxTransactionGenCustomMakeBoxes (toBoxes: IndexedSeq[(PublicKey25519Proposition, Value)]): Gen[SimpleBoxTransaction] = for {
def simpleBoxTransactionGenCustomMakeBoxes(toBoxes: IndexedSeq[(PublicKey25519Proposition, Value)]): Gen[SimpleBoxTransaction] = for {
fee <- positiveLongGen
timestamp <- positiveLongGen
from: IndexedSeq[(PrivateKey25519, Nonce)] <- Gen.choose(1,1).flatMap(i => Gen.listOfN(i + 1, privGen).map(_.toIndexedSeq))
from: IndexedSeq[(PrivateKey25519, Nonce)] <- Gen.choose(1, 1).flatMap(i => Gen.listOfN(i + 1, privGen).map(_.toIndexedSeq))
to = toBoxes
} yield SimpleBoxTransaction(from, to, fee, timestamp)

def simpleBoxTransactionGenCustomUseBoxes (fromBoxes: IndexedSeq[(PrivateKey25519, Nonce)]): Gen[SimpleBoxTransaction] = for {
def simpleBoxTransactionGenCustomUseBoxes(fromBoxes: IndexedSeq[(PrivateKey25519, Nonce)]): Gen[SimpleBoxTransaction] = for {
fee <- positiveLongGen
timestamp <- positiveLongGen
from = fromBoxes
to: IndexedSeq[(PublicKey25519Proposition, Value)] <- Gen.choose(1,1).flatMap(i => Gen.listOfN(i, pGen).map(_.toIndexedSeq))
to: IndexedSeq[(PublicKey25519Proposition, Value)] <- Gen.choose(1, 1).flatMap(i => Gen.listOfN(i, pGen).map(_.toIndexedSeq))
} yield SimpleBoxTransaction(from, to, fee, timestamp)
}
Expand Up @@ -3,10 +3,13 @@ package hybrid
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.testkit.TestProbe
import commons.ExamplesCommonGenerators
import examples.commons.SimpleBoxTransactionMemPool
import examples.hybrid.HybridApp
import examples.hybrid.history.HybridSyncInfoMessageSpec
import io.iohk.iodb.ByteArrayWrapper
import scorex.core._
import scorex.core.app.Version
import scorex.core.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, ChangedMempool}
import scorex.core.network._
import scorex.core.utils.NetworkTimeProvider
import scorex.testkit.generators.CoreGenerators
Expand All @@ -20,9 +23,12 @@ trait NodeViewSynchronizerGenerators {
object NodeViewSynchronizerForTests {
def props(networkControllerRef: ActorRef,
viewHolderRef: ActorRef): Props =
NodeViewSynchronizerRef.props[TX, HSI, SIS, PM, HT, MP](networkControllerRef, viewHolderRef, HybridSyncInfoMessageSpec,
settings.scorexSettings.network,
new NetworkTimeProvider(settings.scorexSettings.ntp))
NodeViewSynchronizerRef.props[TX, HSI, SIS, PM, HT, MP](networkControllerRef,
viewHolderRef,
HybridSyncInfoMessageSpec,
settings.scorexSettings.network,
new NetworkTimeProvider(settings.scorexSettings.ntp),
HybridApp.modifierSerializers)
}

def nodeViewSynchronizer(implicit system: ActorSystem):
Expand All @@ -31,6 +37,7 @@ trait NodeViewSynchronizerGenerators {
val h = historyGen.sample.get
@SuppressWarnings(Array("org.wartremover.warts.OptionPartial"))
val sRaw = stateGen.sample.get
val mempool = SimpleBoxTransactionMemPool.emptyPool
val v = h.openSurfaceIds().last
sRaw.store.update(ByteArrayWrapper(idToBytes(v)), Seq(), Seq())
val s = sRaw.copy(version = idToVersion(v))
Expand All @@ -41,12 +48,13 @@ trait NodeViewSynchronizerGenerators {
val eventListener = TestProbe("EventListener")

val ref = system.actorOf(NodeViewSynchronizerForTests.props(ncProbe.ref, vhProbe.ref))
ref ! ChangedHistory(h)
ref ! ChangedMempool(mempool)
val m = totallyValidModifier(h, s)
@SuppressWarnings(Array("org.wartremover.warts.OptionPartial"))
val tx = simpleBoxTransactionGen.sample.get
@SuppressWarnings(Array("org.wartremover.warts.OptionPartial"))
val p : ConnectedPeer = ConnectedPeer(inetSocketAddressGen.sample.get, pchProbe.ref, Outgoing,
Handshake("", Version(0,1,2), "", None, Seq(), 0L))
val p: ConnectedPeer = connectedPeerGen(pchProbe.ref).sample.get

(ref, h.syncInfo, m, tx, p, pchProbe, ncProbe, vhProbe, eventListener)
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/reference.conf
Expand Up @@ -105,6 +105,9 @@ scorex {
# Accept maximum inv objects
maxInvObjects = 500

# Desired number of inv objects. Our requests will have this size.
desiredInvObjects = 100

# Synchronization interval
syncInterval = 15s

Expand Down
8 changes: 7 additions & 1 deletion src/main/scala/scorex/ObjectGenerators.scala
Expand Up @@ -2,10 +2,11 @@ package scorex

import java.net.{InetAddress, InetSocketAddress}

import akka.actor.ActorRef
import org.scalacheck.{Arbitrary, Gen}
import scorex.core.app.Version
import scorex.core.network.PeerFeature
import scorex.core.network.message.BasicMsgDataTypes._
import scorex.core.network.{ConnectedPeer, Handshake, Outgoing, PeerFeature}
import scorex.core.serialization.Serializer
import scorex.core.transaction.box.proposition.PublicKey25519Proposition
import scorex.core.transaction.state.{PrivateKey25519, PrivateKey25519Companion}
Expand Down Expand Up @@ -87,4 +88,9 @@ trait ObjectGenerators {
.map(s => PrivateKey25519Companion.generateKeys(s))

lazy val propositionGen: Gen[PublicKey25519Proposition] = key25519Gen.map(_._2)

def connectedPeerGen(peerRef: ActorRef): Gen[ConnectedPeer] = for {
address <- inetSocketAddressGen
} yield ConnectedPeer(address, peerRef, Outgoing, Handshake("", Version(0, 1, 2), "", None, Seq(), 0L))

}
76 changes: 35 additions & 41 deletions src/main/scala/scorex/core/ModifiersCache.scala
@@ -1,6 +1,6 @@
package scorex.core

import scorex.core.consensus.HistoryReader
import scorex.core.consensus.{ContainsModifiers, HistoryReader}
import scorex.core.utils.ScorexLogging
import scorex.core.validation.RecoverableModifierError

Expand All @@ -10,74 +10,69 @@ import scala.util.{Failure, Success}

/**
* A cache which is storing persistent modifiers not applied to history yet.
*
* This trait is not thread-save so it should be used only as a local field of an actor
* and its methods should not be called from lambdas, Future, Future.map, etc.
*
* @tparam PMOD - type of a persistent node view modifier (or a family of modifiers).
*/
trait ModifiersCache[PMOD <: PersistentNodeViewModifier, H <: HistoryReader[PMOD, _]] {
trait ModifiersCache[PMOD <: PersistentNodeViewModifier, H <: HistoryReader[PMOD, _]] extends ContainsModifiers[PMOD] {
require(maxSize >= 1)

type K = ModifierId
type V = PMOD

protected val cache: mutable.Map[K, V] = mutable.Map[K, V]()

override def modifierById(modifierId: ModifierId): Option[PMOD] = cache.get(modifierId)

def size: Int = cache.size

/**
* How many elements are to be stored in the cache
*/
def maxSize: Int

/**
* Keys to simulate objects residing a cache. So if key is stored here,
* the membership check (contains()) shows that the key is in the cache,
* but the value corresponding to the key is not stored. The motivation
* to have this structure is to avoid repeatedly downloading modifiers
* which are unquestionably invalid.
*/
protected val rememberedKeys: mutable.HashSet[K] = mutable.HashSet[K]()

/**
* Defines a best (and application-specific) candidate to be applied.
*
* @param history - an interface to history which could be needed to define a candidate
* @return - candidate if it is found
*/
def findCandidateKey(history: H): Option[K]

protected def onPut(key: K): Unit = {}
protected def onRemove(key: K, rememberKey: Boolean): Unit = {}

protected def onRemove(key: K): Unit = {}

/**
* A cache element replacement strategy method, which defines a key to remove from cache when it is overfull
* Remove elements from cache when it is overfull
*
* @return collection of just removed elements
*/
protected def keyToRemove(): K
def cleanOverfull(): Seq[V]


def contains(key: K): Boolean = cache.contains(key) || rememberedKeys.contains(key)

def put(key: K, value: V): Unit = synchronized {
if(!contains(key)) {
def put(key: K, value: V): Unit = {
if (!contains(key)) {
onPut(key)
cache.put(key, value)
if (size > maxSize) remove(keyToRemove())
}
}

/**
* Remove an element from the cache.
*
* @param key - modifier's key
* @param rememberKey - whether to remember the key as belonging to cache. E.g. invalid modifiers are
* to be remembered (for not to be requested from the network again).
* @return
* @return - removed value if existed
*/
def remove(key: K, rememberKey: Boolean = false): Option[V] = synchronized {
cache.remove(key).map {removed =>
onRemove(key, rememberKey)
if (rememberKey) rememberedKeys += key
def remove(key: K): Option[V] = {
cache.remove(key).map { removed =>
onRemove(key)
removed
}
}

def popCandidate(history: H): Option[V] = synchronized {
def popCandidate(history: H): Option[V] = {
findCandidateKey(history).flatMap(k => remove(k))
}
}
Expand All @@ -92,29 +87,28 @@ trait LRUCache[PMOD <: PersistentNodeViewModifier, HR <: HistoryReader[PMOD, _]]
// complete scan and cleaning of removed keys happen.
private val cleaningThreshold = 50

@tailrec
private def evictionCandidate(): K = {
val k = evictionQueue.dequeue()
if(cache.contains(k)) k else evictionCandidate()
}

override protected def onPut(key: K): Unit = {
evictionQueue.enqueue(key)
if(evictionQueue.size > maxSize + cleaningThreshold){
if (evictionQueue.size > maxSize + cleaningThreshold) {
evictionQueue.dequeueAll(k => !cache.contains(k))
}
}

override protected def onRemove(key: K, rememberKey: Boolean): Unit = {
}
def cleanOverfull(): Seq[V] = {
@tailrec
def removeUntilCorrectSize(acc: List[V]): List[V] = if (size <= maxSize || evictionQueue.isEmpty) {
acc
} else {
removeUntilCorrectSize(remove(evictionQueue.dequeue()).map(_ :: acc).getOrElse(acc))
}

def keyToRemove(): K = {
evictionCandidate()
removeUntilCorrectSize(List())
}
}

class DefaultModifiersCache[PMOD <: PersistentNodeViewModifier, HR <: HistoryReader[PMOD, _]]
(override val maxSize: Int) extends ModifiersCache[PMOD, HR] with LRUCache[PMOD, HR] with ScorexLogging {
(override val maxSize: Int) extends ModifiersCache[PMOD, HR] with LRUCache[PMOD, HR] with ScorexLogging {

/**
* Default implementation is just about to scan. Not efficient at all and should be probably rewritten in a
Expand All @@ -134,8 +128,8 @@ class DefaultModifiersCache[PMOD <: PersistentNodeViewModifier, HR <: HistoryRea
case Failure(e) =>
// non-recoverable error - remove modifier from cache
// TODO blaklist peer who sent it
log.warn(s"Modifier ${v.encodedId} is permanently invalid and will be removed from cache", e)
remove(k, rememberKey = true)
log.warn(s"Modifier ${v.encodedId} became permanently invalid and will be removed from cache", e)
remove(k)
false
case Success(_) =>
true
Expand Down

0 comments on commit 283fd63

Please sign in to comment.