Skip to content

Commit

Permalink
Add monitoring for memory usage, emitting events as it moves around t…
Browse files Browse the repository at this point in the history
…he threshold.

Needs specific tests.
  • Loading branch information
jamadden committed Mar 22, 2018
1 parent 320be8f commit 090b360
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 16 deletions.
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
events when it detects certain conditions, like loop blocked or
memory limits exceeded.

- Add settings for monitoring memory usage and emitting events when a
threshold is exceeded and then corrected. gevent currently supplies
no policy for what to do when memory exceeds the configured limit.

1.3a2 (2018-03-06)
==================

Expand Down
64 changes: 59 additions & 5 deletions src/gevent/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,17 +279,41 @@ def _convert(self, value):
validate = staticmethod(validate_anything)


class FloatSettingMixin(object):
def _convert(self, value):
if value:
return float(value)
class _PositiveValueMixin(object):

def validate(self, value):
if value is not None and value <= 0:
raise ValueError("Must be > 0")
raise ValueError("Must be positive")
return value


class FloatSettingMixin(_PositiveValueMixin):
def _convert(self, value):
if value:
return float(value)


class ByteCountSettingMixin(_PositiveValueMixin):

_MULTIPLES = {
# All keys must be the same size.
'kb': 1024,
'mb': 1024 * 1024,
'gb': 1024 * 1024 * 1024,
}

_SUFFIX_SIZE = 2

def _convert(self, value):
if not value:
return
value = value.lower()
for s, m in self._MULTIPLES.items():
if value[-self._SUFFIX_SIZE:] == s:
return int(value[:-self._SUFFIX_SIZE]) * m
return int(value)


class Resolver(ImportableSetting, Setting):

desc = """\
Expand Down Expand Up @@ -493,6 +517,36 @@ class MaxBlockingTime(FloatSettingMixin, Setting):
.. versionadded:: 1.3b1
"""

class MonitorMemoryPeriod(FloatSettingMixin, Setting):
name = 'memory_monitor_period'

environment_key = 'GEVENT_MONITOR_MEMORY_PERIOD'
default = 5

desc = """\
If `monitor_thread` is enabled, this is approximately how long
(in seconds) we will go between checking the processes memory usage.
Checking the memory usage is relatively expensive on some operating
systems, so this should not be too low. gevent will place a floor
value on it.
"""

class MonitorMemoryMaxUsage(ByteCountSettingMixin, Setting):
name = 'max_memory_usage'

environment_key = 'GEVENT_MONITOR_MEMORY_MAX'
default = None

desc = """\
If `monitor_thread` is enabled,
then if memory usage exceeds this amount (in bytes), events will
be emitted. See `gevent.events`.
There is no default value for this setting. If you wish to
cap memory usage, you must choose a value.
"""

# The ares settings are all interpreted by
# gevent/resolver/ares.pyx, so we don't do
# any validation here.
Expand Down
69 changes: 69 additions & 0 deletions src/gevent/_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from gevent.util import format_run_info
from gevent.events import notify
from gevent.events import EventLoopBlocked
from gevent.events import MemoryUsageThresholdExceeded
from gevent.events import MemoryUsageUnderThreshold

from gevent._compat import thread_mod_name
from gevent._util import gmctime
Expand All @@ -39,6 +41,24 @@
start_new_thread = get_original(thread_mod_name, 'start_new_thread')
thread_sleep = get_original('time', 'sleep')

try:
# The standard library 'resource' module doesn't provide
# a standard way to get the RSS measure, only the maximum.
# You might be tempted to try to compute something by adding
# together text and data sizes, but on many systems those come back
# zero. So our only option is psutil.
from psutil import Process, AccessDenied
# Make sure it works (why would we be denied access to our own process?)
try:
Process().memory_full_info()
except AccessDenied:
Process = None
except ImportError:
pass

class MonitorWarning(RuntimeWarning):
"""The type of warnings we emit."""

class _MonitorEntry(object):

__slots__ = ('function', 'period', 'last_run_time')
Expand Down Expand Up @@ -76,13 +96,22 @@ class PeriodicMonitoringThread(object):
# what particular monitoring functions want to say.
min_sleep_time = 0.005

# The minimum period in seconds at which we will check memory usage.
# Getting memory usage is fairly expensive.
min_memory_monitor_period = 2

# A list of _MonitorEntry objects: [(function(hub), period, last_run_time))]
# The first entry is always our entry for self.monitor_blocking
_monitoring_functions = None

