Skip to content

Commit

Permalink
Code review: 345920043: Changes to handle retry task
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed May 15, 2018
1 parent 3785e58 commit 3aea923
Show file tree
Hide file tree
Showing 10 changed files with 491 additions and 230 deletions.
4 changes: 2 additions & 2 deletions config/dpkg/changelog
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
plaso (20180513-1) unstable; urgency=low
plaso (20180515-1) unstable; urgency=low

* Auto-generated

-- Log2Timeline <log2timeline-dev@googlegroups.com> Sun, 13 May 2018 17:25:01 +0200
-- Log2Timeline <log2timeline-dev@googlegroups.com> Tue, 15 May 2018 05:43:40 +0200
2 changes: 1 addition & 1 deletion plaso/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

from __future__ import unicode_literals

__version__ = '20180513'
__version__ = '20180515'
40 changes: 23 additions & 17 deletions plaso/containers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from plaso.containers import interface
from plaso.containers import manager
from plaso.lib import definitions


class Task(interface.AttributeContainer):
Expand All @@ -22,17 +23,15 @@ class Task(interface.AttributeContainer):
number of micro seconds since January 1, 1970, 00:00:00 UTC.
file_entry_type (str): dfVFS type of the file entry the path specification
is referencing.
has_retry (bool): True if the task was previously abandoned and a retry
task was created, False otherwise.
identifier (str): unique identifier of the task.
last_processing_time (int): the last time the task was marked as being
processed as number of milliseconds since January 1, 1970, 00:00:00 UTC.
merge_priority (int): priority used for the task storage file merge, where
a lower value indicates a higher priority to merge.
original_task_identifier (str): the identifier of the task that this task
is an attempt to retry, or None if this task isn't a retry.
path_spec (dfvfs.PathSpec): path specification.
retried (bool): True if this task been retried.
session_identifier (str): the identifier of the session the task
is part of.
session_identifier (str): the identifier of the session the task is part of.
start_time (int): time that the task was started. Contains the number
of micro seconds since January 1, 1970, 00:00:00 UTC.
storage_file_size (int): size of the storage file in bytes.
Expand All @@ -50,14 +49,13 @@ def __init__(self, session_identifier=None):
self.aborted = False
self.completion_time = None
self.file_entry_type = None
self.has_retry = False
self.identifier = '{0:s}'.format(uuid.uuid4().hex)
self.last_processing_time = None
self.merge_priority = None
self.original_task_identifier = None
self.path_spec = None
self.retried = False
self.session_identifier = session_identifier
self.start_time = int(time.time() * 1000000)
self.start_time = int(time.time() * definitions.MICROSECONDS_PER_SECOND)
self.storage_file_size = None

# This method is necessary for heap sort.
Expand All @@ -72,17 +70,23 @@ def __lt__(self, other):
"""
return self.identifier < other.identifier

def CreateRetry(self):
"""Creates a new task that's an attempt to retry the original task.
def CreateRetryTask(self):
"""Creates a new task to retry a previously abandoned task.
The retry task will have a new identifier but most of the attributes
will be a copy of the previously abandoned task.
Returns:
Task: a task that's a retry of the existing task.
Task: a task to retry a previously abandoned task.
"""
self.retried = True
retry_task = Task(self.session_identifier)
retry_task.path_spec = self.path_spec
retry_task = Task(session_identifier=self.session_identifier)
retry_task.file_entry_type = self.file_entry_type
retry_task.merge_priority = self.merge_priority
retry_task.original_task_identifier = self.identifier
retry_task.path_spec = self.path_spec
retry_task.storage_file_size = self.storage_file_size

self.has_retry = True

return retry_task

def CreateTaskCompletion(self):
Expand All @@ -91,7 +95,8 @@ def CreateTaskCompletion(self):
Returns:
TaskCompletion: task completion attribute container.
"""
self.completion_time = int(time.time() * 1000000)
self.completion_time = int(
time.time() * definitions.MICROSECONDS_PER_SECOND)

task_completion = TaskCompletion()
task_completion.aborted = self.aborted
Expand All @@ -114,7 +119,8 @@ def CreateTaskStart(self):

def UpdateProcessingTime(self):
"""Updates the processing time to now."""
self.last_processing_time = int(time.time() * 1000000)
self.last_processing_time = int(
time.time() * definitions.MICROSECONDS_PER_SECOND)


class TaskCompletion(interface.AttributeContainer):
Expand Down
9 changes: 7 additions & 2 deletions plaso/multi_processing/task_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,13 @@ def _MergeTaskStorage(self, storage_writer):
try:
task = self._task_manager.GetProcessedTaskByIdentifier(task_identifier)

storage_writer.PrepareMergeTaskStorage(task)
self._task_manager.UpdateTaskAsPendingMerge(task)
to_merge = self._task_manager.CheckTaskToMerge(task)
if not to_merge:
storage_writer.RemoveProcessedTaskStorage(task)
self._task_manager.RemoveTask(task)
else:
storage_writer.PrepareMergeTaskStorage(task)
self._task_manager.UpdateTaskAsPendingMerge(task)

except KeyError:
logger.error(
Expand Down

0 comments on commit 3aea923

Please sign in to comment.