Skip to content

Commit

Permalink
Replace stomp.py with stompest, implement reconnect, and more...
Browse files Browse the repository at this point in the history
* Replace stomp.py with stompest
* Make our messagebroker module expose a high level interface to stomp that's easy to use
* Implement reconnect to the STOMP server
* Give the taskrunner a -debug mode
* Start making better use of python's logger
* Drop explicit versions from our requirements.txt
* Update our todos including an extra one about the gareth gareth as an installable library so in the future it can be installed with pip
  • Loading branch information
dantman committed Apr 6, 2013
1 parent 8c217e8 commit d13803c
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 99 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ Gareth depends on these python libraries:
* `pytz`
* `pygments`
* `diff-match-patch`
* `stomp.py`
* `stompest`

You can use `pip install -r requirements.txt` to install all the necessary modules at the last version we tested them with.
You can use `pip install -r requirements.txt` to install all the necessary modules.

If you wish to use MySQL as the database you will also need the `mysql-python` library.

Expand Down
21 changes: 14 additions & 7 deletions gareth/settings_user-sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
# Enable development settings
from settings_dev import *

# Make this unique, and don't share it with anybody.
SECRET_KEY = [...]

REPO_PATH = [...]

# Use secure cookies (only do this if using https://)
SESSION_COOKIE_SECURE = True

# Setup the database
DATABASES = {
'default': {
Expand All @@ -14,10 +22,9 @@
}
}

# Make this unique, and don't share it with anybody.
SECRET_KEY = [...]

REPO_PATH = [...]

# Use secure cookies (only do this if using https://)
SESSION_COOKIE_SECURE = True
# Setup the message queue system
STOMP = {
'host': ('localhost', 61613),
'user': '',
'pass': '',
}
8 changes: 6 additions & 2 deletions garethgit/procgit.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import re, logging
# @todo Explicitly use gevent's modules instead of the native ones
from subprocess import Popen, PIPE
from select import select
from select import error as SelectError
import re

logger = logging.getLogger('procgit')
logger.setLevel(logging.DEBUG)

def collect_chunks(buffer):
def handler(chunk):
Expand Down Expand Up @@ -45,7 +49,7 @@ def run(self):
env = {}
if self.git_dir:
env['GIT_DIR'] = self.git_dir
print "Running %s" % args
logger.debug("Running %s" % args)
p = Popen(args, stdout=PIPE, stderr=PIPE, cwd=self.git_dir, env=env)
rlist = [p.stderr, p.stdout]
wlist = []
Expand Down
2 changes: 1 addition & 1 deletion garethweb/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# -*- coding: utf-8 -

SUPPORTED_GIT_PROTOCOLS = ('http', 'https', 'ftp', 'ftps', 'rsync', 'git')
SUPPORTED_GIT_PROTOCOLS = ('http', 'https', 'ftp', 'ftps', 'rsync', 'git')
58 changes: 22 additions & 36 deletions garethweb/daemon/taskrunner.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,30 @@
import time, logging, json
from garethweb.messagebroker import StompConnection
import logging, gevent
from garethweb.messagebroker import client as mb_client
from garethweb.models import Remote

logging.basicConfig(level=logging.DEBUG)

class TaskListener(object):
def __init__(self, out):
self.out = out

def on_error(self, headers, message):
logging.error(message)

def on_message(self, headers, message):
logging.info(message)
if headers.get('destination').startswith('/queue/task.remote.fetch'):
cmd = json.loads(message)
r = Remote.objects.get(name=cmd['name'])
self.out.write("Running fetch for %s (%s)\n" % (r.name, r.project.name))
r.run_fetch()
logger = logging.getLogger('taskrunner')
logger.setLevel(logging.DEBUG)

class TaskRunner(object):
def run(self, out):
out.write("Task runner starting\n")
stomp = StompConnection()
stomp.set_listener('task', TaskListener(out))
stomp.start()
stomp.connect(wait=True)
out.write("Connected\n")
stomp.subscribe(destination='/queue/task.remote.fetch', id='task-remote-fetch')
out.write("Subscribed to /queue/task.remote.fetch\n")
def on_task_remote_fetch(self, cmd, frame):
logging.debug(frame.body)
r = Remote.objects.get(name=cmd['name'])
logger.debug("Running fetch for %s (%s)" % (r.name, r.project.name))
r.run_fetch()

