Skip to content
This repository has been archived by the owner on Feb 13, 2020. It is now read-only.

Commit

Permalink
Watch for process memory growth and restart any exceeding configured …
Browse files Browse the repository at this point in the history
…limit

git-svn-id: https://svn.calendarserver.org/repository/calendarserver/CalendarServer/trunk@9895 e27351fd-9f3e-4f54-a53b-843176b1656c
  • Loading branch information
m0rgen committed Oct 4, 2012
1 parent d47e0ca commit be044a1
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 3 deletions.
8 changes: 6 additions & 2 deletions calendarserver/tap/caldav.py
Expand Up @@ -75,7 +75,7 @@
from twistedcaldav.stdconfig import DEFAULT_CONFIG, DEFAULT_CONFIG_FILE
from twistedcaldav.upgrade import UpgradeFileSystemFormatService, PostDBImportService

from calendarserver.tap.util import pgServiceFromConfig, getDBPool
from calendarserver.tap.util import pgServiceFromConfig, getDBPool, MemoryLimitService

from twext.enterprise.ienterprise import POSTGRES_DIALECT
from twext.enterprise.ienterprise import ORACLE_DIALECT
Expand Down Expand Up @@ -1194,6 +1194,11 @@ def makeService_Combined(self, options):
s.processMonitor = monitor
monitor.setServiceParent(s)

if config.MemoryLimiter.Enabled:
memoryLimiter = MemoryLimitService(monitor, config.MemoryLimiter.Seconds,
config.MemoryLimiter.Bytes, config.MemoryLimiter.ResidentOnly)
memoryLimiter.setServiceParent(s)

