Navigation Menu

Skip to content
This repository has been archived by the owner on Sep 24, 2022. It is now read-only.

Commit

Permalink
Moved the connection polling into the connection class.
Browse files Browse the repository at this point in the history
  • Loading branch information
FSX committed Jun 13, 2012
1 parent 80334b7 commit 423087f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 54 deletions.
53 changes: 31 additions & 22 deletions momoko/pools.py
Expand Up @@ -18,8 +18,6 @@
from psycopg2.extensions import STATUS_READY
from tornado.ioloop import IOLoop, PeriodicCallback

from .utils import Poller


class BlockingPool(object):
"""A connection pool that manages blocking PostgreSQL connections
Expand Down Expand Up @@ -174,15 +172,15 @@ def _new_conn(self, new_cursor_args={}):
"""
if len(self._pool) > self.max_conn:
raise PoolError('connection pool exausted')
conn = AsyncConnection(self._ioloop, *self._args, **self._kwargs)
add_conn = functools.partial(self._add_conn, conn)
conn = AsyncConnection(self._ioloop)
callbacks = [functools.partial(self._add_conn, conn)]

if new_cursor_args:
new_cursor_args['connection'] = conn
new_cursor = functools.partial(self.new_cursor, **new_cursor_args)
conn.poller(add_conn, new_cursor)
else:
conn.poller(add_conn)
callbacks.append(new_cursor)

conn.open(callbacks, *self._args, **self._kwargs)


def _add_conn(self, conn):
Expand Down Expand Up @@ -277,17 +275,38 @@ class PoolError(Exception):


class AsyncConnection(object):
def __init__(self, ioloop, *args, **kwargs):
self._conn = psycopg2.connect(async=1, *args, **kwargs)
def __init__(self, ioloop):
self._conn = None
self._fileno = -1
self._ioloop = ioloop
self._callbacks = []

def cursor(self, *args, **kwargs):
return self._conn.cursor(*args, **kwargs)
def open(self, callbacks, *args, **kwargs):
self._conn = psycopg2.connect(async=1, *args, **kwargs)
self._fileno = self._conn.fileno()
self._callbacks = callbacks

# Connection state should be 2 (write)
self._ioloop.add_handler(self._fileno, self._io_callback, IOLoop.WRITE)

def exec_cursor(self, function, args, callback):
cursor = self._conn.cursor()
getattr(cursor, function)(*args)
self.poller(functools.partial(callback, cursor))
self._callbacks = [functools.partial(callback, cursor)]

# Connection state should be 1 (write)
self._ioloop.update_handler(self._fileno, IOLoop.READ)

def _io_callback(self, fd, events):
state = self._conn.poll()

if state == psycopg2.extensions.POLL_OK:
for callback in self._callbacks:
callback()
elif state == psycopg2.extensions.POLL_READ:
self._ioloop.update_handler(self._fileno, IOLoop.READ)
elif state == psycopg2.extensions.POLL_WRITE:
self._ioloop.update_handler(self._fileno, IOLoop.WRITE)

def close(self):
return self._conn.close()
Expand All @@ -298,13 +317,3 @@ def closed(self):

def isexecuting(self):
return self._conn.isexecuting()

def fileno(self):
return self._conn.fileno()

def poll(self):
return self._conn.poll()

def poller(self, *callbacks):
Poller(self._conn, callbacks, ioloop=self._ioloop)

32 changes: 0 additions & 32 deletions momoko/utils.py
Expand Up @@ -104,35 +104,3 @@ def _collect(self, key, cursor):
self._args[key] = cursor
if not self._size and self._callback:
self._callback(self._args)


class Poller(object):
"""A poller that polls the PostgreSQL connection and calls the callbacks
when the connection state is ``POLL_OK``.
:param connection: The connection that needs to be polled.
:param callbacks: A tuple/list of callbacks.
"""
# TODO: Accept new argument "is_connection"
def __init__(self, connection, callbacks=(), ioloop=None):
self._ioloop = ioloop or IOLoop.instance()
self._connection = connection
self._callbacks = callbacks

self._update_handler()

def _update_handler(self):
state = self._connection.poll()
if state == psycopg2.extensions.POLL_OK:
for callback in self._callbacks:
callback()
elif state == psycopg2.extensions.POLL_READ:
self._ioloop.add_handler(self._connection.fileno(),
self._io_callback, IOLoop.READ)
elif state == psycopg2.extensions.POLL_WRITE:
self._ioloop.add_handler(self._connection.fileno(),
self._io_callback, IOLoop.WRITE)

def _io_callback(self, *args):
self._ioloop.remove_handler(self._connection.fileno())
self._update_handler()

0 comments on commit 423087f

Please sign in to comment.