Skip to content

Commit

Permalink
Code review: 342840043: Added memory usage profiler
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Mar 30, 2018
1 parent 2ecf6f8 commit 342112d
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 100 deletions.
2 changes: 1 addition & 1 deletion config/dpkg/changelog
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ plaso (20180330-1) unstable; urgency=low

* Auto-generated

-- Log2Timeline <log2timeline-dev@googlegroups.com> Fri, 30 Mar 2018 07:24:16 +0200
-- Log2Timeline <log2timeline-dev@googlegroups.com> Fri, 30 Mar 2018 14:46:22 +0200
1 change: 1 addition & 0 deletions plaso/cli/helpers/profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class ProfilingArgumentsHelper(interface.ArgumentsHelper):
DEFAULT_PROFILING_SAMPLE_RATE = 1000

PROFILERS_INFORMATION = {
'memory': 'Profile memory usage over time',
'parsers': 'Profile CPU time per parser',
'processing': 'Profile CPU time of processing phases',
'serializers': 'Profile CPU time of serialization'}
Expand Down
122 changes: 60 additions & 62 deletions plaso/engine/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from __future__ import unicode_literals

import abc
import gzip
import os
import time
Expand Down Expand Up @@ -44,23 +43,19 @@ class CPUTimeProfiler(object):

_FILENAME_PREFIX = 'cputime'

def __init__(self, identifier, path=None):
"""Initializes the CPU time profiler object.
def __init__(self, identifier, configuration):
"""Initializes the CPU time profiler.
Args:
identifier (str): identifier of the profiling session used to create
the sample filename.
path (Optional[str]): path to write the sample file.
configuration (ProfilingConfiguration): profiling configuration.
"""
super(CPUTimeProfiler, self).__init__()
self._identifier = identifier
self._path = path
self._path = configuration.directory
self._profile_measurements = {}
self._sample_file = '{0:s}-{1!s}.csv'.format(
self._FILENAME_PREFIX, identifier)

if path:
self._sample_file = os.path.join(path, self._sample_file)
self._sample_file = None

def StartTiming(self, profile_name):
"""Starts timing CPU time.
Expand Down Expand Up @@ -104,27 +99,29 @@ def Stop(self):
self._sample_file = None


class BaseMemoryProfiler(object):
"""The memory profiler interface."""
class GuppyMemoryProfiler(object):
"""The guppy-based memory profiler."""

def __init__(self, identifier, path=None, profiling_sample_rate=1000):
"""Initializes a memory profiler object.
def __init__(self, identifier, configuration):
"""Initializes a memory profiler.
Args:
identifier (str): unique name of the profile.
profiling_sample_rate (Optional[int]): the profiling sample rate.
Contains the number of event sources processed.
path (Optional[str]): path to write the sample file.
configuration (ProfilingConfiguration): profiling configuration.
"""
super(BaseMemoryProfiler, self).__init__()
super(GuppyMemoryProfiler, self).__init__()
self._identifier = identifier
self._path = path
self._path = configuration.directory
self._profiling_sample = 0
self._profiling_sample_rate = profiling_sample_rate
self._profiling_sample_rate = configuration.profiling_sample_rate
self._heapy = None
self._sample_file = '{0!s}.hpy'.format(identifier)

@abc.abstractmethod
def _Sample(self):
"""Takes a sample for profiling."""
if self._path:
self._sample_file = os.path.join(self._path, self._sample_file)

if hpy:
self._heapy = hpy()

@classmethod
def IsSupported(cls):
Expand All @@ -133,55 +130,48 @@ def IsSupported(cls):
Returns:
bool: True if the profiler is supported.
"""
return False
return hpy is not None

def Sample(self):
"""Takes a sample for profiling."""
self._profiling_sample += 1

if self._profiling_sample >= self._profiling_sample_rate:
self._Sample()
if self._heapy:
heap = self._heapy.heap()
heap.dump(self._sample_file)

self._profiling_sample = 0

@abc.abstractmethod
def Start(self):
"""Starts the profiler."""
if self._heapy:
self._heapy.setrelheap()

