Skip to content
This repository has been archived by the owner on Dec 28, 2017. It is now read-only.

Commit

Permalink
Adding emdr-gateway and emdr-snooper bin convenience commands.
Browse files Browse the repository at this point in the history
  • Loading branch information
Greg Taylor committed Apr 16, 2012
1 parent a7354ee commit c8f0e2d
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 21 deletions.
32 changes: 32 additions & 0 deletions bin/emdr-gateway
@@ -0,0 +1,32 @@
#!/usr/bin/env python
"""
WSGI gateway application. Runs using bottle+gevent.
"""
import argparse
from bottle import run
from emdr.conf import default_settings

parser = argparse.ArgumentParser(
description="The gateway accepts uploads from player uploader clients " \
"over HTTP. Market data is then sent to the broker.",
)
parser.add_argument(
'--sender', action='append', dest='senders',
help="Overrides default gateway sender bindings. This determines where " \
"the gateway sends its messages, typically a broker.")

parsed = parser.parse_args()

if parsed.senders:
print("* Overriding default gateway sender bindings.")
default_settings.GATEWAY_SENDER_BINDINGS = parsed.senders

#noinspection PyUnresolvedReferences
from emdr.daemons.gateway import wsgi

# Fire up a bottle+gevent process.
run(
host='localhost',
port=8080,
server='gevent',
)
39 changes: 39 additions & 0 deletions bin/emdr-snooper
@@ -0,0 +1,39 @@
#!/usr/bin/env python
"""
CLI util used to attach to ZMQ sockets to listen to whatever is coming down.
Currently hard-wired to SUB.
"""
import argparse
import zlib
import zmq
import simplejson
from pprint import pprint

parser = argparse.ArgumentParser(
description="Connects to a PUB ZMQ socket and prints whatever is coming" \
"out. ",
)
parser.add_argument('receiver', nargs=1, help="The ZMQ socket to connect to.")

parsed = parser.parse_args()

receiver_uri = parsed.receiver[0]

context = zmq.Context()
subscriber = context.socket(zmq.SUB)

# Connect to the first publicly available relay.
subscriber.connect(receiver_uri)
# Disable filtering.
subscriber.setsockopt(zmq.SUBSCRIBE, "")

print("Connected to %s" % receiver_uri)

while True:
# Receive raw market JSON strings.
market_json = zlib.decompress(subscriber.recv())
# Un-serialize the JSON data to a Python dict.
market_data = simplejson.loads(market_json)
# Dump the market data to stdout. Or, you know, do more fun
# things here.
pprint(market_data)
2 changes: 1 addition & 1 deletion bin/fake_history.py
Expand Up @@ -33,7 +33,7 @@
data = simplejson.dumps(data)

r = requests.post(
'http://localhost:8080/emdr/upload/unified/',
'http://localhost:8080/upload/unified/',
data=data,
)
print "RESPONSE"
Expand Down
2 changes: 1 addition & 1 deletion bin/fake_order.py
Expand Up @@ -44,7 +44,7 @@
data = simplejson.dumps(data)

r = requests.post(
'http://localhost:8080/emdr/upload/unified/',
'http://localhost:8080/upload/unified/',
data=data,
)
print "RESPONSE"
Expand Down
9 changes: 8 additions & 1 deletion emdr/daemons/gateway/order_pusher.py
Expand Up @@ -6,6 +6,7 @@
import logging
import zlib
import simplejson
import gevent
from gevent.queue import Queue
from gevent_zeromq import zmq
from emdr.conf import default_settings as settings
Expand All @@ -21,7 +22,9 @@
# Get the list of transports to bind from settings. This allows us to listen
# for processor connections from multiple places (UNIX sockets + TCP sockets).
# By default, we only listen for UNIX domain sockets.
logger.info("Order data will be sent to:")
for binding in settings.GATEWAY_SENDER_BINDINGS:
logger.info(" * %s" % binding)
sender.connect(binding)

def worker():
Expand All @@ -46,4 +49,8 @@ def worker():
sender.send(compressed_msg)
logger.info('Pushed message of length %s' % len(compressed_msg))


# Fire up gevent workers that send raw market order data to processor processes
# in the background without blocking the WSGI app.
logger.info("Spawning %d PUSH workers." % settings.NUM_GATEWAY_SENDER_WORKERS)
for worker_num in range(settings.NUM_GATEWAY_SENDER_WORKERS):
gevent.spawn(worker)
21 changes: 3 additions & 18 deletions emdr/daemons/gateway/wsgi.py
Expand Up @@ -12,7 +12,7 @@
from logging.config import dictConfig
from emdr.conf import default_settings as settings
dictConfig(settings.LOGGING)
logger = logging.getLogger('src.daemons.gateway.wsgi')
logger = logging.getLogger('emdr.daemons.gateway.wsgi')

import gevent
from gevent import monkey; gevent.monkey.patch_all()
Expand Down Expand Up @@ -53,6 +53,7 @@ def upload_eve_marketeer():
# The job dict gets shoved into a gevent queue, where it awaits sending
# to the processors via the src.daemons.gateway.order_pusher module.
order_pusher.order_upload_queue.put(job_dict)
logger.info("Accepted upload from %s" % request.remote_addr)

# Goofy, but apparently expected by EVE Market Data Uploader.
return '1'
Expand All @@ -73,23 +74,7 @@ def upload_eve_marketeer():
# The job dict gets shoved into a gevent queue, where it awaits sending
# to the processors via the src.daemons.gateway.order_pusher module.
order_pusher.order_upload_queue.put(job_dict)
logger.info("Accepted upload from %s" % request.remote_addr)

# Goofy, but apparently expected by EVE Market Data Uploader.
return '1'

# Fire up gevent workers that send raw market order data to processor processes
# in the background without blocking the WSGI app.
for worker_num in range(settings.NUM_GATEWAY_SENDER_WORKERS):
logger.info("Spawning Gateway->Processor PUSH worker.")
gevent.spawn(order_pusher.worker)

if __name__ == '__main__':
# Start the built-in Bottle server for development, for now.
run(
host='localhost',
port=8080,
server='gevent',
)
else:
# gunicorn will eventually use this in production.
application = default_app()
2 changes: 2 additions & 0 deletions setup.py
Expand Up @@ -33,8 +33,10 @@
scripts = [
'bin/emdr-announcer',
'bin/emdr-broker',
'bin/emdr-gateway',
'bin/emdr-processor',
'bin/emdr-relay',
'bin/emdr-snooper',
'bin/ec-feeder',
]

Expand Down

0 comments on commit c8f0e2d

Please sign in to comment.