From 2122db5cc782137c9198fb88f0cac942f62d0073 Mon Sep 17 00:00:00 2001 From: jelleas Date: Thu, 21 Mar 2024 14:39:49 +0100 Subject: [PATCH] v2.0.12, checkpy.interactive.testOffline --- checkpy/interactive.py | 68 ++++++++++++----- checkpy/tester/discovery.py | 14 +++- checkpy/tester/tester.py | 148 ++++++++++++++++++++++-------------- setup.py | 2 +- 4 files changed, 155 insertions(+), 77 deletions(-) diff --git a/checkpy/interactive.py b/checkpy/interactive.py index d7cb7cf..0e47b1f 100644 --- a/checkpy/interactive.py +++ b/checkpy/interactive.py @@ -1,36 +1,71 @@ -from checkpy.downloader import download, update +from checkpy.tester import TesterResult +from checkpy.tester import runTests as _runTests +from checkpy.tester import runTestsSynchronously as _runTestsSynchronously +from checkpy import caches as _caches +import checkpy.tester.discovery as _discovery +import pathlib as _pathlib +from typing import List, Optional -def testModule(moduleName, debugMode = False, silentMode = False): +__all__ = ["testModule", "test", "testOffline"] + + +def testModule(moduleName: str, debugMode=False, silentMode=False) -> Optional[List[TesterResult]]: """ Test all files from module """ - from . import caches - caches.clearAllCaches() + _caches.clearAllCaches() + from . import tester from . import downloader downloader.updateSilently() + results = tester.testModule(moduleName, debugMode = debugMode, silentMode = silentMode) - try: - if __IPYTHON__: # type: ignore [name-defined] - try: - import matplotlib.pyplot - matplotlib.pyplot.close("all") - except: - pass - except: - pass + + _closeAllMatplotlib() + return results -def test(fileName, debugMode = False, silentMode = False): +def test(fileName: str, debugMode=False, silentMode=False) -> TesterResult: """ Run tests for a single file """ - from . import caches - caches.clearAllCaches() + _caches.clearAllCaches() + from . import tester from . import downloader downloader.updateSilently() + result = tester.test(fileName, debugMode = debugMode, silentMode = silentMode) + + _closeAllMatplotlib() + + return result + +def testOffline(fileName: str, testPath: str | _pathlib.Path, multiprocessing=True, debugMode=False, silentMode=False) -> TesterResult: + """ + Run a test offline. + Takes in the name of file to be tested and an absolute path to the tests directory. + If multiprocessing is True (by default), runs all tests in a seperate process. All tests run in the same process otherwise. + """ + _caches.clearAllCaches() + + fileStem = fileName.split(".")[0] + filePath = _discovery.getPath(fileStem) + + testModuleName = f"{fileStem}Test" + testFileName = f"{fileStem}Test.py" + testPath = _discovery.getTestPathsFrom(testFileName, _pathlib.Path(testPath))[0] + + if multiprocessing: + result = _runTests(testModuleName, testPath, filePath, debugMode, silentMode) + else: + result = _runTestsSynchronously(testModuleName, testPath, filePath, debugMode, silentMode) + + _closeAllMatplotlib() + + return result + +def _closeAllMatplotlib(): try: if __IPYTHON__: # type: ignore [name-defined] try: @@ -40,4 +75,3 @@ def test(fileName, debugMode = False, silentMode = False): pass except: pass - return result \ No newline at end of file diff --git a/checkpy/tester/discovery.py b/checkpy/tester/discovery.py index b1b0870..2c89740 100644 --- a/checkpy/tester/discovery.py +++ b/checkpy/tester/discovery.py @@ -35,8 +35,14 @@ def getTestNames(moduleName: str) -> Optional[List[str]]: def getTestPaths(testFileName: str, module: str="") -> List[pathlib.Path]: testFilePaths: List[pathlib.Path] = [] - for testsPath in database.forEachTestsPath(): - for (dirPath, dirNames, fileNames) in os.walk(testsPath): - if testFileName in fileNames and (not module or module in dirPath): - testFilePaths.append(pathlib.Path(dirPath)) + for testPath in database.forEachTestsPath(): + testFilePaths.extend(getTestPathsFrom(testFileName, testPath, module=module)) return testFilePaths + +def getTestPathsFrom(testFileName: str, path: pathlib.Path, module: str="") -> List[pathlib.Path]: + """Get all testPaths from a tests folder (path).""" + testFilePaths: List[pathlib.Path] = [] + for (dirPath, _, fileNames) in os.walk(path): + if testFileName in fileNames and (not module or module in dirPath): + testFilePaths.append(pathlib.Path(dirPath)) + return testFilePaths \ No newline at end of file diff --git a/checkpy/tester/tester.py b/checkpy/tester/tester.py index 0a080ba..2fa6576 100644 --- a/checkpy/tester/tester.py +++ b/checkpy/tester/tester.py @@ -9,8 +9,10 @@ from types import ModuleType from typing import Dict, Iterable, List, Optional, Union +import contextlib import os import pathlib +import queue import subprocess import sys import importlib @@ -21,7 +23,7 @@ import multiprocessing as mp -__all__ = ["getActiveTest", "test", "testModule", "TesterResult"] +__all__ = ["getActiveTest", "test", "testModule", "TesterResult", "runTests", "runTestsSynchronously"] _activeTest: Optional[Test] = None @@ -45,9 +47,6 @@ def test(testName: str, module="", debugMode=False, silentMode=False) -> "Tester fileName = os.path.basename(path) filePath = os.path.dirname(path) - if filePath not in sys.path: - sys.path.append(filePath) - testFileName = fileName.split(".")[0] + "Test.py" testPaths = discovery.getTestPaths(testFileName, module=module) @@ -60,9 +59,6 @@ def test(testName: str, module="", debugMode=False, silentMode=False) -> "Tester testPath = testPaths[0] - if str(testPath) not in sys.path: - sys.path.append(str(testPath)) - if path.endswith(".ipynb"): if subprocess.call(['jupyter', 'nbconvert', '--to', 'script', path]) != 0: result.addOutput(printer.displayError("Failed to convert Jupyter notebook to .py")) @@ -76,13 +72,14 @@ def test(testName: str, module="", debugMode=False, silentMode=False) -> "Tester with open(path, "w") as f: f.write("".join([l for l in lines if "get_ipython" not in l])) - testerResult = _runTests( - testFileName.split(".")[0], - testPath, - path, - debugMode=debugMode, - silentMode=silentMode - ) + with _addToSysPath(filePath): + testerResult = runTests( + testFileName.split(".")[0], + testPath, + path, + debugMode=debugMode, + silentMode=silentMode + ) if path.endswith(".ipynb"): os.remove(path) @@ -101,56 +98,84 @@ def testModule(module: str, debugMode=False, silentMode=False) -> Optional[List[ return [test(testName, module=module, debugMode=debugMode, silentMode=silentMode) for testName in testNames] -def _runTests(moduleName: str, testPath: pathlib.Path, fileName: str, debugMode=False, silentMode=False) -> "TesterResult": - ctx = mp.get_context("spawn") - - signalQueue: "mp.Queue[_Signal]" = ctx.Queue() - resultQueue: "mp.Queue[TesterResult]" = ctx.Queue() - tester = _Tester(moduleName, testPath, pathlib.Path(fileName), debugMode, silentMode, signalQueue, resultQueue) - p = ctx.Process(target=tester.run, name="Tester") - p.start() - - start = time.time() - isTiming = False +def runTests(moduleName: str, testPath: pathlib.Path, fileName: str, debugMode=False, silentMode=False) -> "TesterResult": result: Optional[TesterResult] = None - while p.is_alive(): - while not signalQueue.empty(): - signal = signalQueue.get() - - if signal.description is not None: - description = signal.description - if signal.isTiming is not None: - isTiming = signal.isTiming - if signal.timeout is not None: - timeout = signal.timeout - if signal.resetTimer: - start = time.time() - - if isTiming and time.time() - start > timeout: - result = TesterResult(pathlib.Path(fileName).name) - result.addOutput(printer.displayError("Timeout ({} seconds) reached during: {}".format(timeout, description))) - p.terminate() - p.join() - return result + with _addToSysPath(testPath): + ctx = mp.get_context("spawn") + + signalQueue: "mp.Queue[_Signal]" = ctx.Queue() + resultQueue: "mp.Queue[TesterResult]" = ctx.Queue() + tester = _Tester(moduleName, testPath, pathlib.Path(fileName), debugMode, silentMode, signalQueue, resultQueue) + p = ctx.Process(target=tester.run, name="Tester") + p.start() + + start = time.time() + isTiming = False + + while p.is_alive(): + while not signalQueue.empty(): + signal = signalQueue.get() + + if signal.description is not None: + description = signal.description + if signal.isTiming is not None: + isTiming = signal.isTiming + if signal.timeout is not None: + timeout = signal.timeout + if signal.resetTimer: + start = time.time() + + if isTiming and time.time() - start > timeout: + result = TesterResult(pathlib.Path(fileName).name) + result.addOutput(printer.displayError("Timeout ({} seconds) reached during: {}".format(timeout, description))) + p.terminate() + p.join() + return result + + if not resultQueue.empty(): + # .get before .join to prevent hanging indefinitely due to a full pipe + # https://bugs.python.org/issue8426 + result = resultQueue.get() + p.terminate() + p.join() + break + + time.sleep(0.1) if not resultQueue.empty(): - # .get before .join to prevent hanging indefinitely due to a full pipe - # https://bugs.python.org/issue8426 result = resultQueue.get() - p.terminate() - p.join() - break - time.sleep(0.1) + if result is None: + raise exception.CheckpyError(message="An error occured while testing. The testing process exited unexpectedly.") - if not resultQueue.empty(): - result = resultQueue.get() + return result - if result is None: - raise exception.CheckpyError(message="An error occured while testing. The testing process exited unexpectedly.") - return result +def runTestsSynchronously(moduleName: str, testPath: pathlib.Path, fileName: str, debugMode=False, silentMode=False) -> "TesterResult": + signalQueue = queue.Queue() + resultQueue = queue.Queue() + + tester = _Tester( + moduleName=moduleName, + testPath=testPath, + filePath=pathlib.Path(fileName), + debugMode=debugMode, + silentMode=silentMode, + signalQueue=signalQueue, + resultQueue=resultQueue + ) + + with _addToSysPath(testPath): + try: + old_debug_mode = printer.printer.DEBUG_MODE + old_silent_mode = printer.printer.SILENT_MODE + tester.run() + finally: + printer.printer.DEBUG_MODE = old_debug_mode + printer.printer.SILENT_MODE = old_silent_mode + + return resultQueue.get() class TesterResult(object): @@ -340,3 +365,16 @@ def _getTestFunctionsInExecutionOrder(self, testFunctions: Iterable[TestFunction dependencies = self._getTestFunctionsInExecutionOrder(tf.dependencies) + [tf] sortedTFs.extend([t for t in dependencies if t not in sortedTFs]) return sortedTFs + +@contextlib.contextmanager +def _addToSysPath(path: str): + addedToPath = False + path = str(path) + try: + if path not in sys.path: + addedToPath = True + sys.path.append(path) + yield + finally: + if addedToPath and path in sys.path: + sys.path.remove(path) \ No newline at end of file diff --git a/setup.py b/setup.py index 363c5c6..246cea9 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ setup( name='checkPy', - version='2.0.11', + version='2.0.12', description='A simple python testing framework for educational purposes', long_description=long_description,