try:
os.remove(self._sample_file)
except OSError:
pass

@abc.abstractmethod
def Stop(self):
"""Stops the profiler."""
return


class GuppyMemoryProfiler(BaseMemoryProfiler):
"""The guppy-based memory profiler."""
class MemoryProfiler(object):
"""The memory profiler."""

def __init__(self, identifier, path=None, profiling_sample_rate=1000):
"""Initializes a memory profiler object.
def __init__(self, identifier, configuration):
"""Initializes a memory profiler.
Args:
identifier (str): unique name of the profile.
path (Optional[str]): path to write the sample file.
profiling_sample_rate (Optional[int]): the profiling sample rate.
Contains the number of event sources processed.
configuration (ProfilingConfiguration): profiling configuration.
"""
super(GuppyMemoryProfiler, self).__init__(
identifier, path=path, profiling_sample_rate=profiling_sample_rate)
self._heapy = None
self._sample_file = '{0!s}.hpy'.format(identifier)

if self._path:
self._sample_file = os.path.join(self._path, self._sample_file)

if hpy:
self._heapy = hpy()

def _Sample(self):
"""Takes a sample for profiling."""
if not self._heapy:
return

heap = self._heapy.heap()
heap.dump(self._sample_file)
super(MemoryProfiler, self).__init__()
self._identifier = identifier
self._path = configuration.directory
self._sample_file = None

@classmethod
def IsSupported(cls):
Expand All @@ -190,23 +180,31 @@ def IsSupported(cls):
Returns:
bool: True if the profiler is supported.
"""
return hpy is not None
return True

def Sample(self, used_memory):
"""Takes a sample for profiling.
Args:
used_memory (int): amount of used memory in bytes.
"""
cpu_time = time.clock()
sample = '{0:f}\t{1:d}\n'.format(cpu_time, used_memory)
self._sample_file.write(sample)

def Start(self):
"""Starts the profiler."""
if not self._heapy:
return

self._heapy.setrelheap()
filename = 'memory-{0:s}.csv.gz'.format(self._identifier)
if self._path:
filename = os.path.join(self._path, filename)

try:
os.remove(self._sample_file)
except OSError:
pass
self._sample_file = gzip.open(filename, 'wb')
self._sample_file.write('CPU time\tUsed memory\n')

def Stop(self):
"""Stops the profiler."""
return
self._sample_file.close()
self._sample_file = None


class ParsersProfiler(CPUTimeProfiler):
Expand Down
20 changes: 14 additions & 6 deletions plaso/engine/single_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(self):
self._current_display_name = ''
self._guppy_memory_profiler = None
self._last_status_update_timestamp = 0.0
self._memory_profiler = None
self._name = 'Main'
self._parsers_profiler = None
self._path_spec_extractor = extractors.PathSpecExtractor()
Expand Down Expand Up @@ -188,29 +189,32 @@ def _StartProfiling(self, extraction_worker):

if self._processing_configuration.profiling.HaveProfileMemoryGuppy():
self._guppy_memory_profiler = profiler.GuppyMemoryProfiler(
self._name, path=self._processing_configuration.profiling.directory,
profiling_sample_rate=(
self._processing_configuration.profiling.sample_rate))
self._name, self._processing_configuration.profiling)
self._guppy_memory_profiler.Start()

if self._processing_configuration.profiling.HaveProfileMemory():
self._memory_profiler = profiler.MemoryProfiler(
self._name, self._processing_configuration.profiling)
self._memory_profiler.Start()

if self._processing_configuration.profiling.HaveProfileParsers():
identifier = '{0:s}-parsers'.format(self._name)
self._parsers_profiler = profiler.ParsersProfiler(
identifier, path=self._processing_configuration.profiling.directory)
identifier, self._processing_configuration.profiling)
extraction_worker.SetParsersProfiler(self._parsers_profiler)
self._parsers_profiler.Start()

