Skip to content

Commit

Permalink
Merge pull request #71 from smspillaz/process-overhaul-modules-subpro…
Browse files Browse the repository at this point in the history
…cesses

RFC: Paralellize on module level by default.

Great start!  I'll merge it in.  It's not finished yet, though. 1) Crashes on windows, 2) hangs on pypy on Travis, 3) could be simpler, 4) (ironically) didn't completely cover the new code with tests ;-)

I'm working on ironing out the issues above.
  • Loading branch information
CleanCut committed Jul 20, 2015
2 parents 8796ca4 + 2b97742 commit 80d37bc
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 43 deletions.
2 changes: 1 addition & 1 deletion green/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.11.0
1.12.0
3 changes: 2 additions & 1 deletion green/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@
import os # pragma: no cover
import sys # pragma: no cover
import tempfile # pragma: no cover
import multiprocessing # pragma: no cover

# Used for debugging output in cmdline, since we can't do debug output here.
files_loaded = [] # pragma: no cover

# Set the defaults in a re-usable way
default_args = argparse.Namespace( # pragma: no cover
targets = ['.'], # Not in configs
processes = 1,
processes = multiprocessing.cpu_count(),
initializer = '',
finalizer = '',
html = False,
Expand Down
52 changes: 52 additions & 0 deletions green/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,57 @@ def toProtoTestList(suite_part, test_list=None, doing_completions=False):
return test_list


def toParallelTestTargets(suite_part, user_targets, test_set=None):
"""
Take a test suite and turn it into a list of target names.
This is effectively just the name of the target to load in each case.
In most cases, this will be right down to the level of the individual
test, however in certain cases, such as where test case or test module
set up and tear down functions have been registered, it will need to
stop at a level before that.
"""
if test_set == None:
test_set = set()

# Python's lousy handling of module import failures during loader discovery
# makes this crazy special case necessary. See _make_failed_import_test in
# the source code for unittest.loader
if suite_part.__class__.__name__ == 'ModuleImportFailure':
exception_method = str(suite_part).split()[0]
getattr(suite_part, exception_method)()

# Base case: check if we're a TestCase.
#
# If so, we then have a further check -
# 1. If the module defines setUpModule or tearDownModule then the entire
# module must be run in serial, so we add only the module name.
# 2. If the class defines setUpClass or tearDownClass then the entire
# class must be run in serial, so we add only the class name.
#
# However, if the user has specifically asked for a certain target
# then we should return that target only, instead of the whole
# class.
if issubclass(type(suite_part), unittest.TestCase):
full_test_case_name = ".".join((suite_part.__module__,
suite_part.__class__.__name__))
full_unit_test_name = ".".join((suite_part.__module__,
suite_part.__class__.__name__,
str(suite_part).split()[0]))

if not (full_test_case_name in user_targets or
full_unit_test_name in user_targets):
test_set.add(suite_part.__module__)
return test_set

test_set.add(full_unit_test_name)
else:
for test in suite_part:
toParallelTestTargets(test, user_targets, test_set)

return test_set


def getCompletions(target):
# This option expects 0 or 1 targets
if type(target) == list:
Expand Down Expand Up @@ -274,6 +325,7 @@ def loadTarget(target, file_pattern='test*.py'):
target, file_pattern))
loader = unittest.TestLoader()
loader.suiteClass = GreenTestSuite
loader.loadTestsFromTestCase = lambda tcc: loadFromTestCase(tcc)

# For a test loader, we want to always the current working directory to be
# the first item in sys.path, just like when a python interpreter is loaded
Expand Down
89 changes: 80 additions & 9 deletions green/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from green.exceptions import InitializerOrFinalizerError
from green.loader import loadTargets
from green.result import ProtoTest, ProtoTestResult
from green.result import proto_test, proto_error, ProtoTest, ProtoTestResult, BaseTestResult



Expand Down Expand Up @@ -206,7 +206,73 @@ def rebuild_exc(exc, tb): # pragma: no cover
# END of Worker Finalization Monkey Patching
#-------------------------------------------------------------------------------

def poolRunner(test_name, coverage_number=None, omit_patterns=[]): # pragma: no cover
class SubprocessTestResult(BaseTestResult):
"""
I'm the TestResult object for a single unit test run in a subprocess.
"""


