Skip to content

Commit

Permalink
Code review: 338670043: Added processed directory to speed up listdir
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed May 1, 2018
1 parent e243d08 commit 53d1ae9
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 113 deletions.
2 changes: 1 addition & 1 deletion config/dpkg/changelog
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ plaso (20180501-1) unstable; urgency=low

* Auto-generated

-- Log2Timeline <log2timeline-dev@googlegroups.com> Tue, 01 May 2018 17:57:10 +0200
-- Log2Timeline <log2timeline-dev@googlegroups.com> Tue, 01 May 2018 18:10:11 +0200
2 changes: 1 addition & 1 deletion plaso/multi_processing/analysis_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def _Main(self):
storage_writer.SetStorageProfiler(None)

try:
self._storage_writer.PrepareMergeTaskStorage(task)
self._storage_writer.FinalizeTaskStorage(task)
except IOError:
pass

Expand Down
1 change: 1 addition & 0 deletions plaso/multi_processing/psort.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None):

merge_ready = storage_writer.CheckTaskReadyForMerge(task)
if merge_ready:
storage_writer.PrepareMergeTaskStorage(task)
self._status = definitions.PROCESSING_STATUS_MERGING

event_queue = self._event_queues[plugin_name]
Expand Down
15 changes: 12 additions & 3 deletions plaso/multi_processing/task_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,18 @@ def _MergeTaskStorage(self, storage_writer):
if self._processing_profiler:
self._processing_profiler.StartTiming('merge_check')

pending_tasks = self._task_manager.GetTasksCheckMerge()
mergeable_tasks = storage_writer.CheckTasksReadyForMerge(pending_tasks)
self._task_manager.UpdateTasksAsPendingMerge(mergeable_tasks)
for task_identifier in storage_writer.GetProcessedTaskIdentifiers():
try:
task = self._task_manager.GetProcessedTaskByIdentifier(task_identifier)

storage_writer.PrepareMergeTaskStorage(task)
self._task_manager.UpdateTaskAsPendingMerge(task)

except KeyError:
logger.error(
'Unable to retrieve task: {0:s} to prepare it to be merged.'.format(
task_identifier))
continue

if self._processing_profiler:
self._processing_profiler.StopTiming('merge_check')
Expand Down
20 changes: 5 additions & 15 deletions plaso/multi_processing/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ class TaskManager(object):
possible that a worker has already completed the task, but no status
update was collected from the worker while it processed the task.
* processing: a worker is processing the task.
* pending_merge: a worker has completed processing the task and the
results are ready to be merged with the session storage.
* processed: a worker has completed processing the task, but it is not ready
to be merged into the session storage.
* pending_merge: the task has been processed and is ready to be merged with
the session storage.
* merging: tasks that are being merged by the engine.
Once the engine reports that a task is completely merged, it is removed
Expand Down Expand Up @@ -339,7 +341,7 @@ def GetProcessedTaskByIdentifier(self, task_identifier):
if not task:
task = self._tasks_abandoned.get(task_identifier, None)
if not task:
raise KeyError('Status of task {0:s} is unknown.'.format(
raise KeyError('Status of task {0:s} is unknown'.format(
task_identifier))

return task
Expand Down Expand Up @@ -476,18 +478,6 @@ def UpdateTaskAsPendingMerge(self, task):
else:
logger.debug('Task {0:s} is pending merge.'.format(task.identifier))

def UpdateTasksAsPendingMerge(self, mergeable_tasks):
"""Updates the task manager to reflect that tasks are ready to be merged.
Args:
mergeable_tasks (list[Task]): tasks that are ready to be merged.
Raises:
KeyError: if a task was not processing or abandoned.
"""
for task in mergeable_tasks:
self.UpdateTaskAsPendingMerge(task)

def UpdateTaskAsProcessingByIdentifier(self, task_identifier):
"""Updates the task manager to reflect the task is processing.
Expand Down
2 changes: 1 addition & 1 deletion plaso/multi_processing/worker_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def _ProcessTask(self, task):
storage_writer.Close()

try:
self._storage_writer.PrepareMergeTaskStorage(task)
self._storage_writer.FinalizeTaskStorage(task)
except IOError:
pass

Expand Down
17 changes: 15 additions & 2 deletions plaso/storage/fake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,19 @@ def GetSortedEvents(self, time_range=None):

return iter(event_heap.PopEvents())

def FinalizeTaskStorage(self, task):
"""Finalizes a processed task storage.
Args:
task (Task): task.
Raises:
IOError: if the task storage does not exist.
"""
if task.identifier not in self._task_storage_writers:
raise IOError('Storage writer for task: {0:s} does not exist.'.format(
task.identifier))

def Open(self):
"""Opens the storage writer.
Expand Down Expand Up @@ -416,15 +429,15 @@ def SetSerializersProfiler(self, serializers_profiler):
"""Sets the serializers profiler.
Args:
serializers_profiler (SerializersProfiler): serializers profile.
serializers_profiler (SerializersProfiler): serializers profiler.
"""
pass

def SetStorageProfiler(self, storage_profiler):
"""Sets the storage profiler.
Args:
storage_profiler (StorageProfiler): storage profile.
storage_profiler (StorageProfiler): storage profiler.
"""
pass

Expand Down

0 comments on commit 53d1ae9

Please sign in to comment.