Skip to content

Commit

Permalink
refactor process launching to deal with monitor during init race
Browse files Browse the repository at this point in the history
  • Loading branch information
moonpolysoft committed Jul 24, 2012
1 parent 0925535 commit 28d3dd1
Show file tree
Hide file tree
Showing 13 changed files with 515 additions and 341 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<name>scalang</name>
<artifactId>scalang-scala_2.9.1</artifactId>
<packaging>jar</packaging>
<version>0.16</version>
<version>0.17-SNAPSHOT</version>
<properties>
<scala.version>2.9.1</scala.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
28 changes: 24 additions & 4 deletions src/main/scala/scalang/FunProcess.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@ package scalang
import scalang.node._
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}

class FunProcess(fun : Mailbox => Unit, ctx : ProcessContext) extends Process(ctx) {
class FunProcess(fun : Mailbox => Unit, ctx : ProcessContext) extends ProcessAdapter {
val queue = new LinkedBlockingQueue[Any]
val referenceCounter = ctx.referenceCounter
val self = ctx.pid
val fiber = ctx.fiber
val parentPid = self
val parentRef = referenceCounter

val parent = this
val mbox = new Mailbox {
def self = parentPid
def referenceCounter = parentRef

override def handleMessage(msg : Any) {
def handleMessage(msg : Any) {
queue.offer(msg)
}

Expand All @@ -38,6 +41,16 @@ class FunProcess(fun : Mailbox => Unit, ctx : ProcessContext) extends Process(ct
def receive(timeout : Long) : Option[Any] = {
Option(queue.poll(timeout, TimeUnit.MILLISECONDS))
}

def send(pid : Pid, msg : Any) = parent.notifySend(pid,msg)

def send(name : Symbol, msg : Any) = parent.notifySend(name,msg)

def send(dest : (Symbol,Symbol), from : Pid, msg : Any) = parent.notifySend(dest,from,msg)

def exit(reason : Any) = parent.exit(reason)

def link(to : Pid) = parent.link(to)
}


Expand All @@ -50,8 +63,15 @@ class FunProcess(fun : Mailbox => Unit, ctx : ProcessContext) extends Process(ct
})
}

override def onMessage(msg : Any) {
override def handleMessage(msg : Any) {
queue.offer(msg)
}

override def handleExit(from : Pid, reason : Any) {
queue.offer(('EXIT, from, reason))
}

override def handleMonitorExit(monitored : Any, ref : Reference, reason : Any) {
queue.offer(('DOWN, ref, 'process, monitored, reason))
}
}
106 changes: 62 additions & 44 deletions src/main/scala/scalang/Node.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
val tickTime = config.tickTime
val poolFactory = config.poolFactory
var creation : Int = 0
val processes = new NonBlockingHashMap[Pid,ProcessLike]
val processes = new NonBlockingHashMap[Pid,ProcessAdapter]
val registeredNames = new NonBlockingHashMap[Symbol,Pid]
val channels = AtomicMap.atomicNBHM[Symbol,Channel]
val links = AtomicMap.atomicNBHM[Channel,NonBlockingHashSet[Link]]
Expand Down Expand Up @@ -221,22 +221,25 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
}
}

def spawnMbox : Mailbox = {
def spawnMbox : MailboxProcess = {
val p = createPid
val n = this
val box = new MailboxProcess(new ProcessContext {
val ctx = new ProcessContext {
val pid = p
val referenceCounter = n.referenceCounter
val node = n
val fiber = null
val replyRegistry = n
})
box.addExitListener(this)
box.addSendListener(this)
box.addLinkListener(this)
box.addMonitorListener(this)
processes.put(p, box)
box
var adapter : ProcessAdapter = null
}
val mailbox = new MailboxProcess(ctx)
ctx.adapter = mailbox
mailbox.addExitListener(this)
mailbox.addSendListener(this)
mailbox.addLinkListener(this)
mailbox.addMonitorListener(this)
processes.put(p, mailbox)
mailbox
}

def spawnMbox(regName : String) : Mailbox = spawnMbox(Symbol(regName))
Expand Down Expand Up @@ -266,8 +269,10 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
val node = n
val fiber = factory.create(poolFactory.createBatchExecutor(false))
val replyRegistry = n
var adapter : ProcessAdapter = null
}
val process = new FunProcess(fun, ctx)
ctx.adapter = process
process.addExitListener(this)
process.addSendListener(this)
process.addLinkListener(this)
Expand All @@ -290,15 +295,16 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
val node = n
val fiber = factory.create(poolFactory.createBatchExecutor(name.name,false))
val replyRegistry = n
var adapter : ProcessAdapter = null
}
val process = new FunProcess(fun, ctx)
ctx.adapter = process
process.addExitListener(this)
process.addSendListener(this)
process.addLinkListener(this)
process.addMonitorListener(this)
process.fiber.start
process.start
p
registeredNames.put(name, p)
p
}
Expand All @@ -311,7 +317,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex

