Permalink
Browse files

fix find_one, adding pooling, reconnect support

  • Loading branch information...
jehiah committed Oct 10, 2010
1 parent dc96b8f commit fb3754483c376942caed02403a35ce5912ac05dd
Showing with 172 additions and 41 deletions.
  1. +3 −3 README.md
  2. +5 −19 asyncmongo/__init__.py
  3. +25 −6 asyncmongo/connection.py
  4. +14 −9 asyncmongo/cursor.py
  5. +6 −0 asyncmongo/errors.py
  6. +115 −0 asyncmongo/pool.py
  7. +4 −4 test/test_pooled_db.py
View
@@ -14,16 +14,16 @@ Usage
import asyncmongo
import tornado.web
- from DBUtils import PooledDB
- db_pool = PooledDB.PooledDB(asyncmongo, host='127.0.0.1', port=27107, dbname='test', maxconnections=50)
+ db_pool = asyncmongo.PooledDB(asyncmongo, host='127.0.0.1', port=27107, dbname='test', maxconnections=50)
class Handler(tornado.web.RequestHandler):
@property
def db(self):
if not hasattr(self, '_db'):
- self._db = db_pool.dedicated_connection()
+ self._db = db_pool.connection()
return self._db
+ @tornado.web.asynchronous
def get(self):
cursor = self.db.cursor("users_collection")
cursor.users.find({'username': self.current_user}, limit=1, callback=self._on_response)
View
@@ -1,22 +1,5 @@
"""
-import asyncmongo
-from DBUtils import PooledDB
-db_pool = PooledDB.PooledDB(asyncmongo, host='127.0.0.1', port=27107, dbname='test', maxconnections=50)
-
-class Handler(tornado.web.RequestHandler):
- @property
- def db(self):
- if not hasattr(self, '_db'):
- self._db = db_pool.dedicated_connection()
- return self._db
-
- def get(self):
- cursor = self.db.cursor("history")
- cursor.users.find({'username': self.current_user}, limit=1, callback=self._on_response)
-
- def _on_response(self, response):
- self.render('template', full_name=respose['full_name'])
-
+asyncmongo is a library for accessing tornado built built upon the tornado io loop
"""
version = "0.0.1"
@@ -31,7 +14,10 @@ def _on_response(self, response):
apilevel = 2
threadsafety = 1 # share the module, not connections
+from errors import Warning, Error, InterfaceError, DatabaseError, DataError, OperationalError, IntegrityError, InternalError, ProgrammingError, NotSupportedError
from connection import Connection
def connect(*args, **kwargs):
- return Connection(*args, **kwargs)
+ return Connection(*args, **kwargs)
+
+from pool import PooledDB
View
@@ -6,32 +6,36 @@
import struct
import logging
-from errors import DataError, ProgrammingError, IntegrityError
+from errors import DataError, ProgrammingError, IntegrityError, InterfaceError
# The mongo wire protocol is described at
# http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol
class Connection(object):
- def __init__(self, host, port, dbname, slave_ok=False):
+ def __init__(self, host, port, dbname, slave_ok=False, autoreconnect=True):
assert isinstance(host, (str, unicode))
assert isinstance(port, int)
assert isinstance(slave_ok, bool)
+ assert isinstance(autoreconnect, bool)
self.__host = host
self.__port = port
self.__dbname = dbname
self.__slave_ok = slave_ok
self.__stream = None
self.__callback = None
+ self.__alive = False
self.__connect()
+ self.__autoreconnect = autoreconnect
def __connect(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
s.connect((self.__host, self.__port))
self.__stream = tornado.iostream.IOStream(s)
self.__stream.set_close_callback(self._socket_close)
+ self.__alive = True
def _socket_close(self):
- pass
+ self.__alive = False
def commit(self):
pass
@@ -65,6 +69,7 @@ def threadsafety(self):
return 2
def close(self):
+ self.__alive = False
self.__stream.close()
def __getattr__(self, name):
@@ -86,12 +91,22 @@ def send_message(self, message, callback):
# TODO: handle reconnect
if self.__callback is not None:
raise ProgrammingError('connection already in use')
+
+ if not self.__alive:
+ if self.__autoreconnect:
+ self.__connect()
+ else:
+ raise InterfaceError('connection invalid. autoreconnect=False')
self.__callback=callback
(self._request_id, data) = message
# logging.info('request id %d writing %r' % (self._request_id, data))
- self.__stream.write(data)
- self.__stream.read_bytes(16, callback=self._parse_header)
+ try:
+ self.__stream.write(data)
+ self.__stream.read_bytes(16, callback=self._parse_header)
+ except IOError, e:
+ self.__alive = False
+ raise
return self._request_id # used by get_more()
def _parse_header(self, header):
@@ -106,7 +121,11 @@ def _parse_header(self, header):
assert operation == struct.unpack("<i", header[12:])[0]
# logging.info('%s' % length)
# logging.info('waiting for another %d bytes' % length - 16)
- self.__stream.read_bytes(length - 16, callback=self._parse_response)
+ try:
+ self.__stream.read_bytes(length - 16, callback=self._parse_response)
+ except IOError, e:
+ self.__alive = False
+ raise
def _parse_response(self, response):
# logging.info('got data %r' % response)
View
@@ -84,18 +84,19 @@ def insert(self, doc_or_docs,
# if manipulate:
# docs = [self.__database._fix_incoming(doc, self) for doc in docs]
+ self.__limit = None
if kwargs:
safe = True
# TODO: do this callback error handling elsewhere
- try:
- self.__id = self._connection.send_message(
- message.insert(self.full_collection_name, docs,
- check_keys, safe, kwargs), callback=self._handle_response)
- except Exception, e:
- logging.error(e)
- self.callback(None, error=e)
- self.callback=None
+ # try:
+ self.__id = self._connection.send_message(
+ message.insert(self.full_collection_name, docs,
+ check_keys, safe, kwargs), callback=self._handle_response)
+ # except Exception, e:
+ # logging.error(e)
+ # self.callback(None, error=e)
+ # self.callback=None
def remove(self, spec_or_id=None, safe=False, callback=None, **kwargs):
if not isinstance(safe, bool):
@@ -111,6 +112,7 @@ def remove(self, spec_or_id=None, safe=False, callback=None, **kwargs):
if not isinstance(spec_or_id, dict):
spec_or_id = {"_id": spec_or_id}
+ self.__limit = None
if kwargs:
safe = True
@@ -338,7 +340,10 @@ def _handle_response(self, result, error=None):
self.callback(None, error=error)
else:
logging.info('%s %r' % (self.full_collection_name , result))
- self.callback(result['data'], error=None)
+ if self.__limit == -1 and len(result['data']) == 1:
+ self.callback(result['data'][0], error=None)
+ else:
+ self.callback(result['data'], error=None)
self.callback = None
def __query_options(self):
View
@@ -49,3 +49,9 @@ class ProgrammingError(DatabaseError):
class NotSupportedError(DatabaseError):
pass
+
+class TooManyConnections(Error):
+ pass
+
+class InvalidConnection(DatabaseError):
+ pass
View
@@ -0,0 +1,115 @@
+from threading import Condition
+
+from errors import TooManyConnections, InvalidConnection
+
+class PooledDB(object):
+ def __init__(self, creator, mincached=0, maxcached=0,
+ maxconnections=0, maxusage=None, blocking=True, *args, **kwargs):
+ self._creator = creator
+ self._args, self._kwargs = args, kwargs
+ # self._maxusage = maxusage
+ self._mincached = mincached
+ self._maxcached = maxcached
+ self._maxconnections = maxconnections
+ self._idle_cache = [] # the actual connections
+ self._condition = Condition()
+ if not blocking:
+ def wait():
+ raise TooManyConnections
+ self._condition.wait = wait
+ # Establish an initial number of idle database connections:
+ idle = [self.dedicated_connection() for i in range(mincached)]
+ while idle:
+ idle.pop().close()
+ self._connections = 0
+
+ def new_connection(self):
+ return self._creator.connect(*self._args, **self._kwargs)
+
+ def connection(self):
+ """ get a cached connection from the pool """
+
+ self._condition.acquire()
+ try:
+ while (self._maxconnections
+ and self._connections >= self._maxconnections):
+ self._condition.wait()
+ # connection limit not reached, get a dedicated connection
+ try: # first try to get it from the idle cache
+ con = self._idle_cache.pop(0)
+ except IndexError: # else get a fresh connection
+ con = self.new_connection()
+ con = PooledConnection(self, con)
+ self._connections += 1
+ finally:
+ self._condition.release()
+ return con
+
+ def cache(self, con):
+ """Put a dedicated connection back into the idle cache."""
+ self._condition.acquire()
+ try:
+ if not self._maxcached or len(self._idle_cache) < self._maxcached:
+ # the idle cache is not full, so put it there
+ self._idle_cache.append(con)
+ else: # if the idle cache is already full,
+ con.close() # then close the connection
+ self._connections -= 1
+ self._condition.notify()
+ finally:
+ self._condition.release()
+
+ def close(self):
+ """Close all connections in the pool."""
+ self._condition.acquire()
+ try:
+ while self._idle_cache: # close all idle connections
+ con = self._idle_cache.pop(0)
+ try:
+ con.close()
+ except Exception:
+ pass
+ self._condition.notifyAll()
+ finally:
+ self._condition.release()
+
+ def __del__(self):
+ """Delete the pool."""
+ try:
+ self.close()
+ except Exception:
+ pass
+
+
+class PooledConnection(object):
+ def __init__(self, pool, con):
+ """Create a pooled dedicated connection.
+
+ pool: the corresponding PooledDB instance
+ con: the underlying SteadyDB connection
+
+ """
+ self._pool = pool
+ self._con = con
+
+ def close(self):
+ """Close the pooled dedicated connection."""
+ # Instead of actually closing the connection,
+ # return it to the pool for future reuse.
+ if self._con:
+ self._pool.cache(self._con)
+ self._con = None
+
+ def __getattr__(self, name):
+ """Proxy all members of the class."""
+ if self._con:
+ return getattr(self._con, name)
+ else:
+ raise InvalidConnection
+
+ def __del__(self):
+ """Delete the pooled connection."""
+ try:
+ self.close()
+ except Exception:
+ pass
View
@@ -1,4 +1,3 @@
-from DBUtils import PooledDB
import tornado.ioloop
import logging
import time
@@ -12,10 +11,11 @@ def test_pooled_db():
This tests simply verifies that we can grab two different connections from the pool
and use them independantly.
"""
+ print asyncmongo.__file__
test_shunt.setup()
- pool = PooledDB.PooledDB(asyncmongo, maxconnections=5, host='127.0.0.1', port=27017, dbname='test')
+ pool = asyncmongo.PooledDB(asyncmongo, maxconnections=5, host='127.0.0.1', port=27017, dbname='test')
- db = pool.dedicated_connection()
+ db = pool.connection()
cursor = db.cursor('test_users')
def insert_callback(response, error):
@@ -30,7 +30,7 @@ def insert_callback(response, error):
test_shunt.assert_called('inserted')
- db2 = pool.dedicated_connection()
+ db2 = pool.connection()
cursor2 = db2.cursor('test_users')
def pool_callback(response, error):

0 comments on commit fb37544

Please sign in to comment.