def run(self):
# @fixme Reconnect handling both on startup and while processing
logger.info("Task runner starting")
stomp = mb_client()
stomp.connect()
logger.info("Connected")
stomp.on('/queue/task.remote.fetch', self.on_task_remote_fetch)
logger.debug("Subscribed to /queue/task.remote.fetch")

try:
while True:
time.sleep(5)
except KeyboardInterrupt:
pass
stomp.join()

out.write("Quitting...\n")
stomp.stop()
logger.info("Quitting...")

if __name__ == "__main__":
import sys
TaskRunner().run(sys.stdout)
# gevent.signal(signal.SIGQUIT, gevent.shutdown)
TaskRunner().run()
22 changes: 15 additions & 7 deletions garethweb/management/commands/taskrunner.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
import logging
from django.core.management.base import BaseCommand, CommandError
from optparse import make_option
from garethweb.daemon.taskrunner import TaskRunner

class Command(BaseCommand):
# args = '<username password>'
option_list = BaseCommand.option_list + tuple( [
# make_option('-a', '--admin',
# dest="admin",
# action="store_true",
# default=False,
# help="Give the user the admin role.")
make_option('-d', '--debug',
dest="debug",
action="store_true",
default=False,
help="Enable verbose debug logging")
] )
help = 'Starts a taskrunner process'

def handle(self, *args, **options):
TaskRunner().run(self.stdout)
# @todo Support changing the logger options for loggers individually using the cli options
if options.get('debug', False):
logging.getLogger('taskrunner').setLevel(logging.DEBUG)
logging.getLogger('stompest').setLevel(logging.DEBUG)
logging.getLogger('procgit').setLevel(logging.DEBUG)
# else:
# logging.getLogger('taskrunner').setLevel(logging.INFO)

TaskRunner().run()
204 changes: 179 additions & 25 deletions garethweb/messagebroker.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,193 @@
import threading, logging, stomp
import threading, gevent, logging, json
from stompest.error import StompConnectTimeout, StompConnectionError
from stompest.config import StompConfig
from stompest.protocol import StompSpec, StompSession
from stompest.sync import Stomp
from gevent import monkey
from gareth.settings import STOMP

# Make certain that socket and select have been monkeypatched
monkey.patch_all(thread=False)

logger = logging.getLogger('messagebroker')
logger.setLevel(logging.DEBUG)

# @todo This is typically overridden by gevent and will likely result in each greenlet
# (ie: each request) having it's own connection. Our real goal is simply to avoid
# different threads from trying to use the same STOMP connection at the exact same time.
# In theory it should be perfectly fine for greenlets on the same thread to use the same STOMP connection.
# Consider tweaking this so it is in fact a real threading local instead of a greenlet local.
_local_thread = threading.local()

class BasicListener(object):
def on_error(self, headers, message):
logging.error(message)

def on_message(self, headers, message):
# Don't do anything
pass

