Permalink
Browse files

Pipeline now work with connection pool.

Todo:
1) stable reconnect
2) dedicated connection for pub/sub
3) flexible connection pool with lazy connection creation
  • Loading branch information...
1 parent f8be42f commit b922fe78ecead16db1147cb9b9ae57df8726d099 @evilkost committed May 9, 2011
Showing with 44 additions and 45 deletions.
  1. +44 −45 brukva/client.py
View
89 brukva/client.py
@@ -12,7 +12,8 @@
from adisp import async, process
from datetime import datetime
-from brukva.exceptions import RequestError, ConnectionError, ResponseError, InvalidResponse
+from brukva.exceptions import RedisError, RequestError, \
+ ConnectionError, ResponseError, InvalidResponse
log = logging.getLogger('brukva.client')
log_blob = logging.getLogger('brukva.client.blob')
@@ -131,6 +132,11 @@ def format(*tokens):
return '*%s\r\n%s' % (len(tokens), ''.join(cmds))
def format_pipeline_request(command_stack):
+ """
+ @command_stack: [CmdLine(), ...]
+ @return: str
+ Return serialized request to redis for pipeline
+ """
return ''.join(format(c.cmd, *c.args, **c.kwargs) for c in command_stack)
@@ -199,7 +205,7 @@ def read(self, length, callback):
if not self._stream:
self.disconnect()
raise ConnectionError('Tried to read from non-existent connection')
- log_blob.debug('read bytes %s bytes from %s', length, self.idx)
+ log_blob.debug('read %s bytes from %s', length, self.idx)
self._stream.read_bytes(length, callback)
except IOError:
self.on_disconnect()
@@ -215,20 +221,6 @@ def readline(self, callback):
except IOError:
self.on_disconnect()
- #def try_to_perform_read(self):
- # if not self.in_progress and self.read_queue:
- # self.in_progress = True
- # self._io_loop.add_callback(partial(self.read_queue.pop(0), None) )
- #
- #@async
- #def queue_wait(self, callback):
- # self.read_queue.append(callback)
- # self.try_to_perform_read()
- #
- #def read_done(self):
- # self.in_progress = False
- # self.on_read_done()
-
def connected(self):
if self._stream:
return True
@@ -260,6 +252,8 @@ def __init__(self, connection_args, on_connect, on_disconnect, io_loop, pool_siz
self.connection_requests_queue = []
+ self.is_connected = False
+
def connect(self):
"""
@@ -283,6 +277,7 @@ def on_connect(connection):
connection = Connection(**connection_args)
connection.connect()
self.connections[idx] = connection
+ self.is_connected = True
def _give_out_pending_requests(self):
while self.connection_requests_queue and self.free_connections:
@@ -446,12 +441,16 @@ def __repr__(self):
def pipeline(self, transactional=False):
if not self._pipeline:
- self._pipeline = Pipeline(
- selected_db=self.selected_db,
- io_loop = self._io_loop,
- transactional=transactional
- )
- self._pipeline.connection_pool = self.connection_pool
+ if self.connection_pool.is_connected:
+ self._pipeline = Pipeline(
+ selected_db=self.selected_db,
+ password=self.password,
+ io_loop = self._io_loop,
+ transactional=transactional
+ )
+ self._pipeline.connection_pool = self.connection_pool
+ else:
+ raise RedisError('Client must be connected befor creating pipeline')
return self._pipeline
#### connection
@@ -1090,38 +1089,38 @@ def execute(self, callbacks):
if self.transactional:
command_stack = [CmdLine('MULTI')] + command_stack + [CmdLine('EXEC')]
- request = format_pipeline_request(command_stack)
+ request = format_pipeline_request(command_stack)
+ log_blob.debug('try to get connection for %r', request)
+ connection = yield async(self.connection_pool.request_connection)()
+ log_blob.debug('got connection %s for %r request', \
+ connection.idx, request)
try:
- self.connection.write(request)
- except IOError:
- self.command_stack = []
- self.connection.disconnect()
- raise ConnectionError("Socket closed on remote end")
+ connection.write(request)
except Exception, e:
- self.command_stack = []
- self.connection.disconnect()
+ connection.disconnect()
raise e
- yield self.connection.queue_wait()
responses = []
total = len(command_stack)
cmds = iter(command_stack)
- while len(responses) < total:
- data = yield async(self.connection.readline)()
- if not data:
- raise ResponseError('Not enough data after EXEC')
- try:
- cmd_line = cmds.next()
- if self.transactional and cmd_line.cmd != 'EXEC':
- response = yield self.process_data(data, CmdLine('MULTI_PART'))
- else:
- response = yield self.process_data(data, cmd_line)
- responses.append(response)
- except Exception,e :
- responses.append(e)
- self.connection.read_done()
+ try:
+ while len(responses) < total:
+ data = yield async(connection.readline)()
+ if not data:
+ raise ResponseError('Not enough data after EXEC')
+ try:
+ cmd_line = cmds.next()
+ if self.transactional and cmd_line.cmd != 'EXEC':
+ response = yield self.process_data(connection, data, CmdLine('MULTI_PART'))
+ else:
+ response = yield self.process_data(connection, data, cmd_line)
+ responses.append(response)
+ except Exception,e :
+ responses.append(e)
+ finally:
+ self.connection_pool.return_connection(connection)
def format_replies(cmd_lines, responses):
results = []

0 comments on commit b922fe7

Please sign in to comment.