Skip to content
This repository
Browse code

simplified error handling

  • Loading branch information...
commit 6f5514c1782fa7d8411bf3b3b330e40f4f725eec 1 parent 7ae8a12
Valentin Gologuzov authored
5 brukva/adisp.py
@@ -107,7 +107,10 @@ def _queue_send_result(self, result, single):
107 107 def _send_result(self, results, single):
108 108 try:
109 109 result = results[0] if single else results
110   - self.call(self.g.send(result))
  110 + if isinstance(result, Exception):
  111 + self.call(self.g.throw(result))
  112 + else:
  113 + self.call(self.g.send(result))
111 114 except StopIteration:
112 115 pass
113 116
331 brukva/client.py
... ... @@ -1,14 +1,35 @@
1 1 # -*- coding: utf-8 -*-
2 2 import socket
  3 +from functools import partial
  4 +from itertools import izip
  5 +import contextlib
  6 +import logging
  7 +from collections import Iterable
  8 +
3 9 from tornado.ioloop import IOLoop
4 10 from tornado.iostream import IOStream
5 11 from adisp import async, process
6 12
7   -from functools import partial
8   -from itertools import izip
9 13 from datetime import datetime
10 14 from brukva.exceptions import RedisError, ConnectionError, ResponseError, InvalidResponse
11 15
  16 +log = logging.getLogger('brukva.client')
  17 +
  18 +@contextlib.contextmanager
  19 +def forward_error(callbacks, cleanup=None):
  20 + try:
  21 + yield callbacks
  22 + except Exception, e:
  23 + log.error(e)
  24 + if isinstance(callbacks, Iterable):
  25 + for cb in callbacks:
  26 + callbacks(e)
  27 + else:
  28 + callbacks(e)
  29 + finally:
  30 + if cleanup:
  31 + cleanup()
  32 +
12 33 class Message(object):
13 34 def __init__(self, kind, channel, body):
14 35 self.kind = kind
@@ -83,16 +104,16 @@ def disconnect(self):
83 104 def write(self, data):
84 105 if not self._stream:
85 106 self.on_reconnect()
86   - if not self._stream:
87   - raise ConnectionError('Tried to write to non-existent connection')
  107 + if not self._stream:
  108 + raise ConnectionError('Tried to write to non-existent connection')
88 109 else:
89 110 self._stream.write(data)
90 111
91 112 def consume(self, length):
92 113 if not self._stream:
93 114 self.on_reconnect()
94   - if not self._stream:
95   - raise ConnectionError('Tried to consume from non-existent connection')
  115 + if not self._stream:
  116 + raise ConnectionError('Tried to consume from non-existent connection')
96 117 self._stream.read_bytes(length, NOOP_CB)
97 118
98 119 def read(self, length, callback):
@@ -100,8 +121,8 @@ def read(self, length, callback):
100 121 if not self._stream:
101 122 self.client._sudden_disconnect([callback])
102 123 self.on_reconnect()
103   - if not self._stream:
104   - raise ConnectionError('Tried to read from non-existent connection')
  124 + if not self._stream:
  125 + raise ConnectionError('Tried to read from non-existent connection')
105 126 self._stream.read_bytes(length, callback)
106 127 except IOError:
107 128 self.client._sudden_disconnect([callback])
@@ -112,6 +133,8 @@ def readline(self, callback):
112 133 if not self._stream:
113 134 self.client._sudden_disconnect([callback])
114 135 self.on_reconnect()
  136 + if not self._stream:
  137 + raise ConnectionError('Tried to read from non-existent connection')
115 138 self._stream.read_until('\r\n', callback)
116 139 except IOError:
117 140 self.client._sudden_disconnect([callback])
@@ -283,7 +306,10 @@ def format_reply(self, cmd_line, data):
283 306 try:
284 307 res = self.REPLY_MAP[cmd_line.cmd](data, *cmd_line.args, **cmd_line.kwargs)
285 308 except Exception, e:
286   - res = ResponseError('failed to format reply, raw data: %s' % data, cmd_line)
  309 + raise ResponseError(
  310 + 'failed to format reply to %s, raw data: %s; err message: %s' %
  311 + cmd_line, data, e
  312 + )
