From 1c9a36599f1ae9b86205c59ba3754dba139921d2 Mon Sep 17 00:00:00 2001 From: Younghee Kwon Date: Mon, 6 Feb 2017 12:35:50 -0800 Subject: [PATCH 1/4] To add sdks/python/utils/profiler a MemoryReporter that tracks heap profiles. --- sdks/python/apache_beam/utils/profiler.py | 74 ++++++++++++++++++++++- 1 file changed, 73 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/utils/profiler.py b/sdks/python/apache_beam/utils/profiler.py index 121417265673..e75e424c3a06 100644 --- a/sdks/python/apache_beam/utils/profiler.py +++ b/sdks/python/apache_beam/utils/profiler.py @@ -24,7 +24,7 @@ import StringIO import tempfile import time - +from threading import Timer from apache_beam.utils.dependency import _dependency_file_copy @@ -67,3 +67,75 @@ def __exit__(self, *args): self.profile, stream=s).sort_stats(Profile.SORTBY) self.stats.print_stats() logging.info('Profiler data: [%s]', s.getvalue()) + + +class MemoryReporter(object): + """A memory reporter that reports the memory usage and heap profile. + Usage: + mr = MemoryReporter(interval_sec=30.0) + mr.start() + while ... + + # this will report continuously with 30 seconds between reports. + mr.stop() + + Or simply: + with MemoryReporter(interval_sec=100): + + + Also it could report on demand without continuous reporting. + mr = MemoryReporter() # default interval 60s but not activated. + + mr.report_once() + """ + + def __init__(self, interval_sec=60.0): + try: + from guppy import hpy + self._hpy = hpy + self._interval_sec = interval_sec + self._enabled = False + + def report_with_interval(): + if not self._enabled: + return + self.report_once() + self._timer = Timer(self._interval_sec, report_with_interval) + self._timer.start() + + self._timer = Timer(self._interval_sec, report_with_interval) + + except ImportError: + logging.warning('guppy is not installed; MemoryReporter not available.') + + def __enter__(self): + self.start() + return self + + def __exit__(self, *args): + self.stop() + + def start(self): + if self._enabled: + return + if not self._hpy: + logging.warning('guppy is not installed; MemoryReporter not available.') + return + self._enabled = True + self._timer.start() + + def stop(self): + if not self._enabled: + return + self._timer.cancel() + self._enabled = False + + def report_once(self): + if not self._hpy: + logging.warning('guppy is not installed; MemoryReporter not available.') + return + report_start_time = time.time() + heap_profile = self._hpy().heap() + logging.info('*** MemoryReport Heap:\n %s', heap_profile) + logging.info('*** MemoryReport took %.1f sec.', + time.time() - report_start_time) From 9525392a39234af4efd808c3cbe17e930d65bf94 Mon Sep 17 00:00:00 2001 From: Younghee Kwon Date: Mon, 6 Feb 2017 13:55:36 -0800 Subject: [PATCH 2/4] added comment about guppy --- sdks/python/apache_beam/utils/profiler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/utils/profiler.py b/sdks/python/apache_beam/utils/profiler.py index e75e424c3a06..e560eedecbf8 100644 --- a/sdks/python/apache_beam/utils/profiler.py +++ b/sdks/python/apache_beam/utils/profiler.py @@ -90,8 +90,10 @@ class MemoryReporter(object): """ def __init__(self, interval_sec=60.0): + # guppy might not have installed. Set up the reporter only when guppy is + # installed. http://pypi.python.org/pypi/guppy/0.1.10 try: - from guppy import hpy + from guppy import hpy # pylint: disable=import-error self._hpy = hpy self._interval_sec = interval_sec self._enabled = False From 69407ace6a81dc9c92767b4747833c70f4cbda7d Mon Sep 17 00:00:00 2001 From: Younghee Kwon Date: Mon, 6 Feb 2017 16:31:55 -0800 Subject: [PATCH 3/4] After first round of review. --- sdks/python/apache_beam/utils/profiler.py | 53 ++++++++++++----------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/sdks/python/apache_beam/utils/profiler.py b/sdks/python/apache_beam/utils/profiler.py index e560eedecbf8..b4ea86b3c64d 100644 --- a/sdks/python/apache_beam/utils/profiler.py +++ b/sdks/python/apache_beam/utils/profiler.py @@ -25,6 +25,7 @@ import tempfile import time from threading import Timer +import warnings from apache_beam.utils.dependency import _dependency_file_copy @@ -79,36 +80,34 @@ class MemoryReporter(object): # this will report continuously with 30 seconds between reports. mr.stop() - Or simply: + NOTE: A reporter with start() should always stop(), or the parent process can + never finish. + + Or simply the following which does star() and stop(): with MemoryReporter(interval_sec=100): - + while ... + Also it could report on demand without continuous reporting. - mr = MemoryReporter() # default interval 60s but not activated. + mr = MemoryReporter() # default interval 60s but not started. mr.report_once() """ def __init__(self, interval_sec=60.0): - # guppy might not have installed. Set up the reporter only when guppy is - # installed. http://pypi.python.org/pypi/guppy/0.1.10 + # guppy might not have installed. http://pypi.python.org/pypi/guppy/0.1.10 + # The reporter can be set up only when guppy is installed (and guppy cannot + # be added to the required packages in setup.py, since it's not available + # in all platforms). try: from guppy import hpy # pylint: disable=import-error self._hpy = hpy self._interval_sec = interval_sec - self._enabled = False - - def report_with_interval(): - if not self._enabled: - return - self.report_once() - self._timer = Timer(self._interval_sec, report_with_interval) - self._timer.start() - - self._timer = Timer(self._interval_sec, report_with_interval) - + self._timer = None except ImportError: - logging.warning('guppy is not installed; MemoryReporter not available.') + warnings.warn('guppy is not installed; MemoryReporter not available.') + self._hpy = None + self._enabled = False def __enter__(self): self.start() @@ -118,12 +117,18 @@ def __exit__(self, *args): self.stop() def start(self): - if self._enabled: - return - if not self._hpy: - logging.warning('guppy is not installed; MemoryReporter not available.') + if self._enabled or not self._hpy: return self._enabled = True + + def report_with_interval(): + if not self._enabled: + return + self.report_once() + self._timer = Timer(self._interval_sec, report_with_interval) + self._timer.start() + + self._timer = Timer(self._interval_sec, report_with_interval) self._timer.start() def stop(self): @@ -134,10 +139,8 @@ def stop(self): def report_once(self): if not self._hpy: - logging.warning('guppy is not installed; MemoryReporter not available.') return report_start_time = time.time() heap_profile = self._hpy().heap() - logging.info('*** MemoryReport Heap:\n %s', heap_profile) - logging.info('*** MemoryReport took %.1f sec.', - time.time() - report_start_time) + logging.info('*** MemoryReport Heap:\n %s\n MemoryReport took %.1f sec', + heap_profile, time.time() - report_start_time) From c516c3b7d54a572c8444fad317324174951d9480 Mon Sep 17 00:00:00 2001 From: Younghee Kwon Date: Mon, 6 Feb 2017 23:24:52 -0800 Subject: [PATCH 4/4] Second round review. --- sdks/python/apache_beam/utils/profiler.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/utils/profiler.py b/sdks/python/apache_beam/utils/profiler.py index b4ea86b3c64d..3599f9886cd7 100644 --- a/sdks/python/apache_beam/utils/profiler.py +++ b/sdks/python/apache_beam/utils/profiler.py @@ -73,7 +73,7 @@ def __exit__(self, *args): class MemoryReporter(object): """A memory reporter that reports the memory usage and heap profile. Usage: - mr = MemoryReporter(interval_sec=30.0) + mr = MemoryReporter(interval_second=30.0) mr.start() while ... @@ -84,7 +84,7 @@ class MemoryReporter(object): never finish. Or simply the following which does star() and stop(): - with MemoryReporter(interval_sec=100): + with MemoryReporter(interval_second=100): while ... @@ -94,7 +94,7 @@ class MemoryReporter(object): mr.report_once() """ - def __init__(self, interval_sec=60.0): + def __init__(self, interval_second=60.0): # guppy might not have installed. http://pypi.python.org/pypi/guppy/0.1.10 # The reporter can be set up only when guppy is installed (and guppy cannot # be added to the required packages in setup.py, since it's not available @@ -102,7 +102,7 @@ def __init__(self, interval_sec=60.0): try: from guppy import hpy # pylint: disable=import-error self._hpy = hpy - self._interval_sec = interval_sec + self._interval_second = interval_second self._timer = None except ImportError: warnings.warn('guppy is not installed; MemoryReporter not available.') @@ -125,10 +125,10 @@ def report_with_interval(): if not self._enabled: return self.report_once() - self._timer = Timer(self._interval_sec, report_with_interval) + self._timer = Timer(self._interval_second, report_with_interval) self._timer.start() - self._timer = Timer(self._interval_sec, report_with_interval) + self._timer = Timer(self._interval_second, report_with_interval) self._timer.start() def stop(self): @@ -142,5 +142,5 @@ def report_once(self): return report_start_time = time.time() heap_profile = self._hpy().heap() - logging.info('*** MemoryReport Heap:\n %s\n MemoryReport took %.1f sec', + logging.info('*** MemoryReport Heap:\n %s\n MemoryReport took %.1f seconds', heap_profile, time.time() - report_start_time)