Permalink
Browse files

Merge my 'sendfdport' branch.

This expands on twext.python.sendfd to add a twisted-style Port object that
listens for incoming connections and dispatches them to IProtocolFactory
providers in a different process using sendmsg().  It also uses that mechanism
to dispatch incoming connections to subprocesses by default, although it can
still be disabled by the 'UseMetaFD' option, for the time being.

git-svn-id: https://svn.calendarserver.org/repository/calendarserver/CalendarServer/trunk@5425 e27351fd-9f3e-4f54-a53b-843176b1656c
  • Loading branch information...
2 parents 0f39c12 + 747af96 commit b23340cbc22076b6813f52be3c2f87eebf002dcd @glyph glyph committed Mar 30, 2010
View
228 calendarserver/tap/caldav.py
@@ -40,7 +40,7 @@
from twisted.python.usage import Options, UsageError
from twisted.python.reflect import namedClass
from twisted.plugin import IPlugin
-from twisted.internet.reactor import callLater, spawnProcess, addSystemEventTrigger
+from twisted.internet.reactor import callLater, addSystemEventTrigger
from twisted.internet.process import ProcessExitedAlready
from twisted.internet.protocol import Protocol, Factory
from twisted.application.internet import TCPServer, UNIXServer
@@ -53,6 +53,7 @@
from twext.python.log import logLevelForNamespace, setLogLevelForNamespace
from twext.internet.ssl import ChainingOpenSSLContextFactory
from twext.internet.tcp import MaxAcceptTCPServer, MaxAcceptSSLServer
+
from twext.web2.channel.http import LimitingHTTPFactory, SSLRedirectRequest
try:
@@ -76,6 +77,8 @@
from twistedcaldav.upgrade import upgradeData
from twistedcaldav.util import getNCPU
+from twext.web2.metafd import ConnectionLimiter, ReportingHTTPService
+
try:
from twistedcaldav.authkerb import NegotiateCredentialFactory
NegotiateCredentialFactory # pacify pyflakes
@@ -516,6 +519,20 @@ def sighup_handler(num, frame):
return service
+ def createContextFactory(self):
+ """
+ Create an SSL context factory for use with any SSL socket talking to
+ this server.
+ """
+ return ChainingOpenSSLContextFactory(
+ config.SSLPrivateKey,
+ config.SSLCertificate,
+ certificateChainFile=config.SSLAuthorityChain,
+ passwdCallback=getSSLPassphrase,
+ sslmethod=getattr(OpenSSL.SSL, config.SSLMethod),
+ )
+
+
def makeService_Slave(self, options):
#
# Change default log level to "info" as its useful to have
@@ -586,18 +603,13 @@ def updateFactory(configDict):
config.addPostUpdateHooks((updateFactory,))
if config.InheritFDs or config.InheritSSLFDs:
+ # Inherit sockets to call accept() on them individually.
for fd in config.InheritSSLFDs:
fd = int(fd)
try:
- contextFactory = ChainingOpenSSLContextFactory(
- config.SSLPrivateKey,
- config.SSLCertificate,
- certificateChainFile=config.SSLAuthorityChain,
- passwdCallback=getSSLPassphrase,
- sslmethod=getattr(OpenSSL.SSL, config.SSLMethod),
- )
+ contextFactory = self.createContextFactory()
except SSLError, e:
log.error("Unable to set up SSL context factory: %s" % (e,))
else:
@@ -623,6 +635,15 @@ def updateFactory(configDict):
inherit=True
).setServiceParent(service)
+ elif config.MetaFD:
+ # Inherit a single socket to receive accept()ed connections via
+ # recvmsg() and SCM_RIGHTS.
+
+ fd = int(config.MetaFD)
+
+ ReportingHTTPService(
+ site, fd, self.createContextFactory()
+ ).setServiceParent(service)
else: # Not inheriting, therefore we open our own:
@@ -651,13 +672,7 @@ def updateFactory(configDict):
% (bindAddress, port))
try:
- contextFactory = ChainingOpenSSLContextFactory(
- config.SSLPrivateKey,
- config.SSLCertificate,
- certificateChainFile=config.SSLAuthorityChain,
- passwdCallback=getSSLPassphrase,
- sslmethod=getattr(OpenSSL.SSL, config.SSLMethod),
- )
+ contextFactory = self.createContextFactory()
except SSLError, e:
self.log_error("Unable to set up SSL context factory: %s"
% (e,))
@@ -781,7 +796,13 @@ def makeService_Combined(self, options):
inheritFDs = []
inheritSSLFDs = []
- s._inheritedSockets = [] # keep a reference to these so they don't close
+ if config.UseMetaFD:
+ cl = ConnectionLimiter(config.MaxAccepts,
+ (config.MaxRequests *
+ config.MultiProcess.ProcessCount))
+ cl.setServiceParent(s)
+ else:
+ s._inheritedSockets = [] # keep a reference to these so they don't close
for bindAddress in config.BindAddresses:
if config.BindHTTPPorts:
@@ -800,40 +821,46 @@ def makeService_Combined(self, options):
elif config.SSLPort != 0:
config.BindSSLPorts = [config.SSLPort]
- def _openSocket(addr, port):
- log.info("Opening socket for inheritance at %s:%d" % (addr, port))
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setblocking(0)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind((addr, port))
- sock.listen(config.ListenBacklog)
- s._inheritedSockets.append(sock)
- return sock
-
- for portNum in config.BindHTTPPorts:
- sock = _openSocket(bindAddress, int(portNum))
- inheritFDs.append(sock.fileno())
-
- for portNum in config.BindSSLPorts:
- sock = _openSocket(bindAddress, int(portNum))
- inheritSSLFDs.append(sock.fileno())
-
+ if config.UseMetaFD:
+ for ports, description in [(config.BindSSLPorts, "SSL"),
+ (config.BindHTTPPorts, "TCP")]:
+ for port in ports:
+ cl.addPortService(description, port, bindAddress, config.ListenBacklog)
+ else:
+ def _openSocket(addr, port):
+ log.info("Opening socket for inheritance at %s:%d" % (addr, port))
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setblocking(0)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((addr, port))
+ sock.listen(config.ListenBacklog)
+ s._inheritedSockets.append(sock)
+ return sock
+
+ for portNum in config.BindHTTPPorts:
+ sock = _openSocket(bindAddress, int(portNum))
+ inheritFDs.append(sock.fileno())
+
+ for portNum in config.BindSSLPorts:
+ sock = _openSocket(bindAddress, int(portNum))
+ inheritSSLFDs.append(sock.fileno())
for p in xrange(0, config.MultiProcess.ProcessCount):
+ if config.UseMetaFD:
+ extraArgs = dict(dispatcher=cl.dispatcher)
+ else:
+ extraArgs = dict(inheritFDs=inheritFDs,
+ inheritSSLFDs=inheritSSLFDs)
process = TwistdSlaveProcess(
sys.argv[0],
self.tapname,
options["config"],
p,
config.BindAddresses,
- inheritFDs=inheritFDs,
- inheritSSLFDs=inheritSSLFDs
+ **extraArgs
)
-
monitor.addProcessObject(process, parentEnv)
-
-
for name, pool in config.Memcached.Pools.items():
if pool.ServerEnabled:
self.log_info("Adding memcached service for pool: %s" % (name,))
@@ -957,10 +984,28 @@ def deleteStaleSocketFiles(self):
class TwistdSlaveProcess(object):
+ """
+ A L{TwistdSlaveProcess} is information about how to start a slave process
+ running a C{twistd} plugin, to be used by
+ L{DelayedStartupProcessMonitor.addProcessObject}.
+
+ @ivar inheritFDs: File descriptors to be inherited for calling accept() on
+ in the subprocess.
+ @type inheritFDs: C{list} of C{int}, or C{None}
+
+ @ivar inheritSSLFDs: File descriptors to be inherited for calling accept()
+ on in the subprocess, and speaking TLS on the resulting sockets.
+ @type inheritSSLFDs: C{list} of C{int}, or C{None}
+
+ @ivar dispatcher: a socket dispatcher to generate an inherited port from,
+ or C{None}.
+
+ @type dispatcher: L{InheritedSocketDispatcher} or C{NoneType}
+ """
prefix = "caldav"
def __init__(self, twistd, tapname, configFile, id, interfaces,
- inheritFDs=None, inheritSSLFDs=None):
+ inheritFDs=None, inheritSSLFDs=None, dispatcher=None):
self.twistd = twistd
@@ -969,16 +1014,52 @@ def __init__(self, twistd, tapname, configFile, id, interfaces,
self.configFile = configFile
self.id = id
-
- self.inheritFDs = inheritFDs
- self.inheritSSLFDs = inheritSSLFDs
+ def emptyIfNone(x):
+ if x is None:
+ return []
+ else:
+ return x
+ self.inheritFDs = emptyIfNone(inheritFDs)
+ self.inheritSSLFDs = emptyIfNone(inheritSSLFDs)
+ self.metaSocket = None
+ self.dispatcher = dispatcher
self.interfaces = interfaces
def getName(self):
return '%s-%s' % (self.prefix, self.id)
+
+ def getMetaDescriptor(self):
+ """
+ Get the meta-socket file descriptor to inherit.
+ """
+ if self.metaSocket is None:
+ self.metaSocket = self.dispatcher.addSocket()
+ return self.metaSocket.fileno()
+
+
+ def getFileDescriptors(self):
+ """
+ @return: a mapping of file descriptor numbers for the new (child)
+ process to file descriptor numbers in the current (master) process.
+ """
+ fds = {}
+ maybeMetaFD = []
+ if self.dispatcher is not None:
+ maybeMetaFD.append(self.getMetaDescriptor())
+ for fd in self.inheritSSLFDs + self.inheritFDs + maybeMetaFD:
+ fds[fd] = fd
+ return fds
+
+
def getCommandLine(self):
+ """
+ @return: a list of command-line arguments, including the executable to
+ be used to start this subprocess.
+
+ @rtype: C{list} of C{str}
+ """
args = [sys.executable, self.twistd]
if config.UserName:
@@ -1020,6 +1101,13 @@ def getCommandLine(self):
"-o", "InheritSSLFDs=%s" % (",".join(map(str, self.inheritSSLFDs)),)
])
+ if self.dispatcher is not None:
+ # XXX this FD is never closed in the parent. should it be?
+ # (should they *all* be?) -glyph
+ args.extend([
+ "-o", "MetaFD=%s" % (self.getMetaDescriptor(),)
+ ])
+
return args
@@ -1034,29 +1122,60 @@ def startService(self):
config.ControlPort = self._port.getHost().port
class DelayedStartupProcessMonitor(procmon.ProcessMonitor):
+ """
+ A L{DelayedStartupProcessMonitor} is a L{procmon.ProcessMonitor} that
+ defers building its command lines until the service is actually ready to
+ start. It also specializes process-starting to allow for process objects
+ to determine their arguments as they are started up rather than entirely
+ ahead of time.
+
+ @ivar processObjects: a C{list} of L{TwistdSlaveProcess} to add using
+ C{self.addProcess} when this service starts up.
+
+ @ivar _extraFDs: a mapping from process names to extra file-descriptor
+ maps. (By default, all processes will have the standard stdio mapping,
+ so all file descriptors here should be >2.) This is updated during
+ L{DelayedStartupProcessMonitor.startService}, by inspecting the result
+ of L{TwistdSlaveProcess.getFileDescriptors}.
+
+ @ivar reactor: an L{IReactorProcess} for spawning processes, defaulting to
+ the global reactor.
+ """
def __init__(self, *args, **kwargs):
procmon.ProcessMonitor.__init__(self, *args, **kwargs)
-
- # processObjects stores TwistdSlaveProcesses which need to have their
- # command-lines determined just in time
self.processObjects = []
+ self._extraFDs = {}
+ from twisted.internet import reactor
+ self.reactor = reactor
+
def addProcessObject(self, process, env):
+ """
+ Add a process object to be run when this service is started.
+
+ @param env: a dictionary of environment variables.
+
+ @param process: a L{TwistdSlaveProcesses} object to be started upon
+ service startup.
+ """
self.processObjects.append((process, env))
+
def startService(self):
Service.startService(self)
# Now we're ready to build the command lines and actualy add the
# processes to procmon. This step must be done prior to setting
# active to 1
for processObject, env in self.processObjects:
+ name = processObject.getName()
self.addProcess(
- processObject.getName(),
+ name,
processObject.getCommandLine(),
env=env
)
+ self._extraFDs[name] = processObject.getFileDescriptors()
self.active = 1
delay = 0
@@ -1122,16 +1241,13 @@ def startProcess(self, name):
childFDs = { 0 : "w", 1 : "r", 2 : "r" }
- # Examine args for -o InheritFDs= and -o InheritSSLFDs=
- # Add any file descriptors listed in those args to the childFDs
- # dictionary so those don't get closed across the spawn.
- for i in xrange(len(args)-1):
- if args[i] == "-o" and args[i+1].startswith("Inherit"):
- for fd in map(int, args[i+1].split("=")[1].split(",")):
- childFDs[fd] = fd
+ childFDs.update(self._extraFDs.get(name, {}))
+
+ self.reactor.spawnProcess(
+ p, args[0], args, uid=uid, gid=gid, env=env,
+ childFDs=childFDs
+ )
- spawnProcess(p, args[0], args, uid=uid, gid=gid, env=env,
- childFDs=childFDs)
class DelayedStartupLineLogger(object):
View
161 calendarserver/tap/test/test_caldav.py
@@ -21,6 +21,8 @@
from os.path import dirname, abspath
+from zope.interface import implements
+
from twisted.trial.unittest import TestCase as BaseTestCase
from twisted.python.threadable import isInIOThread
@@ -30,7 +32,8 @@
from twisted.python import log
from twisted.internet.protocol import ServerFactory
-from twisted.internet.defer import Deferred
+from twisted.internet.defer import Deferred, inlineCallbacks
+from twisted.internet.interfaces import IProcessTransport, IReactorProcess
from twisted.application.service import IService
from twisted.application import internet
@@ -50,16 +53,76 @@
from twistedcaldav.directory.directory import UnknownRecordTypeError
from twistedcaldav.test.util import TestCase
-from calendarserver.tap.caldav import (CalDAVOptions, CalDAVServiceMaker,
- CalDAVService, GroupOwnedUNIXServer,
- DelayedStartupProcessMonitor,
- DelayedStartupLineLogger)
+from calendarserver.tap.caldav import (
+ CalDAVOptions, CalDAVServiceMaker, CalDAVService, GroupOwnedUNIXServer,
+ DelayedStartupProcessMonitor, DelayedStartupLineLogger, TwistdSlaveProcess
+)
# Points to top of source tree.
sourceRoot = dirname(dirname(dirname(dirname(abspath(__file__)))))
+class NotAProcessTransport(object):
+ """
+ Simple L{IProcessTransport} stub.
+ """
+ implements(IProcessTransport)
+
+ def __init__(self, processProtocol, executable, args, env, path,
+ uid, gid, usePTY, childFDs):
+ """
+ Hold on to all the attributes passed to spawnProcess.
+ """
+ self.processProtocol = processProtocol
+ self.executable = executable
+ self.args = args
+ self.env = env
+ self.path = path
+ self.uid = uid
+ self.gid = gid
+ self.usePTY = usePTY
+ self.childFDs = childFDs
+
+
+class InMemoryProcessSpawner(object):
+ """
+ Stub out L{IReactorProcess.spawnProcess} so that we can examine the
+ interaction of L{DelayedStartupProcessMonitor} and the reactor.
+ """
+ implements(IReactorProcess)
+
+ def __init__(self):
+ """
+ Create some storage to hold on to all the fake processes spawned.
+ """
+ self.processTransports = []
+ self.waiting = []
+
+ def waitForOneProcess(self):
+ """
+ Return a L{Deferred} which will fire when spawnProcess has been
+ invoked, with the L{IProcessTransport}.
+ """
+ d = Deferred()
+ self.waiting.append(d)
+ return d
+
+ def spawnProcess(self, processProtocol, executable, args=(), env={},
+ path=None, uid=None, gid=None, usePTY=0,
+ childFDs=None):
+
+ transport = NotAProcessTransport(
+ processProtocol, executable, args, env, path, uid, gid, usePTY,
+ childFDs
+ )
+ self.processTransports.append(transport)
+ if self.waiting:
+ self.waiting.pop(0).callback(transport)
+ return transport
+
+
+
class TestCalDAVOptions (CalDAVOptions):
"""
A fake implementation of CalDAVOptions that provides
@@ -798,6 +861,8 @@ def test_configuredDirectoryService(self):
class DummyProcessObject(object):
"""
Simple stub for the Process Object API that will run a test script.
+
+ This is a stand in for L{TwistdSlaveProcess}.
"""
def __init__(self, scriptname, *args):
@@ -809,7 +874,15 @@ def getCommandLine(self):
"""
Get the command line to invoke this script.
"""
- return [sys.executable, FilePath(__file__).sibling(self.scriptname).path] + self.args
+ return [sys.executable,
+ FilePath(__file__).sibling(self.scriptname).path] + self.args
+
+
+ def getFileDescriptors(self):
+ """
+ Return a dummy, empty mapping of file descriptors.
+ """
+ return {}
def getName(self):
@@ -873,3 +946,79 @@ def assertions(result):
return d
+ @inlineCallbacks
+ def test_acceptDescriptorInheritance(self):
+ """
+ If a L{TwistdSlaveProcess} specifies some file descriptors to be
+ inherited, they should be inherited by the subprocess.
+ """
+ dspm = DelayedStartupProcessMonitor()
+ dspm.reactor = InMemoryProcessSpawner()
+
+ # Most arguments here will be ignored, so these are bogus values.
+ slave = TwistdSlaveProcess(
+ twistd = "bleh",
+ tapname = "caldav",
+ configFile = "/does/not/exist",
+ id = 10,
+ interfaces = '127.0.0.1',
+ inheritFDs = [3, 7],
+ inheritSSLFDs = [19, 25],
+ )
+
+ dspm.addProcessObject(slave, {})
+ dspm.startService()
+ self.addCleanup(dspm.consistency.cancel)
+ # We can easily stub out spawnProcess, because caldav calls it, but a
+ # bunch of callLater calls are buried in procmon itself, so we need to
+ # use the real clock.
+ oneProcessTransport = yield dspm.reactor.waitForOneProcess()
+ self.assertEquals(oneProcessTransport.childFDs,
+ {0: 'w', 1: 'r', 2: 'r',
+ 3: 3, 7: 7,
+ 19: 19, 25: 25})
+ @inlineCallbacks
+ def test_metaDescriptorInheritance(self):
+ """
+ If a L{TwistdSlaveProcess} specifies a meta-file-descriptor to be
+ inherited, it should be inherited by the subprocess, and a
+ configuration argument should be passed that indicates to the
+ subprocess.
+ """
+ dspm = DelayedStartupProcessMonitor()
+ dspm.reactor = InMemoryProcessSpawner()
+ class FakeFD:
+ def __init__(self, n):
+ self.fd = n
+ def fileno(self):
+ return self.fd
+
+ class FakeDispatcher:
+ n = 3
+ def addSocket(self):
+ self.n += 1
+ return FakeFD(self.n)
+
+ # Most arguments here will be ignored, so these are bogus values.
+ slave = TwistdSlaveProcess(
+ twistd = "bleh",
+ tapname = "caldav",
+ configFile = "/does/not/exist",
+ id = 10,
+ interfaces = '127.0.0.1',
+ dispatcher = FakeDispatcher()
+ )
+
+ dspm.addProcessObject(slave, {})
+ dspm.startService()
+ self.addCleanup(dspm.consistency.cancel)
+ oneProcessTransport = yield dspm.reactor.waitForOneProcess()
+ self.assertIn("MetaFD=4", oneProcessTransport.args)
+ self.assertEquals(
+ oneProcessTransport.args[oneProcessTransport.args.index("MetaFD=4")-1],
+ '-o',
+ "MetaFD argument was not passed as an option"
+ )
+ self.assertEquals(oneProcessTransport.childFDs,
+ {0: 'w', 1: 'r', 2: 'r',
+ 4: 4})
View
301 twext/internet/sendfdport.py
@@ -0,0 +1,301 @@
+# -*- test-case-name: twext.internet.test.test_sendfdport -*-
+##
+# Copyright (c) 2005-2009 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Implementation of a TCP/SSL port that uses sendmsg/recvmsg as implemented by
+L{twext.python.sendfd}.
+"""
+
+from os import close
+from errno import EAGAIN, ENOBUFS
+from socket import (socketpair, fromfd, error as SocketError,
+ AF_INET, AF_UNIX, SOCK_STREAM, SOCK_DGRAM)
+
+from twisted.python import log
+
+from twisted.internet.abstract import FileDescriptor
+from twisted.internet.protocol import Protocol, Factory
+
+from twext.python.sendmsg import sendmsg, recvmsg
+from twext.python.sendfd import sendfd, recvfd
+
+class InheritingProtocol(Protocol, object):
+ """
+ When a connection comes in on this protocol, stop reading and writing, and
+ dispatch the socket to another process via its factory.
+ """
+
+ def connectionMade(self):
+ """
+ A connection was received; transmit the file descriptor to another
+ process via L{InheritingProtocolFactory} and remove my transport from
+ the reactor.
+ """
+ self.transport.stopReading()
+ self.transport.stopWriting()
+ skt = self.transport.getHandle()
+ self.factory.sendSocket(skt)
+
+
+
+class InheritingProtocolFactory(Factory, object):
+ """
+ In the 'master' process, make one of these and hook it up to the sockets
+ where you want to hear stuff.
+
+ @ivar dispatcher: an L{InheritedSocketDispatcher} to use to dispatch
+ incoming connections to an appropriate subprocess.
+
+ @ivar description: the string to send along with connections received on
+ this factory.
+ """
+
+ protocol = InheritingProtocol
+
+ def __init__(self, dispatcher, description):
+ self.dispatcher = dispatcher
+ self.description = description
+
+
+ def sendSocket(self, socketObject):
+ """
+ Send the given socket object on to my dispatcher.
+ """
+ self.dispatcher.sendFileDescriptor(socketObject, self.description)
+
+
+
+class _SubprocessSocket(FileDescriptor, object):
+ """
+ A socket in the master process pointing at a file descriptor that can be
+ used to transmit sockets to a subprocess.
+
+ @ivar skt: the UNIX socket used as the sendmsg() transport.
+
+ @ivar outgoingSocketQueue: an outgoing queue of sockets to send to the
+ subprocess.
+
+ @ivar outgoingSocketQueue: a C{list} of 2-tuples of C{(socket-object, str)}
+
+ @ivar status: a record of the last status message received (via recvmsg)
+ from the subprocess: this is an application-specific indication of how
+ ready this subprocess is to receive more connections. A typical usage
+ would be to count the open connections: this is what is passed to
+
+ @type status: C{str}
+ """
+
+ def __init__(self, dispatcher, skt):
+ FileDescriptor.__init__(self, dispatcher.reactor)
+ self.status = None
+ self.dispatcher = dispatcher
+ self.skt = skt # XXX needs to be set non-blocking by somebody
+ self.fileno = skt.fileno
+ self.outgoingSocketQueue = []
+
+
+ def sendSocketToPeer(self, skt, description):
+ """
+ Enqueue a socket to send to the subprocess.
+ """
+ self.outgoingSocketQueue.append((skt, description))
+ self.startWriting()
+
+
+ def doRead(self):
+ """
+ Receive a status / health message and record it.
+ """
+ try:
+ data, flags, ancillary = recvmsg(self.skt.fileno())
+ except SocketError, se:
+ if se.errno not in (EAGAIN, ENOBUFS):
+ raise
+ else:
+ self.dispatcher.statusMessage(self, data)
+
+
+ def doWrite(self):
+ """
+ Transmit as many queued pending file descriptors as we can.
+ """
+ while self.outgoingSocketQueue:
+ skt, desc = self.outgoingSocketQueue.pop(0)
+ try:
+ sendfd(self.skt.fileno(), skt.fileno(), desc)
+ except SocketError, se:
+ if se.errno in (EAGAIN, ENOBUFS):
+ self.outgoingSocketQueue.insert(0, (skt, desc))
+ return
+ raise
+ if not self.outgoingSocketQueue:
+ self.stopWriting()
+
+
+
+class InheritedSocketDispatcher(object):
+ """
+ Used by one or more L{InheritingProtocolFactory}s, this keeps track of a
+ list of available sockets in subprocesses and sends inbound connections
+ towards them.
+ """
+
+ def __init__(self, statusWatcher):
+ """
+ Create a socket dispatcher.
+ """
+ self._subprocessSockets = []
+ self.statusWatcher = statusWatcher
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+
+ @property
+ def statuses(self):
+ """
+ Yield the current status of all subprocess sockets.
+ """
+ for subsocket in self._subprocessSockets:
+ yield subsocket.status
+
+
+ def statusMessage(self, subsocket, message):
+ """
+ The status of a connection has changed; update all registered status
+ change listeners.
+ """
+ subsocket.status = self.statusWatcher.statusFromMessage(subsocket.status, message)
+
+
+ def sendFileDescriptor(self, skt, description):
+ """
+ A connection has been received. Dispatch it.
+
+ @param skt: a socket object
+
+ @param description: some text to identify to the subprocess's
+ L{InheritedPort} what type of transport to create for this socket.
+ """
+ self._subprocessSockets.sort(key=lambda conn: conn.status)
+ selectedSocket = self._subprocessSockets[0]
+ selectedSocket.sendSocketToPeer(skt, description)
+ # XXX Maybe want to send along 'description' or 'skt' or some
+ # properties thereof? -glyph
+ selectedSocket.status = self.statusWatcher.newConnectionStatus(selectedSocket.status)
+
+
+ def addSocket(self):
+ """
+ Add a C{sendmsg()}-oriented AF_UNIX socket to the pool of sockets being
+ used for transmitting file descriptors to child processes.
+
+ @return: a socket object for the receiving side; pass this object's
+ C{fileno()} as part of the C{childFDs} argument to
+ C{spawnProcess()}, then close it.
+ """
+ i, o = socketpair(AF_UNIX, SOCK_DGRAM)
+ a = _SubprocessSocket(self, o)
+ a.startReading()
+ self._subprocessSockets.append(a)
+ return i
+
+
+
+class InheritedPort(FileDescriptor, object):
+ """
+ Create this in the 'slave' process to handle incoming connections
+ dispatched via C{sendmsg}.
+ """
+
+ def __init__(self, fd, transportFactory, protocolFactory):
+ """
+ @param fd: a file descriptor
+
+ @type fd: C{int}
+
+ @param transportFactory: a 3-argument function that takes the socket
+ object produced from the file descriptor, the (non-ancillary) data
+ sent along with the incoming file descriptor, and the protocol
+ built along with it, and returns an L{ITransport} provider. Note
+ that this should NOT call C{makeConnection} on the protocol that it
+ produces, as this class will do that.
+
+ @param protocolFactory: an L{IProtocolFactory}
+ """
+ FileDescriptor.__init__(self)
+ self.fd = fd
+ self.transportFactory = transportFactory
+ self.protocolFactory = protocolFactory
+ self.statusQueue = []
+
+
+ def fileno(self):
+ """
+ Get the FD number for this socket.
+ """
+ return self.fd
+
+
+ def doRead(self):
+ """
+ A message is ready to read. Receive a file descriptor from our parent
+ process.
+ """
+ try:
+ fd, description = recvfd(self.fd)
+ except SocketError, se:
+ if se.errno != EAGAIN:
+ raise
+ else:
+ try:
+ skt = fromfd(fd, AF_INET, SOCK_STREAM)
+ # XXX it could be AF_UNIX, I guess? or even something else?
+ # should this be on the transportFactory's side of things?
+
+ close(fd) # fromfd() calls dup()
+ peeraddr = skt.getpeername()
+ protocol = self.protocolFactory.buildProtocol(peeraddr)
+ transport = self.transportFactory(skt, description, protocol)
+ protocol.makeConnection(transport)
+ except:
+ log.err()
+
+
+ def doWrite(self):
+ """
+ Write some data.
+ """
+ while self.statusQueue:
+ msg = self.statusQueue.pop(0)
+ try:
+ sendmsg(self.fd, msg, 0)
+ except SocketError, se:
+ if se.errno in (EAGAIN, ENOBUFS):
+ self.statusQueue.insert(0, msg)
+ return
+ raise
+ self.stopWriting()
+
+
+ def reportStatus(self, statusMessage):
+ """
+ Report a status message to the L{_SubprocessSocket} monitoring this
+ L{InheritedPort}'s health in the master process.
+ """
+ self.statusQueue.append(statusMessage)
+ self.startWriting()
+
View
16 twext/internet/tcp.py
@@ -39,20 +39,27 @@ class MaxAcceptPortMixin(object):
Mixin for resetting maxAccepts.
"""
def doRead(self):
- self.numberAccepts = min(self.factory.maxRequests - self.factory.outstandingRequests, self.factory.maxAccepts)
+ self.numberAccepts = min(
+ self.factory.maxRequests - self.factory.outstandingRequests,
+ self.factory.maxAccepts
+ )
tcp.Port.doRead(self)
+
+
class MaxAcceptTCPPort(MaxAcceptPortMixin, tcp.Port):
"""
Use for non-inheriting tcp ports.
"""
- pass
+
+
class MaxAcceptSSLPort(MaxAcceptPortMixin, ssl.Port):
"""
Use for non-inheriting SSL ports.
"""
- pass
+
+
class InheritedTCPPort(MaxAcceptTCPPort):
"""
@@ -100,6 +107,9 @@ class MaxAcceptTCPServer(internet.TCPServer):
"""
TCP server which will uses MaxAcceptTCPPorts (and optionally,
inherited ports)
+
+ @ivar myPort: When running, this is set to the L{IListeningPort} being
+ managed by this service.
"""
def __init__(self, *args, **kwargs):
View
17 twext/python/sendfd.py
@@ -20,7 +20,7 @@
from twext.python.sendmsg import sendmsg, recvmsg, SCM_RIGHTS
-def sendfd(socketfd, fd):
+def sendfd(socketfd, fd, description):
"""
Send the given FD to another process via L{sendmsg} on the given C{AF_UNIX}
socket.
@@ -33,9 +33,14 @@ def sendfd(socketfd, fd):
@param fd: A file descriptor to be sent to the other process.
@type fd: C{int}
+
+ @param description: a string describing the socket that was passed.
+
+ @type description: C{str}
"""
- args = (socketfd, "", 0, [(SOL_SOCKET, SCM_RIGHTS, pack("i", fd))])
- sendmsg(*args)
+ sendmsg(
+ socketfd, description, 0, [(SOL_SOCKET, SCM_RIGHTS, pack("i", fd))]
+ )
def recvfd(socketfd):
@@ -48,14 +53,14 @@ def recvfd(socketfd):
@param fd: C{int}
- @return: a new file descriptor.
+ @return: a 2-tuple of (new file descriptor, description).
- @rtype: C{int}
+ @rtype: 2-tuple of (C{int}, C{str})
"""
data, flags, ancillary = recvmsg(socketfd)
[(cmsg_level, cmsg_type, packedFD)] = ancillary
# cmsg_level and cmsg_type really need to be SOL_SOCKET / SCM_RIGHTS, but
# since those are the *only* standard values, there's not much point in
# checking.
[unpackedFD] = unpack("i", packedFD)
- return unpackedFD
+ return (unpackedFD, data)
View
4 twext/python/test/pullpipe.py
@@ -19,8 +19,8 @@
if __name__ == '__main__':
from twext.python.sendfd import recvfd
import sys, os
- fd = recvfd(int(sys.argv[1]))
- os.write(fd, "Test fixture data.\n")
+ fd, description = recvfd(int(sys.argv[1]))
+ os.write(fd, "Test fixture data: %s.\n" % (description,))
os.close(fd)
View
6 twext/python/test/test_sendmsg.py
@@ -154,7 +154,7 @@ def spawn(self, script):
@inlineCallbacks
def test_sendSubProcessFD(self):
"""
- Calling L{sendsmsg} with SOL_SOCKET, SCM_RIGHTS , and a platform-endian
+ Calling L{sendsmsg} with SOL_SOCKET, SCM_RIGHTS, and a platform-endian
packed file descriptor number should send that file descriptor to a
different process, where it can be retrieved by using L{recvmsg}.
"""
@@ -163,10 +163,10 @@ def test_sendSubProcessFD(self):
yield sspp.started
pipeOut, pipeIn = pipe()
self.addCleanup(close, pipeOut)
- sendfd(self.input.fileno(), pipeIn)
+ sendfd(self.input.fileno(), pipeIn, "blonk")
close(pipeIn)
yield sspp.stopped
- self.assertEquals(read(pipeOut, 1024), "Test fixture data.\n")
+ self.assertEquals(read(pipeOut, 1024), "Test fixture data: blonk.\n")
# Make sure that the pipe is actually closed now.
self.assertEquals(read(pipeOut, 1024), "")
View
97 twext/web2/channel/http.py
@@ -739,7 +739,7 @@ def __init__(self):
def connectionMade(self):
self.setTimeout(self.inputTimeOut)
- self.factory.outstandingRequests+=1
+ self.factory.addConnectedChannel(self)
def lineReceived(self, line):
if self._first_line:
@@ -928,7 +928,7 @@ def readConnectionLost(self):
self.transport.loseConnection()
def connectionLost(self, reason):
- self.factory.outstandingRequests-=1
+ self.factory.removeConnectedChannel(self)
self._writeLost = True
self.readConnectionLost()
@@ -950,20 +950,33 @@ def connectionMade(self):
"please try again later.</body></html>")
self.transport.loseConnection()
+
+
class HTTPFactory(protocol.ServerFactory):
- """Factory for HTTP server."""
+ """
+ Factory for HTTP server.
+
+ @ivar outstandingRequests: the number of currently connected HTTP channels.
+
+ @type outstandingRequests: C{int}
+
+ @ivar connectedChannels: all the channels that have currently active
+ connections.
+
+ @type connectedChannels: C{set} of L{HTTPChannel}
+ """
protocol = HTTPChannel
protocolArgs = None
- outstandingRequests = 0
-
def __init__(self, requestFactory, maxRequests=600, **kwargs):
- self.maxRequests=maxRequests
+ self.maxRequests = maxRequests
self.protocolArgs = kwargs
- self.protocolArgs['requestFactory']=requestFactory
-
+ self.protocolArgs['requestFactory'] = requestFactory
+ self.connectedChannels = set()
+
+
def buildProtocol(self, addr):
if self.outstandingRequests >= self.maxRequests:
return OverloadedServerProtocol()
@@ -975,6 +988,28 @@ def buildProtocol(self, addr):
return p
+ def addConnectedChannel(self, channel):
+ """
+ Add a connected channel to the set of currently connected channels and
+ increase the outstanding request count.
+ """
+ self.connectedChannels.add(channel)
+
+
+ def removeConnectedChannel(self, channel):
+ """
+ Remove a connected channel from the set of currently connected channels
+ and decrease the outstanding request count.
+ """
+ self.connectedChannels.remove(channel)
+
+
+ @property
+ def outstandingRequests(self):
+ return len(self.connectedChannels)
+
+
+
class HTTP503LoggingFactory (HTTPFactory):
"""
Factory for HTTP server which emits a 503 response when overloaded.
@@ -1087,39 +1122,49 @@ def finish(self):
-class LimitingHTTPChannel(HTTPChannel):
- """ HTTPChannel that takes itself out of the reactor once it has enough
- requests in flight.
- """
-
- def connectionMade(self):
- HTTPChannel.connectionMade(self)
- if self.factory.outstandingRequests >= self.factory.maxRequests:
- self.factory.myServer.myPort.stopReading()
-
- def connectionLost(self, reason):
- HTTPChannel.connectionLost(self, reason)
- if self.factory.outstandingRequests < self.factory.maxRequests:
- self.factory.myServer.myPort.startReading()
-
class LimitingHTTPFactory(HTTPFactory):
- """ HTTPFactory which stores maxAccepts on behalf of the MaxAcceptPortMixin
"""
+ HTTPFactory which stores maxAccepts on behalf of the MaxAcceptPortMixin
- protocol = LimitingHTTPChannel
+ @ivar myServer: a reference to a L{MaxAcceptTCPServer} that this
+ L{LimitingHTTPFactory} will limit. This must be set externally.
+ """
def __init__(self, requestFactory, maxRequests=600, maxAccepts=100,
**kwargs):
HTTPFactory.__init__(self, requestFactory, maxRequests, **kwargs)
self.maxAccepts = maxAccepts
def buildProtocol(self, addr):
-
+ """
+ Override L{HTTPFactory.buildProtocol} in order to avoid ever returning
+ an L{OverloadedServerProtocol}; this should be handled in other ways.
+ """
p = protocol.ServerFactory.buildProtocol(self, addr)
for arg, value in self.protocolArgs.iteritems():
setattr(p, arg, value)
return p
+ def addConnectedChannel(self, channel):
+ """
+ Override L{HTTPFactory.addConnectedChannel} to pause listening on the
+ socket when there are too many outstanding channels.
+ """
+ HTTPFactory.addConnectedChannel(self, channel)
+ if self.outstandingRequests >= self.maxRequests:
+ self.myServer.myPort.stopReading()
+
+
+ def removeConnectedChannel(self, channel):
+ """
+ Override L{HTTPFactory.addConnectedChannel} to resume listening on the
+ socket when there are too many outstanding channels.
+ """
+ HTTPFactory.removeConnectedChannel(self, channel)
+ if self.outstandingRequests < self.maxRequests:
+ self.myServer.myPort.startReading()
+
+
__all__ = [
"HTTPFactory",
View
251 twext/web2/metafd.py
@@ -0,0 +1,251 @@
+
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twisted.internet.tcp import Server
+from twext.internet.tcp import MaxAcceptTCPServer
+
+from twisted.internet import reactor
+
+from twisted.application.service import MultiService, Service
+
+from twext.web2.channel.http import HTTPFactory
+
+from twext.internet.sendfdport import (
+ InheritedPort, InheritedSocketDispatcher, InheritingProtocolFactory)
+
+
+
+class JustEnoughLikeAPort(object):
+ """
+ Fake out just enough of L{tcp.Port} to be acceptable to
+ L{tcp.Server}...
+ """
+ _realPortNumber = 'inherited'
+
+
+
+class ReportingHTTPService(Service, object):
+ """
+ Service which starts up an HTTP server that can report back to its parent
+ process via L{InheritedPort}.
+ """
+
+ _connectionCount = 0
+
+ def __init__(self, site, fd, contextFactory):
+ self.contextFactory = contextFactory
+ # Unlike other 'factory' constructions, config.MaxRequests and
+ # config.MaxAccepts are dealt with in the master process, so we don't
+ # need to propagate them here.
+ self.site = site
+ self.fd = fd
+
+
+ def startService(self):
+ """
+ Start reading on the inherited port.
+ """
+ Service.startService(self)
+ self.reportingFactory = ReportingHTTPFactory(self.site, vary=True)
+ self.reportingFactory.inheritedPort = InheritedPort(
+ self.fd, self.createTransport, self.reportingFactory
+ )
+ self.reportingFactory.inheritedPort.startReading()
+
+
+ def stopService(self):
+ """
+ Stop reading on the inherited port.
+ """
+ Service.stopService(self)
+ # XXX stopping should really be destructive, because otherwise we will
+ # always leak a file descriptor; i.e. this shouldn't be restartable.
+ # XXX this needs to return a Deferred.
+ self.reportingFactory.inheritedPort.stopReading()
+
+
+ def createTransport(self, skt, data, protocol):
+ """
+ Create a TCP transport, from a socket object passed by the parent.
+ """
+ self._connectionCount += 1
+ transport = Server(skt, protocol,
+ skt.getpeername(), JustEnoughLikeAPort,
+ self._connectionCount, reactor)
+ if data == 'SSL':
+ transport.startTLS(self.contextFactory)
+ transport.startReading()
+ return transport
+
+
+
+class ReportingHTTPFactory(HTTPFactory):
+ """
+ An L{HTTPFactory} which reports its status to a
+ L{twext.internet.sendfdport.InheritedPort}.
+
+ @ivar inheritedPort: an L{InheritedPort} to report status (the current
+ number of outstanding connections) to. Since this - the
+ L{ReportingHTTPFactory} - needs to be instantiated to be passed to
+ L{InheritedPort}'s constructor, this attribute must be set afterwards
+ but before any connections have occurred.
+ """
+
+ def _report(self, message):
+ """
+ Report a status message to the parent.
+ """
+ self.inheritedPort.reportStatus(message)
+
+
+ def addConnectedChannel(self, channel):
+ """
+ Add the connected channel, and report the current number of open
+ channels to the listening socket in the parent process.
+ """
+ HTTPFactory.addConnectedChannel(self, channel)
+ self._report("+")
+
+
+ def removeConnectedChannel(self, channel):
+ """
+ Remove the connected channel, and report the current number of open
+ channels to the listening socket in the parent process.
+ """
+ HTTPFactory.removeConnectedChannel(self, channel)
+ self._report("-")
+
+
+
+class ConnectionLimiter(MultiService, object):
+ """
+ Connection limiter for use with L{InheritedSocketDispatcher}.
+
+ This depends on statuses being reported by L{ReportingHTTPFactory}
+ """
+
+ def __init__(self, maxAccepts, maxRequests):
+ """
+ Create a L{ConnectionLimiter} with an associated dispatcher and
+ list of factories.
+ """
+ MultiService.__init__(self)
+ self.factories = []
+ # XXX dispatcher needs to be a service, so that it can shut down its
+ # sub-sockets.
+ self.dispatcher = InheritedSocketDispatcher(self)
+ self.maxAccepts = maxAccepts
+ self.maxRequests = maxRequests
+
+
+ def addPortService(self, description, port, interface, backlog):
+ """
+ Add a L{MaxAcceptTCPServer} to bind a TCP port to a socket description.
+ """
+ lipf = LimitingInheritingProtocolFactory(self, description)
+ self.factories.append(lipf)
+ MaxAcceptTCPServer(
+ port, lipf,
+ interface=interface,
+ backlog=backlog
+ ).setServiceParent(self)
+
+
+ # implementation of implicit statusWatcher interface required by
+ # InheritedSocketDispatcher
+
+ def statusFromMessage(self, previousStatus, message):
+ """
+ Determine a subprocess socket's status from its previous status and a
+ status message.
+ """
+ if message == '-':
+ result = self.intWithNoneAsZero(previousStatus) - 1
+ # A connection has gone away in a subprocess; we should start
+ # accepting connections again if we paused (see
+ # newConnectionStatus)
+ for f in self.factories:
+ f.myServer.myPort.startReading()
+ else:
+ # '+' is just an acknowledgement of newConnectionStatus, so we can
+ # ignore it.
+ result = self.intWithNoneAsZero(previousStatus)
+ return result
+
+
+ def newConnectionStatus(self, previousStatus):
+ """
+ Determine the effect of a new connection being sent on a subprocess
+ socket.
+ """
+ current = self.outstandingRequests + 1
+ maximum = self.maxRequests
+ overloaded = (current >= maximum)
+ if overloaded:
+ for f in self.factories:
+ f.myServer.myPort.stopReading()
+
+ result = self.intWithNoneAsZero(previousStatus) + 1
+ return result
+
+
+ def intWithNoneAsZero(self, x):
+ """
+ Convert 'x' to an C{int}, unless x is C{None}, in which case return 0.
+ """
+ if x is None:
+ return 0
+ else:
+ return int(x)
+
+
+ @property
+ def outstandingRequests(self):
+ outstanding = 0
+ for status in self.dispatcher.statuses:
+ outstanding += self.intWithNoneAsZero(status)
+ return outstanding
+
+
+
+class LimitingInheritingProtocolFactory(InheritingProtocolFactory):
+ """
+ An L{InheritingProtocolFactory} that supports the implicit factory contract
+ required by L{MaxAcceptTCPServer}/L{MaxAcceptTCPPort}.
+
+ @ivar outstandingRequests: a read-only property for the number of currently
+ active connections.
+
+ @ivar maxAccepts: The maximum number of times to call 'accept()' in a
+ single reactor loop iteration.
+
+ @ivar maxRequests: The maximum number of concurrent connections to accept
+ at once - note that this is for the I{entire server}, whereas the
+ value in the configuration file is for only a single process.
+ """
+
+ def __init__(self, limiter, description):
+ super(LimitingInheritingProtocolFactory, self).__init__(
+ limiter.dispatcher, description)
+ self.limiter = limiter
+ self.maxAccepts = limiter.maxAccepts
+ self.maxRequests = limiter.maxRequests
+
+
+ @property
+ def outstandingRequests(self):
+ return self.limiter.outstandingRequests
View
4 twistedcaldav/stdconfig.py
@@ -141,6 +141,10 @@
"BindSSLPorts" : [], # List of port numbers to bind to for SSL [empty = same as "SSLPort"]
"InheritFDs" : [], # File descriptors to inherit for HTTP requests (empty = don't inherit)
"InheritSSLFDs": [], # File descriptors to inherit for HTTPS requests (empty = don't inherit)
+ "MetaFD": 0, # Inherited file descriptor to call recvmsg() on to recive sockets (none = don't inherit)
+
+ "UseMetaFD": True, # Use a 'meta' FD, i.e. an FD to transmit other
+ # FDs to slave processes.
#
# Types of service provided

0 comments on commit b23340c

Please sign in to comment.