287 313 return res
288 314 ####
289 315
@@ -294,114 +320,103 @@ def call_callbacks(self, callbacks, *args, **kwargs):
294 320
295 321 def _sudden_disconnect(self, callbacks):
296 322 self.connection.disconnect()
297   - self.call_callbacks(callbacks, (ConnectionError("Socket closed on remote end"), None))
  323 + raise ConnectionError("Socket closed on remote end")
298 324
299 325 @process
300 326 def execute_command(self, cmd, callbacks, *args, **kwargs):
301   - if callbacks is None:
302   - callbacks = []
303   - elif not hasattr(callbacks, '__iter__'):
304   - callbacks = [callbacks]
305   - try:
306   - if self.reconnect and not self.connection.connected():
307   - self.connect()
308   - self.connection.write(self.format(cmd, *args, **kwargs))
309   - except IOError:
310   - self._sudden_disconnect(callbacks)
311   - return
312   - except Exception, e:
313   - self.connection.disconnect()
314   - self.call_callbacks(callbacks, (e, None) )
315   - return
  327 + with forward_error(callbacks):
  328 + if callbacks is None:
  329 + callbacks = []
  330 + elif not hasattr(callbacks, '__iter__'):
  331 + callbacks = [callbacks]
  332 + try:
  333 + if self.reconnect and not self.connection.connected():
  334 + self.connect()
  335 + self.connection.write(self.format(cmd, *args, **kwargs))
  336 + except IOError:
  337 + self._sudden_disconnect(callbacks)
  338 + except Exception, e:
  339 + self.connection.disconnect()
  340 + raise e
316 341
317   - cmd_line = CmdLine(cmd, *args, **kwargs)
318   - yield self.connection.queue_wait()
  342 + cmd_line = CmdLine(cmd, *args, **kwargs)
  343 + yield self.connection.queue_wait()
319 344
320   - data = yield async(self.connection.readline)()
321   - if not data:
322   - result = None
323   - error = Exception('todo')
324   - else:
325   - try:
326   - error, response = yield self.process_data(data, cmd_line)
  345 + data = yield async(self.connection.readline)()
  346 + if not data:
  347 + result = None
  348 + raise Exception('TODO: [no data from connection->readline')
  349 + else:
  350 + response = yield self.process_data(data, cmd_line)
327 351 result = self.format_reply(cmd_line, response)
328   - except Exception, e:
329   - error, result = e, None
330 352
331   - self.connection.read_done()
332   - self.call_callbacks(callbacks, (error, result))
  353 + self.connection.read_done()
  354 + self.call_callbacks(callbacks, result)
333 355
334 356 @async
335 357 @process
336 358 def process_data(self, data, cmd_line, callback):
337   - error, response = None, None
338   - if error:
339   - callback((error, None))
340   -
341   - data = data[:-2] # strip \r\n
  359 + with forward_error(callback):
  360 + data = data[:-2] # strip \r\n
342 361
343   - if data == '$-1':
344   - response = None
345   - elif data == '*0' or data == '*-1':
346   - response = []
347   - else:
348   - if len(data) == 0:
349   - self.on_reconnect()
350   - callback((IOError('Disconnected'),None))
351   - head, tail = data[0], data[1:]
352   -
353   - if head == '*':
354   - error, response = yield self.consume_multibulk(int(tail), cmd_line)
355   - elif head == '$':
356   - error, response = yield self.consume_bulk(int(tail)+2)
357   - elif head == '+':
358   - response = tail
359   - elif head == ':':
360   - response = int(tail)
361   - elif head == '-':
362   - if tail.startswith('ERR'):
363   - tail = tail[4:]
364   - error = ResponseError(tail, cmd_line)
  362 + if data == '$-1':
  363 + response = None
  364 + elif data == '*0' or data == '*-1':
  365 + response = []
