Skip to content

Commit

Permalink
Alpha release
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Dietz committed Apr 1, 2011
1 parent 8d9f52f commit f9ee679
Show file tree
Hide file tree
Showing 23 changed files with 355 additions and 95 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -0,0 +1 @@
*.pyc
29 changes: 26 additions & 3 deletions README.md
@@ -1,4 +1,27 @@
Dependencies:
# Yagi

feedgenerator
carrot (if using Rabbit)
A PubSubHubBub Publisher implementation in Python

## Why PubSubHubBub?

We wanted to use a defined spec with easy to use clients, and PubSubHubBub fit
the bill nicely.

## Why implement an external publisher?

The original impetus for this project is to adapt an existing piece of
software to sending notifications without laying the job of managing those
notifications on said implementation. Because PubSubHubBub requires an ATOM
feed, we didn\'t want the original software from having to worry about
completely unrelated functionality. Additionally, this route buys us a
lot of flexibility.

## Dependencies:

* feedgenerator
* redis
* webob
* eventlet
* daemon
* pubsubhubbub_publish (available under the publisher_clients folder after checking out the project from [Google Code](http://code.google.com/p/pubsubhubbub/source/checkout)
* carrot (if using Rabbit)
23 changes: 23 additions & 0 deletions run_tests.py
@@ -0,0 +1,23 @@
#!/usr/bin/python
import os
import sys
import unittest

import daemon
import nose
import nose.config
import nose.core

possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'yagi', '__init__.py')):
sys.path.insert(0, possible_topdir)

if __name__ == '__main__':
test_path = os.path.abspath(os.path.join('tests', 'unit'))
c = nose.config.Config(stream=sys.stdout,
env=os.environ,
verbosity=3,
workingDir=test_path)
nose.core.run(config=c, argv=sys.argv)
File renamed without changes.
Empty file added tests/unit/__init__.py
Empty file.
25 changes: 25 additions & 0 deletions tests/unit/test_serialization.py
@@ -0,0 +1,25 @@
import unittest

import stubout

import yagi.config
import yagi.serializer

class SerializerTests(unittest.TestCase):
def setUp(self):
self.stubs = stubout.StubOutForTesting()

def tearDown(self):
self.stubs.UnsetAll()

def test_load_serializer(self):
"""Contrived test for basic functionality"""

def config_get(*args, **kwargs):
return 'yagi.serializer.atom'

yagi.config.get()
self.stubs.Set(yagi.config, 'get', config_get)

ser = yagi.serializer.feed_serializer()
self.assertEqual(ser, yagi.serializer.atom)
28 changes: 16 additions & 12 deletions yagi.conf
Expand Up @@ -4,30 +4,34 @@ debug = False

[event_worker]
pidfile = yagi_event_worker.pid
daemonize = True
event_host = 127.0.0.1
user = root
password = qwerty
vhost = events
daemonize = False
event_driver = yagi.broker.rabbit.Broker

[rabbit_broker]
host = localhost
user = guest
password = guest
port = 5672
vhost = /
event_topic = events
event_driver = yagi.broker.fake.Broker
exchange = events
routing_key = events

[event_feed]
pidfile = yagi_feed.pid
daemonize = False
port = 8080
serializer_driver = yagi.serializer.atom
feed_title = Notifications

[logging]
logfile = yagi.log
default_level = DEBUG

[serializer]

[atom_serializer]
feed_title = Notifications
feed_base_url = 127.0.0.1

[persistence]
driver = yagi.persistence.redis_driver.Driver
host = localhost

[hub]
host = 127.0.0.1
port = 8000
1 change: 0 additions & 1 deletion yagi/broker/__init__.py
@@ -1 +0,0 @@

32 changes: 28 additions & 4 deletions yagi/broker/fake.py
@@ -1,10 +1,34 @@
import json
import time

import yagi.log

LOG = yagi.log.logger


class Message(object):
def ack(self):
pass


class Broker(object):
"""A flimsy class for testing the event worker"""
def __init__(self):
pass
self.pipe = open('queue.fifo', 'r')
self.callback = None

def register_callback(self, fun):
pass
self.callback = fun

def loop(self):
pass

while True:
time.sleep(2)
try:
line = self.pipe.readline()
if len(line) > 0:
LOG.debug('Got %s' % line)
line = json.loads(line)
message = Message()
self.callback(line['msg'], message)
except Exception, e:
LOG.debug(e)
19 changes: 12 additions & 7 deletions yagi/broker/rabbit.py
Expand Up @@ -2,23 +2,28 @@
import carrot.messaging

from yagi import config as conf
import yagi.log

LOG = yagi.log.logger


class Broker(object):
def __init__(self):
self.conn = carrot.connection.BrokerConnection(
hostname=conf.get('rabbit_broker', 'event_host'),
hostname=conf.get('rabbit_broker', 'host'),
port=5672,
user=conf.get('rabbit_broker', 'user'),
userid=conf.get('rabbit_broker', 'user'),
password=conf.get('rabbit_broker', 'password'),
virtual_host=conf.get('rabbit_broker', 'vhost')
)
virtual_host=conf.get('rabbit_broker', 'vhost'))
self.consumer = carrot.messaging.Consumer(
connection=self.conn,
queue=conf.get('rabbit_broker', 'event_topic')
)
exchange=conf.get('rabbit_broker', 'exchange'),
routing_key=conf.get('rabbit_broker', 'routing_key'),
queue=conf.get('rabbit_broker', 'event_topic'))

