Skip to content
This repository has been archived by the owner on Sep 24, 2022. It is now read-only.

Commit

Permalink
Added a wrapper class, Momoko, for Pool, BatchQuery and QueryChain.
Browse files Browse the repository at this point in the history
  • Loading branch information
FSX committed Apr 6, 2011
1 parent 95b0dde commit 425df40
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 41 deletions.
13 changes: 7 additions & 6 deletions examples/batch_query.py
Expand Up @@ -5,32 +5,33 @@
import tornado.options
import tornado.web

from momoko import Pool, BatchQuery
from momoko import Momoko


class BaseHandler(tornado.web.RequestHandler):
@property
def db(self):
if not hasattr(self.application, 'db'):
self.application.db = Pool(1, 20, 10, **{
self.application.db = Momoko({
'host': 'localhost',
'database': 'infunadb',
'user': 'infuna',
'password': 'password',
'async': 1
'min_conn': 1,
'max_conn': 20,
'cleanup_timeout': 10
})
return self.application.db


class MainHandler(BaseHandler):
@tornado.web.asynchronous
def get(self):
bq = BatchQuery(self.db, {
self.db.batch({
'query1': ['SELECT 42, 12, %s, 11;', (23,)],
'query2': ['SELECT 1, 2, 3, 4, 5;'],
'query3': ['SELECT 465767, 4567, 3454;']
}, self._on_response)
bq.run()
}, self._on_response).run()

def _on_response(self, cursors):
for key, cursor in cursors.items():
Expand Down
13 changes: 7 additions & 6 deletions examples/query_chain.py
Expand Up @@ -5,36 +5,37 @@
import tornado.options
import tornado.web

from momoko import Pool, QueryChain
from momoko import Momoko


class BaseHandler(tornado.web.RequestHandler):
@property
def db(self):
if not hasattr(self.application, 'db'):
self.application.db = Pool(1, 20, 10, **{
self.application.db = Momoko({
'host': 'localhost',
'database': 'infunadb',
'user': 'infuna',
'password': 'password',
'async': 1
'min_conn': 1,
'max_conn': 20,
'cleanup_timeout': 10
})
return self.application.db


class MainHandler(BaseHandler):
@tornado.web.asynchronous
def get(self):
qc = QueryChain(self.db, [
self.db.chain([
['SELECT 42, 12, %s, 11;', (23,)],
self._after_first_query,
self._after_first_callable,
['SELECT 1, 2, 3, 4, 5;'],
self._before_last_query,
['SELECT %s, %s, %s, %s, %s;'],
self._on_response
])
qc.run()
]).run()

def _after_first_query(self, cursor):
results = cursor.fetchall()
Expand Down
8 changes: 5 additions & 3 deletions examples/simple_query.py
Expand Up @@ -5,19 +5,21 @@
import tornado.options
import tornado.web

from momoko import Pool
from momoko import Momoko


class BaseHandler(tornado.web.RequestHandler):
@property
def db(self):
if not hasattr(self.application, 'db'):
self.application.db = Pool(1, 20, 10, **{
self.application.db = Momoko({
'host': 'localhost',
'database': 'infunadb',
'user': 'infuna',
'password': 'password',
'async': 1
'min_conn': 1,
'max_conn': 20,
'cleanup_timeout': 10
})
return self.application.db

Expand Down
67 changes: 41 additions & 26 deletions momoko/momoko.py
Expand Up @@ -11,10 +11,40 @@
from tornado.ioloop import IOLoop, PeriodicCallback


class Momoko(object):
def __init__(self, settings):
self._pool = Pool(**settings)

def batch(self, queries, callback):
return BatchQuery(self, queries, callback)

def chain(self, links):
return QueryChain(self, links)

def execute(self, operation, parameters=(), callback=None):
"""http://initd.org/psycopg/docs/cursor.html#cursor.execute
"""
self._pool.new_cursor('execute', (operation, parameters), callback)

def executemany(self, operation, parameters=None, callback=None):
"""http://initd.org/psycopg/docs/cursor.html#cursor.executemany
"""
self._pool.new_cursor('executemany', (operation, parameters), callback)

def callproc(self, procname, parameters=None, callback=None):
"""http://initd.org/psycopg/docs/cursor.html#cursor.callproc
"""
self._pool.new_cursor('callproc', (procname, parameters), callback)

def close(self):
self._pool.close()


class Pool(object):
"""A connection pool that manages PostgreSQL connections.
"""A connection pool that manages PostgreSQL connections and cursors.
"""
def __init__(self, min_conn, max_conn, cleanup_timeout, *args, **kwargs):
def __init__(self, min_conn=1, max_conn=20, cleanup_timeout=10,
*args, **kwargs):
self.min_conn = min_conn
self.max_conn = max_conn
self.closed = False
Expand All @@ -39,12 +69,12 @@ def _new_conn(self, new_cursor_args={}):
"""
if len(self._pool) > self.max_conn:
raise PoolError('connection pool exausted')
conn = psycopg2.connect(*self._args, **self._kwargs)
conn = psycopg2.connect(async=1, *self._args, **self._kwargs)
add_conn = functools.partial(self._add_conn, conn)

if new_cursor_args:
new_cursor_args['connection'] = conn
new_cursor = functools.partial(self._new_cursor, **new_cursor_args)
new_cursor = functools.partial(self.new_cursor, **new_cursor_args)
Poller(conn, (add_conn, new_cursor)).start()
else:
Poller(conn, (add_conn,)).start()
Expand All @@ -55,9 +85,9 @@ def _add_conn(self, conn):
"""
self._pool.append(conn)

def _new_cursor(self, function, func_args=(), callback=None, connection=None):
def new_cursor(self, function, func_args=(), callback=None, connection=None):
"""Create a new cursor. If there's no connection available, a new
connection will be created and `_new_cursor` will be called again after
connection will be created and `new_cursor` will be called again after
the connection has been made.
"""
if not connection:
Expand Down Expand Up @@ -105,21 +135,6 @@ def _clean_pool(self):
if conns == 0:
break

def execute(self, operation, parameters=(), callback=None):
"""http://initd.org/psycopg/docs/cursor.html#cursor.execute
"""
self._new_cursor('execute', (operation, parameters), callback)

def executemany(self, operation, parameters=None, callback=None):
"""http://initd.org/psycopg/docs/cursor.html#cursor.executemany
"""
self._new_cursor('executemany', (operation, parameters), callback)

def callproc(self, procname, parameters=None, callback=None):
"""http://initd.org/psycopg/docs/cursor.html#cursor.callproc
"""
self._new_cursor('callproc', (procname, parameters), callback)

def close(self):
"""Close all open connections.
"""
Expand All @@ -134,16 +149,16 @@ def close(self):

class QueryChain(object):

def __init__(self, db, chain):
def __init__(self, db, links):
self._db = db
self._args = None
self._chain = chain
self._chain.reverse()
self._links = links
self._links.reverse()

def _collect(self, *args, **kwargs):
if not self._chain:
if not self._links:
return
link = self._chain.pop()
link = self._links.pop()
if callable(link):
results = link(*args, **kwargs)
if type(results) is type([]) or type(results) is type(()):
Expand Down

0 comments on commit 425df40

Please sign in to comment.