Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove or reduce some test sleeps #2771

Merged
merged 2 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 31 additions & 69 deletions parsl/tests/test_python_apps/test_fail.py
Original file line number Diff line number Diff line change
@@ -1,89 +1,59 @@
import argparse
import pytest

import parsl
from parsl.app.app import python_app
from parsl.tests.configs.local_threads import fresh_config as local_config
from parsl.dataflow.errors import DependencyError


@python_app
def sleep_fail(sleep_dur, sleep_rand_max, fail_prob, inputs=[]):
import time
import random
class ManufacturedTestFailure(Exception):
pass

s = sleep_dur + random.randint(-sleep_rand_max, sleep_rand_max)

time.sleep(s)
raise Exception("App failure")
@python_app
def random_fail(fail_prob: float, inputs=()):
import random
if random.random() < fail_prob:
raise ManufacturedTestFailure("App failure")


def test_no_deps(numtasks=2):
def test_no_deps():
"""Test basic error handling, with no dependent failures
"""
futs = [random_fail(1), random_fail(0), random_fail(0)]

fus = []
for i in range(0, numtasks):

fu = sleep_fail(0.1, 0, .8)
fus.extend([fu])

count = 0
for fu in fus:
for f in futs:
try:
fu.result()
except Exception as e:
print("Caught exception : ", "*" * 20)
print(e)
print("*" * 20)
count += 1
f.result()
except ManufacturedTestFailure:
pass

print("Caught failures of {0}/{1}".format(count, len(fus)))


def test_fail_sequence(numtasks=2):
@pytest.mark.parametrize("fail_probs", ((1, 0), (0, 1)))
def test_fail_sequence(fail_probs):
"""Test failure in a sequence of dependencies

App1 -> App2 ... -> AppN
"""

sleep_dur = 0.1
fail_prob = 0.4

fus = {0: None}
for i in range(0, numtasks):
print("Chaining {0} to {1}".format(i + 1, fus[i]))
fus[i + 1] = sleep_fail(sleep_dur, 0, fail_prob, inputs=[fus[i]])
t1_fail_prob, t2_fail_prob = fail_probs
t1 = random_fail(fail_prob=t1_fail_prob)
t2 = random_fail(fail_prob=t2_fail_prob, inputs=[t1])
t_final = random_fail(fail_prob=0, inputs=[t2])

# time.sleep(numtasks*sleep_dur)
for k in sorted(fus.keys()):
try:
x = fus[i].result()
print("{0} : {1}".format(k, x))
except Exception as e:
print("{0} : {1}".format(k, e))

return


def test_deps(numtasks=2):
"""Random failures in branches of Map -> Map -> reduce
with pytest.raises(DependencyError):
t_final.result()

App1 App2 ... AppN
"""

fus = []
for i in range(0, numtasks):
fu = sleep_fail(0.2, 0, .4)
fus.extend([fu])
def test_deps(width=3):
"""Random failures in branches of Map -> Map -> reduce"""
# App1 App2 ... AppN
futs = [random_fail(fail_prob=0.4) for _ in range(width)]

# App1 App2 ... AppN
# | | |
# V V V
# App1 App2 ... AppN

fus_2 = []
for fu in fus:
fu = sleep_fail(0, 0, .8, inputs=[fu])
fus_2.extend([fu])
futs = [random_fail(fail_prob=0.8, inputs=[f]) for f in futs]

# App1 App2 ... AppN
# | | |
Expand All @@ -92,15 +62,7 @@ def test_deps(numtasks=2):
# \ | /
# \ | /
# App_Final

fu_final = sleep_fail(1, 0, 0, inputs=fus_2)

try:
print("Final status : ", fu_final.result())
except parsl.dataflow.errors.DependencyError as e:
print("Caught the right exception")
print("Exception : ", e)
except Exception as e:
assert False, "Expected DependencyError but got: %s" % e
else:
raise RuntimeError("Expected DependencyError, but got no exception")
random_fail(fail_prob=0, inputs=futs).result()
except DependencyError:
pass
24 changes: 15 additions & 9 deletions parsl/tests/test_python_apps/test_garbage_collect.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,36 @@
import parsl
import threading
import time

import pytest

import parsl
from parsl.app.app import python_app
from parsl.tests.configs.local_threads import fresh_config as local_config # noqa


@python_app
def slow_double(x):
import time
time.sleep(0.1)
def slow_double(x, may_continue: threading.Event):
may_continue.wait()
return x * 2


@pytest.mark.local
def test_garbage_collect():
""" Launches an app with a dependency and waits till it's done and asserts that
the internal refs were wiped
"""
x = slow_double(slow_double(10))
evt = threading.Event()
x = slow_double(10, evt)
x = slow_double(x, evt)

if x.done() is False:
assert parsl.dfk().tasks[x.tid]['app_fu'] == x, "Tasks table should have app_fu ref before done"
assert parsl.dfk().tasks[x.tid]['app_fu'] == x, "Tasks table should have app_fu ref before done"
khk-globus marked this conversation as resolved.
Show resolved Hide resolved

x.result()
evt.set()
assert x.result() == 10 * 4
if parsl.dfk().checkpoint_mode is not None:
# We explicit call checkpoint if checkpoint_mode is enabled covering
# cases like manual/periodic where checkpointing may be deferred.
parsl.dfk().checkpoint()

