-
-
Notifications
You must be signed in to change notification settings - Fork 75
/
runner.py
138 lines (112 loc) · 4.93 KB
/
runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from __future__ import unicode_literals
from __future__ import print_function
from sys import modules
from unittest.signals import (
registerResult, installHandler, removeResult)
import warnings
try: # pragma: no cover
import coverage
except: # pragma: no cover
coverage = None
from green.exceptions import InitializerOrFinalizerError
from green.loader import toParallelTestTargets
from green.output import GreenStream
from green.process import LoggingDaemonlessPool, poolRunner
from green.result import GreenTestResult
from collections import namedtuple
import multiprocessing
_AsyncChunk = namedtuple("_AsyncChunk", "suite_name queue")
class InitializerOrFinalizer:
"""
I represent a command that will be run as either the initializer or the
finalizer for a worker process. The only reason I'm a class instead of a
function is so that I can be instantiated at the creation time of the Pool
(with the user's customized command to run), but actually run at the
appropriate time.
"""
def __init__(self, dotted_function):
self.module_part = '.'.join(dotted_function.split('.')[:-1])
self.function_part = '.'.join(dotted_function.split('.')[-1:])
def __call__(self, *args):
if not self.module_part:
return
try:
__import__(self.module_part)
loaded_function = getattr(modules[self.module_part], self.function_part, None)
except Exception as e:
raise InitializerOrFinalizerError("Couldn't load '{}' - got: {}"
.format(self.function_part, str(e)))
if not loaded_function:
raise InitializerOrFinalizerError(
"Loaded module '{}', but couldn't find function '{}'"
.format(self.module_part, self.function_part))
try:
loaded_function()
except Exception as e:
raise InitializerOrFinalizerError("Error running '{}' - got: {}"
.format(self.function_part, str(e)))
def run(suite, stream, args):
"""
Run the given test case or test suite with the specified arguments.
Any args.stream passed in will be wrapped in a GreenStream
"""
if not issubclass(GreenStream, type(stream)):
stream = GreenStream(stream)
result = GreenTestResult(args, stream)
# Note: Catching SIGINT isn't supported by Python on windows (python
# "WONTFIX" issue 18040)
installHandler()
registerResult(result)
with warnings.catch_warnings():
if args.warnings: # pragma: no cover
# if args.warnings is set, use it to filter all the warnings
warnings.simplefilter(args.warnings)
# if the filter is 'default' or 'always', special-case the
# warnings from the deprecated unittest methods to show them
# no more than once per module, because they can be fairly
# noisy. The -Wd and -Wa flags can be used to bypass this
# only when args.warnings is None.
if args.warnings in ['default', 'always']:
warnings.filterwarnings('module',
category=DeprecationWarning,
message='Please use assert\w+ instead.')
result.startTestRun()
pool = LoggingDaemonlessPool(processes=args.processes or None,
initializer=InitializerOrFinalizer(args.initializer),
finalizer=InitializerOrFinalizer(args.finalizer))
manager = multiprocessing.Manager()
tests = [_AsyncChunk(t, manager.Queue()) for t in toParallelTestTargets(suite, args.targets)]
if tests:
for index, test_chunk in enumerate(tests):
if args.run_coverage:
coverage_number = index + 1
else:
coverage_number = None
pool.apply_async(
poolRunner,
(test_chunk.suite_name, test_chunk.queue, coverage_number, args.omit_patterns))
pool.close()
for test_chunk in tests:
abort_tests = False
while True:
msg = test_chunk.queue.get()
# Sentinel value, we're done
if not msg:
break
else:
# Result guarunteed after this message, we're
# currently waiting on this test, so print out
# the white 'processing...' version of the output
result.startTest(msg)
proto_test_result = test_chunk.queue.get()
result.addProtoTestResult(proto_test_result)
if result.shouldStop:
abort_tests = True
break
if abort_tests:
break
pool.close()
pool.join()
result.stopTestRun()
removeResult(result)
return result