Skip to content

Commit

Permalink
Conditional execution of list after testcase.
Browse files Browse the repository at this point in the history
  • Loading branch information
eerimoq committed Feb 18, 2017
1 parent d83ea67 commit eadc3af
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 73 deletions.
47 changes: 33 additions & 14 deletions systest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@
import string
import subprocess
import logging
import traceback


__author__ = 'Erik Moqvist'
__version__ = '1.6.0'
__version__ = '2.0.0'


_RUN_HEADER_FMT ="""
Name: {name}
Date: {date}
Node: {node}
User: {user}"""

_TEST_HEADER_FMT = """---------------------------------------------------------------
_TEST_HEADER_FMT = """
---------------------------------------------------------------
Name: {name}
Description:
{description}"""
Expand Down Expand Up @@ -71,8 +75,8 @@ def configure_logging(filename=None):
# Add a prefix to entries written to file.
if not filename:
filename = "systest"
filename = "{}-{}.log".format(filename, datetime.datetime.now())
file_handler = logging.FileHandler(_make_filename(filename), "w")
filename = "{}-{}.log".format(filename, _make_filename(str(datetime.datetime.now())))
file_handler = logging.FileHandler(filename, "w")
file_handler.setFormatter(logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s"))
root_logger.addHandler(file_handler)

Expand All @@ -96,7 +100,7 @@ def format(self, record):
formatted = formatted.replace("PASSED", '\033[0;32mPASSED\033[0m')
formatted = formatted.replace("FAILED", '\033[0;31mFAILED\033[0m')
formatted = formatted.replace("SKIPPED", '\033[0;33mSKIPPED\033[0m')

return formatted


Expand Down Expand Up @@ -150,26 +154,29 @@ def assert_equal(self, first, second):
"""

if first != second:
raise RuntimeError("{} != {}".format(first, second))
filename, line, _, code = traceback.extract_stack()[-2]
LOGGER.error('%s:%d: %s', filename, line, code)
raise SequencerTestFailedError()


class Sequencer(object):

def __init__(self,
name,
stop_on_failure=True,
testcase_filter=None,
testcase_skip_filter=None,
dry_run=False,
force_serial_execution=False):
self.name = name
self.stop_on_failure = stop_on_failure
self.dry_run = dry_run
self.tests = None
self.execution_time = 0.0
self.testcase_filter = testcase_filter
self.testcase_skip_filter = testcase_skip_filter
self.force_serial_execution = force_serial_execution
self.passed = 0
self.failed = 0
self.skipped = 0

def is_testcase_enabled(self, test):
enabled = True
Expand Down Expand Up @@ -232,9 +239,7 @@ def run(self, *tests):
end_time = time.time()
self.execution_time += (end_time - start_time)

# raise an exception if at least one test failed
if thread.result != TestCase.PASSED:
raise SequencerTestFailedError("At least one testcase failed.")
return self.passed, self.failed, self.skipped

def summary(self):
"""Compile the test execution summary and return it as a string.
Expand Down Expand Up @@ -518,19 +523,33 @@ def run_test(self, test):
test.execution_time = execution_time
test.finish_time = finish_time

if result == TestCase.PASSED:
self.sequencer.passed += 1
elif result == TestCase.FAILED:
self.sequencer.failed += 1
elif result == TestCase.SKIPPED:
self.sequencer.skipped += 1

def run_sequential_tests(self, tests):
"""Run all tests in the list in sequential order.
"""

prev_test_failed = False

for test in tests:
if prev_test_failed:
prev_test_failed = False
if isinstance(test, list):
continue

thread = _TestThread(test, self.sequencer)
thread.start()
thread.join()

# raise an exception if the test failed
if self.sequencer.stop_on_failure and thread.result == TestCase.FAILED:
raise SequencerTestFailedError("The testcase failed.")
if thread.result == TestCase.FAILED:
prev_test_failed = True
self.result = TestCase.FAILED

def run_parallel_tests(self, tests):
"""Start each test in the tests tuple in a separate thread.
Expand Down
111 changes: 52 additions & 59 deletions tests/test_systest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest

import systest
from systest import Sequencer, SequencerTestFailedError, ColorFormatter
from systest import Sequencer