time.sleep(0.2) # Give enough time for task wipes to work
time.sleep(0.01) # Give enough time for task wipes to work
khk-globus marked this conversation as resolved.
Show resolved Hide resolved
assert x.tid not in parsl.dfk().tasks, "Task record should be wiped after task completion"
27 changes: 7 additions & 20 deletions parsl/tests/test_python_apps/test_join.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import pytest
import time

from parsl import join_app, python_app
from parsl.dataflow.errors import JoinError

from parsl.tests.configs.local_threads import fresh_config as local_config

RESULT_CONSTANT = 3


@python_app(cache=True)
@python_app
def inner_app():
time.sleep(1)
return RESULT_CONSTANT


Expand All @@ -34,24 +31,17 @@ def combine(*args):

@join_app
def outer_make_a_dag_combine(n):
futs = []
for _ in range(n):
futs.append(inner_app())
return combine(*futs)
return combine(*(inner_app() for _ in range(n)))


@join_app
def outer_make_a_dag_multi(n):
futs = []
for _ in range(n):
futs.append(inner_app())
return futs
return [inner_app() for _ in range(n)]


def test_result_flow():
f = outer_app()
res = f.result()
assert res == RESULT_CONSTANT
assert f.result() == RESULT_CONSTANT


@join_app
Expand All @@ -67,20 +57,17 @@ def test_wrong_type():

def test_dependency_on_joined():
g = add_one(outer_app())
res = g.result()
assert res == RESULT_CONSTANT + 1
assert g.result() == RESULT_CONSTANT + 1


def test_combine():
f = outer_make_a_dag_combine(inner_app())
res = f.result()
assert res == [RESULT_CONSTANT] * RESULT_CONSTANT
assert f.result() == [RESULT_CONSTANT] * RESULT_CONSTANT


def test_multiple_return():
f = outer_make_a_dag_multi(inner_app())
res = f.result()
assert res == [RESULT_CONSTANT] * RESULT_CONSTANT
assert f.result() == [RESULT_CONSTANT] * RESULT_CONSTANT


class InnerError(RuntimeError):
Expand Down
51 changes: 13 additions & 38 deletions parsl/tests/test_python_apps/test_mapred.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
import argparse
import pytest

import parsl
from parsl.app.app import python_app
from parsl.tests.configs.local_threads import config


@python_app
def fan_out(x, dur):
import time
time.sleep(dur)
def times_two(x):
return x * 2


@python_app
def accumulate(inputs=[]):
def accumulate(inputs=()):
return sum(inputs)


Expand All @@ -22,38 +18,17 @@ def accumulate_t(*args):
return sum(args)


def test_mapred_type1(width=2):
"""MapReduce test with the reduce stage taking futures in inputs=[]
"""

futs = []
for i in range(1, width + 1):
fu = fan_out(i, 1)
futs.extend([fu])

print("Fan out : ", futs)

@pytest.mark.parametrize("width", (2, 3, 5))
def test_mapred_type1(width):
"""MapReduce test with the reduce stage taking futures in inputs=[]"""
futs = [times_two(i) for i in range(width)]
red = accumulate(inputs=futs)
# print([(i, i.done()) for i in futs])
r = sum([x * 2 for x in range(1, width + 1)])
assert r == red.result(), "[TEST] MapRed type1 expected %s, got %s" % (
r, red.result())

assert red.result() == 2 * sum(range(width))

def test_mapred_type2(width=2):
"""MapReduce test with the reduce stage taking futures on the args
"""

futs = []
for i in range(1, width + 1):
fu = fan_out(i, 0.1)
futs.extend([fu])

print("Fan out : ", futs)

@pytest.mark.parametrize("width", (2, 3, 5))
def test_mapred_type2(width):
"""MapReduce test with the reduce stage taking futures on the args"""
futs = [times_two(i) for i in range(width)]
red = accumulate_t(*futs)

# print([(i, i.done()) for i in futs])
r = sum([x * 2 for x in range(1, width + 1)])
assert r == red.result(), "[TEST] MapRed type2 expected %s, got %s" % (
r, red.result())
assert red.result() == 2 * sum(range(width))
13 changes: 6 additions & 7 deletions parsl/tests/test_python_apps/test_memoize_bad_id_for_memo.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ def noop_app(x, inputs=[], cache=True):


@python_app
def sleep(t):
import time
time.sleep(t)
def some_func(_t):
pass


def test_python_unmemoizable():
Expand All @@ -51,14 +50,14 @@ def test_python_failing_memoizer():


def test_python_unmemoizable_after_dep():
sleep_fut = sleep(1)
fut = noop_app(Unmemoizable(), inputs=[sleep_fut])
memoizable_fut = some_func(1)
fut = noop_app(Unmemoizable(), inputs=[memoizable_fut])
with pytest.raises(ValueError):
fut.result()


def test_python_failing_memoizer_afer_dep():
sleep_fut = sleep(1)
fut = noop_app(FailingMemoizable(), inputs=[sleep_fut])
memoizable_fut = some_func(1)
fut = noop_app(FailingMemoizable(), inputs=[memoizable_fut])
with pytest.raises(ValueError):
fut.result()
Loading