if self._processing_configuration.profiling.HaveProfileProcessing():
identifier = '{0:s}-processing'.format(self._name)
self._processing_profiler = profiler.ProcessingProfiler(
identifier, path=self._processing_configuration.profiling.directory)
identifier, self._processing_configuration.profiling)
extraction_worker.SetProcessingProfiler(self._processing_profiler)
self._processing_profiler.Start()

if self._processing_configuration.profiling.HaveProfileSerializers():
identifier = '{0:s}-serializers'.format(self._name)
self._serializers_profiler = profiler.SerializersProfiler(
identifier, path=self._processing_configuration.profiling.directory)
identifier, self._processing_configuration.profiling)
self._serializers_profiler.Start()

def _StopProfiling(self, extraction_worker):
Expand All @@ -224,6 +228,10 @@ def _StopProfiling(self, extraction_worker):
self._guppy_memory_profiler.Stop()
self._guppy_memory_profiler = None

if self._memory_profiler:
self._memory_profiler.Stop()
self._memory_profiler = None

if self._parsers_profiler:
extraction_worker.SetParsersProfiler(None)
self._parsers_profiler.Stop()
Expand Down
3 changes: 3 additions & 0 deletions plaso/multi_processing/base_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import signal
import time

from plaso.engine import process_info
from plaso.lib import loggers
from plaso.multi_processing import plaso_xmlrpc

Expand Down Expand Up @@ -42,6 +43,7 @@ def __init__(self, enable_sigsegv_handler=False, **kwargs):
# TODO: check if this can be replaced by self.pid or does this only apply
# to the parent process?
self._pid = None
self._process_information = None
self._quiet_mode = False
self._rpc_server = None
self._status_is_running = False
Expand Down Expand Up @@ -192,6 +194,7 @@ def run(self):
signal.SIGSEGV, self._SigSegvHandler)

self._pid = os.getpid()
self._process_information = process_info.ProcessInfo(self._pid)

# We need to set the is running status explicitly to True in case
# the process completes before the engine is able to determine
Expand Down
19 changes: 13 additions & 6 deletions plaso/multi_processing/psort.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def __init__(self, use_zeromq=True):
self._knowledge_base = None
self._merge_task = None
self._guppy_memory_profiler = None
self._memory_profiler = None
self._processing_profiler = None
self._serializers_profiler = None
self._number_of_consumed_errors = 0
Expand Down Expand Up @@ -634,23 +635,25 @@ def _StartProfiling(self):
return

if self._profiling_configuration.HaveProfileMemoryGuppy():
identifier = '{0:s}-memory'.format(self._name)
self._guppy_memory_profiler = profiler.GuppyMemoryProfiler(
identifier, path=self._profiling_configuration.directory,
profiling_sample_rate=(
self._profiling_configuration.sample_rate))
self._name, self._profiling_configuration)
self._guppy_memory_profiler.Start()

if self._profiling_configuration.HaveProfileMemory():
self._memory_profiler = profiler.MemoryProfiler(
self._name, self._profiling_configuration)
self._memory_profiler.Start()

if self._profiling_configuration.HaveProfileProcessing():
identifier = '{0:s}-processing'.format(self._name)
self._processing_profiler = profiler.ProcessingProfiler(
identifier, path=self._profiling_configuration.directory)
identifier, self._profiling_configuration)
self._processing_profiler.Start()

if self._profiling_configuration.HaveProfileSerializers():
identifier = '{0:s}-serializers'.format(self._name)
self._serializers_profiler = profiler.SerializersProfiler(
identifier, path=self._profiling_configuration.directory)
identifier, self._profiling_configuration)
self._serializers_profiler.Start()

def _StatusUpdateThreadMain(self):
Expand Down Expand Up @@ -729,6 +732,10 @@ def _StopProfiling(self):
self._guppy_memory_profiler.Stop()
self._guppy_memory_profiler = None

if self._memory_profiler:
self._memory_profiler.Stop()
self._memory_profiler = None

if self._processing_profiler:
self._processing_profiler.Stop()
self._processing_profiler = None
Expand Down

0 comments on commit 342112d

Please sign in to comment.