Permalink
Browse files

Split announcers out of the old relays. Announcers are now PULLers fr…

…om processors, then PUB to relays or end-users. Relays are now always PUB/SUB.
  • Loading branch information...
1 parent a4a53e1 commit 9c0638752ced4ef215a059b30f426f446c3dca16 Greg Taylor committed Apr 11, 2012
View
@@ -0,0 +1,34 @@
+#!/usr/bin/env python
+"""
+Processors connect to Announcers via PUSH, and the Announcer distributes
+the message to those below it via PUB.
+"""
+import argparse
+from emdr.conf import default_settings
+
+parser = argparse.ArgumentParser(
+ description="The announcer accepts order data from processors. " \
+ "The data is PUBlished to all SUBscribers, which are developer " \
+ "applications, and/or Relays.",
+)
+parser.add_argument(
+ '--listen', action='append', dest='listeners',
+ help="Overrides default announcer receiver bindings. This determines how " \
+ "the processors connects to this relay to PUSH messages.")
+parser.add_argument(
+ '--send', action='append', dest='senders',
+ help="Override default announcer sender bindings. This determines how " \
+ "relays or developer applications can connect to this announcer.")
+
+parsed = parser.parse_args()
+
+if parsed.listeners:
+ print("* Overriding default announcer receiver bindings.")
+ default_settings.ANNOUNCER_RECEIVER_BINDINGS = parsed.listeners
+if parsed.senders:
+ print("* Overriding default announcer sender bindings.")
+ default_settings.ANNOUNCER_SENDER_BINDINGS = parsed.senders
+
+# Importing this starts the announcer.
+from emdr.daemons.announcer import main
+main.start()
View
@@ -1,14 +1,13 @@
#!/usr/bin/env python
"""
-Shell command for starting up a relay that accepts order data from processors,
-or other relays. The data is PUBlished to all SUBscribers, which are user
-applications, and/or other relays.
+Relays listen to Announcers or other Relays for messages over SUB. Any
+messages received are pushed out to anyone below the Relay via PUB.
"""
import argparse
from emdr.conf import default_settings
parser = argparse.ArgumentParser(
- description="The relay accepts order data from processors, or other relays. " \
+ description="The relay accepts order data from announcers, or other relays. " \
"The data is PUBlished to all SUBscribers, which are developer " \
"applications, and/or other relays.",
)
@@ -21,10 +20,6 @@ parser.add_argument(
'--send', action='append', dest='senders',
help="Override default relay sender bindings. This determines how " \
"other relays or developer applications can connect to this relay.")
-parser.add_argument(
- '--announcer-mode', action='store_true', dest='is_announcer',
- help="Override default relay sender bindings. This determines how "\
- "other relays or developer applications can connect to this relay.")
parsed = parser.parse_args()
@@ -37,4 +32,4 @@ if parsed.senders:
# Importing this starts the relay.
from emdr.daemons.relay import main
-main.start(run_as_announcer=parsed.is_announcer)
+main.start()
@@ -32,12 +32,18 @@
# however, add additional gateways to the list, remote or local.
# See: http://api.zeromq.org/2-1:zmq-connect
PROCESSOR_RECEIVER_BINDINGS = ["ipc:///tmp/broker-sender.sock"]
-PROCESSOR_SENDER_BINDINGS = ["ipc:///tmp/relay-receiver.sock"]
+PROCESSOR_SENDER_BINDINGS = ["ipc:///tmp/announcer-receiver.sock"]
+
+#
+## Announcer Daemon Settings
+#
+ANNOUNCER_RECEIVER_BINDINGS = ["ipc:///tmp/announcer-receiver.sock"]
+ANNOUNCER_SENDER_BINDINGS = ["ipc:///tmp/announcer-sender.sock"]
#
## Relay Daemon Settings
#
-RELAY_RECEIVER_BINDINGS = ["ipc:///tmp/relay-receiver.sock"]
+RELAY_RECEIVER_BINDINGS = ["ipc:///tmp/announcer-sender.sock"]
RELAY_SENDER_BINDINGS = ["ipc:///tmp/relay-sender.sock"]
No changes.
@@ -0,0 +1,50 @@
+"""
+Announcers are the first daemons to get their mittens on the "finished"
+unified format messages. From here, they PUB the messages out to anyone
+SUBscribing. This could be Relays, or end-users.
+"""
+# Logging has to be configured first before we do anything.
+import logging
+from logging.config import dictConfig
+from emdr.conf import default_settings as settings
+dictConfig(settings.LOGGING)
+logger = logging.getLogger('src.daemons.announcer.main')
+
+import gevent
+from gevent import monkey; gevent.monkey.patch_all()
+from gevent_zeromq import zmq
+
+def start():
+ """
+ Fires up the announcer process.
+ """
+ context = zmq.Context()
+
+ receiver = context.socket(zmq.PULL)
+ for binding in settings.ANNOUNCER_RECEIVER_BINDINGS:
+ logger.info("Accepting connections from %s" % binding)
+ # Processors connect to announcer to PUSH messages.
+ receiver.bind(binding)
+
+ sender = context.socket(zmq.PUB)
+ for binding in settings.ANNOUNCER_SENDER_BINDINGS:
+ # Announcers offer up the data via PUB/SUB.
+ sender.bind(binding)
+
+ def relay_worker(message):
+ """
+ This is the worker function that re-sends the incoming messages out
+ to any subscribers.
+
+ :param str message: A JSON string to re-broadcast.
+ """
+ print message
+ sender.send(message)
+
+ logger.info("Announcer is now listening for order data.")
+
+ while True:
+ gevent.spawn(relay_worker, receiver.recv())
+
+if __name__ == '__main__':
+ start()
@@ -30,8 +30,8 @@ def start():
for binding in settings.PROCESSOR_RECEIVER_BINDINGS:
receiver.connect(binding)
- sender = context.socket(zmq.PUB)
- for binding in settings.RELAY_RECEIVER_BINDINGS:
+ sender = context.socket(zmq.PUSH)
+ for binding in settings.PROCESSOR_SENDER_BINDINGS:
sender.connect(binding)
# We use a greenlet pool to cap the number of workers at a reasonable level.
View
@@ -1,8 +1,6 @@
"""
-There are two kinds of relays, announcer relays, and normal relays. Announcer
-relays have a bound socket that processors connect to in order to PUB messages.
-The announcer then re-broadcasts the messages to any normal relays beneath it
-that are subscribed to the announcer.
+Relays sit below an announcer, or another relay, and simply repeat what
+they receive over PUB/SUB.
"""
# Logging has to be configured first before we do anything.
import logging
@@ -15,41 +13,23 @@
from gevent import monkey; gevent.monkey.patch_all()
from gevent_zeromq import zmq
-def start(run_as_announcer=False):
+def start():
"""
Fires up the relay process.
-
- :keyword bool run_as_announcer: If ``True``, this relay will run in
- announcer mode. In announcer mode, the relay accepts connections
- from processor workers, instead of SUBscribing to an upstream relay
- to get its messages.
"""
- if run_as_announcer:
- logger.info("Starting in announcer mode.")
# These form the connection to the Gateway daemon(s) upstream.
context = zmq.Context()
receiver = context.socket(zmq.SUB)
receiver.setsockopt(zmq.SUBSCRIBE, '')
for binding in settings.RELAY_RECEIVER_BINDINGS:
- # Relays that are running in 'Announcer' mode have processor workers
- # connecting to PUB messages to. Relays running in normal relay mode
- # connect to an announcer and SUB to them. Announcers are at the root
- # of the relay hierarchy (level 1), whereas normal relays are the
- # branches.
- if run_as_announcer:
- logger.info("Accepting connections from %s" % binding)
- # Processors connect to announcer relays to PUB messages.
- receiver.bind(binding)
- else:
- # Relays connect to an announcer and SUB to get messages.
- logger.info("Subscribing to %s" % binding)
- receiver.connect(binding)
+ # Relays bind upstream to an Announcer, or another Relay.
+ receiver.connect(binding)
+ logger.info("Listening to %s" % binding)
sender = context.socket(zmq.PUB)
for binding in settings.RELAY_SENDER_BINDINGS:
- # Regardless of mode, relays always have a ZeroMQ socket that may be
- # connected to in order to get at the data.
+ # End users, or other relays, may attach here.
sender.bind(binding)
def relay_worker(message):

0 comments on commit 9c06387

Please sign in to comment.