Permalink
Browse files

First commit.

  • Loading branch information...
0 parents commit 7dd8ec1d1195b904689696234e370de359a01981 @FSX committed Mar 11, 2011
Showing with 255 additions and 0 deletions.
  1. +19 −0 LICENSE
  2. +50 −0 examples/example1.py
  3. +3 −0 momoko/__init__.py
  4. +169 −0 momoko/momoko.py
  5. +14 −0 setup.py
19 LICENSE
@@ -0,0 +1,19 @@
+Copyright (C) 2011 by Frank Smit <frank@61924.nl>
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
50 examples/example1.py
@@ -0,0 +1,50 @@
+#!/usr/bin/env python
+
+import tornado.httpserver
+import tornado.ioloop
+import tornado.options
+import tornado.web
+
+from momoko import Pool
+
+
+class BaseHandler(tornado.web.RequestHandler):
+ @property
+ def db(self):
+ if not hasattr(self.application, 'db'):
+ self.application.db = Pool(1, 20, 10, **{
+ 'host': 'localhost',
+ 'database': 'infunadb',
+ 'user': 'infuna',
+ 'password': 'password',
+ 'async': 1
+ })
+ return self.application.db
+
+
+class MainHandler(BaseHandler):
+ @tornado.web.asynchronous
+ def get(self):
+ self.db.execute('SELECT 42, 12, 40, 11;', callback=self._on_response)
+
+ def _on_response(self, cursor):
+ self.write('Query results: %s' % cursor.fetchall())
+ self.finish()
+
+
+def main():
+ try:
+ tornado.options.parse_command_line()
+ application = tornado.web.Application([
+ (r'/', MainHandler),
+ ])
+ http_server = tornado.httpserver.HTTPServer(application)
+ http_server.bind(8888)
+ http_server.start(0) # Forks multiple sub-processes
+ tornado.ioloop.IOLoop.instance().start()
+ except KeyboardInterrupt:
+ print 'Exit'
+
+
+if __name__ == '__main__':
+ main()
3 momoko/__init__.py
@@ -0,0 +1,3 @@
+#!/usr/bin/env python
+
+from momoko import *
169 momoko/momoko.py
@@ -0,0 +1,169 @@
+#!/usr/bin/env python
+
+__authors__ = ('Frank Smit <frank@61924.nl>',)
+__version__ = '0.1.0'
+__license__ = 'MIT'
+
+
+import functools
+
+import psycopg2
+from tornado.ioloop import IOLoop, PeriodicCallback
+
+
+class Pool(object):
+ """A connection pool that manages PostgreSQL connections.
+ """
+ def __init__(self, min_conn, max_conn, cleanup_timeout, *args, **kwargs):
+ self.min_conn = min_conn
+ self.max_conn = max_conn
+ self.closed = False
+
+ self._args = args
+ self._kwargs = kwargs
+
+ self._pool = []
+
+ for i in range(self.min_conn):
+ self._new_conn()
+
+ # Create a periodic callback that tries to close inactive connections
+ if cleanup_timeout > 0:
+ self._cleaner = PeriodicCallback(self._clean_pool,
+ cleanup_timeout * 1000)
+ self._cleaner.start()
+
+ def _new_conn(self, new_cursor_args={}):
+ """Create a new connection. If `new_cursor_args` is provided a new
+ cursor is created when the callback is executed.
+ """
+ if len(self._pool) > self.max_conn:
+ raise PoolError('connection pool exausted')
+ conn = psycopg2.connect(*self._args, **self._kwargs)
+ add_conn = functools.partial(self._add_conn, conn)
+
+ if new_cursor_args:
+ new_cursor_args['connection'] = conn
+ new_cursor = functools.partial(self._new_cursor, **new_cursor_args)
+ Poller(conn, (add_conn, new_cursor)).start()
+ else:
+ Poller(conn, (add_conn,)).start()
+
+ def _add_conn(self, conn):
+ """Add a connection to the pool. This function is used by `_new_conn`
+ as a callback to add the created connection to the pool.
+ """
+ self._pool.append(conn)
+
+ def _new_cursor(self, function, func_args=(), callback=None, connection=None):
+ """Create a new cursor. If there's no connection available, a new
+ connection will be created and `_new_cursor` will be called again after
+ the connection has been made.
+ """
+ if not connection:
+ connection = self._get_free_conn()
+ if not connection:
+ new_cursor_args = {
+ 'function': function,
+ 'func_args': func_args,
+ 'callback': callback
+ }
+ self._new_conn(new_cursor_args)
+ return
+
+ cursor = connection.cursor()
+ getattr(cursor, function)(*func_args)
+
+ # Callbacks from cursor fucntion always get the cursor back
+ callback = functools.partial(callback, cursor)
+ Poller(cursor.connection, (callback,)).start()
+
+ def _get_free_conn(self):
+ """Look for a free connection and return it. `None` is returned when no
+ free connection can be found.
+ """
+ if self.closed:
+ raise PoolError('connection pool is closed')
+ for conn in self._pool:
+ if not conn.isexecuting():
+ return conn
+ return None
+
+ def _clean_pool(self):
+ """Try to close the number of connections that exceeds the number in
+ `min_conn`. This method loops throught the connections in `_pool` and
+ if it finds a free connection it closes it.
+ """
+ if self.closed:
+ raise PoolError('connection pool is closed')
+ if len(self._pool) > self.min_conn:
+ conns = len(self._pool) - self.min_conn
+ indexes = []
+ for i, conn in enumerate(self._pool):
+ if not conn.isexecuting():
+ conn.close()
+ conns = conns - 1
+ indexes.append(i)
+ if conns == 0:
+ break
+ for i in indexes:
+ self._pool.pop(i)
+
+ def execute(self, operation, parameters=(), callback=None):
+ """http://initd.org/psycopg/docs/cursor.html#cursor.execute
+ """
+ self._new_cursor('execute', (operation, parameters), callback)
+
+ def executemany(self, operation, parameters=None, callback=None):
+ """http://initd.org/psycopg/docs/cursor.html#cursor.executemany
+ """
+ self._new_cursor('executemany', (operation, parameters), callback)
+
+ def callproc(self, procname, parameters=None, callback=None):
+ """http://initd.org/psycopg/docs/cursor.html#cursor.callproc
+ """
+ self._new_cursor('callproc', (procname, parameters), callback)
+
+ def close(self):
+ """Close all open connections.
+ """
+ if self.closed:
+ raise PoolError('connection pool is closed')
+ for conn in self._pool:
+ if not conn.closed:
+ conn.close()
+ self._pool = []
+ self.closed = True
+
+
+class Poller(object):
+ """A poller that polls the PostgreSQL connection and calls the callbacks
+ when the connection state is `POLL_OK`.
+ """
+ def __init__(self, connection, callbacks=()):
+ self._ioloop = IOLoop.instance()
+ self._connection = connection
+ self._callbacks = callbacks
+
+ def start(self):
+ """Start polling the connection.
+ """
+ self._update_handler()
+
+ def _update_handler(self):
+ state = self._connection.poll()
+ if state == psycopg2.extensions.POLL_OK:
+ for callback in self._callbacks:
+ callback()
+ elif state == psycopg2.extensions.POLL_READ:
+ self._ioloop.add_handler(self._connection.fileno(), self._io_callback, IOLoop.READ)
+ elif state == psycopg2.extensions.POLL_WRITE:
+ self._ioloop.add_handler(self._connection.fileno(), self._io_callback, IOLoop.WRITE)
+
+ def _io_callback(self, *args):
+ self._ioloop.remove_handler(self._connection.fileno())
+ self._update_handler()
+
+
+class PoolError(Exception):
+ pass
14 setup.py
@@ -0,0 +1,14 @@
+#!/usr/bin/env python
+
+from distutils.core import setup
+
+
+setup(
+ name='Momoko',
+ version='0.1.0',
+ description='Asynchronous Psycopg wrapper for Tornado.',
+ author='Frank Smit',
+ author_email='frank@61924.nl',
+ url='https://github.com/FSX/momoko',
+ packages=['momoko']
+)

0 comments on commit 7dd8ec1

Please sign in to comment.