def register_callback(self, fun):
self.consumer.register_callback(fun)

def loop(self):
self.consumer.wait()
LOG.debug('Starting Carrot message loop')
self.consumer.wait()
20 changes: 17 additions & 3 deletions yagi/config.py
@@ -1,8 +1,14 @@
import ConfigParser

DEFAULT_CONF_PATH='yagi.conf'
import yagi.log

LOG = yagi.log.logger

DEFAULT_CONF_PATH = 'yagi.conf'

config = None
argv = None


def parse_conf(conf_path=None):
if not conf_path:
Expand All @@ -11,13 +17,21 @@ def parse_conf(conf_path=None):
config.read(conf_path)
return config


def get(*args, **kwargs):
global config
if not config:
config = parse_conf()

try:
var = config.get(*args)
return var
except Exception, e:
LOG.warn(e)
return kwargs.get('default', None)
return var


def setup(sys_argv):
global argv
global config
argv = sys_argv
config = parse_conf()
23 changes: 20 additions & 3 deletions yagi/event_worker.py
@@ -1,21 +1,38 @@
import json

import yagi.config
import yagi.log
import yagi.notifier
import yagi.persistence
import yagi.utils

LOG = yagi.log.logger


class EventWorker(object):
def __init__(self):
self.broker = yagi.utils.import_class(yagi.config.get('event_worker',
'event_driver'))()
self.db = yagi.persistence.persistence_driver()

def fetched_message(self, message_data, message_topic):
LOG.debug('Got message %s %s' % (message_data, message_topic))
def fetched_message(self, message_data, message):
LOG.debug('Received %s' % (message_data))
try:
obj = json.loads(message_data)
self.db.create(obj['key'], json.dumps(obj['content']))
LOG.debug('New notification created')
yagi.notifier.notify(yagi.utils.topic_url(obj['key']))
except Exception, e:
LOG.debug(e)
finally:
message.ack()

def start(self):
LOG.debug('Starting eventworker...')
self.broker.register_callback(self.fetched_message)
self.broker.loop()



def start():
event_worker = EventWorker()
event_worker.start()
1 change: 0 additions & 1 deletion yagi/feed/__init__.py
@@ -1 +0,0 @@

61 changes: 50 additions & 11 deletions yagi/feed/feed.py
@@ -1,6 +1,9 @@
import urlparse

import eventlet
from eventlet import wsgi
import routes
import routes.middleware
import webob
import webob.dec

Expand All @@ -11,19 +14,55 @@

LOG = yagi.log.logger

@webob.dec.wsgify()
def event_feed(req):
db_driver = yagi.persistence.persistence_driver()
feed_serializer = yagi.serializer.feed_serializer()
elements = db_driver.get_all('events')

response = webob.Response()
response.content_type = 'application/atom+xml'
response.body = feed_serializer.dumps(elements)
class EventFeed(object):
def __init__(self):
self.db_driver = yagi.persistence.persistence_driver()
self.feed_serializer = yagi.serializer.feed_serializer()

@webob.dec.wsgify()
def route_request(self, req):
path = req.environ['PATH_INFO'][1:].split('/')
resource = path[0]
if len(path) > 2:
raise Exception("Invalid resource")
if len(path) == 2:
index = path[1]
return self.get_one(req, resource, index)
elif len(resource) > 0:
return self.get_all_of_resource(req, resource)
return self.get_all(req)

def get_one(self, req, resource, index):
LOG.debug('get_one %s %s' % (resource, index))
elements = self.db_driver.get(resource, index)
print elements
return self.respond(req, elements)

def get_all_of_resource(self, req, resource):
LOG.debug('get_all_of_resource %s' % resource)
elements = self.db_driver.get_all_of_type(resource)
return self.respond(req, elements)

def get_all(self, req):
LOG.debug('get_all')
elements = self.db_driver.get_all()
return self.respond(req, elements)

def respond(self, req, elements):
LOG.debug('serializing feed of %d events' % len(elements))
print elements
response = webob.Response()
response.content_type = 'application/atom+xml'
response.body = self.feed_serializer.dumps(elements)
return response

def listen(self, port):
wsgi.server(eventlet.listen(('', port)), self.route_request)

return response

def start():
port = int(yagi.config.get('event_feed', 'port'))
LOG.debug('Starting feed on port %d' % port)
wsgi.server(eventlet.listen(('', port)), event_feed)
LOG.debug('Starting feed on port %d' % port)
event_feed = EventFeed()
event_feed.listen(port)
12 changes: 6 additions & 6 deletions yagi/log.py
Expand Up @@ -4,6 +4,7 @@

logger = logging


class YagiLogger(logging.Logger):
def __init__(self, name, level=None):
conf = yagi.config.parse_conf()
Expand All @@ -13,14 +14,13 @@ def __init__(self, name, level=None):
logfile = conf.get('logging', 'logfile')
if logfile:
handlers.append(logging.FileHandler(
filename=logfile
))
filename=logfile))
for handler in handlers:
logging.Logger.addHandler(self, handler)
logging.Logger.addHandler(self, handler)


def setup_logging():
logging.root = YagiLogger("YagiLogger")

logging.root = YagiLogger("YagiLogger")

logging.setLoggerClass(YagiLogger)

logging.setLoggerClass(YagiLogger)

0 comments on commit f9ee679

Please sign in to comment.