Browse files

fixed reconnect again

  • Loading branch information...
1 parent b922fe7 commit 0b75e206fc63f354bbb8fbe2e89ca04b7359082c @evilkost committed May 10, 2011
Showing with 41 additions and 28 deletions.
  1. +40 −27 brukva/client.py
  2. +1 −1 tests/server_commands.py
View
67 brukva/client.py
@@ -6,6 +6,7 @@
from collections import Iterable, defaultdict
import weakref
import traceback
+import time
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream
@@ -162,43 +163,55 @@ def __init__(self, host, port, idx,
self.in_progress = False
self.read_queue = []
- def connect(self):
+ def connect(self, add_to_free=True):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
sock.settimeout(self.timeout)
sock.connect((self.host, self.port))
self._stream = IOStream(sock, io_loop=self._io_loop)
- self.connected()
except socket.error, e:
+ log.error(e)
raise ConnectionError(str(e))
- self.on_connect(weakref.proxy(self))
+ self._io_loop.add_callback(lambda: self.on_connect(weakref.proxy(self), add_to_free=add_to_free))
def disconnect(self):
if self._stream:
try:
self._stream.close()
except socket.error, e:
pass
+ self.is_initialized = False
self._stream = None
- def write(self, data, try_left=None):
+ def write(self, data, callback, try_left=None, before_init=False):
if try_left is None:
try_left = self.try_left
if not self._stream:
- self.connect()
+ self.connect(add_to_free=False)
if not self._stream:
- raise ConnectionError('Tried to write to non-existent connection')
+ callback(ConnectionError('Tried to write to non-existent connection'))
+ return
+
+ if not self.is_initialized and not before_init:
+ log_blob.debug('wait for connection initialization')
+ self._io_loop.add_callback(lambda: self.write(data, callback, try_left))
+ return
if try_left > 0:
try:
log_blob.debug('try write %r to %s', data, self.idx)
self._stream.write(data)
- except IOError:
+
+ log_blob.debug('data written to socket')
+ callback(True)
+ #self._io_loop.add_callback(lambda: callback(True))
+ except IOError, e:
+ log.error(e)
self.disconnect()
- self.write(data, try_left - 1)
+ self.write(data, callback=callback, try_left=try_left - 1)
else:
- raise ConnectionError('Tried to write to non-existent connection')
+ callback(ConnectionError('Tried to write to non-existent connection'))
def read(self, length, callback):
try:
@@ -221,11 +234,6 @@ def readline(self, callback):
except IOError:
self.on_disconnect()
- def connected(self):
- if self._stream:
- return True
- return False
-
class ConnectionPool(object):
def __init__(self, connection_args, on_connect, on_disconnect, io_loop, pool_size=4):
"""
@@ -265,11 +273,13 @@ def connect(self):
connection_args = self.connection_args
for idx in xrange(self.pool_size):
@process
- def on_connect(connection):
+ def on_connect(connection, add_to_free=True):
+ log_blob.debug('on_connect from pool')
yield async(self._on_connect)(connection)
- self.free_connections.add(connection.idx)
- log.info('new connection %s added to pool',
- connection.idx)
+ if add_to_free:
+ self.free_connections.add(connection.idx)
+ log.info('connection %s added to pool',
+ connection.idx)
self.io_loop.add_callback(self._give_out_pending_requests)
connection_args['on_connect'] = on_connect
@@ -507,6 +517,7 @@ def _initialize_connection(self, connection, callback):
"""
call after connection creation
"""
+ log_blob.debug('initializing connection')
with execution_context(callback) as ctx:
cmds = []
if self.password:
@@ -517,11 +528,13 @@ def _initialize_connection(self, connection, callback):
if cmds:
try:
for cmd_line in cmds:
- connection.write(self.format(
+ log_blob.debug('try to write to connection %s: %s', connection.idx, cmd_line)
+ write_res = yield async(connection.write)(self.format(
cmd_line.cmd,
*cmd_line.args,
**cmd_line.kwargs
- ))
+ ), before_init=True)
+ log_blob.debug('conn init, write_res: %s', write_res)
except Exception, e:
connection.disconnect()
raise e
@@ -554,10 +567,12 @@ def execute_command(self, cmd, callbacks, *args, **kwargs):
log_blob.debug('got connection %s for %s cmd_line',
connection.idx, cmd_line)
try:
- connection.write(self.format(cmd, *args, **kwargs))
+ write_res = yield async(connection.write)(self.format(cmd, *args, **kwargs))
except Exception, e:
connection.disconnect()
raise e
+ if isinstance(write_res, Exception):
+ raise write_res
#if self.subscribed and cmd in ('SUBSCRIBE', 'UNSUBSCRIBE'):
# self._waiting_callbacks[cmd].append(callbacks)
@@ -1065,9 +1080,7 @@ def __init__(self, transactional, *args, **kwargs):
self.command_stack = []
def execute_command(self, cmd, callbacks, *args, **kwargs):
- if cmd in ('AUTH', 'SELECT'):
- super(Pipeline, self).execute_command(cmd, callbacks, *args, **kwargs)
- elif cmd in PUB_SUB_COMMANDS:
+ if cmd in PUB_SUB_COMMANDS:
raise RequestError(
'Client is not supposed to issue command %s in pipeline' % cmd)
self.command_stack.append(CmdLine(cmd, *args, **kwargs))
@@ -1096,7 +1109,8 @@ def execute(self, callbacks):
log_blob.debug('got connection %s for %r request', \
connection.idx, request)
try:
- connection.write(request)
+ write_res = yield async(connection.write)(request)
+ log_blob.debug('conn write res: %r', write_res)
except Exception, e:
connection.disconnect()
raise e
@@ -1135,9 +1149,8 @@ def format_replies(cmd_lines, responses):
command_stack = command_stack[:-1]
responses = responses[-1] # actual data only from EXEC command
#FIXME: assert all other responses to be 'QUEUED'
- log.info('responses %s', responses)
results = format_replies(command_stack[1:], responses)
- log.info('results %s', results)
+ log_blob.debug('on request %r pipe results %s', request, results)
else:
results = format_replies(command_stack, responses)
View
2 tests/server_commands.py
@@ -17,7 +17,7 @@
async = partial(async, cbname='cb')
from brukva.exceptions import ResponseError, RequestError
-log_format = "[%(asctime)s][%(module)s] %(name)s: %(message)s"
+log_format = "[%(asctime)-15s][%(levelname)-5s][%(lineno)-4d:%(funcName)-26s] %(name)-20s: %(message)s"
import logging; logging.basicConfig(level=logging.DEBUG, format=log_format)
def callable(obj):

0 comments on commit 0b75e20

Please sign in to comment.