Skip to content

Commit

Permalink
different topology for remerge
Browse files Browse the repository at this point in the history
  • Loading branch information
MisterD123 committed Dec 31, 2018
1 parent c155df0 commit 4894fdf
Showing 1 changed file with 56 additions and 12 deletions.
Expand Up @@ -8,7 +8,9 @@ import rescala.fullmv.mirrors.localcloning.{FakeDelayer, ReactiveLocalClone}
import rescala.fullmv.{FullMVEngine, FullMVStruct}
import rescala.reactives.{Signal, Var}

import scala.concurrent._
import scala.concurrent.duration._
import scala.util.Try

@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
Expand All @@ -22,36 +24,59 @@ class Remerge {
var totalLength: Int = _

@Param(Array("2"))
var widthHosts: Int = _
var threads: Int = _

@Param(Array("1", "2", "3", "5"))
var mergeAt: Int = _

var sourceHost: FullMVEngine = _
var source: Var[Int, FullMVStruct] = _
var sources: Seq[(FullMVEngine, Var[Int, FullMVStruct])] = _
var instantMergeHost: FullMVEngine = _
var remotesOnInstantMerge: Seq[Signal[Int, FullMVStruct]] = _
var instantMerge: Signal[Int, FullMVStruct] = _
var preMergeDistance: Seq[Seq[(FullMVEngine, Signal[Int, FullMVStruct])]] = _
var mergeHost: FullMVEngine = _
var remotesOnMerge: Seq[Signal[Int, FullMVStruct]] = _
var merge: Signal[Int, FullMVStruct] = _
var postMergeDistance: Seq[(FullMVEngine, Signal[Int, FullMVStruct])] = _

var barrier: CyclicBarrier = _
var threadpool: ExecutorService = _

@Param(Array("50"))
var msDelay: Int = _

@Setup(Level.Iteration)
def setup(): Unit = {
FakeDelayer.enable()

sourceHost = new FullMVEngine(10.seconds, s"src")
source = {
val engine = sourceHost
import engine._
engine.Var(0)
barrier = new CyclicBarrier(threads)
threadpool = Executors.newFixedThreadPool(threads)

sources = for (i <- 1 to threads) yield {
val engine = new FullMVEngine(10.seconds, s"src-$i")
engine -> {
import engine._
engine.Var(0)
}
}

instantMergeHost = new FullMVEngine(10.seconds, "merge")
remotesOnInstantMerge = for (i <- 1 to threads) yield {
REName.named(s"clone-merge-$i") { implicit ! =>
ReactiveLocalClone(sources(i - 1)._2, instantMergeHost, msDelay.millis)
}
}
instantMerge = {
val e = instantMergeHost
import e._
Signals.static(remotesOnInstantMerge:_*) { t =>
remotesOnInstantMerge.map(t.dependStatic).sum
}
}

var preMerge: Seq[(FullMVEngine, Signal[Int, FullMVStruct])] = Seq.fill(widthHosts)(sourceHost -> source)
var preMerge: Seq[(FullMVEngine, Signal[Int, FullMVStruct])] = sources
preMergeDistance = for (d <- 1 until mergeAt) yield {
preMerge = for (i <- 1 to widthHosts) yield {
preMerge = for (i <- 1 to threads) yield {
val host = new FullMVEngine(10.seconds, s"premerge-$d-$i")
host -> REName.named(s"clone-premerge-$d-$i") { implicit ! =>
ReactiveLocalClone(preMerge(i - 1)._2, host, msDelay.millis)
Expand All @@ -61,7 +86,7 @@ class Remerge {
}

mergeHost = new FullMVEngine(10.seconds, "merge")
remotesOnMerge = for (i <- 1 to widthHosts) yield {
remotesOnMerge = for (i <- 1 to threads) yield {
REName.named(s"clone-merge-$i") { implicit ! =>
ReactiveLocalClone(preMerge(i - 1)._2, mergeHost, msDelay.millis)
}
Expand All @@ -88,10 +113,29 @@ class Remerge {
@TearDown(Level.Iteration)
def teardown(): Unit = {
FakeDelayer.shutdown()
threadpool.shutdown()
}

@Benchmark
def run(): Unit = {
source.transform(_ + 1)(sourceHost)
val results = for(i <- 1 to threads) yield {
val p = Promise[Unit]()
threadpool.submit(new Runnable(){
override def run(): Unit = {
p.complete(Try{
val (engine, source) = sources(i - 1)
engine.transactionWithWrapup(source)({ticket =>
val before = ticket.now(source)
source.admit(before + 1)(ticket)
})({ (_, ticket) =>
// prevent turns from completing before all turns have updated the whole graph
barrier.await()
})
})
}
})
p.future
}
results.foreach(Await.result(_, Duration.Inf))
}
}

0 comments on commit 4894fdf

Please sign in to comment.