Skip to content

Commit

Permalink
Remove or reduce some test sleeps (#2771)
Browse files Browse the repository at this point in the history
- Make some test code more pythonic (e.g., generators, list comprehension)
- Remove a number of unnecessary sleep calls (or vastly reduce wait time)
- Teach some tests to `assert`
- Remove more `print()`s

Co-authored-by: Ben Clifford <benc@hawaga.org.uk>
  • Loading branch information
khk-globus and benclifford authored Jul 5, 2023
1 parent 3306deb commit 6b81796
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 204 deletions.
100 changes: 31 additions & 69 deletions parsl/tests/test_python_apps/test_fail.py
Original file line number Diff line number Diff line change
@@ -1,89 +1,59 @@
import argparse
import pytest

import parsl
from parsl.app.app import python_app
from parsl.tests.configs.local_threads import fresh_config as local_config
from parsl.dataflow.errors import DependencyError


@python_app
def sleep_fail(sleep_dur, sleep_rand_max, fail_prob, inputs=[]):
import time
import random
class ManufacturedTestFailure(Exception):
pass

s = sleep_dur + random.randint(-sleep_rand_max, sleep_rand_max)

time.sleep(s)
raise Exception("App failure")
@python_app
def random_fail(fail_prob: float, inputs=()):
import random
if random.random() < fail_prob:
raise ManufacturedTestFailure("App failure")


def test_no_deps(numtasks=2):
def test_no_deps():
"""Test basic error handling, with no dependent failures
"""
futs = [random_fail(1), random_fail(0), random_fail(0)]

fus = []
for i in range(0, numtasks):

fu = sleep_fail(0.1, 0, .8)
fus.extend([fu])

count = 0
for fu in fus:
for f in futs:
try:
fu.result()
except Exception as e:
print("Caught exception : ", "*" * 20)
print(e)
print("*" * 20)
count += 1
f.result()
except ManufacturedTestFailure:
pass

print("Caught failures of {0}/{1}".format(count, len(fus)))


def test_fail_sequence(numtasks=2):
@pytest.mark.parametrize("fail_probs", ((1, 0), (0, 1)))
def test_fail_sequence(fail_probs):
"""Test failure in a sequence of dependencies
App1 -> App2 ... -> AppN
"""

sleep_dur = 0.1
fail_prob = 0.4

fus = {0: None}
for i in range(0, numtasks):
print("Chaining {0} to {1}".format(i + 1, fus[i]))
fus[i + 1] = sleep_fail(sleep_dur, 0, fail_prob, inputs=[fus[i]])
t1_fail_prob, t2_fail_prob = fail_probs
t1 = random_fail(fail_prob=t1_fail_prob)
t2 = random_fail(fail_prob=t2_fail_prob, inputs=[t1])
t_final = random_fail(fail_prob=0, inputs=[t2])

# time.sleep(numtasks*sleep_dur)
for k in sorted(fus.keys()):
try:
x = fus[i].result()
print("{0} : {1}".format(k, x))
except Exception as e:
print("{0} : {1}".format(k, e))

return


def test_deps(numtasks=2):
"""Random failures in branches of Map -> Map -> reduce
with pytest.raises(DependencyError):
t_final.result()

App1 App2 ... AppN
"""

fus = []
for i in range(0, numtasks):
fu = sleep_fail(0.2, 0, .4)
fus.extend([fu])
def test_deps(width=3):
"""Random failures in branches of Map -> Map -> reduce"""
# App1 App2 ... AppN
futs = [random_fail(fail_prob=0.4) for _ in range(width)]

# App1 App2 ... AppN
# | | |
# V V V
# App1 App2 ... AppN

fus_2 = []
for fu in fus:
fu = sleep_fail(0, 0, .8, inputs=[fu])
fus_2.extend([fu])
futs = [random_fail(fail_prob=0.8, inputs=[f]) for f in futs]

# App1 App2 ... AppN
# | | |
Expand All @@ -92,15 +62,7 @@ def test_deps(numtasks=2):
# \ | /
# \ | /
# App_Final

fu_final = sleep_fail(1, 0, 0, inputs=fus_2)

try:
print("Final status : ", fu_final.result())
except parsl.dataflow.errors.DependencyError as e:
print("Caught the right exception")
print("Exception : ", e)
except Exception as e:
assert False, "Expected DependencyError but got: %s" % e
else:
raise RuntimeError("Expected DependencyError, but got no exception")
random_fail(fail_prob=0, inputs=futs).result()
except DependencyError:
pass
24 changes: 15 additions & 9 deletions parsl/tests/test_python_apps/test_garbage_collect.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,36 @@
import parsl
import threading
import time

import pytest

import parsl
from parsl.app.app import python_app
from parsl.tests.configs.local_threads import fresh_config as local_config # noqa


@python_app
def slow_double(x):
import time
time.sleep(0.1)
def slow_double(x, may_continue: threading.Event):
may_continue.wait()
return x * 2


@pytest.mark.local
def test_garbage_collect():
""" Launches an app with a dependency and waits till it's done and asserts that
the internal refs were wiped
"""
x = slow_double(slow_double(10))
evt = threading.Event()
x = slow_double(10, evt)
x = slow_double(x, evt)

if x.done() is False:
assert parsl.dfk().tasks[x.tid]['app_fu'] == x, "Tasks table should have app_fu ref before done"
assert parsl.dfk().tasks[x.tid]['app_fu'] == x, "Tasks table should have app_fu ref before done"

x.result()
evt.set()
assert x.result() == 10 * 4
if parsl.dfk().checkpoint_mode is not None:
# We explicit call checkpoint if checkpoint_mode is enabled covering
# cases like manual/periodic where checkpointing may be deferred.
parsl.dfk().checkpoint()

time.sleep(0.2) # Give enough time for task wipes to work
time.sleep(0.01) # Give enough time for task wipes to work
assert x.tid not in parsl.dfk().tasks, "Task record should be wiped after task completion"
27 changes: 7 additions & 20 deletions parsl/tests/test_python_apps/test_join.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import pytest
import time

from parsl import join_app, python_app
from parsl.dataflow.errors import JoinError

from parsl.tests.configs.local_threads import fresh_config as local_config

RESULT_CONSTANT = 3


@python_app(cache=True)
@python_app
def inner_app():
time.sleep(1)
return RESULT_CONSTANT


Expand All @@ -34,24 +31,17 @@ def combine(*args):

@join_app
def outer_make_a_dag_combine(n):
futs = []
for _ in range(n):
futs.append(inner_app())
return combine(*futs)
return combine(*(inner_app() for _ in range(n)))


@join_app
def outer_make_a_dag_multi(n):
futs = []
for _ in range(n):
futs.append(inner_app())
return futs
return [inner_app() for _ in range(n)]


def test_result_flow():
f = outer_app()
res = f.result()
assert res == RESULT_CONSTANT
assert f.result() == RESULT_CONSTANT


@join_app
Expand All @@ -67,20 +57,17 @@ def test_wrong_type():

def test_dependency_on_joined():
g = add_one(outer_app())
res = g.result()
assert res == RESULT_CONSTANT + 1
assert g.result() == RESULT_CONSTANT + 1


def test_combine():
f = outer_make_a_dag_combine(inner_app())
res = f.result()
assert res == [RESULT_CONSTANT] * RESULT_CONSTANT
assert f.result() == [RESULT_CONSTANT] * RESULT_CONSTANT


def test_multiple_return():
f = outer_make_a_dag_multi(inner_app())
res = f.result()
assert res == [RESULT_CONSTANT] * RESULT_CONSTANT
assert f.result() == [RESULT_CONSTANT] * RESULT_CONSTANT


class InnerError(RuntimeError):
Expand Down
51 changes: 13 additions & 38 deletions parsl/tests/test_python_apps/test_mapred.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
import argparse
import pytest

import parsl
from parsl.app.app import python_app
from parsl.tests.configs.local_threads import config


@python_app
def fan_out(x, dur):
import time
time.sleep(dur)
def times_two(x):
return x * 2


@python_app
def accumulate(inputs=[]):
def accumulate(inputs=()):
return sum(inputs)


Expand All @@ -22,38 +18,17 @@ def accumulate_t(*args):
return sum(args)


def test_mapred_type1(width=2):
"""MapReduce test with the reduce stage taking futures in inputs=[]
"""

futs = []
for i in range(1, width + 1):
fu = fan_out(i, 1)
futs.extend([fu])

print("Fan out : ", futs)

@pytest.mark.parametrize("width", (2, 3, 5))
def test_mapred_type1(width):
"""MapReduce test with the reduce stage taking futures in inputs=[]"""
futs = [times_two(i) for i in range(width)]
red = accumulate(inputs=futs)
# print([(i, i.done()) for i in futs])
r = sum([x * 2 for x in range(1, width + 1)])
assert r == red.result(), "[TEST] MapRed type1 expected %s, got %s" % (
r, red.result())

assert red.result() == 2 * sum(range(width))

def test_mapred_type2(width=2):
"""MapReduce test with the reduce stage taking futures on the args
"""

futs = []
for i in range(1, width + 1):
fu = fan_out(i, 0.1)
futs.extend([fu])

print("Fan out : ", futs)

@pytest.mark.parametrize("width", (2, 3, 5))
def test_mapred_type2(width):
"""MapReduce test with the reduce stage taking futures on the args"""
futs = [times_two(i) for i in range(width)]
red = accumulate_t(*futs)

# print([(i, i.done()) for i in futs])
r = sum([x * 2 for x in range(1, width + 1)])
assert r == red.result(), "[TEST] MapRed type2 expected %s, got %s" % (
r, red.result())
assert red.result() == 2 * sum(range(width))
13 changes: 6 additions & 7 deletions parsl/tests/test_python_apps/test_memoize_bad_id_for_memo.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ def noop_app(x, inputs=[], cache=True):


@python_app
def sleep(t):
import time
time.sleep(t)
def some_func(_t):
pass


def test_python_unmemoizable():
Expand All @@ -51,14 +50,14 @@ def test_python_failing_memoizer():


def test_python_unmemoizable_after_dep():
sleep_fut = sleep(1)
fut = noop_app(Unmemoizable(), inputs=[sleep_fut])
memoizable_fut = some_func(1)
fut = noop_app(Unmemoizable(), inputs=[memoizable_fut])
with pytest.raises(ValueError):
fut.result()


def test_python_failing_memoizer_afer_dep():
sleep_fut = sleep(1)
fut = noop_app(FailingMemoizable(), inputs=[sleep_fut])
memoizable_fut = some_func(1)
fut = noop_app(FailingMemoizable(), inputs=[memoizable_fut])
with pytest.raises(ValueError):
fut.result()
Loading

0 comments on commit 6b81796

Please sign in to comment.