Skip to content

Commit

Permalink
Improved Bridge (more inline with how node works)
Browse files Browse the repository at this point in the history
  • Loading branch information
prologic committed Jun 16, 2015
1 parent 53aebc8 commit 584fdff
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 48 deletions.
3 changes: 2 additions & 1 deletion circuits/__init__.py
Expand Up @@ -16,9 +16,10 @@
from .version import version as __version__

from .core import Event
from .core import child, Bridge
from .core import sleep, task, Worker
from .core import handler, reprhandler, BaseComponent, Component
from .core import Debugger, Bridge, Loader, Manager, Timer, TimeoutError
from .core import Debugger, Loader, Manager, Timer, TimeoutError

# flake8: noqa

Expand Down
2 changes: 1 addition & 1 deletion circuits/core/__init__.py
Expand Up @@ -5,8 +5,8 @@


from .events import Event
from .bridge import Bridge
from .loader import Loader
from .bridge import child, Bridge
from .handlers import handler, reprhandler
from .components import BaseComponent, Component
from .manager import sleep, Manager, TimeoutError
Expand Down
73 changes: 53 additions & 20 deletions circuits/core/bridge.py
Expand Up @@ -5,36 +5,49 @@
"process mode" via :meth:`circuits.core.manager.start`. Typically a
Pipe is used as the socket transport between two sides of a Bridge
(*there must be a :class:`~Bridge` instnace on both sides*).
"""


import traceback

try:
from cPickle import dumps, loads
except ImportError:
from pickle import dumps, loads # NOQA


from ..six import b
from .values import Value
from .events import Event, exception
from .handlers import handler
from .events import Event, exception
from .components import BaseComponent
from ..six import b


_sentinel = b('~~~')


class child(Event):
"""child Event
Send an event to a child process
"""

def __init__(self, event, channel=None):
"""
:param event: Event to execute remotely.
:type event: :class:`circuits.core.events.Event`
:param channel: Remote channel (channel to use on peer).
:type channel: str
"""

super(child, self).__init__(event, channel=channel)


class Bridge(BaseComponent):

channel = "bridge"

ignore = [
"registered", "unregistered", "started", "stopped", "error",
"value_changed", "generate_events", "read", "write", "close",
"connected", "connect", "disconnect", "disconnected", "_read",
"_write", "ready", "read_value_changed", "prepare_unregister"
]

def init(self, socket, channel=channel):
self._socket = socket
self._values = dict()
Expand Down Expand Up @@ -76,7 +89,7 @@ def _on_read(self, data):
for item in data[:-1]:
self._process_packet(*loads(item))

def send(self, eid, event):
def __send(self, eid, event):
try:
if isinstance(event, exception):
Bridge.__adapt_exception(event)
Expand All @@ -88,16 +101,36 @@ def send(self, eid, event):
def __write(self, eid, data):
self._socket.write(dumps((eid, data)) + _sentinel)

@handler(channel="*", priority=100.0)
def _on_event(self, event, *args, **kwargs):
if event.name in self.ignore or getattr(event, "remote", False) \
or event.name.endswith('_done') \
or event.name.endswith('_success') \
or event.name.endswith('_complete'):
return
@handler("child")
def _on_child(self, event, child_event, channel=None):
"""Send event to a child process
Event handler to run an event on a child process
(the event definition is :class:`circuits.core.bridge.child`)
:param event: The event triggered (by the handler)
:type event: :class:`circuits.node.events.remote`
:param child_event: Event to execute in child process.
:type child_event: :class:`circuits.core.events.Event`
:param channel: Remote channel (channel to use on peer).
:type channel: str
:return: The result of remote event
:rtype: generator
:Example:
``# hello is your event to execute in the child process
result = yield self.fire(child(hello()))
print(result.value)``
"""

child_event.channels = (channel,) if channel is not None else event.channels
event.value.value = child_event.value = Value(child_event, self)

eid = hash(event)
self.send(eid, event)
eid = hash(child_event)
self.__send(eid, child_event)
yield self.wait(Bridge.__waiting_event(eid))

@staticmethod
Expand Down
3 changes: 0 additions & 3 deletions circuits/core/manager.py
Expand Up @@ -973,9 +973,6 @@ def run(self, socket=None):

if socket is not None:
from circuits.core.bridge import Bridge
from circuits.core.debugger import Debugger

Debugger().register(self)
Bridge(socket, channel=socket.channel).register(self)

self.fire(started(self))
Expand Down
45 changes: 26 additions & 19 deletions examples/hello_bridge.py
Expand Up @@ -2,39 +2,46 @@

"""Bridge Example
To use this example run it interactively through the Python interactive
shell using the -i option as per the shebang line above.
This example is quite similar to the Hello example
but displays a hello form both the parent and child
processing demonstrating how IPC works using the Bridge.
"""

i.e: python -i hello_bridge.py
At the python prompt:
>>> x = m.fire(Hello())
.
.
.
>>> x
<Value ('Hello World! (15969)') result=True; errors=False; for <Hello[*.hello] ( )>
""" # noqa

from __future__ import print_function

from os import getpid


from circuits import Component, Debugger, Event, Manager
from circuits import child, Event, Component


class hello(Event):

"""hello Event"""


class Child(Component):

def hello(self):
return "Hello from child with pid {0}".format(getpid())


class App(Component):

def init(self):
Child().start(process=True, link=self)

def ready(self, *args):
x = yield self.call(hello())
yield print(x)

y = yield self.call(child(hello()))
yield print(y)

raise SystemExit(0)

def hello(self):
return "Hello World! ({0:d})".format(getpid())
return "Hello from parent with pid {0}".format(getpid())


m = Manager() + Debugger()
m.start()
App().start(process=True, link=m)
App().run()
9 changes: 5 additions & 4 deletions tests/core/test_bridge.py
Expand Up @@ -8,13 +8,14 @@

pytest.importorskip("multiprocessing")


from os import getpid

from circuits import Component, Event

from circuits import child, Component, Event

class hello(Event):

class hello(Event):
"""hello Event"""


Expand All @@ -27,9 +28,9 @@ def hello(self):
def test(manager, watcher):
app = App()
process, bridge = app.start(process=True, link=manager)
assert watcher.wait("ready", timeout=30)
assert watcher.wait("ready")

x = manager.fire(hello())
x = manager.fire(child(hello()))

assert pytest.wait_for(x, "result")

Expand Down

0 comments on commit 584fdff

Please sign in to comment.