365 366 else:
366   - error = ResponseError('Unknown response type %s' % head, cmd_line)
  367 + if len(data) == 0:
  368 + self.on_reconnect()
  369 + raise IOError('Disconnected')
  370 + head, tail = data[0], data[1:]
  371 +
  372 + if head == '*':
  373 + response = yield self.consume_multibulk(int(tail), cmd_line)
  374 + elif head == '$':
  375 + response = yield self.consume_bulk(int(tail)+2)
  376 + elif head == '+':
  377 + response = tail
  378 + elif head == ':':
  379 + response = int(tail)
  380 + elif head == '-':
  381 + if tail.startswith('ERR'):
  382 + tail = tail[4:]
  383 + response = ResponseError(tail, cmd_line)
  384 + else:
  385 + raise ResponseError('Unknown response type %s' %
  386 + (head, cmd_line)
  387 + )
367 388
368   - callback( (error, response) )
  389 + callback(response)
369 390
370 391 @async
371 392 @process
372 393 def consume_multibulk(self, length, cmd_line, callback):
373   - tokens = []
374   - errors = {}
375   - idx = 0
376   - while len(tokens) < length:
377   - data = yield async(self.connection.readline)()
378   - if not data:
379   - break
380   - if isinstance(data, Exception):
381   - errors[idx] = data
382   - break
383   -
384   - error, token = yield self.process_data(data, cmd_line) #FIXME error
385   - tokens.append( token )
386   - if error:
387   - errors[idx] = error
388   -
389   - idx += 1
390   - callback( (errors, tokens) )
  394 + with forward_error(callback):
  395 + tokens = []
  396 + while len(tokens) < length:
  397 + data = yield async(self.connection.readline)()
  398 + if not data:
  399 + raise ResponseError(
  400 + 'Not enough data in response to %s, accumulated tokens: %s'%
  401 + (cmd_line, tokens)
  402 + )
  403 + token = yield self.process_data(data, cmd_line) #FIXME error
  404 + tokens.append( token )
  405 + callback(tokens)
391 406
392 407 @async
393 408 @process
394 409 def consume_bulk(self, length, callback):
395   - data = yield async(self.connection.read)(length)
396   - if isinstance(data, Exception):
397   - callback((data, None))
398   - error = None
399   - if not data:
400   - error = ResponseError('EmptyResponse')
401   - else:
402   - data = data[:-2]
403   - callback( (error, data) )
404   - ####
  410 + with forward_error(callback):
  411 + data = yield async(self.connection.read)(length)
  412 + if isinstance(data, Exception):
  413 + raise data
  414 + if not data:
  415 + raise ResponseError('EmptyResponse')
  416 + else:
  417 + data = data[:-2]
  418 + callback(data)
  419 + ####
405 420
406 421 ### MAINTENANCE
407 422 def bgrewriteaof(self, callbacks=None):
@@ -825,80 +840,74 @@ def discard(self): # actually do nothing with redis-server, just flush command_s
825 840
826 841 def _sudden_disconnect(self, callbacks, error=None):
827 842 self.connection.disconnect()
828   - self.call_callbacks(callbacks,
829   - (error or ConnectionError("Socket closed on remote end"), [])
830   - )
  843 + raise error or ConnectionError("Socket closed on remote end")
831 844
832 845 @process
833 846 def execute(self, callbacks):
834   - command_stack = self.command_stack
835   - self.command_stack = []
  847 + with forward_error(callbacks):
  848 + command_stack = self.command_stack
  849 + self.command_stack = []
836 850
837   - if callbacks is None:
838   - callbacks = []
839   - elif not hasattr(callbacks, '__iter__'):
840   - callbacks = [callbacks]
  851 + if callbacks is None:
  852 + callbacks = []
  853 + elif not hasattr(callbacks, '__iter__'):
  854 + callbacks = [callbacks]
