Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 44 additions & 5 deletions backend/apps/ifc_validation/tasks/check_programs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,50 @@
import json
import subprocess
from typing import List
from dataclasses import dataclass

from apps.ifc_validation_models.settings import TASK_TIMEOUT_LIMIT
from apps.ifc_validation_models.models import ValidationTask

from .logger import logger
from .context import TaskContext

@dataclass
class proc_output:
returncode : int
stdout : str
stderr : str
args: List[str]


def run_subprocess_wait(*popen_args, check=False, **popen_kwargs):
process = subprocess.Popen(*popen_args, **popen_kwargs)
out_chunks, err_chunks = [], []
try:
while True:
try:
stdout, stderr = process.communicate(timeout=0.2)
out_chunks.append(stdout or "")
err_chunks.append(stderr or "")
break
except subprocess.TimeoutExpired:
# keep looping; you can also check your own stop conditions here
continue
except BaseException as e:
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
raise
retcode = process.returncode
stdout, stderr = "".join(out_chunks), "".join(err_chunks)
if check and retcode != 0:
raise subprocess.CalledProcessError(retcode, popen_args[0], output=stdout, stderr=stderr)
return proc_output(retcode, stdout, stderr, popen_args[0] if popen_args else [])


checks_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "checks"))

def check_syntax(context:TaskContext):
Expand Down Expand Up @@ -168,30 +205,32 @@ def check_instance_completion(context:TaskContext):

def check_proc_success_or_fail(proc, task):
if proc.returncode is not None and proc.returncode != 0:
error_message = f"Running {' '.join(proc.args)} failed with exit code {proc.returncode}\n{proc.stdout}\n{proc.stderr}"
error_message = (
f"Subprocess failed with exit code {proc.returncode}\n"
f"{proc.stdout}\n{proc.stderr}"
)
task.mark_as_failed(error_message)
raise RuntimeError(error_message)
return proc.stdout

def run_subprocess(
task: ValidationTask,
command: List[str],
) -> subprocess.CompletedProcess[str]:
) -> proc_output:
logger.debug(f'Command for {task.type}: {" ".join(command)}')
task.set_process_details(None, command)
try:
proc = subprocess.run(
proc = run_subprocess_wait(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=TASK_TIMEOUT_LIMIT,
env= os.environ.copy()
)
logger.info(f'test run task task name {task.type}, task value : {task}')
return proc

except Exception as err:
logger.exception(f"{type(err).__name__} in task {task.id} : {task.type}")
task.mark_as_failed(err)
task.mark_as_failed(str(err))
raise type(err)(f"Unknown error during validation task {task.id}: {task.type}") from err
45 changes: 38 additions & 7 deletions backend/apps/ifc_validation/tasks/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,47 @@
from .utils import get_absolute_file_path
from .logger import logger
from .email_tasks import *
import psutil
from celery.exceptions import SoftTimeLimitExceeded


def terminate_subprocesses():
parent = psutil.Process()
children = parent.children(recursive=True)
logger.debug(f"found child pids: {' '.join(map(str, (c.pid for c in children)))}")

for child in children:
try:
child.terminate()
logger.debug(f"terminated {child.pid}")
except psutil.NoSuchProcess:
logger.debug(f"no such process {child.pid}")
pass

_, alive = psutil.wait_procs(children, timeout=10)
logger.debug(f"processes still alive: {' '.join(map(str, (c.pid for c in alive)))}")
for child in alive:
try:
child.kill()
logger.debug(f"killed {child.pid}")
except psutil.NoSuchProcess:
logger.debug(f"no such process {child.pid}")
pass


def kill_subprocesses_on_timeout(task_func):
@functools.wraps(task_func)
def wrapper(*args, **kwargs):
try:
return task_func(*args, **kwargs)
except SoftTimeLimitExceeded:
terminate_subprocesses()
raise
return wrapper


assert task_registry.total_increment() == 100

def check_proc_success_or_fail(proc, task):
if proc.returncode is not None and proc.returncode != 0:
error_message = f"Running {' '.join(proc.args)} failed with exit code {proc.returncode}\n{proc.stdout}\n{proc.stderr}"
task.mark_as_failed(error_message)
raise RuntimeError(error_message)
return proc.stdout

@shared_task(bind=True)
@log_execution
def error_handler(self, *args, **kwargs):
Expand Down Expand Up @@ -97,6 +127,7 @@ def task_factory(task_type):
@shared_task(bind=True, name=config.celery_task_name)
@log_execution
@requires_django_user_context
@kill_subprocesses_on_timeout
def validation_subtask_runner(self, *args, **kwargs):

id = kwargs.get('id')
Expand Down