def spawn[T <: Process](regName : Symbol)(implicit mf : Manifest[T]) : Pid = {
val pid = createPid
val process = createProcess(mf.erasure.asInstanceOf[Class[T]], pid, poolFactory.createBatchExecutor(regName.name, false))
createProcess(mf.erasure.asInstanceOf[Class[T]], pid, poolFactory.createBatchExecutor(regName.name, false))
registeredNames.put(regName, pid)
pid
}
Expand Down Expand Up @@ -343,8 +349,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex

def spawnService[T <: Service[A], A <: Product](args : A, reentrant : Boolean)(implicit mf : Manifest[T]) : Pid = {
val pid = createPid
val process = createService(mf.erasure.asInstanceOf[Class[T]], pid, args, poolFactory.createBatchExecutor(reentrant))
/* process.init(args)*/
createService(mf.erasure.asInstanceOf[Class[T]], pid, args, poolFactory.createBatchExecutor(reentrant))
pid
}

Expand All @@ -362,7 +367,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex

def spawnService[T <: Service[A], A <: Product](regName : Symbol, args : A, reentrant : Boolean)(implicit mf : Manifest[T]) : Pid = {
val pid = createPid
val process = createService(mf.erasure.asInstanceOf[Class[T]], pid, args, poolFactory.createBatchExecutor(regName.name, reentrant))
createService(mf.erasure.asInstanceOf[Class[T]], pid, args, poolFactory.createBatchExecutor(regName.name, reentrant))
registeredNames.put(regName, pid)
pid
}
Expand All @@ -375,8 +380,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
send('cluster, ('nodeup, node))
}

protected def createService[A <: Product, T <: Service[A]](clazz : Class[T], p : Pid, a : A, batch : BatchExecutor) : T = {
val constructor = clazz.getConstructor(classOf[ServiceContext[_]])
protected def createService[A <: Product, T <: Service[A]](clazz : Class[T], p : Pid, a : A, batch : BatchExecutor) {
val n = this
val ctx = new ServiceContext[A] {
val pid = p
Expand All @@ -385,35 +389,46 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
val fiber = factory.create(batch)
val replyRegistry = n
val args = a
}
val process = constructor.newInstance(ctx)
process.addExitListener(this)
process.addSendListener(this)
process.addLinkListener(this)
process.addMonitorListener(this)
processes.put(p, process)
process.fiber.start
process
var adapter : ProcessAdapter = null
}
val adapter = new ServiceLauncher[A,T](clazz, ctx)
ctx.adapter = adapter
adapter.addExitListener(this)
adapter.addSendListener(this)
adapter.addLinkListener(this)
adapter.addMonitorListener(this)
ctx.fiber.start
ctx.fiber.execute(new Runnable {
def run {
adapter.init
}
})
processes.put(p, adapter)
}

