Skip to content

Commit

Permalink
Remove or reduce some test sleeps
Browse files Browse the repository at this point in the history
- Make some test code more pythonic (e.g., generators, list comprehension)
- Remove a number of unnecessary sleep calls (or vastly reduce wait time)
- Teach some tests to `assert`
- Remove more `print()`s
  • Loading branch information
khk-globus committed Jun 28, 2023
1 parent b91aa49 commit d1d08c3
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 164 deletions.
95 changes: 34 additions & 61 deletions parsl/tests/test_python_apps/test_fail.py
Original file line number Diff line number Diff line change
@@ -1,89 +1,69 @@
import argparse
import pytest

import parsl
from parsl.app.app import python_app
from parsl.tests.configs.local_threads import fresh_config as local_config
from parsl.dataflow.errors import DependencyError


@python_app
def sleep_fail(sleep_dur, sleep_rand_max, fail_prob, inputs=[]):
import time
import random
class ManufacturedTestFailure(Exception):
pass

s = sleep_dur + random.randint(-sleep_rand_max, sleep_rand_max)

time.sleep(s)
raise Exception("App failure")
@python_app
def random_fail(fail_prob, inputs=()):
import random
if random.random() < fail_prob:
raise ManufacturedTestFailure("App failure")


def test_no_deps(numtasks=2):
@pytest.mark.parametrize("ntasks", (5, 8, 13))
def test_no_deps(ntasks):
"""Test basic error handling, with no dependent failures
"""

fus = []
for i in range(0, numtasks):

fu = sleep_fail(0.1, 0, .8)
fus.extend([fu])
futs = [random_fail(.8) for i in range(ntasks)]

count = 0
for fu in fus:
for f in futs:
try:
fu.result()
except Exception as e:
print("Caught exception : ", "*" * 20)
print(e)
print("*" * 20)
count += 1

print("Caught failures of {0}/{1}".format(count, len(fus)))
f.result()
except ManufacturedTestFailure:
pass


def test_fail_sequence(numtasks=2):
@pytest.mark.parametrize("depth", (5, 8, 13))
def test_fail_sequence(depth):
"""Test failure in a sequence of dependencies
App1 -> App2 ... -> AppN
"""

sleep_dur = 0.1
fail_prob = 0.4

fus = {0: None}
for i in range(0, numtasks):
print("Chaining {0} to {1}".format(i + 1, fus[i]))
fus[i + 1] = sleep_fail(sleep_dur, 0, fail_prob, inputs=[fus[i]])
futs = [random_fail(fail_prob)]
futs.extend(random_fail(fail_prob, inputs=[futs[i - 1]]) for i in range(1, depth))
futs.append(random_fail(fail_prob=0)) # testing *dependency* logic

# time.sleep(numtasks*sleep_dur)
for k in sorted(fus.keys()):
try:
x = fus[i].result()
print("{0} : {1}".format(k, x))
except Exception as e:
print("{0} : {1}".format(k, e))

return
try:
futs[-1].result()
except DependencyError:
pass


def test_deps(numtasks=2):
@pytest.mark.parametrize("width", (2, 3, 5, 8))
def test_deps(width):
"""Random failures in branches of Map -> Map -> reduce
App1 App2 ... AppN
"""

fus = []
for i in range(0, numtasks):
fu = sleep_fail(0.2, 0, .4)
fus.extend([fu])
fail_prob = 0.4
futs = [random_fail(fail_prob) for _ in range(width)]

# App1 App2 ... AppN
# | | |
# V V V
# App1 App2 ... AppN

fus_2 = []
for fu in fus:
fu = sleep_fail(0, 0, .8, inputs=[fu])
fus_2.extend([fu])
fail_prob = 0.8
futs = [random_fail(fail_prob, inputs=[f]) for f in futs]

# App1 App2 ... AppN
# | | |
Expand All @@ -93,14 +73,7 @@ def test_deps(numtasks=2):
# \ | /
# App_Final

fu_final = sleep_fail(1, 0, 0, inputs=fus_2)

try:
print("Final status : ", fu_final.result())
except parsl.dataflow.errors.DependencyError as e:
print("Caught the right exception")
print("Exception : ", e)
except Exception as e:
assert False, "Expected DependencyError but got: %s" % e
else:
raise RuntimeError("Expected DependencyError, but got no exception")
random_fail(fail_prob=0, inputs=futs).result()
except DependencyError:
pass
11 changes: 5 additions & 6 deletions parsl/tests/test_python_apps/test_garbage_collect.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import parsl
import time

import parsl
from parsl.app.app import python_app


@python_app
def slow_double(x):
import time
time.sleep(0.1)
time.sleep(0.01)
return x * 2


