Skip to content

Commit

Permalink
Support monitoring by registered name
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Newson authored and moonpolysoft committed May 25, 2012
1 parent f790fd1 commit 17917fa
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 23 deletions.
43 changes: 36 additions & 7 deletions src/main/scala/scalang/Node.scala
Expand Up @@ -546,14 +546,14 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
p.registerMonitor(monitoring, ref)
}
} else {
getOrConnectAndSend(monitored.node, MonitorMessage(monitoring, monitored, ref), { channel =>
getOrConnectAndSend(nodeOf(monitored), MonitorMessage(monitoring, monitored, ref), { channel =>
val set = monitors.getOrElseUpdate(channel, new NonBlockingHashSet[Monitor])
set.add(monitor)
})
}
}

def monitorWithoutNotify(monitoring : Pid, monitored : Pid, ref : Reference, channel : Channel) {
def monitorWithoutNotify(monitoring : Pid, monitored : Any, ref : Reference, channel : Channel) {
log.debug("monitor %s -> %s (%s)", monitoring, monitored, ref)
if (monitoring == monitored) {
log.warn("Trying to monitor itself: %s", monitoring)
Expand All @@ -578,7 +578,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex


//node internal interface
def demonitor(monitoring : Pid, monitored : Pid, ref : Reference) {
def demonitor(monitoring : Pid, monitored : Any, ref : Reference) {
log.debug("demonitor %s -> %s (%s)", monitoring, monitored, ref)
for (p <- process(monitored)) {
p.demonitor(ref)
Expand Down Expand Up @@ -747,8 +747,18 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
}
}

def process(pid : Pid) : Option[ProcessLike] = {
Option(processes.get(pid))
def process(pidOrProc : Any) : Option[ProcessLike] = pidOrProc match {
case pid : Pid =>
Option(processes.get(pid))
case regName : Symbol =>
whereis(regName) match {
case Some(pid : Pid) =>
process(pid)
case None =>
None
}
case (regname : Symbol, node : Symbol) =>
None
}

def unlink(from : Pid, to : Pid) {
Expand All @@ -761,8 +771,27 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
}
}

def isLocal(pid : Pid) : Boolean = {
pid.node == name //&& pid.creation == creation
def isLocal(pidOrProc : Any) : Boolean = pidOrProc match {
case pid : Pid =>
pid.node == name
case regName : Symbol =>
whereis(regName) match {
case Some(pid : Pid) =>
pid.node == name
case None =>
false
}
case (regName : Symbol, node : Symbol) =>
node == name
}

def nodeOf(pidOrProc : Any) : Symbol = pidOrProc match {
case pid : Pid =>
pid.node
case regName : Symbol =>
name
case (regName : Symbol, node : Symbol) =>
node
}

def disconnected(peer : Symbol, channel: Channel) {
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/scalang/Process.scala
Expand Up @@ -103,7 +103,7 @@ abstract class Process(ctx : ProcessContext) extends ProcessLike with Logging wi
/**
* Subclasses wishing to trap monitor exits should override this method.
*/
def trapMonitorExit(monitored : Pid, ref : Reference, reason : Any) {
def trapMonitorExit(monitored : Any, ref : Reference, reason : Any) {
}

override def handleMessage(msg : Any) {
Expand All @@ -115,7 +115,7 @@ abstract class Process(ctx : ProcessContext) extends ProcessLike with Logging wi
exitChannel.publish((from,msg))
}

override def handleMonitorExit(monitored : Pid, ref : Reference, reason : Any) {
override def handleMonitorExit(monitored : Any, ref : Reference, reason : Any) {
monitorChannel.publish((monitored,ref,reason))
}

Expand Down Expand Up @@ -148,9 +148,9 @@ abstract class Process(ctx : ProcessContext) extends ProcessLike with Logging wi
}
})

val monitorChannel = new MemoryChannel[(Pid,Reference,Any)]
monitorChannel.subscribe(ctx.fiber, new Callback[(Pid,Reference,Any)] {
def onMessage(msg : (Pid,Reference,Any)) {
val monitorChannel = new MemoryChannel[(Any,Reference,Any)]
monitorChannel.subscribe(ctx.fiber, new Callback[(Any,Reference,Any)] {
def onMessage(msg : (Any,Reference,Any)) {
try {
trapMonitorExit(msg._1, msg._2, msg._3)
} catch {
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/scalang/node/HandshakeMessages.scala
Expand Up @@ -65,7 +65,8 @@ object DistributionFlags {
val newFloats = 0x800

val default = extendedReferences | extendedPidsPorts |
bitBinaries | newFloats | funTags | newFunTags | distMonitor
bitBinaries | newFloats | funTags | newFunTags |
distMonitor | distMonitorName
}

class ErlangAuthException(msg : String) extends Exception(msg)
2 changes: 1 addition & 1 deletion src/main/scala/scalang/node/Monitor.scala
Expand Up @@ -17,7 +17,7 @@ package scalang.node

import scalang._

case class Monitor(monitoring : Pid, monitored : Pid, ref : Reference) extends MonitorListenable {
case class Monitor(monitoring : Pid, monitored : Any, ref : Reference) extends MonitorListenable {
def monitorExit(reason : Any) {
notifyMonitorExit(this, reason)
}
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/scalang/node/NodeMessages.scala
Expand Up @@ -35,10 +35,10 @@ case object Tock

//must implement trace tags later

case class MonitorMessage(monitoring : Pid, monitored : Pid, ref : Reference)
case class MonitorMessage(monitoring : Pid, monitored : Any, ref : Reference)

case class DemonitorMessage(monitoring : Pid, monitored : Pid, ref : Reference)
case class DemonitorMessage(monitoring : Pid, monitored : Any, ref : Reference)

case class MonitorExitMessage(monitored : Pid, monitoring : Pid, ref : Reference, reason : Any)
case class MonitorExitMessage(monitored : Any, monitoring : Pid, ref : Reference, reason : Any)

class DistributedProtocolException(msg : String) extends Exception(msg)
4 changes: 2 additions & 2 deletions src/main/scala/scalang/node/ProcessLike.scala
Expand Up @@ -99,11 +99,11 @@ trait ProcessLike extends ExitListenable with SendListenable with LinkListenable
links.remove(Link(self, to))
}

def handleMonitorExit(monitored : Pid, ref : Reference, reason : Any) {
def handleMonitorExit(monitored : Any, ref : Reference, reason : Any) {
// Empty
}

def monitor(monitored : Pid): Reference = {
def monitor(monitored : Any): Reference = {
val m = Monitor(self, monitored, makeRef)
for (listener <- monitorListeners) {
listener.deliverMonitor(m)
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/scalang/node/ScalaTermDecoder.scala
Expand Up @@ -93,11 +93,11 @@ class ScalaTermDecoder(peer : Symbol, factory : TypeFactory) extends OneToOneDec
RegSend(from, to, msg)
case (8, from : Pid, to : Pid, reason : Any) =>
Exit2Message(from, to, reason)
case (19, monitoring : Pid, monitored: Pid, ref : Reference) =>
case (19, monitoring : Pid, monitored: Any, ref : Reference) =>
MonitorMessage(monitoring, monitored, ref)
case (20, monitoring : Pid, monitored : Pid, ref : Reference) =>
case (20, monitoring : Pid, monitored : Any, ref : Reference) =>
DemonitorMessage(monitoring, monitored, ref)
case (21, monitored : Pid, monitoring : Pid, ref : Reference, reason : Any) =>
case (21, monitored : Any, monitoring : Pid, ref : Reference, reason : Any) =>
MonitorExitMessage(monitored, monitoring, ref, reason)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/scalang/MonitorProcess.scala
Expand Up @@ -14,7 +14,7 @@ class MonitorProcess(ctx : ProcessContext) extends Process(ctx) {
'ok
}

override def trapMonitorExit(pid : Pid, ref : Reference, reason : Any) {
override def trapMonitorExit(pid : Any, ref : Reference, reason : Any) {
if (expectedPid == pid && expectedRef == ref) {
sendTo ! 'monitor_exit
}
Expand Down

0 comments on commit 17917fa

Please sign in to comment.