from tests.testcases.named import NamedTest
from tests.testcases.fail import FailTest
Expand All @@ -22,52 +22,45 @@ def test_serial_parallel(self):

sequencer = Sequencer("serial_parallel")

# Run test a and b.
sequencer.run(
NamedTest("a"),
NamedTest("b")
)

# Run a bunch of tests.
try:
sequencer.run([
# Tests in a tuple are executed in parallel.
result = sequencer.run(
NamedTest("a"),
[
NamedTest("b")
],
# Tests in a tuple are executed in parallel.
(
NamedTest("c", work_time=0.2),
NamedTest("d"),
NamedTest("e"),
(
NamedTest("f"),
NamedTest("g")
),
NamedTest("h"),
(
NamedTest("c", work_time=0.2),
NamedTest("i", work_time=0.1),
NamedTest("j"),
FailTest("a")
),
[
FailTest("b"),
[
NamedTest("d"),
NamedTest("e"),
(
NamedTest("f"),
NamedTest("g")
),
NamedTest("h"),
(
NamedTest("i", work_time=0.1),
NamedTest("j"),
FailTest("a")
),
NotExecutedTest("a")
],
FailTest("b"),
FailTest("c")
),
]
],
FailTest("c")
),
[
NotExecutedTest("b")
])

# sequencer.run() should throw the
# SequencerTestFailedError exception since the test
# FailTest() fails
raise

except SequencerTestFailedError:
sequencer.run(
NamedTest("k"),
NamedTest("l")
)
],
NamedTest("k"),
NamedTest("l")
)

sequencer.report()

self.assertEqual(result, (12, 3, 0))
self.assertEqual(NamedTest.count, 12)
self.assertEqual(NotExecutedTest.count, 0)
self.assertEqual(FailTest.count, 3)
Expand All @@ -77,20 +70,20 @@ def test_continue_on_failure(self):
"""

sequencer = Sequencer("continue_on_failure",
stop_on_failure=False)
sequencer = Sequencer("continue_on_failure")

sequencer.run(
result = sequencer.run(
FailTest("1"),
NamedTest("a"),
[
(
FailTest("2"),
NamedTest("b")
]
)
)

sequencer.report()

self.assertEqual(result, (2, 2, 0))
self.assertEqual(NamedTest.count, 2)
self.assertEqual(NotExecutedTest.count, 0)
self.assertEqual(FailTest.count, 2)
Expand All @@ -101,10 +94,9 @@ def test_dot_digraph(self):
"""

sequencer = Sequencer("dot_digraph",
dry_run=True,
stop_on_failure=False)
dry_run=True)

sequencer.run(
result = sequencer.run(
NamedTest("a"),
NamedTest("b"),
(
Expand Down Expand Up @@ -134,6 +126,8 @@ def test_dot_digraph(self):

sequencer.report()

self.assertEqual(result, (17, 0, 0))

def test_testcase_filter(self):
"""Use the test execution filter to run a specific testcase in a
sequence.
Expand All @@ -146,17 +140,18 @@ def test_testcase_filter(self):
testcase_filter=["fail_1", "test_b"],
testcase_skip_filter=["fail_1"])

sequencer.run(
result = sequencer.run(
FailTest("1"),
NamedTest("a"),
[
(
FailTest("2"),
NamedTest("b")
]
)
)

sequencer.report()

self.assertEqual(result, (1, 0, 0))
self.assertEqual(NamedTest.count, 1)
self.assertEqual(NotExecutedTest.count, 0)
self.assertEqual(FailTest.count, 0)
Expand All @@ -169,17 +164,15 @@ def test_force_serial_execution(self):
sequencer = Sequencer("forced_serial_execution",
force_serial_execution=True)

try:
sequencer.run((
FailTest("1"),
NamedTest("a")
))
except SequencerTestFailedError:
pass

result = sequencer.run((
FailTest("1"),
NamedTest("a")
))

sequencer.report()

self.assertEqual(NamedTest.count, 0)
self.assertEqual(result, (1, 1, 0))
self.assertEqual(NamedTest.count, 1)
self.assertEqual(NotExecutedTest.count, 0)
self.assertEqual(FailTest.count, 1)

Expand Down

0 comments on commit eadc3af

Please sign in to comment.