for name, pool in config.Memcached.Pools.items():
if pool.ServerEnabled:
self.log_info(
Expand Down Expand Up @@ -1635,7 +1640,6 @@ def startService(self):
for name in self.processes:
self.startProcess(name)


def stopService(self):
"""
Return a deferred that fires when all child processes have ended.
Expand Down
72 changes: 71 additions & 1 deletion calendarserver/tap/test/test_util.py
Expand Up @@ -14,10 +14,11 @@
# limitations under the License.
##

from calendarserver.tap.util import computeProcessCount, directoryFromConfig
from calendarserver.tap.util import computeProcessCount, directoryFromConfig, MemoryLimitService
from twistedcaldav.test.util import TestCase
from twistedcaldav.config import config
from twistedcaldav.directory.augment import AugmentXMLDB
from twisted.internet.task import Clock

class ProcessCountTestCase(TestCase):

Expand Down Expand Up @@ -62,3 +63,72 @@ def test_directoryFromConfig(self):
# augmentService set to AugmentXMLDB
if hasattr(service, "augmentService"):
self.assertTrue(isinstance(service.augmentService, AugmentXMLDB))



# Stub classes for MemoryLimitServiceTestCase

class StubProtocol(object):
def __init__(self, transport):
self.transport = transport

class StubProcess(object):
def __init__(self, pid):
self.pid = pid

class StubProcessMonitor(object):
def __init__(self, processes, protocols):
self.processes = processes
self.protocols = protocols
self.history = []

def stopProcess(self, name):
self.history.append(name)


class MemoryLimitServiceTestCase(TestCase):

def test_checkMemory(self):
"""
Set up stub objects to verify MemoryLimitService.checkMemory( )
only stops the processes whose memory usage exceeds the configured
limit, and skips memcached
"""
data = {
# PID : (name, resident memory-in-bytes, virtual memory-in-bytes)
101 : ("process #1", 10, 1010),
102 : ("process #2", 30, 1030),
103 : ("process #3", 50, 1050),
99 : ("memcached-Default", 10, 1010),
}

processes = []
protocols = {}
for pid, (name, resident, virtual) in data.iteritems():
protocols[name] = StubProtocol(StubProcess(pid))
processes.append(name)
processMonitor = StubProcessMonitor(processes, protocols)
clock = Clock()
service = MemoryLimitService(processMonitor, 10, 15, True, reactor=clock)

# For testing, use a stub implementation of memory-usage lookup
def testMemoryForPID(pid, residentOnly):
return data[pid][1 if residentOnly else 2]
service._memoryForPID = testMemoryForPID

# After 5 seconds, nothing should have happened, since the interval is 10 seconds
service.startService()
clock.advance(5)
self.assertEquals(processMonitor.history, [])

# After 7 more seconds, processes 2 and 3 should have been stopped since their
# memory usage exceeds 10 bytes
clock.advance(7)
self.assertEquals(processMonitor.history, ['process #2', 'process #3'])

# Now switch to looking at virtual memory, in which case all 3 processes
# should be stopped
service._residentOnly = False
processMonitor.history = []
clock.advance(10)
self.assertEquals(processMonitor.history, ['process #1', 'process #2', 'process #3'])
91 changes: 91 additions & 0 deletions calendarserver/tap/util.py
Expand Up @@ -23,12 +23,14 @@
"getRootResource",
"getDBPool",
"FakeRequest",
"MemoryLimitService",
]

import errno
import os
from time import sleep
from socket import fromfd, AF_UNIX, SOCK_STREAM, socketpair
import psutil

from twext.python.filepath import CachingFilePath as FilePath
from twext.python.log import Logger
Expand All @@ -37,6 +39,7 @@
from twext.web2.http_headers import Headers
from twext.web2.static import File as FileResource

from twisted.application.service import Service
from twisted.cred.portal import Portal
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet import reactor as _reactor
Expand Down Expand Up @@ -68,6 +71,7 @@
from twext.enterprise.ienterprise import ORACLE_DIALECT
from twext.enterprise.adbapi2 import ConnectionPool, ConnectionPoolConnection


try:
from twistedcaldav.authkerb import NegotiateCredentialFactory
NegotiateCredentialFactory # pacify pyflakes
Expand Down Expand Up @@ -821,3 +825,90 @@ class NoURLForResourceError(RuntimeError):
def addResponseFilter(*args, **kwds):
pass


def memoryForPID(pid, residentOnly=True):
"""
Return the amount of memory in use for the given process. If residentOnly is True,
then RSS is returned; if False, then virtual memory is returned.
@param pid: process id
@type pid: C{int}
@param residentOnly: Whether only resident memory should be included
@type residentOnly: C{boolean}
@return: Memory used by process in bytes
@rtype: C{int}
"""
memoryInfo = psutil.Process(pid).get_memory_info()
return memoryInfo.rss if residentOnly else memoryInfo.vms


class MemoryLimitService(Service, object):
"""
A service which when paired with a DelayedStartupProcessMonitor will periodically
examine the memory usage of the monitored processes and stop any which exceed
a configured limit. Memcached processes are ignored.
"""

def __init__(self, processMonitor, intervalSeconds, limitBytes, residentOnly, reactor=None):
"""
@param processMonitor: the DelayedStartupProcessMonitor
@param intervalSeconds: how often to check
@type intervalSeconds: C{int}
@param limitBytes: any monitored process over this limit is stopped
@type limitBytes: C{int}
@param residentOnly: whether only resident memory should be included
@type residentOnly: C{boolean}
@param reactor: for testing
"""
self._processMonitor = processMonitor
self._seconds = intervalSeconds
self._bytes = limitBytes
self._residentOnly = residentOnly
self._delayedCall = None
if reactor is None:
from twisted.internet import reactor
self._reactor = reactor

# Unit tests can swap out _memoryForPID
self._memoryForPID = memoryForPID

def startService(self):
"""
Start scheduling the memory checks
"""
super(MemoryLimitService, self).startService()
self._delayedCall = self._reactor.callLater(self._seconds, self.checkMemory)

def stopService(self):
"""
Stop checking memory
"""
super(MemoryLimitService, self).stopService()
if self._delayedCall is not None and self._delayedCall.active():
self._delayedCall.cancel()
self._delayedCall = None

def checkMemory(self):
"""
Stop any processes monitored by our paired processMonitor whose resident
memory exceeds our configured limitBytes. Reschedule intervalSeconds in
the future.
"""
try:
for name in self._processMonitor.processes:
if name.startswith("memcached"):
continue
proto = self._processMonitor.protocols.get(name, None)
if proto is not None:
proc = proto.transport
pid = proc.pid
try:
memory = self._memoryForPID(pid, self._residentOnly)
except Exception, e:
log.error("Unable to determine memory usage of PID: %d (%s)" % (pid, e))
continue
if memory > self._bytes:
log.warn("Killing large process: %s PID:%d %s:%d" %
(name, pid, "Resident" if self._residentOnly else "Virtual", memory))
self._processMonitor.stopProcess(name)
finally:
self._delayedCall = self._reactor.callLater(self._seconds, self.checkMemory)
9 changes: 9 additions & 0 deletions twistedcaldav/stdconfig.py
Expand Up @@ -491,6 +491,15 @@
},
},

# How large a spawned process is allowed to get before it's stopped
"MemoryLimiter" : {
"Enabled" : True,
"Seconds" : 60, # How often to check memory sizes (in seconds)
"Bytes" : 2 * 1024 * 1024 * 1024, # Memory limit (RSS in bytes)
"ResidentOnly" : True, # True: only take into account resident memory;
# False: include virtual memory
},

#
# Service ACLs
#
Expand Down

0 comments on commit be044a1

Please sign in to comment.