Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

added support for glib2 and glib3 io loops

  • Loading branch information...
commit 58464865dc6f2264e9df952cf5f705841cc578d1 1 parent 1697e40
@wpjunior wpjunior authored
View
6 asyncmongo/__init__.py
@@ -16,18 +16,12 @@
"""
AsyncMongo is an asynchronous library for accessing mongo
-which is built on the tornado ioloop.
-
http://github.com/bitly/asyncmongo
"""
try:
import bson
except ImportError:
raise ImportError("bson library not installed. Install pymongo >= 1.9 https://github.com/mongodb/mongo-python-driver")
-try:
- import tornado
-except ImportError:
- raise ImportError("tornado library not installed. Install tornado. https://github.com/facebook/tornado")
# also update in setup.py
version = "1.1.1"
View
0  asyncmongo/backends/__init__.py
No changes.
View
85 asyncmongo/backends/glib2_backend.py
@@ -0,0 +1,85 @@
+#!/bin/env python
+#
+# Copyright 2010 bit.ly
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import glib
+
+class Glib2Stream(object):
+ def __init__(self, socket):
+ self.__socket = socket
+ self.__close_id = None
+ self.__read_id = None
+ self.__read_queue = []
+
+ def write(self, data):
+ self.__socket.send(data)
+
+ def read(self, size, callback):
+ self.__read_queue.append((size, callback))
+
+ if not self.__read_id:
+ self.set_waiting()
+
+ def set_waiting(self):
+ if self.__read_id:
+ glib.source_remove(self.__read_id)
+
+ self.__read_id = glib.io_add_watch(
+ self.__socket,
+ glib.IO_IN,
+ self.__on_read_callback)
+
+ def set_idle(self):
+ if self.__read_id:
+ glib.source_remove(self.__read_id)
+
+ def __on_read_callback(self, source, condition):
+ if not self.__read_queue:
+ self.set_idle()
+ return False
+
+ size, callback = self.__read_queue.pop(0)
+ data = self.__socket.recv(size)
+ callback(data)
+ return True
+
+ def set_close_callback(self, callback):
+ if self.__close_id:
+ glib.source_remove(self.__close_id)
+
+ self.__close_callback = callback
+ self.__close_id = glib.io_add_watch(self.__socket,
+ glib.IO_HUP|glib.IO_ERR,
+ self.__on_close_callback)
+
+ def __on_close_callback(self, source, cb_condition, *args, **kwargs):
+ self.__close_callback()
+
+ def close(self):
+ if self.__close_id:
+ glib.source_remove(self.__close_id)
+
+ self.__socket.close()
+
+class AsyncBackend(object):
+ _instance = None
+ def __new__(cls, *args, **kwargs):
+ if not cls._instance:
+ cls._instance = super(AsyncBackend, cls).__new__(
+ cls, *args, **kwargs)
+ return cls._instance
+
+ def register_stream(self, socket):
+ return Glib2Stream(socket)
View
85 asyncmongo/backends/glib3_backend.py
@@ -0,0 +1,85 @@
+#!/bin/env python
+#
+# Copyright 2010 bit.ly
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from gi.repository import GObject
+
+class Glib3Stream(object):
+ def __init__(self, socket):
+ self.__socket = socket
+ self.__close_id = None
+ self.__read_id = None
+ self.__read_queue = []
+
+ def write(self, data):
+ self.__socket.send(data)
+
+ def read(self, size, callback):
+ self.__read_queue.append((size, callback))
+
+ if not self.__read_id:
+ self.set_waiting()
+
+ def set_waiting(self):
+ if self.__read_id:
+ GObject.source_remove(self.__read_id)
+
+ self.__read_id = GObject.io_add_watch(
+ self.__socket,
+ GObject.IO_IN,
+ self.__on_read_callback)
+
+ def set_idle(self):
+ if self.__read_id:
+ GObject.source_remove(self.__read_id)
+
+ def __on_read_callback(self, source, condition):
+ if not self.__read_queue:
+ self.set_idle()
+ return False
+
+ size, callback = self.__read_queue.pop(0)
+ data = self.__socket.recv(size)
+ callback(data)
+ return True
+
+ def set_close_callback(self, callback):
+ if self.__close_id:
+ GObject.source_remove(self.__close_id)
+
+ self.__close_callback = callback
+ self.__close_id = GObject.io_add_watch(self.__socket,
+ GObject.IO_HUP|GObject.IO_ERR,
+ self.__on_close_callback)
+
+ def __on_close_callback(self, source, cb_condition, *args, **kwargs):
+ self.__close_callback()
+
+ def close(self):
+ if self.__close_id:
+ GObject.source_remove(self.__close_id)
+
+ self.__socket.close()
+
+class AsyncBackend(object):
+ _instance = None
+ def __new__(cls, *args, **kwargs):
+ if not cls._instance:
+ cls._instance = super(AsyncBackend, cls).__new__(
+ cls, *args, **kwargs)
+ return cls._instance
+
+ def register_stream(self, socket):
+ return Glib3Stream(socket)
View
46 asyncmongo/backends/tornado_backend.py
@@ -0,0 +1,46 @@
+#!/bin/env python
+#
+# Copyright 2010 bit.ly
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import tornado.iostream
+
+class TornadoStream(object):
+ def __init__(self, socket):
+ self.__stream = tornado.iostream.IOStream(socket)
+
+ def write(self, data):
+ self.__stream.write(data)
+
+ def read(self, size, callback):
+ self.__stream.read_bytes(size, callback=callback)
+
+ def set_close_callback(self, callback):
+ self.__stream.set_close_callback(callback)
+
+ def close(self):
+ self.__stream._close_callback = None
+ self.__stream.close()
+
+class AsyncBackend(object):
+ _instance = None
+ def __new__(cls, *args, **kwargs):
+ if not cls._instance:
+ cls._instance = super(AsyncBackend, cls).__new__(
+ cls, *args, **kwargs)
+ return cls._instance
+
+ def register_stream(self, socket):
+ return TornadoStream(socket)
+
View
1  asyncmongo/client.py
@@ -33,6 +33,7 @@ class Client(object):
- `maxconnections` (optional): maximum open connections for this pool. 0 for unlimited
- `maxusage` (optional): number of requests allowed on a connection before it is closed. 0 for unlimited
- `dbname`: mongo database name
+ - `backend': async loop backend, default = tornado
- `**kwargs`: passed to `connection.Connection`
- `host`: hostname or ip of mongo host
- `port`: port to connect to
View
18 asyncmongo/connection.py
@@ -22,7 +22,7 @@
application developers.
"""
-import tornado.iostream
+import sys
import socket
import struct
import logging
@@ -42,7 +42,8 @@ class Connection(object):
- `autoreconnect` (optional): auto reconnect on interface errors
"""
- def __init__(self, host, port, dbuser=None, dbpass=None, autoreconnect=True, pool=None):
+ def __init__(self, host, port, dbuser=None, dbpass=None, autoreconnect=True, pool=None,
+ backend="tornado"):
assert isinstance(host, (str, unicode))
assert isinstance(port, int)
assert isinstance(autoreconnect, bool)
@@ -61,15 +62,21 @@ def __init__(self, host, port, dbuser=None, dbpass=None, autoreconnect=True, poo
self.__pool = pool
self.__deferred_message = None
self.__deferred_callback = None
+ self.__backend = self.__load_backend(backend)
self.usage_count = 0
self.__connect()
+
+ def __load_backend(self, name):
+ __import__('asyncmongo.backends.%s_backend' % name)
+ mod = sys.modules['asyncmongo.backends.%s_backend' % name]
+ return mod.AsyncBackend()
def __connect(self):
self.usage_count = 0
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
s.connect((self.__host, self.__port))
- self.__stream = tornado.iostream.IOStream(s)
+ self.__stream = self.__backend.register_stream(s)
self.__stream.set_close_callback(self._socket_close)
self.__alive = True
except socket.error, error:
@@ -92,7 +99,6 @@ def _close(self):
self.__callback(None, InterfaceError('connection closed'))
self.__callback = None
self.__alive = False
- self.__stream._close_callback = None
self.__stream.close()
def close(self):
@@ -128,7 +134,7 @@ def _send_message(self, message):
try:
self.__stream.write(data)
if self.__callback:
- self.__stream.read_bytes(16, callback=self._parse_header)
+ self.__stream.read(16, callback=self._parse_header)
else:
self.__request_id = None
self.__pool.cache(self)
@@ -151,7 +157,7 @@ def _parse_header(self, header):
# logging.info('%s' % length)
# logging.info('waiting for another %d bytes' % length - 16)
try:
- self.__stream.read_bytes(length - 16, callback=self._parse_response)
+ self.__stream.read(length - 16, callback=self._parse_response)
except IOError, e:
self.__alive = False
raise
View
60 test/testgtk2/test.py
@@ -0,0 +1,60 @@
+#!/usr/bin/env python
+
+import os
+import base64
+import pygtk
+pygtk.require('2.0')
+import gtk
+import asyncmongo
+
+database= {'host' : '127.0.0.1', 'port' : 27017, 'dbname' : 'testdb', 'maxconnections':5}
+
+class TestApp(object):
+ def __init__(self):
+ self.__win = gtk.Window()
+ self.__win.set_title("AsyncMongo test")
+ box = gtk.VBox()
+ self.__win.add(box)
+
+ self.message = gtk.Label('')
+ box.pack_start(self.message)
+
+ btn = gtk.Button(label="Test Insert")
+ box.pack_start(btn)
+ btn.connect('clicked', self._on_insert_clicked)
+
+ btn = gtk.Button(label="Test Query")
+ box.pack_start(btn)
+ btn.connect('clicked', self._on_query_clicked)
+
+ self._db = asyncmongo.Client(pool_id='test_pool', backend="glib2", **database)
+
+ def _on_query_clicked(self, obj):
+ self._db.test.find({}, callback=self._on_query_response)
+
+ def _on_query_response(self, data, error):
+ if error:
+ self.message.set_text(error)
+
+ self.message.set_text('Query OK, %d objects found' % len(data))
+
+ def _on_insert_clicked(self, obj):
+ rand = base64.b64encode(os.urandom(32))
+ try:
+ self._db.test.insert({ 'blah': rand }, callback=self._on_insertion)
+ except Exception, e:
+ print e
+
+ def _on_insertion(self, data, error):
+ if error:
+ self.message.set_text(error)
+
+ self.message.set_text("Insert OK")
+
+ def show(self):
+ self.__win.show_all()
+
+if __name__ == "__main__":
+ app = TestApp()
+ app.show()
+ gtk.main()
View
58 test/testgtk3/test.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python
+
+import os
+import base64
+import asyncmongo
+from gi.repository import Gtk
+
+database= {'host' : '127.0.0.1', 'port' : 27017, 'dbname' : 'testdb', 'maxconnections':5}
+
+class TestApp(object):
+ def __init__(self):
+ self.__win = Gtk.Window()
+ self.__win.set_title("AsyncMongo test")
+ box = Gtk.VBox()
+ self.__win.add(box)
+
+ self.message = Gtk.Label('')
+ box.pack_start(self.message, 0, 1, 1)
+
+ btn = Gtk.Button(label="Test Insert")
+ box.pack_start(btn, 0, 1, 1)
+ btn.connect('clicked', self._on_insert_clicked)
+
+ btn = Gtk.Button(label="Test Query")
+ box.pack_start(btn, 0, 1, 1)
+ btn.connect('clicked', self._on_query_clicked)
+
+ self._db = asyncmongo.Client(pool_id='test_pool', backend="glib3", **database)
+
+ def _on_query_clicked(self, obj):
+ self._db.test.find({}, callback=self._on_query_response)
+
+ def _on_query_response(self, data, error):
+ if error:
+ self.message.set_text(error)
+
+ self.message.set_text('Query OK, %d objects found' % len(data))
+
+ def _on_insert_clicked(self, obj):
+ rand = base64.b64encode(os.urandom(32))
+ try:
+ self._db.test.insert({ 'blah': rand }, callback=self._on_insertion)
+ except Exception, e:
+ print e
+
+ def _on_insertion(self, data, error):
+ if error:
+ self.message.set_text(error)
+
+ self.message.set_text("Insert OK")
+
+ def show(self):
+ self.__win.show_all()
+
+if __name__ == "__main__":
+ app = TestApp()
+ app.show()
+ Gtk.main()
Please sign in to comment.
Something went wrong with that request. Please try again.