Permalink
Browse files

Extracted the code from DTS

  • Loading branch information...
Erik Allik
Erik Allik committed Feb 20, 2012
0 parents commit d5a17b4ccf494459c8c47f311519fdf40205b016
@@ -0,0 +1 @@
+*.pyc
22 LICENSE
@@ -0,0 +1,22 @@
+Copyright (c) 2012, Erik Allik
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+2. Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
1 README
@@ -0,0 +1 @@
+
@@ -0,0 +1,15 @@
+from setuptools import setup, find_packages
+
+setup(
+ name="spinoff",
+ version="0.1",
+ packages=find_packages(),
+
+ install_requires=[
+ 'twisted==12.0',
+ 'zope.interface',
+ ],
+
+ author="Erik Allik",
+ author_email="erik.allik@skype.net",
+)
No changes.
@@ -0,0 +1 @@
+from .component import *
@@ -0,0 +1,218 @@
+from collections import defaultdict
+
+from twisted.application import service
+from twisted.application.service import Service
+from twisted.internet.defer import Deferred, inlineCallbacks, returnValue, DeferredQueue
+from zope.interface import Interface, implements
+
+from spinoff.util.meta import selfdocumenting
+
+
+__all__ = ['IComponent', 'IProducer', 'IConsumer', 'Component', 'Pipeline', 'Application']
+
+
+class NoRoute(Exception):
+ pass
+
+
+class IProducer(Interface):
+
+ def connect(outbox, (inbox, component)):
+ """Connects the `outbox` of this component to one of the `inbox`es of another `component`.
+
+ It is legal to pass in `self` as the value of `component` if needed.
+
+ """
+
+class IConsumer(Interface):
+
+ def deliver(message, inbox, routing_key):
+ """Delivers an incoming `message` into one of the `inbox`es of this component with an optional `routing_key`.
+
+ The `routing_key` argument is intended for writing routers to be able to have branching in the flow of
+ messages so as to avoid having to resort to using a non-static number of outboxes.
+
+ Returns a `Deferred` which will be fired when this component has received the `message`.
+
+ """
+
+
+class IComponent(IProducer, IConsumer):
+ pass
+
+
+class Component(object, Service):
+ implements(IComponent)
+
+ def __init__(self, *args, **kwargs):
+ super(Component, self).__init__(*args, **kwargs)
+ self._inboxes = defaultdict(lambda: DeferredQueue(backlog=1))
+ self._waiting = {}
+ self._outboxes = {}
+
+ def deliver(self, message, inbox, routing_key):
+ d = Deferred()
+ self._inboxes[inbox].put((message, d))
+ return d
+
+ def connect(self, outbox=None, to=None):
+ """%(parent_doc)s
+
+ The connection (`to`) can be either a tuple of `(<inbox>, <receiver>)` or just `receiver`, in which case `<inbox>` is
+ taken to be the same as `outbox`.
+
+ If no `outbox` is specified, it is taken to be `'default'`, thus:
+
+ `comp_a.connect(to=...)`
+
+ is equivalent to:
+
+ `comp_a.connect('default', ...)`
+
+ and
+
+ `comp_a.connect(to=comp_b)`
+
+ is equivalent to:
+
+ `a.connect('default', ('default', b))`
+
+ """
+ if isinstance(outbox, list):
+ for o in outbox:
+ self._connect(o, to)
+ else:
+ self._connect(outbox, to)
+ connect.__doc__ %= {'parent_doc': IComponent.getDescriptionFor('connect').getDoc()}
+
+ def _connect(self, outbox, to):
+ inbox, receiver = (to if isinstance(to, tuple) else (outbox, to))
+ self._outboxes.setdefault(outbox, []).append((inbox, receiver))
+
+ @selfdocumenting
+ def short_circuit(self, outbox, inbox=None):
+ if inbox is None: inbox = outbox
+ self.connect(outbox, (inbox, self))
+
+ @inlineCallbacks
+ def get(self, inbox='default'):
+ """Retrieves a message from the specified `inbox`.
+
+ Returns a `Deferred` which will be fired when a message is available in the specified `inbox` to be returned.
+
+ This method will not complain if nothing has been connected to the requested `inbox`.
+
+ """
+ message, d = yield self._inboxes[inbox].get()
+ d.callback(None)
+ returnValue(message)
+
+ @inlineCallbacks
+ def put(self, message, outbox='default', routing_key=None):
+ """Puts a `message` into one of the `outbox`es of this component with an optional `routing_key`.
+
+ If the specified `outbox` has not been previously connected to anywhere (see `Component.connect`), a
+ `NoRoute` will be raised, i.e. outgoing messages cannot be queued locally and must immediately be delivered
+ to an inbox of another component and be queued there (if/as needed).
+
+ Returns a `Deferred` which will be fired when the messages has been delivered to all connected components.
+
+ """
+ if outbox not in self._outboxes:
+ raise NoRoute
+
+ connections = self._outboxes[outbox]
+ ds = []
+ for inbox, component in connections:
+ ds.append(component.deliver(message, inbox, routing_key))
+ for d in ds: yield d
+
+ # `startService` and `stopService` are ugly name because they 1) repeat the class
+ # name and 2) not all `Service`s want to be labelled as "services".
+ # Thus, we effectively rename `startService`/`stopService` to `start`/`stop` for subclasses
+ # to override.
+ def startService(self):
+ self.start()
+
+ def stopService(self):
+ self.stop()
+
+ def start(self): pass
+ def stop(self): pass
+
+ def debug_state(self, name=None):
+ for inbox, queue in self._inboxes.items():
+ print '*** %s.INBOX %s:' % (name or '', inbox)
+ for message, _ in queue.pending:
+ print '*** \t%s' % message
+
+
+def _normalize_pipe(pipe):
+ if not isinstance(pipe, tuple):
+ pipe = (pipe, )
+ assert len(pipe) <= 3, "A pipe definition is should be a 3-tuple"
+
+ is_box = lambda x: isinstance(x, basestring)
+
+ if len(pipe) == 3:
+ assert is_box(pipe[0]), "Left item of a pipe definition should be an inbox name"
+ assert is_box(pipe[2]), "Right item of a pipe definition should be an outbox name"
+ elif len(pipe) == 1:
+ pipe = ('default', pipe[0], 'default')
+ else:
+ pipe = ('default', ) + pipe if is_box(pipe[1]) else pipe + ('default', )
+
+ assert is_box(pipe[0]) or is_box(pipe[2]), "Left and right item of a pipe definition shuld be box names"
+ return pipe
+
+
+def Pipeline(*pipes):
+ """Returns a `Pipeline` that can be used as part of an `Application`.
+
+ A `Pipeline` consists of one ore more pipes.
+
+ A pipe is a connection/link in the pipeline; a pipe connects a
+ component to its neighbouring components via inboxes and outboxes;
+ the normalized form of a pipe definition is a 3-tuple of the form:
+
+ `(<inbox-name>, <component>, <outbox-name>)`
+
+ where `inbox-name`
+ and `outbox-name` should be strings; a pipe definition can
+ optionally be shortened to following forms:
+
+ `(<inbox-name>, <component>)`
+ `(<component>, <outbox-name>)`
+ `(<component>, )`
+ `<component>`
+
+ each of which will be normalized, unspecified box names defaulting
+ to `'default'`.
+
+ """
+ pipes = [_normalize_pipe(pipe) for pipe in pipes]
+
+ for sender, receiver in zip(pipes[:-1], pipes[1:]):
+ _, sender, outbox = sender
+ inbox, receiver, _ = receiver
+ sender.connect(outbox, (inbox, receiver))
+
+ return [pipe[1] for pipe in pipes]
+
+
+def Application(*pipelines):
+ """Returns an application object that can be run using `twistd`.
+
+ An `Application` consists of one or more pipelines.
+
+ """
+ services = []
+ for pipeline in pipelines:
+ # components = [connection[1] for stage in pipeline for connection in stage]
+ services.extend(pipeline)
+
+ application = service.Application("DTS Server")
+ for s in services:
+ s.setServiceParent(application)
+
+ return application
@@ -0,0 +1,116 @@
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from spinoff.component.component import Component, Application, Pipeline
+from spinoff.util.async import sleep
+
+
+__all__ = ['application']
+
+
+TIME_UNIT = 1.0
+
+
+class Base(Component):
+
+ def __init__(self, id, speed=1.0, *args, **kwargs):
+ super(Base, self).__init__(*args, **kwargs)
+ self.id = id
+ self.interval = TIME_UNIT / speed
+
+
+class Sender(Base):
+
+ @inlineCallbacks
+ def start(self):
+ yield sleep(.5)
+
+ while True:
+ yield self._do_send()
+ yield sleep(self.interval)
+
+ def _do_send(self, message=None):
+ print '(%s) SEND' % self.id
+ self.put(message or ('message-from-%s' % self.id))
+
+
+class Receiver(Base):
+
+ @inlineCallbacks
+ def start(self):
+ while True:
+ yield self._do_recv()
+ yield sleep(self.interval)
+
+ @inlineCallbacks
+ def _do_recv(self):
+ print '(%s) RECV' % self.id
+ message = yield self.get()
+ print '(%s) GOT.' % self.id
+ self.debug_state(self.id)
+ returnValue(message)
+
+
+class SenderReceiver(Sender, Receiver):
+
+ @inlineCallbacks
+ def start(self):
+ yield sleep(self.interval / 2)
+ while True:
+ self._do_send()
+ yield self._do_recv()
+ yield sleep(self.interval)
+
+
+
+class ReceiverSender(Sender, Receiver):
+
+ @inlineCallbacks
+ def start(self):
+ while True:
+ yield self._do_recv()
+ yield sleep(self.interval)
+ self._do_send()
+
+
+class Repeater(Receiver, Sender):
+ @inlineCallbacks
+ def start(self):
+ while True:
+ message = yield self._do_recv()
+ self._do_send(message)
+
+
+sra, srb = SenderReceiver('party-b'), SenderReceiver('party-a')
+srb.connect('default', sra)
+
+application = Application(
+ # Pipeline(
+ # Sender('sender-1'),
+ # Receiver('receiver-1'),
+ # ),
+
+ # Pipeline(
+ # Sender('sender-3'),
+ # ReceiverSender('sender-receiver-3'),
+ # Receiver('receiver-3'),
+ # ),
+
+ # Pipeline(
+ # Sender('sender-4'),
+ # # LoadBalancer(
+ # Publisher(
+ # ReceiverSender('sender-receiver-4-ONE'),
+ # ReceiverSender('sender-receiver-4-TWO'),
+ # ),
+ # Receiver('receiver-4', speed=10),
+ # ),
+
+ # cyclic
+ Pipeline(sra, srb),
+
+ # Pipeline(
+ # Sender('sender-6'),
+ # Repeater('repeater'),
+ # Receiver('receiver-6'),
+ # ),
+ )
Oops, something went wrong.

0 comments on commit d5a17b4

Please sign in to comment.