841 855
842   - if self.transactional:
843   - command_stack = [CmdLine('MULTI')] + command_stack + [CmdLine('EXEC')]
  856 + if self.transactional:
  857 + command_stack = [CmdLine('MULTI')] + command_stack + [CmdLine('EXEC')]
844 858
845   - request = format_pipeline_request(command_stack)
846   - try:
847   - if self.reconnect and not self.connection.connected():
848   - self.connect()
849   - self.connection.write(request)
850   - except IOError:
851   - self.command_stack = []
852   - self._sudden_disconnect(callbacks)
853   - return
854   - except Exception, e:
855   - self.command_stack = []
856   - self._sudden_disconnect(callbacks, e)
857   - return
858   -
859   - yield self.connection.queue_wait()
860   - responses = []
861   - total = len(command_stack)
862   - cmds = iter(command_stack)
863   - while len(responses) < total:
864   - data = yield async(self.connection.readline)()
865   - if not data:
866   - break
  859 + request = format_pipeline_request(command_stack)
867 860 try:
868   - cmd_line = cmds.next()
869   - if self.transactional and cmd_line.cmd != 'EXEC':
870   - error, response = yield self.process_data(data, CmdLine('MULTI_PART'))
871   - else:
872   - error, response = yield self.process_data(data, cmd_line)
  861 + if self.reconnect and not self.connection.connected():
  862 + self.connect()
  863 + self.connection.write(request)
  864 + except IOError:
  865 + self.command_stack = []
  866 + self._sudden_disconnect(callbacks)
873 867 except Exception, e:
874   - error, response = e, None
875   -
876   - responses.append((error, response ) )
877   - self.connection.read_done()
878   -
879   - def format_replies(cmd_lines, responses):
880   - results = []
881   -
882   - for cmd_line, (error, response) in zip(cmd_lines, responses):
883   - if not error:
884   - results.append((None, self.format_reply(cmd_line, response)))
885   - else:
886   - results.append((error, response))
887   - return results
888   -
889   - if self.transactional:
890   - command_stack = command_stack[:-1]
891   - errors, tr_responses = responses[-1] # actual data only from EXEC command
892   - responses = [
893   - (errors.get(idx, None), tr_responses[idx])
894   - for idx in xrange(len(tr_responses))
895   - ]
896   -
897   - results = format_replies(command_stack[1:], responses)
898   -
899   - else:
900   - results = format_replies(command_stack, responses)
  868 + self.command_stack = []
  869 + self._sudden_disconnect(callbacks, e)
  870 +
  871 + yield self.connection.queue_wait()
  872 + responses = []
  873 + total = len(command_stack)
  874 + cmds = iter(command_stack)
  875 +
  876 + while len(responses) < total:
  877 + data = yield async(self.connection.readline)()
  878 + if not data:
  879 + raise ResponseError('Not enough data after EXEC')
  880 +
  881 + try:
  882 + cmd_line = cmds.next()
  883 + if self.transactional and cmd_line.cmd != 'EXEC':
  884 + response = yield self.process_data(data, CmdLine('MULTI_PART'))
  885 + else:
  886 + response = yield self.process_data(data, cmd_line)
  887 + responses.append(response)
  888 + except Exception,e :
  889 + responses.append(e)
  890 + self.connection.read_done()
  891 +
  892 + def format_replies(cmd_lines, responses):
  893 + results = []
  894 + for cmd_line, response in zip(cmd_lines, responses):
  895 + try:
  896 + results.append(self.format_reply(cmd_line, response))
  897 + except Exception, e:
  898 + results.append(e)
  899 + return results
  900 +
  901 + if self.transactional:
  902 + command_stack = command_stack[:-1]
  903 + responses = responses[-1] # actual data only from EXEC command
  904 + #FIXME: assert all other responses to be 'QUEUED'
  905 + log.info('responses %s', responses)
  906 + results = format_replies(command_stack[1:], responses)
  907 + log.info('results %s', results)
  908 + else:
  909 + results = format_replies(command_stack, responses)
