Skip to content
Browse files

Working connection pooling for atomic operations. Pub/sub and pipelin…

…e broken.
  • Loading branch information...
1 parent 3d471f6 commit c16826fdabf32d5264edd8c93ccd9b63745666bb @evilkost committed
Showing with 179 additions and 57 deletions.
  1. +179 −57 brukva/client.py
View
236 brukva/client.py
@@ -133,17 +133,26 @@ def format(*tokens):
def format_pipeline_request(command_stack):
return ''.join(format(c.cmd, *c.args, **c.kwargs) for c in command_stack)
+
class Connection(object):
- def __init__(self, host, port, on_connect, on_disconnect, timeout=None, io_loop=None):
+ def __init__(self, host, port, idx,
+ on_connect, on_disconnect,
+ timeout=None, io_loop=None):
+
+ # FIXME: io_loop can't be None
self.host = host
self.port = port
+ self.idx = idx
+
self.on_connect = on_connect
self.on_disconnect = on_disconnect
+
self.timeout = timeout
- self._stream = None
self._io_loop = io_loop
- self.try_left = 2
+ self._stream = None
+ self.try_left = 2
+ self.is_initialized = False
self.in_progress = False
self.read_queue = []
@@ -177,6 +186,7 @@ def write(self, data, try_left=None):
if try_left > 0:
try:
+ log_blob.debug('try write %r to %s', data, self.idx)
self._stream.write(data)
except IOError:
self.disconnect()
@@ -189,6 +199,7 @@ def read(self, length, callback):
if not self._stream:
self.disconnect()
raise ConnectionError('Tried to read from non-existent connection')
+ log_blob.debug('read bytes %s bytes from %s', length, self.idx)
self._stream.read_bytes(length, callback)
except IOError:
self.on_disconnect()
@@ -198,29 +209,105 @@ def readline(self, callback):
if not self._stream:
self.disconnect()
raise ConnectionError('Tried to read from non-existent connection')
+
+ log_blob.debug('readline from %s', self.idx)
self._stream.read_until('\r\n', callback)
except IOError:
self.on_disconnect()
- def try_to_perform_read(self):
- if not self.in_progress and self.read_queue:
- self.in_progress = True
- self._io_loop.add_callback(partial(self.read_queue.pop(0), None) )
-
- @async
- def queue_wait(self, callback):
- self.read_queue.append(callback)
- self.try_to_perform_read()
-
- def read_done(self):
- self.in_progress = False
- self.try_to_perform_read()
+ #def try_to_perform_read(self):
+ # if not self.in_progress and self.read_queue:
+ # self.in_progress = True
+ # self._io_loop.add_callback(partial(self.read_queue.pop(0), None) )
+ #
+ #@async
+ #def queue_wait(self, callback):
+ # self.read_queue.append(callback)
+ # self.try_to_perform_read()
+ #
+ #def read_done(self):
+ # self.in_progress = False
+ # self.on_read_done()
def connected(self):
if self._stream:
return True
return False
+class ConnectionPool(object):
+ def __init__(self, connection_args, on_connect, on_disconnect, io_loop, pool_size=4):
+ """
+ connection_args:
+ {
+ 'host' = 'localhost',
+ port = 6379,
+ timeout = None,
+
+ selected_db = None,
+ auth = None,
+ }
+ """
+ self.pool_size = pool_size
+ self.connection_args = connection_args
+
+ self._on_connect = on_connect
+ self._on_disconnect = on_disconnect
+
+ self.io_loop = io_loop
+
+ self.connection_args['io_loop'] = io_loop
+ self.connection_args['on_disconnect'] = on_disconnect
+
+ self.connection_requests_queue = []
+
+
+ def connect(self):
+ """
+ Create connection pool, connect all connection
+ and perform AUTH, SELECT if needed.
+ """
+ self.connections = {}
+ self.free_connections = set()
+ connection_args = self.connection_args
+ for idx in xrange(self.pool_size):
+ @process
+ def on_connect(connection):
+ yield async(self._on_connect)(connection)
+ self.free_connections.add(connection.idx)
+ log.info('new connection %s added to pool',
+ connection.idx)
+ self.io_loop.add_callback(self._give_out_pending_requests)
+
+ connection_args['on_connect'] = on_connect
+ connection_args['idx'] = idx
+ connection = Connection(**connection_args)
+ connection.connect()
+ self.connections[idx] = connection
+
+ def _give_out_pending_requests(self):
+ while self.connection_requests_queue and self.free_connections:
+ log_blob.debug('late leasing connection')
+ callback = self.connection_requests_queue.pop(0)
+ self._lease_connection(callback)
+
+ def return_connection(self, connection):
+ self.free_connections.add(connection.idx)
+ log_blob.debug('returned connection: %s', connection.idx)
+ self._give_out_pending_requests()
+
+ def _lease_connection(self, callback):
+ idx = self.free_connections.pop()
+ log_blob.debug('leasing connection %s', idx)
+ connection = self.connections[idx]
+ self.io_loop.add_callback(lambda: callback(connection))
+
+ def request_connection(self, callback):
+ if not self.free_connections:
+ log_blob.debug('no free connections, waiting')
+ self.connection_requests_queue.append(callback)
+ else:
+ self._lease_connection(callback)
+
def reply_to_bool(r, *args, **kwargs):
return bool(r)
@@ -304,8 +391,18 @@ class Client(object):
def __init__(self, host='localhost', port=6379, password=None,
selected_db=None, io_loop=None):
self._io_loop = io_loop or IOLoop.instance()
- self.connection = Connection(host, port,
- self.on_connect, self.on_disconnect, io_loop=self._io_loop)
+ self.host = host
+ self.port = port
+
+ self.connection_pool = ConnectionPool({
+ 'host': host,
+ 'port': port,
+ },
+ on_connect = self._initialize_connection,
+ on_disconnect = self.on_disconnect,
+ io_loop = self._io_loop
+ )
+
self.async = _AsyncWrapper(weakref.proxy(self))
self.queue = []
self.current_cmd_line = None
@@ -345,7 +442,7 @@ def __init__(self, host='localhost', port=6379, password=None,
self._pipeline = None
def __repr__(self):
- return 'Brukva client (host=%s, port=%s)' % (self.connection.host, self.connection.port)
+ return '<Brukva client %s:%s>' % (self.host, self.port)
def pipeline(self, transactional=False):
if not self._pipeline:
@@ -354,22 +451,17 @@ def pipeline(self, transactional=False):
io_loop = self._io_loop,
transactional=transactional
)
- self._pipeline.connection = self.connection
+ self._pipeline.connection_pool = self.connection_pool
return self._pipeline
#### connection
-
def connect(self):
- self.connection.connect()
+ self.connection_pool.connect()
def disconnect(self):
- self.connection.disconnect()
-
- def on_connect(self):
- if self.password:
- self.auth(self.password)
- if self.selected_db:
- self.select(self.selected_db)
+ raise NotImplementedError()
+ pass
+ #self.connection.disconnect()
def on_disconnect(self):
if self.subscribed:
@@ -412,6 +504,40 @@ def call_callbacks(self, callbacks, *args, **kwargs):
cb(*args, **kwargs)
@process
+ def _initialize_connection(self, connection, callback):
+ """
+ call after connection creation
+ """
+ with execution_context(callback) as ctx:
+ cmds = []
+ if self.password:
+ cmds.append(CmdLine('AUTH', self.password))
+ if self.selected_db:
+ cmds.append(CmdLine('SELECT', self.selected_db))
+
+ if cmds:
+ try:
+ for cmd_line in cmds:
+ connection.write(self.format(
+ cmd_line.cmd,
+ *cmd_line.args,
+ **cmd_line.kwargs
+ ))
+ except Exception, e:
+ connection.disconnect()
+ raise e
+
+ for cmd_line in cmds:
+ data = yield async(connection.readline)()
+ response = yield self.process_data(connection, data, cmd_line)
+ result = self.format_reply(cmd_line, response)
+ log_blob.debug('got result %s on cmd %s', result, cmd_line)
+
+ connection.is_initialized = True
+ ctx.ret_call(True)
+
+
+ @process
def execute_command(self, cmd, callbacks, *args, **kwargs):
cmd_line = CmdLine(cmd, *args, **kwargs)
with execution_context(callbacks) as ctx:
@@ -424,32 +550,35 @@ def execute_command(self, cmd, callbacks, *args, **kwargs):
ctx.ret_call(RequestError('Calling not pub/sub command during subscribed state', cmd_line))
return
+ log_blob.debug('try to get connection for %s', cmd_line)
+ connection = yield async(self.connection_pool.request_connection)()
+ log_blob.debug('got connection %s for %s cmd_line',
+ connection.idx, cmd_line)
try:
- self.connection.write(self.format(cmd, *args, **kwargs))
+ connection.write(self.format(cmd, *args, **kwargs))
except Exception, e:
- self.connection.disconnect()
+ connection.disconnect()
raise e
- if self.subscribed and cmd in ('SUBSCRIBE', 'UNSUBSCRIBE'):
- self._waiting_callbacks[cmd].append(callbacks)
- return
+ #if self.subscribed and cmd in ('SUBSCRIBE', 'UNSUBSCRIBE'):
+ # self._waiting_callbacks[cmd].append(callbacks)
+ # return
- yield self.connection.queue_wait()
- data = yield async(self.connection.readline)()
+ data = yield async(connection.readline)()
if not data:
- result = None
- self.connection.read_done()
- raise Exception('TODO: [no data from connection->readline')
+ self.connection_pool.return_connection(connection)
+ raise Exception('TODO: [no data from connection %s->readline' % connection.idx)
else:
- response = yield self.process_data(data, cmd_line)
+ response = yield self.process_data(connection, data, cmd_line)
result = self.format_reply(cmd_line, response)
+ self.connection_pool.return_connection(connection)
+ log_blob.debug('got result %s on cmd %s with connection %s', result, cmd_line, connection.idx)
- self.connection.read_done()
ctx.ret_call(result)
@async
@process
- def process_data(self, data, cmd_line, callback):
+ def process_data(self, connection, data, cmd_line, callback):
with execution_context(callback) as ctx:
data = data[:-2] # strip \r\n
@@ -463,9 +592,9 @@ def process_data(self, data, cmd_line, callback):
head, tail = data[0], data[1:]
if head == '*':
- response = yield self.consume_multibulk(int(tail), cmd_line)
+ response = yield self.consume_multibulk(connection, int(tail), cmd_line)
elif head == '$':
- response = yield self.consume_bulk(int(tail)+2)
+ response = yield self.consume_bulk(connection, int(tail)+2)
elif head == '+':
response = tail
elif head == ':':
@@ -480,26 +609,26 @@ def process_data(self, data, cmd_line, callback):
@async
@process
- def consume_multibulk(self, length, cmd_line, callback):
+ def consume_multibulk(self, connection, length, cmd_line, callback):
with execution_context(callback) as ctx:
tokens = []
while len(tokens) < length:
- data = yield async(self.connection.readline)()
+ data = yield async(connection.readline)()
if not data:
raise ResponseError(
'Not enough data in response to %s, accumulated tokens: %s'%
(cmd_line, tokens), cmd_line
)
- token = yield self.process_data(data, cmd_line) #FIXME error
+ token = yield self.process_data(connection, data, cmd_line) #FIXME error
tokens.append( token )
ctx.ret_call(tokens)
@async
@process
- def consume_bulk(self, length, callback):
+ def consume_bulk(self, connection, length, callback):
with execution_context(callback) as ctx:
- data = yield async(self.connection.read)(length)
+ data = yield async(connection.read)(length)
if isinstance(data, Exception):
raise data
if not data:
@@ -507,7 +636,6 @@ def consume_bulk(self, length, callback):
else:
data = data[:-2]
ctx.ret_call(data)
- ####
### MAINTENANCE
def bgrewriteaof(self, callbacks=None):
@@ -528,9 +656,6 @@ def ping(self, callbacks=None):
def info(self, callbacks=None):
self.execute_command('INFO', callbacks)
- def select(self, db, callbacks=None):
- self.selected_db = db
- self.execute_command('SELECT', callbacks, db)
def shutdown(self, callbacks=None):
self.execute_command('SHUTDOWN', callbacks)
@@ -547,9 +672,6 @@ def lastsave(self, callbacks=None):
def keys(self, pattern, callbacks=None):
self.execute_command('KEYS', callbacks, pattern)
- def auth(self, password, callbacks=None):
- self.execute_command('AUTH', callbacks, password)
-
### BASIC KEY COMMANDS
def append(self, key, value, callbacks=None):
self.execute_command('APPEND', callbacks, key, value)
@@ -892,7 +1014,7 @@ def publish(self, channel, message, callbacks=None):
self.execute_command('PUBLISH', callbacks, channel, message)
@process
- def listen(self, callbacks=None):
+ def _listen(self, callbacks=None):
# 'LISTEN' is just for receiving information, it is not actually sent anywhere
def error_wrapper(e):
if isinstance(e, GeneratorExit):

0 comments on commit c16826f

Please sign in to comment.
Something went wrong with that request. Please try again.