# The calculated min sleep time for the monitoring functions list.
_calculated_sleep_time = None

# A boolean value that also happens to capture the
# memory usage at the time we exceeded the threshold. Reset
# to 0 when we go back below.
_memory_exceeded = 0

def __init__(self, hub):
self._hub_wref = wref(hub, self._on_hub_gc)
self.should_run = True
Expand Down Expand Up @@ -288,6 +317,46 @@ def ignore_current_greenlet_blocking(self):
def monitor_current_greenlet_blocking(self):
self._active_greenlet = getcurrent()

def can_monitor_memory_usage(self):
return Process is not None

def install_monitor_memory_usage(self):
# Start monitoring memory usage, if possible.
# If not possible, emit a warning.
if not self.can_monitor_memory_usage:
import warnings
warnings.warn("Unable to monitor memory usage. Install psutil.",
MonitorWarning)
return

self.add_monitoring_function(self.monitor_memory_usage,
max(GEVENT_CONFIG.memory_monitor_period,
self.min_memory_monitor_period))

def monitor_memory_usage(self, _hub):
max_allowed = GEVENT_CONFIG.max_memory_usage
if not max_allowed:
# They disabled it.
return

rusage = Process().memory_full_info()
# uss only documented available on Windows, Linux, and OS X.
# If not available, fall back to rss as an aproximation.
mem_usage = getattr(rusage, 'uss', 0) or rusage.rss

if mem_usage > max_allowed:
if mem_usage > self._memory_exceeded:
# We're still growing
notify(MemoryUsageThresholdExceeded(
mem_usage, max_allowed, rusage))
self._memory_exceeded = mem_usage
else:
# we're below. Were we above it last time?
if self._memory_exceeded:
notify(MemoryUsageUnderThreshold(
mem_usage, max_allowed, rusage, self._memory_exceeded))
self._memory_exceeded = 0

def __repr__(self):
return '<%s at %s in thread %s greenlet %r for %r>' % (
self.__class__.__name__,
Expand Down
71 changes: 71 additions & 0 deletions src/gevent/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
'subscribers',
'IEventLoopBlocked',
'EventLoopBlocked',
'IMemoryUsageThresholdExceeded',
'MemoryUsageThresholdExceeded',
'IMemoryUsageUnderThreshold',
'MemoryUsageUnderThreshold',
]

try:
Expand Down Expand Up @@ -94,3 +98,70 @@ def __init__(self, greenlet, blocking_time, info):
self.greenlet = greenlet
self.blocking_time = blocking_time
self.info = info

class IMemoryUsageThresholdExceeded(Interface):
"""
The event emitted when the memory usage threshold is exceeded.
This event is emitted only while memory continues to grow
above the threshold. Only if the condition or stabilized is corrected (memory
usage drops) will the event be emitted in the future.
This event is emitted in the monitor thread.
"""

mem_usage = Attribute("The current process memory usage, in bytes.")
max_allowed = Attribute("The maximum allowed memory usage, in bytes.")
memory_info = Attribute("The tuple of memory usage stats return by psutil.")

class _AbstractMemoryEvent(object):

def __init__(self, mem_usage, max_allowed, memory_info):
self.mem_usage = mem_usage
self.max_allowed = max_allowed
self.memory_info = memory_info

def __repr__(self):
return "<%s used=%d max=%d details=%r>" % (
self.__class__.__name__,
self.mem_usage,
self.max_allowed,
self.memory_info,
)

@implementer(IMemoryUsageThresholdExceeded)
class MemoryUsageThresholdExceeded(_AbstractMemoryEvent):
"""
Implementation of `IMemoryUsageThresholdExceeded`.
"""


class IMemoryUsageUnderThreshold(Interface):
"""
The event emitted when the memory usage drops below the
threshold after having previously been above it.
This event is emitted only the first time memory usage is detected
to be below the threshold after having previously been above it.
If memory usage climbs again, a `IMemoryUsageThresholdExceeded`
event will be broadcast, and then this event could be broadcast again.
This event is emitted in the monitor thread.
"""

mem_usage = Attribute("The current process memory usage, in bytes.")
max_allowed = Attribute("The maximum allowed memory usage, in bytes.")
max_memory_usage = Attribute("The memory usage that caused the previous "
"IMemoryUsageThresholdExceeded event.")
memory_info = Attribute("The tuple of memory usage stats return by psutil.")


@implementer(IMemoryUsageUnderThreshold)
class MemoryUsageUnderThreshold(_AbstractMemoryEvent):
"""
Implementation of `IMemoryUsageUnderThreshold`.
"""

