diff --git a/secator/runners/_helpers.py b/secator/runners/_helpers.py index 156af5c4..01b11eae 100644 --- a/secator/runners/_helpers.py +++ b/secator/runners/_helpers.py @@ -1,6 +1,10 @@ import os +import kombu +import kombu.exceptions + from secator.utils import deduplicate +from secator.rich import console def run_extractors(results, opts, targets=[]): @@ -80,20 +84,24 @@ def get_task_ids(result, ids=[]): if result is None: return - if isinstance(result, GroupResult): - get_task_ids(result.parent, ids=ids) + try: + if isinstance(result, GroupResult): + get_task_ids(result.parent, ids=ids) - elif isinstance(result, AsyncResult): - if result.id not in ids: - ids.append(result.id) + elif isinstance(result, AsyncResult): + if result.id not in ids: + ids.append(result.id) - if hasattr(result, 'children') and result.children: - for child in result.children: - get_task_ids(child, ids=ids) + if hasattr(result, 'children') and result.children: + for child in result.children: + get_task_ids(child, ids=ids) - # Browse parent - if hasattr(result, 'parent') and result.parent: - get_task_ids(result.parent, ids=ids) + # Browse parent + if hasattr(result, 'parent') and result.parent: + get_task_ids(result.parent, ids=ids) + except kombu.exceptions.DecodeError as e: + console.print(f'[bold red]{str(e)}. Aborting get_task_ids.[/]') + return def get_task_data(task_id): @@ -107,33 +115,42 @@ def get_task_data(task_id): """ from celery.result import AsyncResult res = AsyncResult(task_id) - if not (res and res.args and len(res.args) > 1): + if not res: + return + try: + args = res.args + info = res.info + state = res.state + except kombu.exceptions.DecodeError as e: + console.print(f'[bold red]{str(e)}. Aborting get_task_data.[/]') return - data = {} - task_name = res.args[1] - data['id'] = task_id - data['name'] = task_name - data['state'] = res.state - data['chunk_info'] = '' - data['count'] = 0 - data['error'] = None - data['ready'] = False - data['descr'] = '' - data['progress'] = 0 - data['results'] = [] - if res.state in ['FAILURE', 'SUCCESS', 'REVOKED']: + if not (args and len(args) > 1): + return + task_name = args[1] + data = { + 'id': task_id, + 'name': task_name, + 'state': state, + 'chunk_info': '', + 'count': 0, + 'error': None, + 'ready': False, + 'descr': '', + 'progress': 0, + 'results': [] + } + + # Set ready flag + if state in ['FAILURE', 'SUCCESS', 'REVOKED']: data['ready'] = True - if res.info and not isinstance(res.info, list): - chunk = res.info.get('chunk', '') - chunk_count = res.info.get('chunk_count', '') - data['chunk'] = chunk - data['chunk_count'] = chunk_count - if chunk: + + # Set task data + if info and not isinstance(info, list): + data.update(info) + chunk = data.get('chunk') + chunk_count = data.get('chunk_count') + if chunk and chunk_count: data['chunk_info'] = f'{chunk}/{chunk_count}' - data.update(res.info) - data['descr'] = data.pop('description', '') - # del data['results'] - # del data['task_results'] return data