Skip to content
Permalink
Browse files

Task Status Update (#461)

* Added live status update for running/queued tasks

* Fixed the tests

* Cleaned up the code

* Made state_manager an instance variable

* Moved preprocess after file lock

* Update workers_test.py

* Fixed some nits

* Changed the exception handling for when the task setup fails.

* Changed the number of the args for testTurbiniaTaskEvidenceValidationFailure

* Moved preprocessor after evidence validation.

* Minor fixes to the test and init file
  • Loading branch information
alimez authored and aarontp committed Jan 17, 2020
1 parent 3c02c34 commit 70ec382fddbefd66b3ae801e955962aa9b853c17
No changes.
@@ -31,8 +31,6 @@
from turbinia import config
from turbinia.config import DATETIME_FORMAT
from turbinia import TurbiniaException
from turbinia.workers import TurbiniaTask
from turbinia.workers import TurbiniaTaskResult

config.LoadConfig()
if config.STATE_MANAGER.lower() == 'datastore':
@@ -112,6 +110,9 @@ def get_task_dict(self, task):
if task_dict.get('run_time'):
task_dict['run_time'] = task_dict['run_time'].total_seconds()

#Importing these here to avoid circular dependencies.
from turbinia.workers import TurbiniaTask
from turbinia.workers import TurbiniaTaskResult
# Set all non-existent keys to None
all_attrs = set(
TurbiniaTask.STORED_ATTRIBUTES + TurbiniaTaskResult.STORED_ATTRIBUTES)
@@ -181,7 +181,8 @@ def testFinalizeResultBadEvidence(self):
self.assertIsNone(self.manager.process_result(self.result))
self.assertIsInstance(self.result.evidence, list)

def testFinalizeJobGenerateJobFinalizeTasks(self):
@mock.patch('turbinia.task_manager.state_manager.get_state_manager')
def testFinalizeJobGenerateJobFinalizeTasks(self, _):
"""Tests process_job method generates Job finalize Task."""
request_id = 'testRequestID'
self.task.id = 'createdFinalizeTask'
@@ -37,6 +37,7 @@
from turbinia.config import DATETIME_FORMAT
from turbinia.evidence import evidence_decode
from turbinia import output_manager
from turbinia import state_manager
from turbinia import TurbiniaException

log = logging.getLogger('turbinia')
@@ -78,6 +79,8 @@ class TurbiniaTaskResult(object):
task_id: Task ID of the parent task.
task_name: Name of parent task.
requester: The user who requested the task.
state_manager: (DatastoreStateManager|RedisStateManager): State manager
object to handle syncing with storage.
worker_name: Name of worker task executed on.
_log: A list of log messages
"""
@@ -115,6 +118,7 @@ def __init__(
self.status = None
self.error = {}
self.worker_name = platform.node()
self.state_manager = None
# TODO(aarontp): Create mechanism to grab actual python logging data.
self._log = []

@@ -134,6 +138,7 @@ def setup(self, task):
self.task_id = task.id
self.task_name = task.name
self.requester = task.requester
self.state_manager = state_manager.get_state_manager()
if task.output_manager.is_setup:
_, self.output_dir = task.output_manager.get_local_output_dirs()
else:
@@ -192,7 +197,7 @@ def close(self, task, success, status=None):
# also fail.
# pylint: disable=broad-except
except Exception as exception:
message = 'Evidence post-processing for {0:s} failed: {1!s}'.format(
message = 'Evidence post-processing for {0!s} failed: {1!s}'.format(
self.input_evidence.name, exception)
self.log(message, level=logging.ERROR)

@@ -237,6 +242,18 @@ def log(self, message, level=logging.INFO, traceback_=None):
if traceback_:
self.result.set_error(message, traceback_)

def update_task_status(self, task, status):
"""Updates the task status and pushes it directly to datastor.
Args:
task: The calling Task object
status: One line descriptive task status.
"""
task.result.status = 'Task {0!s} is {1!s} on {2!s}'.format(
self.task_name, status, self.worker_name)

self.state_manager.update_task(task)

def add_evidence(self, evidence, evidence_config):
"""Populate the results list.
@@ -273,11 +290,13 @@ def serialize(self):
Returns:
dict: Object dictionary that is JSON serializable.
"""
self.state_manager = None
self.run_time = self.run_time.total_seconds() if self.run_time else None
self.start_time = self.start_time.strftime(DATETIME_FORMAT)
if self.input_evidence:
self.input_evidence = self.input_evidence.serialize()
self.evidence = [x.serialize() for x in self.evidence]

return self.__dict__

@classmethod
@@ -292,6 +311,8 @@ def deserialize(cls, input_dict):
"""
result = TurbiniaTaskResult()
result.__dict__.update(input_dict)
if result.state_manager:
result.state_manager = None
if result.run_time:
result.run_time = timedelta(seconds=result.run_time)
result.start_time = datetime.strptime(result.start_time, DATETIME_FORMAT)
@@ -522,7 +543,6 @@ def setup(self, evidence):
raise TurbiniaException(
'Evidence source path {0:s} does not exist'.format(
evidence.source_path))
evidence.preprocess(self.tmp_dir)
return self.result

def touch(self):
@@ -609,14 +629,40 @@ def run_wrapper(self, evidence):

log.debug('Task {0:s} {1:s} awaiting execution'.format(self.name, self.id))
evidence = evidence_decode(evidence)
try:
self.result = self.setup(evidence)
self.result.update_task_status(self, 'queued')
except TurbiniaException as exception:
message = (
'{0:s} Task setup failed with exception: [{1!s}]'.format(
self.name, exception))
# Logging explicitly here because the result is in an unknown state
trace = traceback.format_exc()
log.error(message)
log.error(trace)
if self.result:
if hasattr(exception, 'message'):
self.result.set_error(exception.message, traceback.format_exc())
else:
self.result.set_error(exception.__class__, traceback.format_exc())
self.result.status = message
else:
self.result = TurbiniaTaskResult(
base_output_dir=self.base_output_dir, request_id=self.request_id,
job_id=self.job_id)
self.result.setup(self)
self.result.status = message
self.result.set_error(message, traceback.format_exc())
return self.result.serialize()
with filelock.FileLock(config.LOCK_FILE):
log.info('Starting Task {0:s} {1:s}'.format(self.name, self.id))
original_result_id = None
try:
self.result = self.setup(evidence)
original_result_id = self.result.id
evidence.validate()

# Preprocessor must be called after evidence validation.
evidence.preprocess(self.tmp_dir)
# TODO(wyassine): refactor it so the result task does not
# have to go through the preprocess stage. At the moment
# self.results.setup is required to be called to set its status.
@@ -637,7 +683,7 @@ def run_wrapper(self, evidence):
self.result.log(message, level=logging.ERROR)
self.result.status = message
return self.result.serialize()

self.result.update_task_status(self, 'running')
self._evidence_config = evidence.config
self.result = self.run(evidence, self.result)
# pylint: disable=broad-except
@@ -27,6 +27,7 @@
from turbinia.workers import TurbiniaTask
from turbinia.workers import TurbiniaTaskResult
from turbinia.workers.plaso import PlasoTask
from turbinia import state_manager


class TestTurbiniaTaskBase(unittest.TestCase):
@@ -63,7 +64,7 @@ def setUp(self, task_class=TurbiniaTask, evidence_class=evidence.RawDisk):
test_disk_path = tempfile.mkstemp(dir=self.base_output_dir)[1]
self.remove_files.append(test_disk_path)
self.evidence = evidence.RawDisk(source_path=test_disk_path)

self.evidence.preprocess = mock.MagicMock()
# Set up TurbiniaTaskResult
self.result = TurbiniaTaskResult(base_output_dir=self.base_output_dir)
self.result.setup(self.task)
@@ -99,8 +100,11 @@ def setResults(
validate_result = self.result

self.result.status = 'TestStatus'
self.result.update_task_status = mock.MagicMock()
self.result.close = mock.MagicMock()
self.task.setup = mock.MagicMock(return_value=setup)
self.result.worker_name = 'worker1'
self.result.state_manager = None
if mock_run:
self.task.run = mock.MagicMock(return_value=run)
self.task.validate_result = mock.MagicMock(return_value=validate_result)
@@ -124,6 +128,7 @@ def testTurbiniaTaskRunWrapper(self):
self.setResults()
self.result.closed = True
new_result = self.task.run_wrapper(self.evidence.__dict__)

new_result = TurbiniaTaskResult.deserialize(new_result)
self.assertEqual(new_result.status, 'TestStatus')
self.result.close.assert_not_called()
@@ -143,10 +148,8 @@ def testTurbiniaTaskRunWrapperBadResult(self):
checked_result.setup(self.task)
checked_result.status = 'CheckedResult'
self.setResults(run=bad_result, validate_result=checked_result)

new_result = self.task.run_wrapper(self.evidence.__dict__)
new_result = TurbiniaTaskResult.deserialize(new_result)

self.task.validate_result.assert_any_call(bad_result)
self.assertEqual(type(new_result), TurbiniaTaskResult)
self.assertIn('CheckedResult', new_result.status)
@@ -175,10 +178,9 @@ def testTurbiniaTaskRunWrapperExceptionThrown(self):
def testTurbiniaTaskRunWrapperSetupFail(self):
"""Test that the run wrapper recovers from setup failing."""
self.task.result = None
canary_status = 'ReturnedFromValidateResult'
self.result.status = canary_status
self.task.validate_result = mock.MagicMock(return_value=self.result)
self.task.setup = mock.MagicMock(side_effect=TurbiniaException)
canary_status = 'exception_message'
self.task.setup = mock.MagicMock(
side_effect=TurbiniaException('exception_message'))
self.remove_files.append(
os.path.join(self.task.base_output_dir, 'worker-log.txt'))

@@ -190,12 +192,14 @@ def testTurbiniaTaskRunWrapperSetupFail(self):
def testTurbiniaTaskValidateResultGoodResult(self):
"""Tests validate_result with good result."""
self.result.status = 'GoodStatus'
self.result.state_manager = None
new_result = self.task.validate_result(self.result)
self.assertEqual(new_result.status, 'GoodStatus')
self.assertDictEqual(new_result.error, {})

@mock.patch('turbinia.workers.TurbiniaTaskResult.close')
def testTurbiniaTaskValidateResultBadResult(self, _):
@mock.patch('turbinia.state_manager.get_state_manager')
def testTurbiniaTaskValidateResultBadResult(self, _, __):
"""Tests validate_result with bad result."""
# Passing in an unpickleable object (json module) and getting back a
# TurbiniaTaskResult

0 comments on commit 70ec382

Please sign in to comment.
You can’t perform that action at this time.