def __init__(self, reporting_queue):
super(SubprocessTestResult, self).__init__(None, None)
self.reportingQueue = reporting_queue
self.currentResult = None
self.shouldStop = False

def startTest(self, test):
"Called before each test runs"
self.reportingQueue.put(proto_test(test))
self.currentResult = ProtoTestResult()


def stopTest(self, test):
"Called after each test runs"
self.reportingQueue.put(self.currentResult)
self.currentResult = None


def addSuccess(self, test):
"Called when a test passed"
self.currentResult.addSuccess(test)


def addError(self, test, err):
"Called when a test raises an exception"
if not self.currentResult:
self.startTest(test)
self.addError(test, err)
self.stopTest(test)
else:
self.currentResult.addError(test, err)


def addFailure(self, test, err):
"Called when a test fails a unittest assertion"
self.currentResult.addFailure(test, err)


def addSkip(self, test, reason):
"Called when a test is skipped"
self.currentResult.addSkip(test, reason)


def addExpectedFailure(self, test, err):
"Called when a test fails, and we expeced the failure"
self.currentResult.addExpectedFailure(test, err)


def addUnexpectedSuccess(self, test):
"Called when a test passed, but we expected a failure"
self.currentResult.addUnexpectedSuccess(test)

@property
def any_errors(self, ignored=None):
"True if anything failed and test execution hasn't stopped"
if not self.currentResult:
return False
else:
return self.currentResult.any_errors

def poolRunner(target, queue, coverage_number=None, omit_patterns=[]): # pragma: no cover
"I am the function that pool worker processes run. I run one unit test."
# Each pool worker gets his own temp directory, to avoid having tests that
# are used to taking turns using the same temp file name from interfering
Expand All @@ -225,18 +291,20 @@ def poolRunner(test_name, coverage_number=None, omit_patterns=[]): # pragma: no
cov.start()

# Create a structure to return the results of this one test
result = ProtoTestResult()
result = SubprocessTestResult(queue)
test = None
try:
test = loadTargets(test_name)
test = loadTargets(target)
except:
err = sys.exc_info()
t = ProtoTest()
t.module = 'green.loader'
t.class_name = 'N/A'
t.description = 'Green encountered an error loading the unit test.'
t.method_name = 'poolRunner'
result.startTest(t)
result.addError(t, err)
result.stopTest(t)

if getattr(test, 'run', False):
# Loading was successful, lets do this
Expand All @@ -247,24 +315,26 @@ def poolRunner(test_name, coverage_number=None, omit_patterns=[]): # pragma: no
# through to crash things. So we only need to manufacture another error
# if the underlying framework didn't, but either way we don't want to
# crash.
if not result.errors:
if not result.any_errors:
err = sys.exc_info()
t = ProtoTest()
t.module = 'green.runner'
t.class_name = 'N/A'
t.description = 'Green encountered an exception not caught by the underlying test framework.'
t.method_name = 'poolRunner'
result.startTest(t)
result.addError(t, err)
result.stopTest(t)
else:
# loadTargets() returned an object without a run() method, probably None
description = 'Test loader returned an un-runnable object: {} of type {} with dir {}'.format(
str(test), type(test), dir(test))
err = (TypeError, TypeError(description), None)
t = ProtoTest()
t.module = '.'.join(test_name.split('.')[:-2])
t.class_name = test_name.split('.')[-2]
t.module = '.'.join(target.split('.')[:-2])
t.class_name = target.split('.')[-2]
t.description = description
t.method_name = test_name.split('.')[-1]
t.method_name = target.split('.')[-1]
result.addError(t, err)

# Finish coverage
Expand All @@ -275,4 +345,5 @@ def poolRunner(test_name, coverage_number=None, omit_patterns=[]): # pragma: no
# Restore the state of the temp directory
shutil.rmtree(tempfile.tempdir)
tempfile.tempdir = saved_tempdir
return result
queue.put(None)
return None
8 changes: 8 additions & 0 deletions green/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,15 @@ def addUnexpectedSuccess(self, test):
"Called when a test passed, but we expected a failure"
self.unexpectedSuccesses.append(proto_test(test))

