Skip to content

Commit

Permalink
fix: truncated pickle error (#334)
Browse files Browse the repository at this point in the history
Fixes #333
  • Loading branch information
ocervell committed Apr 24, 2024
1 parent 1de7f9d commit 663af17
Showing 1 changed file with 52 additions and 35 deletions.
87 changes: 52 additions & 35 deletions secator/runners/_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import os

import kombu
import kombu.exceptions

from secator.utils import deduplicate
from secator.rich import console


def run_extractors(results, opts, targets=[]):
Expand Down Expand Up @@ -80,20 +84,24 @@ def get_task_ids(result, ids=[]):
if result is None:
return

if isinstance(result, GroupResult):
get_task_ids(result.parent, ids=ids)
try:
if isinstance(result, GroupResult):
get_task_ids(result.parent, ids=ids)

elif isinstance(result, AsyncResult):
if result.id not in ids:
ids.append(result.id)
elif isinstance(result, AsyncResult):
if result.id not in ids:
ids.append(result.id)

if hasattr(result, 'children') and result.children:
for child in result.children:
get_task_ids(child, ids=ids)
if hasattr(result, 'children') and result.children:
for child in result.children:
get_task_ids(child, ids=ids)

# Browse parent
if hasattr(result, 'parent') and result.parent:
get_task_ids(result.parent, ids=ids)
# Browse parent
if hasattr(result, 'parent') and result.parent:
get_task_ids(result.parent, ids=ids)
except kombu.exceptions.DecodeError as e:
console.print(f'[bold red]{str(e)}. Aborting get_task_ids.[/]')
return


def get_task_data(task_id):
Expand All @@ -107,33 +115,42 @@ def get_task_data(task_id):
"""
from celery.result import AsyncResult
res = AsyncResult(task_id)
if not (res and res.args and len(res.args) > 1):
if not res:
return
try:
args = res.args
info = res.info
state = res.state
except kombu.exceptions.DecodeError as e:
console.print(f'[bold red]{str(e)}. Aborting get_task_data.[/]')
return
data = {}
task_name = res.args[1]
data['id'] = task_id
data['name'] = task_name
data['state'] = res.state
data['chunk_info'] = ''
data['count'] = 0
data['error'] = None
data['ready'] = False
data['descr'] = ''
data['progress'] = 0
data['results'] = []
if res.state in ['FAILURE', 'SUCCESS', 'REVOKED']:
if not (args and len(args) > 1):
return
task_name = args[1]
data = {
'id': task_id,
'name': task_name,
'state': state,
'chunk_info': '',
'count': 0,
'error': None,
'ready': False,
'descr': '',
'progress': 0,
'results': []
}

# Set ready flag
if state in ['FAILURE', 'SUCCESS', 'REVOKED']:
data['ready'] = True
if res.info and not isinstance(res.info, list):
chunk = res.info.get('chunk', '')
chunk_count = res.info.get('chunk_count', '')
data['chunk'] = chunk
data['chunk_count'] = chunk_count
if chunk:

# Set task data
if info and not isinstance(info, list):
data.update(info)
chunk = data.get('chunk')
chunk_count = data.get('chunk_count')
if chunk and chunk_count:
data['chunk_info'] = f'{chunk}/{chunk_count}'
data.update(res.info)
data['descr'] = data.pop('description', '')
# del data['results']
# del data['task_results']
return data


Expand Down

0 comments on commit 663af17

Please sign in to comment.