protected def createProcess[T <: Process](clazz : Class[T], p : Pid, batch : BatchExecutor) : T = {
val constructor = clazz.getConstructor(classOf[ProcessContext])
protected def createProcess[T <: Process](clazz : Class[T], p : Pid, batch : BatchExecutor) {
val n = this
val ctx = new ProcessContext {
val pid = p
val referenceCounter = n.referenceCounter
val node = n
val fiber = factory.create(batch)
val replyRegistry = n
}
val process = constructor.newInstance(ctx)
process.addExitListener(this)
process.addSendListener(this)
process.addLinkListener(this)
process.addMonitorListener(this)
processes.put(p, process)
process.fiber.start
process
var adapter : ProcessAdapter = null
}
val adapter = new ProcessLauncher[T](clazz, ctx)
ctx.adapter = adapter
adapter.addExitListener(this)
adapter.addSendListener(this)
adapter.addLinkListener(this)
adapter.addMonitorListener(this)
ctx.fiber.start
ctx.fiber.execute(new Runnable {
def run {
adapter.init
}
})
processes.put(p, adapter)
}

def register(regName : String, pid : Pid) {
Expand Down Expand Up @@ -464,12 +479,11 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex

if (isLocal(to)) {
for (p <- process(to)) {
p.linkWithoutNotify(from)
p.registerLink(from)
}
} else {
getOrConnectAndSend(to.node, LinkMessage(from, to), { channel =>
val set = links.getOrElseUpdate(channel, new NonBlockingHashSet[Link])
/* println("adding remote link " + link + " to channel " + channel)*/
set.add(link)
})
}
Expand Down Expand Up @@ -511,7 +525,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex

process(from) match {
case Some(p : Process) =>
val link = p.linkWithoutNotify(to)
val link = p.registerLink(to)
if (!isLocal(from))
links.getOrElseUpdate(channel, new NonBlockingHashSet[Link]).add(link)
case None =>
Expand All @@ -521,7 +535,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex

process(to) match {
case Some(p : Process) =>
val link = p.linkWithoutNotify(from)
val link = p.registerLink(from)
if (!isLocal(to))
links.getOrElseUpdate(channel, new NonBlockingHashSet[Link]).add(link)

Expand Down Expand Up @@ -564,9 +578,10 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
log.warn("Try to monitor between non-local pids: %s -> %s", monitoring, monitored)
return
}

log.debug("pids %s", processes.keys.toList)
process(monitored) match {
case Some(p) =>
log.debug("adding monitor for %s", p)
val monitor = p.registerMonitor(monitoring, ref)
if (!isLocal(monitored))
monitors.getOrElseUpdate(channel, new NonBlockingHashSet[Monitor]).add(monitor)
Expand All @@ -589,7 +604,9 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
val monitoring = monitor.monitoring
val monitored = monitor.monitored
val ref = monitor.ref
log.debug("handling monitor exit for %s", monitor)
if (isLocal(monitoring)) {
log.debug("monitoring is local %s", monitoring)
for (proc <- process(monitoring)) {
proc.handleMonitorExit(monitored, ref, reason)
}
Expand Down Expand Up @@ -721,6 +738,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex

//this only gets called from a remote link breakage()
def remoteBreak(link : Link, reason : Any) {

val from = link.from
val to = link.to
for (proc <- process(to)) {
Expand All @@ -739,15 +757,14 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
val to = link.to
if (isLocal(to)) {
for (proc <- process(to)) {
/* println("exiting " + proc)*/
proc.handleExit(from, reason)
}
} else {
getOrConnectAndSend(to.node, ExitMessage(from,to,reason))
}
}

def process(pidOrProc : Any) : Option[ProcessLike] = pidOrProc match {
def process(pidOrProc : Any) : Option[ProcessAdapter] = pidOrProc match {
case pid : Pid =>
Option(processes.get(pid))
case regName : Symbol =>
Expand Down Expand Up @@ -815,6 +832,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
}

def getOrConnectAndSend(peer : Symbol, msg : Any, afterHandshake : Channel => Unit = { channel => Unit }) {
log.debug("node %s sending %s", this, msg)
val channel = channels.getOrElseUpdate(peer, {
connectAndSend(peer, None)
})
Expand Down
Loading

0 comments on commit 28d3dd1

Please sign in to comment.