Expand All @@ -17,14 +17,13 @@ def test_garbage_collect():
"""
x = slow_double(slow_double(10))

if x.done() is False:
assert parsl.dfk().tasks[x.tid]['app_fu'] == x, "Tasks table should have app_fu ref before done"
assert parsl.dfk().tasks[x.tid]['app_fu'] == x, "Tasks table should have app_fu ref before done"

x.result()
assert x.result() == 10 * 4
if parsl.dfk().checkpoint_mode is not None:
# We explicit call checkpoint if checkpoint_mode is enabled covering
# cases like manual/periodic where checkpointing may be deferred.
parsl.dfk().checkpoint()

time.sleep(0.2) # Give enough time for task wipes to work
time.sleep(0.01) # Give enough time for task wipes to work
assert x.tid not in parsl.dfk().tasks, "Task record should be wiped after task completion"
35 changes: 16 additions & 19 deletions parsl/tests/test_python_apps/test_join.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
import pytest
import time

from parsl import join_app, python_app
from parsl.dataflow.errors import JoinError

from parsl.tests.configs.local_threads import fresh_config as local_config

class _InnerAppCalledMoreThanOnce(Exception):
pass


RESULT_CONSTANT = 3

_inner_app_called = False


@python_app(cache=True)
def inner_app():
time.sleep(1)
global _inner_app_called
if _inner_app_called:
raise _InnerAppCalledMoreThanOnce()
_inner_app_called = True
return RESULT_CONSTANT


Expand All @@ -34,24 +41,17 @@ def combine(*args):

@join_app
def outer_make_a_dag_combine(n):
futs = []
for _ in range(n):
futs.append(inner_app())
return combine(*futs)
return combine(*[inner_app() for _ in range(n)])


@join_app
def outer_make_a_dag_multi(n):
futs = []
for _ in range(n):
futs.append(inner_app())
return futs
return [inner_app() for _ in range(n)]


def test_result_flow():
f = outer_app()
res = f.result()
assert res == RESULT_CONSTANT
assert f.result() == RESULT_CONSTANT


@join_app
Expand All @@ -67,20 +67,17 @@ def test_wrong_type():

def test_dependency_on_joined():
g = add_one(outer_app())
res = g.result()
assert res == RESULT_CONSTANT + 1
assert g.result() == RESULT_CONSTANT + 1


def test_combine():
f = outer_make_a_dag_combine(inner_app())
res = f.result()
assert res == [RESULT_CONSTANT] * RESULT_CONSTANT
assert f.result() == [RESULT_CONSTANT] * RESULT_CONSTANT


def test_multiple_return():
f = outer_make_a_dag_multi(inner_app())
res = f.result()
assert res == [RESULT_CONSTANT] * RESULT_CONSTANT
assert f.result() == [RESULT_CONSTANT] * RESULT_CONSTANT


class InnerError(RuntimeError):
Expand Down
45 changes: 11 additions & 34 deletions parsl/tests/test_python_apps/test_mapred.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
import argparse
import pytest

import parsl
from parsl.app.app import python_app
from parsl.tests.configs.local_threads import config


@python_app
def fan_out(x, dur):
import time
time.sleep(dur)
def times_two(x):
return x * 2


@python_app
def accumulate(inputs=[]):
def accumulate(inputs=()):
return sum(inputs)


Expand All @@ -22,38 +18,19 @@ def accumulate_t(*args):
return sum(args)


def test_mapred_type1(width=2):
@pytest.mark.parametrize("width", (2, 3, 5, 8, 13))
def test_mapred_type1(width):
"""MapReduce test with the reduce stage taking futures in inputs=[]
"""

futs = []
for i in range(1, width + 1):
fu = fan_out(i, 1)
futs.extend([fu])

print("Fan out : ", futs)

futs = [times_two(i) for i in range(width)]
red = accumulate(inputs=futs)
# print([(i, i.done()) for i in futs])
r = sum([x * 2 for x in range(1, width + 1)])
assert r == red.result(), "[TEST] MapRed type1 expected %s, got %s" % (
r, red.result())
assert red.result() == 2 * sum(range(width))


def test_mapred_type2(width=2):
@pytest.mark.parametrize("width", (2, 3, 5, 8, 13))
def test_mapred_type2(width):
"""MapReduce test with the reduce stage taking futures on the args
"""

futs = []
for i in range(1, width + 1):
fu = fan_out(i, 0.1)
futs.extend([fu])

print("Fan out : ", futs)

futs = [times_two(i) for i in range(width)]
red = accumulate_t(*futs)

# print([(i, i.done()) for i in futs])
r = sum([x * 2 for x in range(1, width + 1)])
assert r == red.result(), "[TEST] MapRed type2 expected %s, got %s" % (
r, red.result())
assert red.result() == 2 * sum(range(width))
13 changes: 6 additions & 7 deletions parsl/tests/test_python_apps/test_memoize_bad_id_for_memo.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ def noop_app(x, inputs=[], cache=True):


@python_app
def sleep(t):
import time
time.sleep(t)
def some_func(_t):
pass


def test_python_unmemoizable():
Expand All @@ -51,14 +50,14 @@ def test_python_failing_memoizer():


def test_python_unmemoizable_after_dep():
sleep_fut = sleep(1)
fut = noop_app(Unmemoizable(), inputs=[sleep_fut])
memoizable_fut = some_func(1)
fut = noop_app(Unmemoizable(), inputs=[memoizable_fut])
with pytest.raises(ValueError):
fut.result()


def test_python_failing_memoizer_afer_dep():
sleep_fut = sleep(1)
fut = noop_app(FailingMemoizable(), inputs=[sleep_fut])
memoizable_fut = some_func(1)
fut = noop_app(FailingMemoizable(), inputs=[memoizable_fut])
with pytest.raises(ValueError):
fut.result()
Loading

0 comments on commit d1d08c3

Please sign in to comment.