Skip to content

Commit

Permalink
Pipeline objects are now executed atomically via the MULTI and EXEC c…
Browse files Browse the repository at this point in the history
…ommands
  • Loading branch information
andymccurdy committed Mar 11, 2010
1 parent 708c458 commit bd411f9
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 13 deletions.
70 changes: 57 additions & 13 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def execute_command(self, command_name, command, **options):
self.connection.disconnect()
return self._execute_command(command_name, command, **options)

def _parse_response(self, command_name):
def _parse_response(self, command_name, catch_errors):
conn = self.connection
response = conn.read().strip()
if not response:
Expand Down Expand Up @@ -261,13 +261,27 @@ def _parse_response(self, command_name):
length = int(response)
if length == -1:
return None
return [self._parse_response(command_name) for i in range(length)]
if not catch_errors:
return [self._parse_response(command_name, catch_errors)
for i in range(length)]
else:
# for pipelines, we need to read everything, including response errors.
# otherwise we'd completely mess up the receive buffer
data = []
for i in range(length):
try:
data.append(
self._parse_response(command_name, catch_errors)
)
except Exception, e:
data.append(e)
return data

raise InvalidResponse("Unknown response type for: %s" % command_name)

def parse_response(self, command_name, **options):
def parse_response(self, command_name, catch_errors=False, **options):
"Parses a response from the Redis server"
response = self._parse_response(command_name)
response = self._parse_response(command_name, catch_errors)
if command_name in self.RESPONSE_CALLBACKS:
return self.RESPONSE_CALLBACKS[command_name](response, **options)
return response
Expand Down Expand Up @@ -886,16 +900,27 @@ class Pipeline(Redis):
in one transmission. This is convenient for batch processing, such as
saving all the values in a list to Redis.
Note that pipelining does *not* guarantee all the commands will be executed
together atomically, nor does it guarantee any transactional consistency.
If the third command in the batch fails, the first two will still have been
executed and "committed"
All commands executed within a pipeline are wrapped with MULTI and EXEC
calls. This guarantees all commands executed in the pipeline will be
executed atomically.
Any command raising an exception does *not* halt the execution of
subsequent commands in the pipeline. Instead, the exception is caught
and its instance is placed into the response list returned by execute().
Code iterating over the response list should be able to deal with an
instance of an exception as a potential value. In general, these will be
ResponseError exceptions, such as those raised when issuing a command
on a key of a different datatype.
"""
def __init__(self, connection, charset, errors):
self.connection = connection
self.command_stack = []
self.encoding = charset
self.errors = errors
self.reset()

def reset(self):
self.command_stack = []
self.format_inline('MULTI')

def execute_command(self, command_name, command, **options):
"""
Expand All @@ -921,15 +946,34 @@ def execute_command(self, command_name, command, **options):
return self

def _execute(self, commands):
for _, command, options in commands:
for name, command, options in commands:
self.connection.send(command, self)
return [self.parse_response(name, **options)
for name, _, options in commands]
# we only care about the last item in the response, which should be
# the EXEC command
for i in range(len(commands)-1):
_ = self.parse_response('_')
# tell the response parse to catch errors and return them as
# part of the response
response = self.parse_response('_', catch_errors=True)
# don't return the results of the MULTI or EXEC command
commands = [(c[0], c[2]) for c in commands[1:-1]]
if len(response) != len(commands):
raise ResponseError("Wrong number of response items from "
"pipline execution")
# Run any callbacks for the commands run in the pipeline
data = []
for r, cmd in zip(response, commands):
if not isinstance(r, Exception):
if cmd[0] in self.RESPONSE_CALLBACKS:
r = self.RESPONSE_CALLBACKS[cmd[0]](r, **cmd[1])
data.append(r)
return data

def execute(self):
"Execute all the commands in the current pipeline"
self.format_inline('EXEC')
stack = self.command_stack
self.command_stack = []
self.reset()
try:
return self._execute(stack)
except ConnectionError:
Expand Down
30 changes: 30 additions & 0 deletions tests/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,33 @@ def test_pipeline(self):
[('z1', 2.0), ('z2', 4)]
]
)

def test_invalid_command_in_pipeline(self):
# all commands but the invalid one should be excuted correctly
self.client['c'] = 'a'
pipe = self.client.pipeline()
pipe.set('a', 1).set('b', 2).lpush('c', 3).set('d', 4)
result = pipe.execute()

self.assertEquals(result[0], True)
self.assertEquals(self.client['a'], '1')
self.assertEquals(result[1], True)
self.assertEquals(self.client['b'], '2')
# we can't lpush to a key that's a string value, so this should
# be a ResponseError exception
self.assert_(isinstance(result[2], redis.ResponseError))
self.assertEquals(self.client['c'], 'a')
self.assertEquals(result[3], True)
self.assertEquals(self.client['d'], '4')

# make sure the pipe was restored to a working state
self.assertEquals(pipe.set('z', 'zzz').execute(), [True])
self.assertEquals(self.client['z'], 'zzz')

def test_pipe_cannot_select(self):
pipe = self.client.pipeline()
self.assertRaises(redis.RedisError,
pipe.select, 'localhost', 6379, db=9)



0 comments on commit bd411f9

Please sign in to comment.