901 910
902   - self.call_callbacks(callbacks, (None, results))
  911 + self.call_callbacks(callbacks, results)
903 912
904 913
2  run_nose.sh
... ... @@ -1,3 +1,3 @@
1 1 #!/usr/bin/env sh
2   -PYTHONPATH=../facebook-tornado/:. nosetests tests --with-coverage
  2 +PYTHONPATH=../facebook-tornado/:. nosetests tests:ServerCommandsTestCase$1 --with-coverage --cover-package=brukva
3 3
41 tests/server_commands.py
... ... @@ -1,3 +1,6 @@
  1 +#!/usr/bin/env python
  2 +# -*- coding: utf-8 -*-
  3 +
1 4 import unittest
2 5 import sys
3 6 from datetime import datetime
@@ -42,16 +45,15 @@ def tearDown(self):
42 45 def expect(self, expected):
43 46 source_line = '\n' + tb.format_stack()[-2]
44 47 def callback(result):
45   - error, data = result
46   - if error:
47   - self.assertFalse(error, data,
48   - msg=source_line+' Error:'+repr(error))
  48 + if isinstance(expected, Exception):
  49 + self.assertTrue(isinstance(result, expected),
  50 + msg=source_line+' Got:'+repr(result))
49 51 if callable(expected):
50   - self.assertTrue(expected(data),
51   - msg=source_line+' Got:'+repr(data))
  52 + self.assertTrue(expected(result),
  53 + msg=source_line+' Got:'+repr(result))
52 54 else:
53   - self.assertEqual(expected, data,
54   - msg=source_line+' Got:'+repr(data))
  55 + self.assertEqual(expected, result,
  56 + msg=source_line+' Got:'+repr(result))
55 57 callback.__name__ = "expect_%s" % repr(expected)
56 58 return callback
57 59
@@ -62,16 +64,16 @@ def pexpect(self, expected_list, list_without_errors=True):
62 64 source_line = '\n' + tb.format_stack()[-2]
63 65 def callback(result):
64 66 self.assertEqual(len(result), len(expected_list) )
65   - for (e, d), (exp_e, exp_d) in zip(result, expected_list):
  67 + for result, (exp_e, exp_d) in zip(result, expected_list):
66 68 if exp_e:
67   - self.assertTrue( isinstance(e, exp_e),
68   - msg=source_line+' Error:'+repr(e))
69   - if callable(exp_d):
70   - self.assertTrue(exp_d(d),
71   - msg=source_line+' Got:'+repr(d))
  69 + self.assertTrue( isinstance(result, exp_e),
  70 + msg=source_line+' Error:'+repr(result))
  71 + elif callable(exp_d):
  72 + self.assertTrue(exp_d(result),
  73 + msg=source_line+' Got:'+repr(result))
72 74 else:
73   - self.assertEqual(d, exp_d,
74   - msg=source_line+' Got:'+repr(d))
  75 + self.assertEqual(result, exp_d,
  76 + msg=source_line+' Got:'+repr(result))
75 77 return callback
76 78
77 79 def finish(self, *args):
@@ -81,6 +83,11 @@ def start(self):
81 83 self.loop.start()
82 84
83 85 class ServerCommandsTestCase(TornadoTestCase):
  86 + def test_setget_unicode(self):
  87 + self.client.set('foo', u'бар', self.expect(True))
  88 + self.client.get('foo', [self.expect('бар'), self.finish])
  89 + self.start()
  90 +#"""
84 91 def test_set(self):
85 92 self.client.set('foo', 'bar', [self.expect(True), self.finish])
86 93 self.start()
@@ -650,6 +657,6 @@ def test_pipe_hsets2(self):
650 657 self.finish,
651 658 ])
652 659 self.start()
653   -
  660 +#"""
654 661 if __name__ == '__main__':
655 662 unittest.main()

0 comments on commit 6f5514c

Please sign in to comment.
Something went wrong with that request. Please try again.