def __init__(self, mem_usage, max_allowed, memory_info, max_usage):
super(MemoryUsageUnderThreshold, self).__init__(mem_usage, max_allowed, memory_info)
self.max_memory_usage = max_usage
13 changes: 7 additions & 6 deletions src/gevent/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __init__(self):
_threadlocal = _Threadlocal()

get_thread_ident = get_original(thread_mod_name, 'get_ident')
MAIN_THREAD = get_thread_ident() # XXX: Assuming import is done on the main thread.
MAIN_THREAD_IDENT = get_thread_ident() # XXX: Assuming import is done on the main thread.


class LoopExit(Exception):
Expand Down Expand Up @@ -507,7 +507,7 @@ def __init__(self, loop=None, default=None):
# loop. See #237 and #238.
self.loop = _threadlocal.loop
else:
if default is None and self.thread_ident != MAIN_THREAD:
if default is None and self.thread_ident != MAIN_THREAD_IDENT:
default = False

if loop is None:
Expand Down Expand Up @@ -537,7 +537,7 @@ def main_hub(self):
.. versionadded:: 1.3b1
"""
return self.thread_ident == MAIN_THREAD
return self.thread_ident == MAIN_THREAD_IDENT


def __repr__(self):
Expand Down Expand Up @@ -742,16 +742,17 @@ def run(self):

def start_periodic_monitoring_thread(self):
if self.periodic_monitoring_thread is None and GEVENT_CONFIG.monitor_thread:
# TODO: If we're the main thread, then add the memory monitoring
# function.

# Note that it is possible for one real thread to
# (temporarily) wind up with multiple monitoring threads,
# if hubs are started and stopped within the thread. This shows up
# in the threadpool tests. The monitoring threads will eventually notice their
# hub object is gone.
from gevent._monitor import PeriodicMonitoringThread
self.periodic_monitoring_thread = PeriodicMonitoringThread(self)

if self.main_hub:
self.periodic_monitoring_thread.install_monitor_memory_usage()

return self.periodic_monitoring_thread

def join(self, timeout=None):
Expand Down
13 changes: 13 additions & 0 deletions src/greentest/test__events.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@ class TestImplements(unittest.TestCase):
def test_event_loop_blocked(self):
verify.verifyClass(events.IEventLoopBlocked, events.EventLoopBlocked)

def test_mem_threshold(self):
verify.verifyClass(events.IMemoryUsageThresholdExceeded,
events.MemoryUsageThresholdExceeded)
verify.verifyObject(events.IMemoryUsageThresholdExceeded,
events.MemoryUsageThresholdExceeded(0, 0, 0))

def test_mem_decreased(self):
verify.verifyClass(events.IMemoryUsageUnderThreshold,
events.MemoryUsageUnderThreshold)
verify.verifyObject(events.IMemoryUsageUnderThreshold,
events.MemoryUsageUnderThreshold(0, 0, 0, 0))


class TestEvents(unittest.TestCase):

def test_is_zope(self):
Expand Down
10 changes: 5 additions & 5 deletions src/greentest/test__hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ def test_blocking_this_thread(self):
monitor = hub.start_periodic_monitoring_thread()
self.assertIsNotNone(monitor)

self.assertEqual(1, len(monitor.monitoring_functions()))
monitor.add_monitoring_function(self._monitor, 0.1)
self.assertEqual(2, len(monitor.monitoring_functions()))
self.assertEqual(self._monitor, monitor.monitoring_functions()[1].function)
self.assertEqual(0.1, monitor.monitoring_functions()[1].period)
monitor.add_monitoring_function(self._monitor, 0.1)
self.assertEqual(3, len(monitor.monitoring_functions()))
self.assertEqual(self._monitor, monitor.monitoring_functions()[-1].function)
self.assertEqual(0.1, monitor.monitoring_functions()[-1].period)

# We must make sure we have switched greenlets at least once,
# otherwise we can't detect a failure.
Expand All @@ -214,7 +214,7 @@ def test_blocking_this_thread(self):
self._run_monitoring_threads(monitor)
finally:
monitor.add_monitoring_function(self._monitor, None)
self.assertEqual(1, len(monitor._monitoring_functions))
self.assertEqual(2, len(monitor._monitoring_functions))
assert hub.exception_stream is stream
monitor.kill()
del hub.exception_stream
Expand Down

0 comments on commit 090b360

Please sign in to comment.