@property
def any_errors(self, ignored=None):
"True if anything failed due to an error"
return self.errors > 0

@property
def any_errors(self, ignored=None):
"True if anything failed due to an error"
return self.errors > 0

class GreenTestResult(BaseTestResult):
"""
Expand Down
52 changes: 35 additions & 17 deletions green/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,19 @@
coverage = None

from green.exceptions import InitializerOrFinalizerError
from green.loader import toProtoTestList
from green.loader import toProtoTestList, toParallelTestTargets
from green.output import GreenStream
from green.process import LoggingDaemonlessPool, poolRunner
from green.result import GreenTestResult

from collections import defaultdict, namedtuple

import multiprocessing

import sys


_AsyncChunk = namedtuple("_AsyncChunk", "suite_name queue")

class InitializerOrFinalizer:
"""
Expand Down Expand Up @@ -84,33 +91,44 @@ def run(suite, stream, args):

result.startTestRun()

tests = toProtoTestList(suite)
pool = LoggingDaemonlessPool(processes=args.processes or None,
initializer=InitializerOrFinalizer(args.initializer),
finalizer=InitializerOrFinalizer(args.finalizer))
tests = [_AsyncChunk(t, multiprocessing.Manager().Queue()) for t in toParallelTestTargets(suite, args.targets)]
if tests:
async_responses = []
for index, test in enumerate(tests):
for index, test_chunk in enumerate(tests):
if args.run_coverage:
coverage_number = index + 1
else:
coverage_number = None
async_responses.append(pool.apply_async(
pool.apply_async(
poolRunner,
(test.dotted_name, coverage_number, args.omit_patterns)))
(test_chunk.suite_name, test_chunk.queue, coverage_number, args.omit_patterns))
pool.close()
for test, async_response in zip(tests, async_responses):
# Prints out the white 'processing...' version of the output
result.startTest(test)
# This blocks until the worker who is processing this
# particular test actually finishes
try:
result.addProtoTestResult(async_response.get())
except KeyboardInterrupt: # pragma: no cover
result.shouldStop = True
if result.shouldStop: # pragma: no cover
for test_chunk in tests:
abort_tests = False

while True:
msg = test_chunk.queue.get()

# Sentinel value, we're done
if not msg:
break
else:
# Result guarunteed after this message, we're
# currently waiting on this test, so print out
# the white 'processing...' version of the output
result.startTest(msg)
result.addProtoTestResult(test_chunk.queue.get())

if result.shouldStop:
abort_tests = True
break

if abort_tests:
break
pool.close()

pool.terminate()
pool.join()

result.stopTestRun()
Expand Down
38 changes: 38 additions & 0 deletions green/test/test_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,44 @@ def test_moduleImportFailureIgnored(self):
self.assertEqual(loader.toProtoTestList(suite, doing_completions=True), [])


class TestToParallelTestTargets(unittest.TestCase):

def setUp(self):
super(TestToParallelTestTargets, self).setUp()

class FakeModule(object):
pass

self._fake_module_name = "my_test_module"
sys.modules[self._fake_module_name] = FakeModule

def tearDown(self):
del sys.modules[self._fake_module_name]
super(TestToParallelTestTargets, self).tearDown()

def test_methods_with_no_constraints(self):
"toParallelTestTargets() returns only module names."
class NormalTestCase(unittest.TestCase):
def runTest(self):
pass

NormalTestCase.__module__ = self._fake_module_name

targets = loader.toParallelTestTargets(NormalTestCase(), [])
self.assertEqual(targets,
set(["my_test_module"]))

def test_methods_with_constraints(self):
"toParallelTestTargets() returns test names when constrained."
class NormalTestCase(unittest.TestCase):
def runTest(self):
pass

NormalTestCase.__module__ = self._fake_module_name
full_name = "my_test_module.NormalTestCase.runTest"

targets = loader.toParallelTestTargets(NormalTestCase(), [full_name])
self.assertEqual(targets, set([full_name]))

class TestCompletions(unittest.TestCase):

Expand Down
Loading

0 comments on commit 80d37bc

Please sign in to comment.