Permalink
Browse files

possible reconnect fix

  • Loading branch information...
1 parent 3d15814 commit b505740564dcdb654a024d777e0e89202433dc1e @evilkost committed Apr 15, 2011
Showing with 83 additions and 57 deletions.
  1. +80 −57 brukva/client.py
  2. +3 −0 tests/server_commands.py
View
@@ -89,13 +89,15 @@ def format_pipeline_request(command_stack):
return ''.join(format(c.cmd, *c.args, **c.kwargs) for c in command_stack)
class Connection(object):
- def __init__(self, host, port, on_reconnect, timeout=None, io_loop=None):
+ def __init__(self, host, port, on_connect, on_disconnect, timeout=None, io_loop=None):
self.host = host
self.port = port
- self.on_reconnect = on_reconnect
+ self.on_connect = on_connect
+ self.on_disconnect = on_disconnect
self.timeout = timeout
self._stream = None
self._io_loop = io_loop
+ self.try_left = 2
self.in_progress = False
self.read_queue = []
@@ -107,8 +109,10 @@ def connect(self):
sock.settimeout(self.timeout)
sock.connect((self.host, self.port))
self._stream = IOStream(sock, io_loop=self._io_loop)
+ self.connected()
except socket.error, e:
raise ConnectionError(str(e))
+ self.on_connect()
def disconnect(self):
if self._stream:
@@ -118,38 +122,41 @@ def disconnect(self):
pass
self._stream = None
- def write(self, data):
+ def write(self, data, try_left=None):
+ if try_left is None:
+ try_left = self.try_left
if not self._stream:
- self.on_reconnect()
+ self.connect()
if not self._stream:
raise ConnectionError('Tried to write to non-existent connection')
- else:
- self._stream.write(data)
+ if try_left > 0:
+ try:
+ #print('try to write: %s'% data)
+ self._stream.write(data)
+ except IOError, e:
+ self.disconnect()
+ self.write(data, try_left - 1)
+ else:
+ raise ConnectionError('Tried to write to non-existent connection')
def read(self, length, callback):
try:
if not self._stream:
- self.client._sudden_disconnect([callback])
- self.on_reconnect()
- if not self._stream:
- raise ConnectionError('Tried to read from non-existent connection')
+ self.disconnect()
+ raise ConnectionError('Tried to read from non-existent connection')
self._stream.read_bytes(length, callback)
except IOError:
- self.client._sudden_disconnect([callback])
- self.on_reconnect()
+ self.on_disconnect()
def readline(self, callback):
try:
if not self._stream:
- self.client._sudden_disconnect([callback])
- self.on_reconnect()
- if not self._stream:
- raise ConnectionError('Tried to read from non-existent connection')
+ self.disconnect()
+ raise ConnectionError('Tried to read from non-existent connection')
self._stream.read_until('\r\n', callback)
except IOError:
- self.client._sudden_disconnect([callback])
- self.on_reconnect()
+ self.on_disconnect()
def try_to_perform_read(self):
if not self.in_progress and self.read_queue:
@@ -244,16 +251,18 @@ def __getattr__(self, item):
class Client(object):
- def __init__(self, host='localhost', port=6379, password=None, reconnect=False, io_loop=None):
+ def __init__(self, host='localhost', port=6379, password=None,
+ selected_db=None, io_loop=None):
self._io_loop = io_loop or IOLoop.instance()
-
- self.connection = Connection(host, port, self.on_reconnect, io_loop=self._io_loop)
+ self.connection = Connection(host, port,
+ self.on_connect, self.on_disconnect, io_loop=self._io_loop)
self.async = _AsyncWrapper(weakref.proxy(self))
self.queue = []
self.current_cmd_line = None
self.subscribed = False
self.password = password
- self.reconnect = reconnect
+ self.selected_db = selected_db
+ self.write_try_num = 2
self.REPLY_MAP = dict_merge(
string_keys_to_dict('AUTH BGREWRITEAOF BGSAVE DEL EXISTS EXPIRE HDEL HEXISTS '
'HMSET MOVE MSET MSETNX SAVE SETNX',
@@ -290,22 +299,31 @@ def __repr__(self):
def pipeline(self, transactional=False):
if not self._pipeline:
- self._pipeline = Pipeline(io_loop = self._io_loop, transactional=transactional)
+ self._pipeline = Pipeline(
+ selected_db=self.selected_db,
+ io_loop = self._io_loop,
+ transactional=transactional
+ )
self._pipeline.connection = self.connection
return self._pipeline
#### connection
+
def connect(self):
self.connection.connect()
- if self.password:
- self.auth(self.password)
def disconnect(self):
self.connection.disconnect()
- def on_reconnect(self):
- if self.reconnect:
- self.connect()
+ def on_connect(self):
+ if self.password:
+ self.auth(self.password)
+ if self.selected_db:
+ self.select(self.selected_db)
+
+ def on_disconnect(self, callbacks):
+ self.pipeline().discard()
+ raise ConnectionError("Socket closed on remote end")
####
#### formatting
@@ -342,22 +360,18 @@ def call_callbacks(self, callbacks, *args, **kwargs):
for cb in callbacks:
cb(*args, **kwargs)
- def _sudden_disconnect(self, callbacks):
- self.connection.disconnect()
- raise ConnectionError("Socket closed on remote end")
-
@process
def execute_command(self, cmd, callbacks, *args, **kwargs):
- with forward_error(callbacks):
+ result = None
+ with forward_error(callbacks) as forward:
if callbacks is None:
callbacks = []
elif not hasattr(callbacks, '__iter__'):
callbacks = [callbacks]
+
try:
- if self.reconnect and not self.connection.connected():
- self.connect()
self.connection.write(self.format(cmd, *args, **kwargs))
- except IOError:
+ except IOError, e:
self._sudden_disconnect(callbacks)
except Exception, e:
self.connection.disconnect()
@@ -369,13 +383,15 @@ def execute_command(self, cmd, callbacks, *args, **kwargs):
data = yield async(self.connection.readline)()
if not data:
result = None
+ self.connection.read_done()
raise Exception('TODO: [no data from connection->readline')
else:
response = yield self.process_data(data, cmd_line)
result = self.format_reply(cmd_line, response)
- self.connection.read_done()
- self.call_callbacks(callbacks, result)
+ self.connection.read_done()
+
+ self.call_callbacks(callbacks, result)
@async
@process
@@ -389,7 +405,6 @@ def process_data(self, data, cmd_line, callback):
response = []
else:
if len(data) == 0:
- self.on_reconnect()
raise IOError('Disconnected')
head, tail = data[0], data[1:]
@@ -408,7 +423,7 @@ def process_data(self, data, cmd_line, callback):
else:
raise ResponseError('Unknown response type %s' % head, cmd_line)
- callback(response)
+ callback(response)
@async
@process
@@ -424,7 +439,7 @@ def consume_multibulk(self, length, cmd_line, callback):
)
token = yield self.process_data(data, cmd_line) #FIXME error
tokens.append( token )
- callback(tokens)
+ callback(tokens)
@async
@process
@@ -437,7 +452,7 @@ def consume_bulk(self, length, callback):
raise ResponseError('EmptyResponse')
else:
data = data[:-2]
- callback(data)
+ callback(data)
####
### MAINTENANCE
@@ -460,6 +475,7 @@ def info(self, callbacks=None):
self.execute_command('INFO', callbacks)
def select(self, db, callbacks=None):
+ self.selected_db = db
self.execute_command('SELECT', callbacks, db)
def shutdown(self, callbacks=None):
@@ -823,7 +839,7 @@ def publish(self, channel, message, callbacks=None):
@process
def listen(self, callbacks=None):
# 'LISTEN' is just for receiving information, it is not actually sent anywhere
- with forward_error(callbacks):
+ with forward_error(callbacks) as forward:
callbacks = callbacks or []
if not hasattr(callbacks, '__iter__'):
callbacks = [callbacks]
@@ -839,8 +855,10 @@ def listen(self, callbacks=None):
if isinstance(response, Exception):
raise response
result = self.format_reply(cmd_listen, response)
- self.call_callbacks(callbacks, result)
+ forward.disable()
+ self.call_callbacks(callbacks, result)
+ forward.enable()
### CAS
def watch(self, key, callbacks=None):
self.execute_command('WATCH', callbacks, key)
@@ -855,19 +873,26 @@ def __init__(self, transactional, *args, **kwargs):
self.command_stack = []
def execute_command(self, cmd, callbacks, *args, **kwargs):
- if cmd in ('AUTH'):
- raise Exception('403')
+ if cmd in ('AUTH', 'SELECT'):
+ raise RuntimeError('cmd %s must not be in pipe ' % cmd)
self.command_stack.append(CmdLine(cmd, *args, **kwargs))
def discard(self): # actually do nothing with redis-server, just flush command_stack
self.command_stack = []
- def _sudden_disconnect(self, callbacks, error=None):
- self.connection.disconnect()
- raise error or ConnectionError("Socket closed on remote end")
+ ###
+ def select(self, db, callbacks=None):
+ self.selected_db = db
+ super(Pipeline, self).execute_command('SELECT', callbacks, db)
+
+ def auth(self, password, callbacks=None):
+ super(Pipeline, self).execute_command('AUTH', callbacks, password)
+ ###
+
@process
def execute(self, callbacks):
+ results = None
with forward_error(callbacks):
command_stack = self.command_stack
self.command_stack = []
@@ -881,16 +906,17 @@ def execute(self, callbacks):
command_stack = [CmdLine('MULTI')] + command_stack + [CmdLine('EXEC')]
request = format_pipeline_request(command_stack)
+
try:
- if self.reconnect and not self.connection.connected():
- self.connect()
self.connection.write(request)
except IOError:
self.command_stack = []
- self._sudden_disconnect(callbacks)
+ self.connection.disconnect()
+ raise ConnectionError("Socket closed on remote end")
except Exception, e:
self.command_stack = []
- self._sudden_disconnect(callbacks, e)
+ self.connection.disconnect()
+ raise e
yield self.connection.queue_wait()
responses = []
@@ -901,7 +927,6 @@ def execute(self, callbacks):
data = yield async(self.connection.readline)()
if not data:
raise ResponseError('Not enough data after EXEC')
-
try:
cmd_line = cmds.next()
if self.transactional and cmd_line.cmd != 'EXEC':
@@ -932,6 +957,4 @@ def format_replies(cmd_lines, responses):
else:
results = format_replies(command_stack, responses)
- self.call_callbacks(callbacks, results)
-
-
+ self.call_callbacks(callbacks, results)
View
@@ -485,6 +485,9 @@ def make_list(key, items, expect_value=True):
self.finish()])
self.start()
+
+
+class PipelineTestCase(TornadoTestCase):
### Pipeline ###
def test_pipe_simple(self):
pipe = self.client.pipeline()

0 comments on commit b505740

Please sign in to comment.