diff --git a/reana_workflow_controller/errors.py b/reana_workflow_controller/errors.py index e7b14edd..9934a8c4 100644 --- a/reana_workflow_controller/errors.py +++ b/reana_workflow_controller/errors.py @@ -35,3 +35,7 @@ class REANAExternalCallError(Exception): class REANAWorkflowStatusError(Exception): """Error when trying to change workflow status.""" + + +class REANAWorkflowStopError(Exception): + """Error when trying to stop a workflow.""" diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index db7d6d06..f9589591 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -29,10 +29,11 @@ create_cvmfs_storage_class, format_cmd) from reana_db.config import SQLALCHEMY_DATABASE_URI from reana_db.database import Session -from reana_db.models import Job +from reana_db.models import Job, JobStatus from reana_workflow_controller.errors import (REANAInteractiveSessionError, - REANAWorkflowControllerError) + REANAWorkflowControllerError, + REANAWorkflowStopError) from reana_workflow_controller.k8s import (build_interactive_k8s_objects, delete_k8s_ingress_object, delete_k8s_objects_if_exist, @@ -221,9 +222,9 @@ def _workflow_engine_env_vars(self): def get_workflow_running_jobs_as_backend_ids(self): """Get all running jobs of a workflow as backend job IDs.""" session = Session.object_session(self.workflow) - job_list = self.workflow.job_progress.get( - 'running', {}).get('job_ids', []) - rows = session.query(Job).filter(Job.id_.in_(job_list)) + rows = session.query(Job).filter_by( + workflow_uuid=str(self.workflow.id_), + status=JobStatus.running) backend_ids = [j.backend_job_id for j in rows.all()] return backend_ids @@ -341,11 +342,22 @@ def stop_batch_workflow_run(self): workflow_run_name = self._workflow_run_name_generator('batch') to_delete = self.get_workflow_running_jobs_as_backend_ids() + \ [workflow_run_name] + error = False for job in to_delete: - current_k8s_batchv1_api_client.delete_namespaced_job( - job, - KubernetesWorkflowRunManager.default_namespace, - body=V1DeleteOptions(propagation_policy='Background')) + try: + current_k8s_batchv1_api_client.delete_namespaced_job( + job, + KubernetesWorkflowRunManager.default_namespace, + body=V1DeleteOptions(propagation_policy='Background')) + except ApiException: + logging.error(f'Error while trying to stop {self.workflow.id_}' + f': Kubernetes job {job} could not be deleted.', + exc_info=True) + error = True + continue + if error: + raise REANAWorkflowStopError( + f'Workflow {self.workflow.id_} could not be stopped.') def _create_job_spec(self, name, command=None, image=None, env_vars=None, overwrite_input_parameters=None, diff --git a/tests/conftest.py b/tests/conftest.py index ba30b8f0..32ac62c6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -80,7 +80,8 @@ def add_kubernetes_jobs_to_workflow_callable(workflow, backend=None, backend_job_id = uuid.uuid4() job = Job(id_=reana_job_id, backend_job_id=str(backend_job_id), - workflow_uuid=workflow.id_) + workflow_uuid=workflow.id_, + status=JobStatus.running) progress_dict[status]['job_ids'].append(str(job.id_)) progress_dict[status]['total'] += 1 session.add(job)