Permalink
Browse files

Refactor storage: Move storage into a new package; Rename storage mod…

…ules to storage back-ends; Move the storage back-ends into separate packages under storage. Implement storage back-end for the Storm ORM framework, incorporating Storm model (schema pending). Allow users to create a channel and publish, and channels to be read: Create channels in response to jabber:iq:register requests; Also return subscriptions and affilitations PubSub requests (as well as items); Attempt to send notifications of new posts to subscribers; Return node config in response to disco#info requests.
  • Loading branch information...
jamestait committed May 12, 2012
1 parent 79b5172 commit b857e57fb80af443be7e3d019b3a5ac9f24c66d4
@@ -22,8 +22,9 @@ log_folder =
backend = Memory
[Memory-storage]
-class = buddycloud.channel_server.storage.MemoryStorageModule
+class = buddycloud.channel_server.storage.memory.MemoryStorageBackend
persist = False
[Storm-storage]
-class = buddycloud.channel_server.storage.StormStorageModule
+class = buddycloud.channel_server.storage.storm.StormStorageBackend
+uri = schema://username:password@hostname:port/database_name
@@ -8,17 +8,64 @@
import uuid
import xmpp
-from datetime import datetime
+from xmpp.simplexml import (
+ ustr,
+ XML2Node,
+)
from buddycloud.channel_server.storage import init_storage
-NS_PUBSUB_OWNER = '%s#%s' % (xmpp.protocol.NS_PUBSUB, 'owner')
+NS_PUBSUB_EVENT = '%s#event' % xmpp.protocol.NS_PUBSUB
+NS_PUBSUB_OWNER = '%s#owner' % xmpp.protocol.NS_PUBSUB
NS_RSM = 'http://jabber.org/protocol/rsm'
NS_ATOM = 'http://www.w3.org/2005/Atom'
NS_THREADS = 'http://purl.org/syndication/thread/1.0'
NS_ACTIVITY_STREAMS = 'http://activitystrea.ms/spec/1.0/'
+FORM_TYPE_PUBSUB_METADATA = '%s#meta-data' % xmpp.protocol.NS_PUBSUB
+
+PUBSUB_FIELDS = {
+ 'title': {
+ 'name': 'pubsub#title',
+ 'label': 'A short name for the node',
+ 'typ': 'text-single'
+ },
+ 'description': {
+ 'name': 'pubsub#description',
+ 'label': 'A description of the node',
+ 'typ': 'text-single'
+ },
+ 'accessModel': {
+ 'name': 'pubsub#access_model',
+ 'label': 'Who may subscribe and retrieve items',
+ 'typ': 'text-single'
+ },
+ 'publishModel': {
+ 'name': 'pubsub#publish_model',
+ 'label': 'Who may publish items',
+ 'typ': 'text-single'
+ },
+ 'creationDate': {
+ 'name': 'pubsub#creation_date',
+ 'label': 'Creation date',
+ 'typ': 'text-single'
+ },
+}
+
+BUDDYCLOUD_FIELDS = {
+ 'defaultAffiliation': {
+ 'name': 'buddycloud#default_affiliation',
+ 'label': 'What role do new subscribers have?',
+ 'typ': 'text-single'
+ },
+ 'channelType': {
+ 'name': 'buddycloud#channel_type',
+ 'label': 'Type of channel',
+ 'typ': 'text-single'
+ },
+}
+
class ChannelServer(object):
"""XMPP component for buddycloud channel server."""
@@ -72,6 +119,8 @@ def register_handlers(self):
self.connection.RegisterHandler('presence', self.xmpp_presence)
self.connection.RegisterHandler(
'iq', self.xmpp_pubsub_get, typ='get', ns=xmpp.protocol.NS_PUBSUB)
+ self.connection.RegisterHandler(
+ 'iq', self.xmpp_pubsub_get, typ='get', ns=NS_PUBSUB_OWNER)
self.connection.RegisterHandler(
'iq', self.xmpp_pubsub_set, typ='set', ns=xmpp.protocol.NS_PUBSUB)
self.connection.RegisterHandler(
@@ -91,38 +140,62 @@ def xmpp_presence(self, conn, event):
def xmpp_pubsub_get(self, conn, event):
"""Callback to handle XMPP PubSub queries."""
- self.logger.debug('Pubsub request: %s', event)
+ self.logger.debug('Pubsub request: %s', ustr(event))
tag = event.getTag('pubsub')
- if tag and tag.getNamespace() == xmpp.protocol.NS_PUBSUB:
- node = tag.getTagAttr('items', 'node')
- rsm = tag.getTag('set', namespace=NS_RSM)
- set_size = int(rsm.getTagData('max'))
+ if tag and (tag.getNamespace() == xmpp.protocol.NS_PUBSUB or
+ tag.getNamespace() == NS_PUBSUB_OWNER):
+ child = tag.getChildren()[0]
+ op = child.getName()
+ node = child.getAttr('node')
+ #rsm = tag.getTag('set', namespace=NS_RSM)
+ #set_size = int(rsm.getTagData('max'))
channel = self.storage.get_node(node)
self.logger.debug(
'Got channel entries for node %s: %s', node, channel)
if channel is None:
conn.send(xmpp.protocol.Error(event, xmpp.ERR_ITEM_NOT_FOUND))
raise xmpp.protocol.NodeProcessed
reply = event.buildReply('result')
- pubsub = reply.setTag('pubsub',
- namespace=xmpp.protocol.NS_PUBSUB)
- items = pubsub.setTag('items', attrs={'node': node})
- for channel_item in sorted(channel.items(), key=lambda x: x[1][0],
- reverse=True)[:set_size]:
- item = items.setTag('item', attrs={'id': channel_item[0]})
- item.addChild(node=channel_item[1][1])
- rsm = pubsub.setTag('set', namespace=NS_RSM)
- if len(channel.items()) > 0:
- rsm.setTagData(
- 'first', channel.items()[0][0], attrs={'index': 0})
- rsm.setTagData('last', channel.items()[-1][0])
- rsm.setTagData('count', len(channel.items()))
+ if op == u'items':
+ pubsub = reply.setTag('pubsub',
+ namespace=xmpp.protocol.NS_PUBSUB)
+ items = pubsub.setTag('items', attrs={'node': node})
+ for channel_item in sorted(
+ channel.items, key=lambda x: x.updated, reverse=True):
+ item = items.setTag('item', attrs={'id': channel_item.id})
+ item.addChild(node=XML2Node(channel_item.xml))
+ elif op == u'subscriptions':
+ pubsub = reply.setTag('pubsub',
+ namespace=NS_PUBSUB_OWNER)
+ subscriptions = pubsub.setTag(
+ u'subscriptions', attrs={u'node': node})
+ for channel_item in channel.subscriptions:
+ subscriptions.setTag(u'subscription', attrs={
+ u'jid': channel_item.user,
+ u'subscription': channel_item.subscription})
+ elif op == u'affiliations':
+ pubsub = reply.setTag('pubsub',
+ namespace=NS_PUBSUB_OWNER)
+ affiliations = pubsub.setTag(u'affiliations')
+ for channel_item in channel.affiliations:
+ affiliations.setTag(u'affiliation', attrs={
+ u'jid': channel_item.user,
+ u'affiliation': channel_item.affiliation
+ })
+ #rsm = pubsub.setTag('set', namespace=NS_RSM)
+ #if len(resultset) > 0:
+ # rsm.setTagData('first', resultset[0].id if op == u'items' else
+ # resultset[0].user, attrs={'index': 0})
+ # rsm.setTagData('last', resultset[-1].id if op == u'items' else
+ # resultset[-1].jid)
+ #rsm.setTagData('count', len(resultset))
conn.send(reply)
raise xmpp.protocol.NodeProcessed
def xmpp_pubsub_set(self, conn, event):
"""Callback to handle XMPP PubSub commands."""
self.logger.debug('Pubsub command: %s', event)
+ self.logger.debug('Decoded event Node: %s', ustr(event))
tag = event.getTag('pubsub')
if tag and tag.getNamespace() == xmpp.protocol.NS_PUBSUB:
publish = tag.getTag('publish')
@@ -138,14 +211,20 @@ def xmpp_pubsub_set(self, conn, event):
entry.setTag('link', attrs={'rel': 'self', 'href':
'xmpp:%s?pubsub;action=retrieve;node=%s;item=%s' % (self.jid,
node, entry_id)})
- items = self.storage.get_node(node)
- items[entry_id] = (datetime.utcnow(), entry)
- self.storage.set_node(node, items)
+ self.storage.add_item(node, entry_id, ustr(entry))
reply = event.buildReply('result')
pubsub = reply.setTag('pubsub', namespace=xmpp.protocol.NS_PUBSUB)
publish = pubsub.setTag('publish', attrs={'node': node})
publish.setTag('item', attrs={'id': entry_id})
conn.send(reply)
+ for subscription in self.storage.get_node(node).subscriptions:
+ message = xmpp.protocol.Message(
+ typ='headline', frm=self.jid, to=subscription.user)
+ event = message.setTag('event', namespace=NS_PUBSUB_EVENT)
+ items = event.setTag('items', attrs={'node': node})
+ item = items.setTag('item', attrs={'id': entry_id})
+ item.addChild(node=entry)
+ conn.send(message)
raise xmpp.protocol.NodeProcessed
def xmpp_register_set(self, conn, event):
@@ -156,19 +235,14 @@ def xmpp_register_set(self, conn, event):
raise xmpp.protocol.NodeProcessed
tag = event.getTag('query')
if tag and tag.getNamespace() == xmpp.protocol.NS_REGISTER:
- fromjid = event.getFrom().getStripped().__str__()
- # Create /user/<jid>/posts
+ fromjid = event.getFrom().getStripped()
node = self.storage.get_node(u'/user/%s/posts' % fromjid)
if node:
error = xmpp.protocol.Error(event, xmpp.ERR_CONFLICT)
error.addChild(node=tag)
conn.send(error)
raise xmpp.protocol.NodeProcessed
- # Create /user/<jid>/geo/previous
- # Create /user/<jid>/geo/current
- # Create /user/<jid>/geo/next
- # Create /user/<jid>/subscriptions
- # Create /user/<jid>/status
+ self.storage.create_channel(fromjid)
reply = event.buildReply('result')
conn.send(reply)
raise xmpp.protocol.NodeProcessed
@@ -229,7 +303,7 @@ def xmpp_base_disco(self, conn, event, disco_type):
'features': features}
elif disco_type == 'items':
return [
- dict(node=x, jid=self.jid) for x in
+ dict(node=x.node, jid=self.jid) for x in
self.storage.get_nodes()]
else:
channel = self.storage.get_node(node)
@@ -240,12 +314,27 @@ def xmpp_base_disco(self, conn, event, disco_type):
NS_PUBSUB_OWNER]
if self.allow_register:
features.append(xmpp.protocol.NS_REGISTER)
+ fields = [
+ xmpp.protocol.DataField(name='FORM_TYPE', typ='hidden',
+ value=FORM_TYPE_PUBSUB_METADATA)]
+ for c in channel.config:
+ if c.key in BUDDYCLOUD_FIELDS:
+ fields.append(xmpp.protocol.DataField(
+ **dict(BUDDYCLOUD_FIELDS[c.key].items() +
+ [('value', c.value)])))
+ elif c.key in PUBSUB_FIELDS:
+ fields.append(xmpp.protocol.DataField(
+ **dict(PUBSUB_FIELDS[c.key].items() +
+ [('value', c.value)])))
return {
'ids': [{'category': 'pubsub', 'type': 'leaf',
'name': 'XEP-0060 service'},
{'category': 'pubsub', 'type': 'channel',
'name': 'buddycloud channel'}],
- 'features': features}
+ 'features': features,
+ # TODO Populate this from the node configuration
+ 'xdata': xmpp.protocol.DataForm(
+ typ='result', data=fields)}
def run(self):
"""Main event loop."""
@@ -1,77 +0,0 @@
-# Copyright 2011-2012 James Tait - All Rights Reserved
-
-"""Storage module for buddycloud channel server."""
-
-import copy
-
-
-def init_storage(config):
- """Initialise the storage module."""
- backend = config.get('Storage', 'backend')
- module_config = dict(config.items('%s-storage' % backend, raw=True))
- module_name, class_name = module_config.pop('class').rsplit('.', 1)
- module = __import__(module_name, fromlist=class_name)
- class_ = getattr(module, class_name)
- storage_module = class_()
- storage_module.set_config(**module_config)
- return storage_module
-
-
-class StorageModule(object):
- """Base class for storage modules."""
-
- def set_config(self, **kwargs):
- """Set the configuration of this storage module."""
- pass
-
- def create_node(self, node, node_config):
- """Create a PubSub node with the given configuration."""
- raise NotImplemented
-
- def get_nodes(self):
- """Get a list of all the available PubSub nodes."""
- raise NotImplemented
-
- def get_node(self, node):
- """Get the requested PubSub node."""
- raise NotImplemented
-
- def set_node(self, node, items):
- """Set the requested PubSub node."""
- raise NotImplemented
-
- def shutdown(self):
- """Shut down the storage module - close any open resources, flush any
- pending data and so on."""
- pass
-
-
-class MemoryStorageModule(StorageModule):
- """In-memory storage.
-
- NOTE: This module is intended for testing only. It is full-fat,
- vitamin-free, high-calorie, artificially-preserved, hydrogenated, salt-rich
- badness, totally non-persistent, non-transactional and non-threadsafe and
- definitely not suitable for production use.
-
- Creates an in-memory dictionary to hold nodes and items. The dictionary is
- keyed on the node ID, the value being another dictionary whose key is the
- entry ID and whose value is a tuple consisting of a timestamp and the entry
- Node object:
-
- {'node_id': {'entry_id': (timestamp, entry_node)}}"""
-
- def __init__(self):
- self.temp_entry_store = {}
-
- def get_nodes(self):
- """Get a list of all the available PubSub nodes."""
- return self.temp_entry_store.keys()
-
- def get_node(self, node):
- """Get the requested PubSub node."""
- return copy.deepcopy(self.temp_entry_store.get(node, None))
-
- def set_node(self, node, items):
- """Set the requested PubSub node."""
- self.temp_entry_store[node] = copy.deepcopy(items)
@@ -0,0 +1,51 @@
+# Copyright 2011-2012 James Tait - All Rights Reserved
+
+"""Storage module for buddycloud channel server."""
+
+
+def init_storage(config):
+ """Initialise the storage module."""
+ backend = config.get('Storage', 'backend')
+ module_config = dict(config.items('%s-storage' % backend, raw=True))
+ module_name, class_name = module_config.pop('class').rsplit('.', 1)
+ module = __import__(module_name, fromlist=class_name)
+ class_ = getattr(module, class_name)
+ storage_module = class_()
+ module_config.update({
+ 'log_format': config.get('Logging', 'log_format', raw=True),
+ 'log_level': config.get('Logging', 'log_level')})
+ storage_module.set_config(**module_config)
+ return storage_module
+
+
+class StorageBackend(object):
+ """Base class for storage back-ends."""
+
+ def set_config(self, **kwargs):
+ """Set the configuration of this storage back-end."""
+ pass
+
+ def create_channel(self, jid):
+ """Create a channel for the given JID."""
+ raise NotImplementedError()
+
+ def create_node(self, node, jid, node_config):
+ """Create a PubSub node with the given configuration."""
+ raise NotImplementedError()
+
+ def get_nodes(self):
+ """Get a list of all the available PubSub nodes."""
+ raise NotImplementedError()
+
+ def get_node(self, node):
+ """Get the requested PubSub node."""
+ raise NotImplementedError()
+
+ def add_item(self, node, item_id, item):
+ """Add an item to the requested PubSub node."""
+ raise NotImplementedError()
+
+ def shutdown(self):
+ """Shut down the storage module - close any open resources, flush any
+ pending data and so on."""
+ pass
Oops, something went wrong.

0 comments on commit b857e57

Please sign in to comment.