Skip to content
This repository has been archived by the owner on May 2, 2022. It is now read-only.

Commit

Permalink
Add test for the number of times sleep is called by TaskPackageDropbox
Browse files Browse the repository at this point in the history
  • Loading branch information
Shane Breeze committed Oct 28, 2018
1 parent 45a1ca1 commit 6692b7b
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 68 deletions.
23 changes: 11 additions & 12 deletions alphatwirl/concurrently/TaskPackageDropbox.py
Expand Up @@ -89,11 +89,9 @@ def receive_one(self):
if pkgidx_result_pair and not pkgidx_result_pair[1]:
pkgidx_result_pair = None

# early break to avoid sleeping
if pkgidx_result_pair:
break

time.sleep(self.sleep)
# Only sleep if there are no pending runs
if not self.runid_to_return and self.runid_pkgidx_map:
time.sleep(self.sleep)

return pkgidx_result_pair

Expand All @@ -109,11 +107,9 @@ def receive(self):
self._collect_all_finished_pkgidx_result_pairs()
)

# early break to avoid sleeping
if not self.runid_pkgidx_map:
break

time.sleep(self.sleep)
# Only sleep if we're waiting for runs
if self.runid_pkgidx_map:
time.sleep(self.sleep)

# remove failed results and sort in the order of pkgidx
pkgidx_result_pairs = filter(itemgetter(1), pkgidx_result_pairs)
Expand All @@ -125,19 +121,22 @@ def _collect_all_finished_pkgidx_result_pairs(self):
pkgidx_result_pairs = []

pairs = self._collect_next_finished_pkgidx_result_pair()
while pairs:
pkgidx_result_pairs.append(pairs)
while self.runid_to_return:
if pairs: pkgidx_result_pairs.append(pairs)
pairs = self._collect_next_finished_pkgidx_result_pair()
if pairs: pkgidx_result_pairs.append(pairs)

return pkgidx_result_pairs

def _collect_next_finished_pkgidx_result_pair(self):
# No runs remaining
if not self.runid_pkgidx_map:
return None

if not self.runid_to_return:
self.runid_to_return.extend(self.dispatcher.poll())

# No finished runs remaining
if not self.runid_to_return:
return None

Expand Down
132 changes: 76 additions & 56 deletions tests/unit/concurrently/test_TaskPackageDropbox_receive.py
Expand Up @@ -83,58 +83,71 @@ def obj(workingarea, dispatcher, packages):

##__________________________________________________________________||
def test_receive(obj, pkgidx_result_pairs):
assert pkgidx_result_pairs == obj.receive()
with mock.patch('time.sleep') as sleep:
assert pkgidx_result_pairs == obj.receive()
assert [mock.call(0.01)]*4 == sleep.call_args_list

def test_receive_dispatcher_received_failed_runids(obj, dispatcher):
obj.receive()
assert [mock.call([1002]), mock.call([1005])] == dispatcher.failed_runids.call_args_list
with mock.patch('time.sleep') as sleep:
obj.receive()
assert [mock.call([1002]), mock.call([1005])] == dispatcher.failed_runids.call_args_list
assert [mock.call(0.01)]*4 == sleep.call_args_list

def test_receive_logging_resubmission(obj, caplog):
with caplog.at_level(logging.WARNING):
obj.receive()
assert len(caplog.records) == 2
assert caplog.records[0].levelname == 'WARNING'
assert caplog.records[1].levelname == 'WARNING'
assert 'TaskPackageDropbox' in caplog.records[0].name
assert 'TaskPackageDropbox' in caplog.records[1].name
assert 'resubmitting' in caplog.records[0].msg
assert 'resubmitting' in caplog.records[1].msg
with mock.patch('time.sleep') as sleep:
with caplog.at_level(logging.WARNING):
obj.receive()
assert len(caplog.records) == 2
assert caplog.records[0].levelname == 'WARNING'
assert caplog.records[1].levelname == 'WARNING'
assert 'TaskPackageDropbox' in caplog.records[0].name
assert 'TaskPackageDropbox' in caplog.records[1].name
assert 'resubmitting' in caplog.records[0].msg
assert 'resubmitting' in caplog.records[1].msg
assert [mock.call(0.01)]*4 == sleep.call_args_list

def test_receive_in_one_step(obj, pkgidx_result_pairs, dispatcher, collect_results):
with mock.patch('time.sleep') as sleep:
# make all jobs finish by the first poll
dispatcher.poll.side_effect = [[1000, 1001, 1002, 1003, 1004]]

# make all jobs finish by the first poll
dispatcher.poll.side_effect = [[1000, 1001, 1002, 1003, 1004]]