def StompConnection():
return stomp.Connection(
STOMP['hosts'],
user=STOMP.get('user', None),
passcode=STOMP.get('pass', None),
use_ssl=STOMP.get('ssl', False),
version=STOMP.get('version', 1.0)
def _get_config():
# Hosts
hosts = STOMP.get('hosts', None)
if not hosts:
host = STOMP.get('host', None)
if not host:
host = ('localhost', 61613)
if host:
hosts = (host,)

hostname = hosts[0]
if isinstance(hostname, tuple):
hostname = hostname[0]
else:
hostname = hostname.split(':')[0]

# Uris
uris = []
for host in hosts:
if isinstance(host, tuple):
host = "%s:%d" % host
uri = "tcp://%s" % host
uris.append(uri)

if len(hosts) > 1:
# @todo Give config better control over fallover settings
final_uri = "fallover:(%s)" % ','.join(uris)
else:
final_uri = uris[0]

# User, passcode
user = STOMP.get('user', None),
if user == '':
user = None

passcode = STOMP.get('pass', None),
if passcode == '':
passcode = None

CONFIG = StompConfig(
uri=final_uri,
login=user,
passcode=passcode,
# Support the latest version of the spec that existed when we wrote this
# (and due to stompest's accepts-version header every version before that)
version=StompSpec.VERSION_1_2
)
EXTRA = {
'hostname': hostname,
}
return CONFIG, EXTRA

class Subscription(object):
def __init__(self, conn, destination, token, callback):
self.conn = conn
self.token = token
self.destination = destination
self.callback = callback

def call(self, frame):
content_type = frame.headers.get(StompSpec.CONTENT_TYPE_HEADER, '')
if content_type == 'application/json;charset=UTF-8':
return gevent.spawn(self.callback, json.loads(frame.body), frame)
logger.error("Received frame for subscription %s (%s) but did not find acceptable content-type header: %s"
% (self.destination, self.token, frame.info()))

def unsubscribe(self):
# @todo Include a receipt request and use it to unset the subscriptions key at the right time
self.conn.stompest.unsubscribe(self.token)


def get_local_stomp():
if not getattr(_local_thread, 'stomp_connection', None):
conn = StompConnection()
conn.set_listener('basic', BasicListener())
conn.start()
conn.connect()
_local_thread.stomp_connection = conn
class Client(object):
def __init__(self):
self.stompest = None
self.greenlet = None
self.subscriptions = {}
self._last_id = 0

def _next_id(self):
self._last_id += 1
return self._last_id

def connect(self):
if not self.stompest:
CONFIG, EXTRA = _get_config()
self._hostname = EXTRA.get('hostname', None)
self.stompest = Stomp(CONFIG)

if self.stompest.session.state != StompSession.DISCONNECTED:
return

while True:
try:
self.stompest.connect(host=self._hostname)
logger.info('Connected')
break
except StompConnectTimeout:
continue

if not self.greenlet:
self.greenlet = gevent.spawn(self._run)

def _run(self):
while True:
try:
frame = self.stompest.receiveFrame()
self.stompest.ack(frame)
if frame.command == 'ERROR':
logger.error(frame.info())
elif frame.command == 'MESSAGE':
token = self.stompest.message(frame)
if self.subscriptions.get(token):
subscription = self.subscriptions[token]
subscription.call(frame)
else:
logger.error("Received a message for %s (%s) but there was no matching subscription."
% (frame.headers.get(StompSpec.DESTINATION_HEADER, '???'), token))
else:
logger.warning("Unknown frame: %s" % frame.info())
# @todo Handle receipts
except (gevent.GreenletExit, KeyboardInterrupt):
# @todo Include a receipt in the disconnect. And instead of breaking right away wait for the
# receipt frame before disconnecting and consider waiting on any greenlets we started.
self.stompest.disconnect()
break
except StompConnectionError:
# We've been disconnected from the server. Try reconnecting to it.
self.connect()

def on(self, destination, callback):
self.connect()
token = self.stompest.subscribe(destination, {
StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL,
StompSpec.ID_HEADER: self._next_id(),
})
subscription = Subscription(
conn=self,
destination=destination,
token=token,
callback=callback
)
self.subscriptions[subscription.token] = subscription;

# @todo consider adding optional support for additional custom headers
def send(self, cmd, destination):
self.connect()
body = json.dumps(cmd)
headers = {}
headers[StompSpec.CONTENT_TYPE_HEADER] = 'application/json;charset=UTF-8'
self.stompest.send(destination, body, headers)

def join(self):
try:
self.connect()
except (gevent.GreenletExit, KeyboardInterrupt):
return
try:
gevent.joinall([self.greenlet])
except KeyboardInterrupt:
self.greenlet.kill(block=True)

def client():
if not getattr(_local_thread, 'client', None):
conn = Client()
# conn.connect()
_local_thread.client = conn

return _local_thread.stomp_connection
return _local_thread.client

def send(*args, **kwargs):
get_local_stomp().send(*args, **kwargs)
client().send(*args, **kwargs)

Loading

0 comments on commit d13803c

Please sign in to comment.