Skip to content
Permalink
Browse files

Failing test.

  • Loading branch information...
glyph committed Jun 5, 2013
1 parent 2e40ada commit ab9dbbffb761c9cb4025868efb5fc0b495a7854c
Showing with 33 additions and 0 deletions.
  1. +33 −0 twext/enterprise/test/test_queue.py
@@ -716,6 +716,39 @@ def op2(txn):
self.assertEquals(rows, [[4321, 7]])


@inlineCallbacks
def test_noWorkDoneWhenConcurrentlyDeleted(self):
"""
When a L{WorkItem} is concurrently deleted by another transaction, it
should I{not} perform its work.
"""
# Provide access to a method called 'concurrently' everything using
original = self.store.newTransaction
def decorate(*a, **k):
result = original()
result.concurrently = original
return result
self.store.newTransaction = decorate

def operation(txn):
return txn.enqueue(DummyWorkItem, a=30, b=40, workID=5678,
notBefore=datetime.datetime.utcnow())
proposal = yield inTransaction(self.store.newTransaction, operation)
yield proposal.whenExecuted()
# Sanity check on the concurrent deletion.
def op2(txn):
return Select([schema.DUMMY_WORK_ITEM.WORK_ID],
From=schema.DUMMY_WORK_ITEM).on(txn)
rows = yield inTransaction(self.store.newTransaction, op2)
def op3(txn):
return Select([schema.DUMMY_WORK_DONE.WORK_ID,
schema.DUMMY_WORK_DONE.A_PLUS_B],
From=schema.DUMMY_WORK_DONE).on(txn)
rows = yield inTransaction(self.store.newTransaction, op3)
self.assertEquals(rows, [])



class DummyProposal(object):

def __init__(self, *ignored):

0 comments on commit ab9dbbf

Please sign in to comment.
You can’t perform that action at this time.