# make the 3rd job successful by removing two None's
collect_results[2].popleft() # deque([None, result2])
collect_results[2].popleft() # deque([result2])
# make the 3rd job successful by removing two None's
collect_results[2].popleft() # deque([None, result2])
collect_results[2].popleft() # deque([result2])

assert pkgidx_result_pairs == obj.receive()
assert pkgidx_result_pairs == obj.receive()
assert [] == sleep.call_args_list

##__________________________________________________________________||
def test_poll(obj, pkgidx_result_pairs):
actual = [ ]
while len(actual) < len(pkgidx_result_pairs):
actual.extend(obj.poll())
assert sorted(pkgidx_result_pairs) == sorted(actual)
with mock.patch('time.sleep') as sleep:
actual = [ ]
while len(actual) < len(pkgidx_result_pairs):
actual.extend(obj.poll())
assert sorted(pkgidx_result_pairs) == sorted(actual)
assert [] == sleep.call_args_list

def test_poll_then_receive(obj, pkgidx_result_pairs):
actual = [ ]
actual.extend(obj.poll())
actual.extend(obj.receive())
with mock.patch('time.sleep') as sleep:
actual = [ ]
actual.extend(obj.poll())
actual.extend(obj.receive())

assert sorted(pkgidx_result_pairs) == sorted(actual)
assert sorted(pkgidx_result_pairs) == sorted(actual)
assert [mock.call(0.01)]*3 == sleep.call_args_list

##__________________________________________________________________||
def test_receive_one(obj, pkgidx_result_pairs):
actual = [ ]
while len(actual) < len(pkgidx_result_pairs):
pair = obj.receive_one()
if pair is None:
break
actual.append(pair)
assert obj.receive_one() is None
assert sorted(pkgidx_result_pairs) == sorted(actual)
with mock.patch('time.sleep') as sleep:
actual = [ ]
while len(actual) < len(pkgidx_result_pairs):
pair = obj.receive_one()
if pair is None:
break
actual.append(pair)
assert obj.receive_one() is None
assert sorted(pkgidx_result_pairs) == sorted(actual)
assert [mock.call(0.01)]*4 == sleep.call_args_list

@pytest.mark.parametrize('dispatcher_poll', [
pytest.param(
Expand All @@ -151,36 +164,43 @@ def test_receive_one(obj, pkgidx_result_pairs):
),
])
def test_receive_one_param(obj, pkgidx_result_pairs, dispatcher, dispatcher_poll):
dispatcher.poll.side_effect = dispatcher_poll

actual = [ ]
while len(actual) < len(pkgidx_result_pairs):
pair = obj.receive_one()
if pair is None:
break
actual.append(pair)
assert obj.receive_one() is None
assert sorted(pkgidx_result_pairs) == sorted(actual)
with mock.patch('time.sleep') as sleep:
dispatcher.poll.side_effect = dispatcher_poll

actual = [ ]
while len(actual) < len(pkgidx_result_pairs):
pair = obj.receive_one()
if pair is None:
break
actual.append(pair)
assert obj.receive_one() is None
assert sorted(pkgidx_result_pairs) == sorted(actual)
assert [mock.call(0.01)]*(len(dispatcher_poll)-1) == sleep.call_args_list

def test_receive_one_then_receive(obj, pkgidx_result_pairs):
actual = [ ]
with mock.patch('time.sleep') as sleep:
actual = [ ]

actual.append(obj.receive_one())
actual.append(obj.receive_one())

actual.extend(obj.receive())
actual.extend(obj.receive())

assert sorted(pkgidx_result_pairs) == sorted(actual)
assert sorted(pkgidx_result_pairs) == sorted(actual)
assert [mock.call(0.01)]*4 == sleep.call_args_list

def test_receive_one_then_poll(obj, pkgidx_result_pairs):
actual = [ ]
with mock.patch('time.sleep') as sleep:
actual = [ ]

actual.append(obj.receive_one())
actual.append(obj.receive_one())
actual.append(obj.receive_one())

actual.extend(obj.poll())
actual.extend(obj.poll())
actual.extend(obj.poll())
actual.extend(obj.poll())
actual.extend(obj.poll())
actual.extend(obj.poll())
actual.extend(obj.poll())
actual.extend(obj.poll())

assert sorted(pkgidx_result_pairs) == sorted(actual)
assert sorted(pkgidx_result_pairs) == sorted(actual)
assert [mock.call(0.01)] == sleep.call_args_list

##__________________________________________________________________||

0 comments on commit 6692b7b

Please sign in to comment.