Skip to content

Commit

Permalink
BUGFIX: WATCH + MGET clearing inTransaction prematurely. WATCH + HGET…
Browse files Browse the repository at this point in the history
…ALL does not use post_proc

To reproduce the 1st problem:

  > t = yield conn.watch(k)
  > t.mget([k])
  > assert t.inTransaction # boOoOm!

To reproduce the second problem:

  > t = yield conn.watch(k)
  > h = yield t.hgetall(k)
  > assert type(h) == dict # boOoOm!

This commit introduces a `commit_cc' function (similar to
`unwatch_cc') which clears the `inTransaction' flag only if there has
been a `MULTI' command before (fixes the 1st problem).

The second problem is fixed by not delaying using `post_proc'
function if only `watch' has been issued.
  • Loading branch information
dgvncsz0f committed Feb 6, 2013
1 parent 3715acd commit fd018c8
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 21 deletions.
46 changes: 43 additions & 3 deletions tests/test_watch.py
Expand Up @@ -29,6 +29,7 @@ class TestRedisConnections(unittest.TestCase):
def setUp(self):
self.connections = []
self.db = yield self._getRedisConnection()
yield self.db.delete(self._KEYS)

@defer.inlineCallbacks
def tearDown(self):
Expand Down Expand Up @@ -81,10 +82,49 @@ def testRedisMultiNoArgs(self):
yield t.commit().addBoth(self._check_watcherror, shouldError=False)

@defer.inlineCallbacks
def testRedisWithBulkCommands(self):
t = yield self.db.watch("foobar")
yield t.mget(["foo", "bar"])
def testRedisWithBulkCommands_transactions(self):
t = yield self.db.watch(self._KEYS)
yield t.mget(self._KEYS)
t = yield t.multi()
yield t.commit()
self.assertEqual(0, t.transactions)
self.assertFalse(t.inTransaction)

@defer.inlineCallbacks
def testRedisWithBulkCommands_inTransaction(self):
t = yield self.db.watch(self._KEYS)
yield t.mget(self._KEYS)
self.assertTrue(t.inTransaction)
yield t.unwatch()

@defer.inlineCallbacks
def testRedisWithBulkCommands_mget(self):
yield self.db.set(self._KEYS[0], "foo")
yield self.db.set(self._KEYS[1], "bar")

m0 = yield self.db.mget(self._KEYS)
t = yield self.db.watch(self._KEYS)
m1 = yield t.mget(self._KEYS)
t = yield t.multi()
yield t.mget(self._KEYS)
(m2,) = yield t.commit()

self.assertEqual(["foo", "bar"], m0)
self.assertEqual(m0, m1)
self.assertEqual(m0, m2)

@defer.inlineCallbacks
def testRedisWithBulkCommands_hgetall(self):
yield self.db.hset(self._KEYS[0], "foo", "bar")
yield self.db.hset(self._KEYS[0], "bar", "foo")

h0 = yield self.db.hgetall(self._KEYS[0])
t = yield self.db.watch(self._KEYS[0])
h1 = yield t.hgetall(self._KEYS[0])
t = yield t.multi()
yield t.hgetall(self._KEYS[0])
(h2,) = yield t.commit()

self.assertEqual({"foo": "bar", "bar": "foo"}, h0)
self.assertEqual(h0, h1)
self.assertEqual(h0, h2)
29 changes: 11 additions & 18 deletions txredisapi.py
Expand Up @@ -209,6 +209,7 @@ def __init__(self, charset="utf-8", errors="strict"):
self.transactions = 0
self.inTransaction = False
self.unwatch_cc = lambda: ()
self.commit_cc = lambda: ()

self.script_hashes = set()

Expand Down Expand Up @@ -371,26 +372,16 @@ def multiBulkDataReceived(self):
reply = self.multi_bulk.items
self.multi_bulk = MultiBulkStorage()

if self.inTransaction and reply is not None:
# There is a problem when you use transactions as follows:
# > watch / mget / multi / ... / exec
#
# `watch' command will set self.inTransaction flag to
# True. The response of mget will make
# self.transactions negative and everything will fail
# miserably afterwards.
#
# The following `if' will ensure that we decrement it
# only after MULTI and EXEC commands has both been
# issued (to see this remember that after MULTI
# everything returns QUEUED).
#
# dgvncsz0f, 03 Feb, 2013
if self.inTransaction and reply is not None: # watch or multi has been called
if self.transactions > 0:
self.transactions -= len(reply)
self.transactions -= len(reply) # multi: this must be an exec [commit] reply
if self.transactions == 0:
self.inTransaction = False

self.commit_cc()
if self.inTransaction: # watch but no multi: process the reply as usual
f = self.post_proc[1:]
if len(f) == 1 and callable(f[0]):
reply = f[0](reply)
else: # multi: this must be an exec reply
tmp = []
for f, v in zip(self.post_proc[1:], reply):
if callable(f):
Expand Down Expand Up @@ -1288,6 +1279,7 @@ def watch(self, keys):
if not self.inTransaction:
self.inTransaction = True
self.unwatch_cc = self._clear_txstate
self.commit_cc = lambda: ()
if isinstance(keys, (str, unicode)):
keys = [keys]
d = self.execute_command("WATCH", *keys).addCallback(self._tx_started)
Expand All @@ -1305,6 +1297,7 @@ def unwatch(self):
def multi(self, keys=None):
self.inTransaction = True
self.unwatch_cc = lambda: ()
self.commit_cc = self._clear_txstate
if keys is not None:
d = self.watch(keys)
d.addCallback(lambda _: self.execute_command("MULTI"))
Expand Down

0 comments on commit fd018c8

Please sign in to comment.