diff --git a/backend/apps/ifc_validation/tasks/check_programs.py b/backend/apps/ifc_validation/tasks/check_programs.py index 57cdf154..50e5d262 100644 --- a/backend/apps/ifc_validation/tasks/check_programs.py +++ b/backend/apps/ifc_validation/tasks/check_programs.py @@ -3,6 +3,7 @@ import json import subprocess from typing import List +from dataclasses import dataclass from apps.ifc_validation_models.settings import TASK_TIMEOUT_LIMIT from apps.ifc_validation_models.models import ValidationTask @@ -10,6 +11,42 @@ from .logger import logger from .context import TaskContext +@dataclass +class proc_output: + returncode : int + stdout : str + stderr : str + args: List[str] + + +def run_subprocess_wait(*popen_args, check=False, **popen_kwargs): + process = subprocess.Popen(*popen_args, **popen_kwargs) + out_chunks, err_chunks = [], [] + try: + while True: + try: + stdout, stderr = process.communicate(timeout=0.2) + out_chunks.append(stdout or "") + err_chunks.append(stderr or "") + break + except subprocess.TimeoutExpired: + # keep looping; you can also check your own stop conditions here + continue + except BaseException as e: + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + process.wait() + raise + retcode = process.returncode + stdout, stderr = "".join(out_chunks), "".join(err_chunks) + if check and retcode != 0: + raise subprocess.CalledProcessError(retcode, popen_args[0], output=stdout, stderr=stderr) + return proc_output(retcode, stdout, stderr, popen_args[0] if popen_args else []) + + checks_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "checks")) def check_syntax(context:TaskContext): @@ -168,7 +205,10 @@ def check_instance_completion(context:TaskContext): def check_proc_success_or_fail(proc, task): if proc.returncode is not None and proc.returncode != 0: - error_message = f"Running {' '.join(proc.args)} failed with exit code {proc.returncode}\n{proc.stdout}\n{proc.stderr}" + error_message = ( + f"Subprocess failed with exit code {proc.returncode}\n" + f"{proc.stdout}\n{proc.stderr}" + ) task.mark_as_failed(error_message) raise RuntimeError(error_message) return proc.stdout @@ -176,16 +216,15 @@ def check_proc_success_or_fail(proc, task): def run_subprocess( task: ValidationTask, command: List[str], -) -> subprocess.CompletedProcess[str]: +) -> proc_output: logger.debug(f'Command for {task.type}: {" ".join(command)}') task.set_process_details(None, command) try: - proc = subprocess.run( + proc = run_subprocess_wait( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, - timeout=TASK_TIMEOUT_LIMIT, env= os.environ.copy() ) logger.info(f'test run task task name {task.type}, task value : {task}') @@ -193,5 +232,5 @@ def run_subprocess( except Exception as err: logger.exception(f"{type(err).__name__} in task {task.id} : {task.type}") - task.mark_as_failed(err) + task.mark_as_failed(str(err)) raise type(err)(f"Unknown error during validation task {task.id}: {task.type}") from err \ No newline at end of file diff --git a/backend/apps/ifc_validation/tasks/task_runner.py b/backend/apps/ifc_validation/tasks/task_runner.py index f21d7df8..799f7d9b 100644 --- a/backend/apps/ifc_validation/tasks/task_runner.py +++ b/backend/apps/ifc_validation/tasks/task_runner.py @@ -11,17 +11,47 @@ from .utils import get_absolute_file_path from .logger import logger from .email_tasks import * +import psutil +from celery.exceptions import SoftTimeLimitExceeded + + +def terminate_subprocesses(): + parent = psutil.Process() + children = parent.children(recursive=True) + logger.debug(f"found child pids: {' '.join(map(str, (c.pid for c in children)))}") + + for child in children: + try: + child.terminate() + logger.debug(f"terminated {child.pid}") + except psutil.NoSuchProcess: + logger.debug(f"no such process {child.pid}") + pass + + _, alive = psutil.wait_procs(children, timeout=10) + logger.debug(f"processes still alive: {' '.join(map(str, (c.pid for c in alive)))}") + for child in alive: + try: + child.kill() + logger.debug(f"killed {child.pid}") + except psutil.NoSuchProcess: + logger.debug(f"no such process {child.pid}") + pass + + +def kill_subprocesses_on_timeout(task_func): + @functools.wraps(task_func) + def wrapper(*args, **kwargs): + try: + return task_func(*args, **kwargs) + except SoftTimeLimitExceeded: + terminate_subprocesses() + raise + return wrapper assert task_registry.total_increment() == 100 -def check_proc_success_or_fail(proc, task): - if proc.returncode is not None and proc.returncode != 0: - error_message = f"Running {' '.join(proc.args)} failed with exit code {proc.returncode}\n{proc.stdout}\n{proc.stderr}" - task.mark_as_failed(error_message) - raise RuntimeError(error_message) - return proc.stdout - @shared_task(bind=True) @log_execution def error_handler(self, *args, **kwargs): @@ -97,6 +127,7 @@ def task_factory(task_type): @shared_task(bind=True, name=config.celery_task_name) @log_execution @requires_django_user_context + @kill_subprocesses_on_timeout def validation_subtask_runner(self, *args, **kwargs): id = kwargs.get('id')