Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Geting the ZMQ gateway consistified, and improve decompression handling.

  • Loading branch information...
commit 65f69f351a6bc9a7fb6c893b184bb23b2807e088 1 parent cc74ce4
Greg Taylor authored
Showing with 60 additions and 36 deletions.
  1. +29 −17 bin/emdr-gateway-zmq
  2. +31 −19 emdr/daemons/gateway_zmq/main.py
46 bin/emdr-gateway-zmq
View
@@ -2,8 +2,9 @@
"""
ZeroMQ gateway application.
"""
+from logging.config import dictConfig
import argparse
-from emdr.conf import default_settings
+from emdr.conf import default_settings as settings
from emdr.core.command_utils import set_logger_level, print_cmd_header, print_cmd_footer
parser = argparse.ArgumentParser(
@@ -19,33 +20,44 @@ parser.add_argument(
"the gateway sends its messages, typically an Announcer.")
parser.add_argument(
'--loglevel', action='store', dest='loglevel',
- help="Overrides default logger level (DEBUG, INFO, WARNING, ERROR) (default: %s)" % default_settings.LOGGING['loggers']['']['level'])
+ help="Overrides default logger level (DEBUG, INFO, WARNING, ERROR) "
+ "(default: %s)" % settings.LOGGING['loggers']['']['level'])
parser.add_argument(
'--workers', action='store', dest='workers',
- help="Overrides default worker count (default: %s)" % default_settings.GATEWAY_ZMQ_NUM_WORKERS)
+ help="Overrides default worker count "
+ "(default: %s)" % settings.GATEWAY_ZMQ_NUM_WORKERS)
parsed = parser.parse_args()
-print_cmd_header('emdr-gateway')
+print_cmd_header('emdr-gateway-zmq')
if parsed.listeners:
- print("* Overriding default gateway receiver bindings: %s" % parsed.listeners)
- default_settings.GATEWAY_ZMQ_RECEIVER_BINDINGS = parsed.listeners
-
+ settings.GATEWAY_ZMQ_RECEIVER_BINDINGS = parsed.listeners
if parsed.senders:
- default_settings.GATEWAY_ZMQ_SENDER_BINDINGS = parsed.senders
- print("* Overriding default gateway sender bindings: %s" % parsed.senders)
+ settings.GATEWAY_ZMQ_SENDER_BINDINGS = parsed.senders
+if parsed.workers:
+ settings.GATEWAY_ZMQ_NUM_WORKERS = int(parsed.workers)
-if parsed.loglevel:
- set_logger_level(parsed.loglevel)
- print("* Logging level set to: %s" % parsed.loglevel)
+set_logger_level(parsed.loglevel)
+print("* Logging level: %s" % parsed.loglevel)
-if parsed.workers:
- default_settings.GATEWAY_ZMQ_NUM_WORKERS = int(parsed.workers)
- print("* Worker count set to: %s" % parsed.workers)
+print("* Worker count: %s" % settings.GATEWAY_ZMQ_NUM_WORKERS)
+
+print("* Accepting PUB connections on:")
+for binding in settings.GATEWAY_ZMQ_RECEIVER_BINDINGS:
+ print(" - %s" % binding)
+
+print("* Sending market data over PUB to Announcers:")
+for binding in settings.GATEWAY_ZMQ_SENDER_BINDINGS:
+ print(" - %s" % binding)
print_cmd_footer()
-#noinspection PyUnresolvedReferences
+dictConfig(settings.LOGGING)
+
from emdr.daemons.gateway_zmq import main
-main.start()
+# Get the ZMQ gateway running.
+try:
+ main.run()
+except KeyboardInterrupt:
+ print('Gateway stopped by keyboard interrupt.')
50 emdr/daemons/gateway_zmq/main.py
View
@@ -1,37 +1,31 @@
"""
This gateway accepts compressed unified uploader format messages over ZMQ.
"""
-# Logging has to be configured first before we do anything.
import logging
import zlib
-from logging.config import dictConfig
-from emdr.conf import default_settings as settings
-dictConfig(settings.LOGGING)
-logger = logging.getLogger('src.daemons.gateway_zmq.main')
+logger = logging.getLogger(__name__)
import ujson
import gevent
-from gevent import monkey; gevent.monkey.patch_all()
import zmq.green as zmq
+from emdr.conf import default_settings as settings
from emdr.daemons.gateway.exceptions import MalformedUploadError
from emdr.core.serialization.exceptions import InvalidMarketOrderDataError
from emdr.core.serialization import unified
-def start():
+def run():
"""
- Fires up the announcer process.
+ Fires up the gateway-zmq process.
"""
context = zmq.Context()
receiver = context.socket(zmq.REP)
for binding in settings.GATEWAY_ZMQ_RECEIVER_BINDINGS:
- logger.info("Accepting connections from %s" % binding)
receiver.bind(binding)
sender = context.socket(zmq.PUB)
for binding in settings.GATEWAY_ZMQ_SENDER_BINDINGS:
- logger.info("Sending data to %s" % binding)
sender.connect(binding)
def send_reply(success, message=None):
@@ -47,27 +41,45 @@ def send_reply(success, message=None):
compressed_response = zlib.compress(ujson.dumps(response_dict))
receiver.send(compressed_response)
+ def get_decompressed_message(message):
+ """
+ De-compresses the incoming message using zlib. Attempts several
+ different decompression methods, ending with the most permissive one.
+
+ :rtype: str
+ :returns: The de-compressed message JSON string.
+ :raises: zlib.error if decompression fails for all of our attempts.
+ """
+ try:
+ return zlib.decompress(message)
+ except zlib.error:
+ # The default decompression method failed, let's fall through to
+ # another approach.
+ pass
+
+ # If this succeeds, great, return. If not, let the zlib.error get
+ # passed up to the invoking worker.
+ return zlib.decompress(message, -15)
+
def worker():
"""
This is the worker function that re-sends the incoming messages out
to any subscribers.
-
- :param str message: A JSON string to re-broadcast.
"""
while True:
message = receiver.recv()
try:
- decompressed = zlib.decompress(message)
+ decompressed = get_decompressed_message(message)
except zlib.error as exc:
send_reply(False, message=exc.message)
- return
+ continue
try:
parsed_message = unified.parse_from_json(decompressed)
except (InvalidMarketOrderDataError, MalformedUploadError) as exc:
send_reply(False, message=exc.message)
- return
+ continue
# All is well.
send_reply(True)
@@ -79,11 +91,11 @@ def worker():
# Relay to the Announcers.
sender.send(compressed_msg)
- logger.info("Gateway (ZMQ) is now listening for order data.")
- logger.info("Spawning %d relay workers." % settings.GATEWAY_ZMQ_NUM_WORKERS)
+ logger.info("Accepted Unified %s upload from ?" % (
+ parsed_message.result_type,
+ ))
for worker_num in range(settings.GATEWAY_ZMQ_NUM_WORKERS):
gevent.spawn(worker())
-if __name__ == '__main__':
- start()
+ logger.info("Gateway (ZMQ) is now listening for order data.")
Please sign in to comment.
Something went wrong with that request. Please try again.