Browse files

Partial work (broken)

  • Loading branch information...
1 parent 91dcd4b commit ef98a7fb0079999b511577fcf7a9f6ce3ff77dc1 @ianb committed Apr 23, 2012
Showing with 2,304 additions and 82 deletions.
  1. +1 −0 .gitignore
  2. +359 −81 apppkg/__init__.py
  3. +156 −0 apppkg/init.py
  4. +1,434 −0 apppkg/paste_httpserver.py
  5. +133 −0 apppkg/paste_reloader.py
  6. +95 −0 apppkg/run-command.py
  7. +125 −0 apppkg/serve.py
  8. +1 −1 docs/spec.txt
View
1 .gitignore
@@ -1 +1,2 @@
*.egg-info
+*.pyc
View
440 apppkg/__init__.py
@@ -1,32 +1,41 @@
import sys
import os
+import re
import yaml
import new
-import tempfile
import subprocess
+import urllib
+from cStringIO import StringIO
from site import addsitedir
+try:
+ import simplejson as json
+except ImportError:
+ import json
+
+
+__all__ = ['AppPackage', 'Environment']
+
+here = os.path.dirname(os.path.abspath(__file__))
class AppPackage(object):
- def __init__(self, path, config=None):
+ def __init__(self, path, config_dir=None, environment=None):
self.path = path
- self._config = config
-
- @property
- def config(self):
- if self._config is None:
- fp = self.get_file('app.yaml')
- try:
- return yaml.load(fp)
- finally:
- fp.close()
- return self._config
+ fp = self.open('app.yaml')
+ try:
+ self.description = yaml.load(fp)
+ finally:
+ fp.close()
+ if config_dir is None:
+ config_dir = self.config_default_dir
+ self.config_dir = config_dir
+ self.environment = environment
## Helpers for file names and handling:
def open(self, relpath, mode='rb'):
- return open(self.abspath(filename), mode)
+ return open(self.abspath(relpath), mode)
def abspath(self, *paths):
return os.path.normcase(os.path.abspath(os.path.join(self.path, *paths)))
@@ -38,25 +47,25 @@ def exists(self, path):
@property
def name(self):
- return self.config['name']
+ return self.description['name']
@property
def static_path(self):
"""The path of static files"""
- if 'static' in self.config:
- return self.abspath(self.config['static'])
+ if 'static' in self.description:
+ return self.abspath(self.description['static'])
elif self.exists('static'):
return self.abspath('static')
else:
return None
@property
- def runner(self):
+ def wsgi_application(self):
"""The runner value (where the application is instantiated)"""
- runner = self.config.get('runner')
+ runner = self.description.get('wsgi_application')
if not runner:
return None
- return self.abspath(runner)
+ return CommandReference(self, runner, 'wsgi_application')
@property
def config_required(self):
@@ -66,6 +75,7 @@ def config_required(self):
@property
def config_template(self):
"""Path: where a configuration template exists"""
+ ## FIXME: should this be a command?
v = self.config.get('config', {}).get('template')
if v:
return self.abspath(v)
@@ -76,60 +86,59 @@ def config_validator(self):
"""Object: validator for the configuration"""
v = self.config.get('config', {}).get('validator')
if v:
- return self.objloader(v, 'config.validator')
+ return CommandReference(self, v, 'config.validator')
return None
@property
- def config_default(self):
+ def config_default_dir(self):
"""Path: default configuration if no other is provided"""
dir = self.config.get('config', {}).get('default')
if dir:
return self.abspath(dir)
return None
@property
- def add_paths(self):
- """List of paths: things to add to sys.path"""
- dirs = self.config.get('add_paths', [])
- if isinstance(dirs, basestring):
- dirs = [dirs]
- ## FIXME: should ensure all paths are relative
- return [self.abspath(dir) for dir in dirs]
-
- @property
- def services(self):
- """Dict of {service_name: config}: all the configured services. Config may be None"""
- services = self.config.get('services', [])
- if isinstance(services, list):
- services = dict((v, None) for v in services)
- return services
+ def requires(self):
+ return Requires.from_description(self, self.description.get('requires'))
## Process initialization
- def activate_path(self):
+ def activate_path(self, venv_path=None):
+ dirs = self.description.get('add_paths', [])
+ if isinstance(dirs, basestring):
+ dirs = [dirs]
+ dirs = [self.abspath(dir) for dir in dirs]
add_paths = list(self.add_paths)
add_paths.extend([
self.abspath('lib/python%s' % sys.version[:3]),
self.abspath('lib/python%s/site-packages' % sys.version[:3]),
self.abspath('lib/python'),
+ self.abspath('vendor'),
])
+ if venv_path:
+ add_paths.extend([
+ os.path.join(venv_path, 'lib/python%s/site-packages' % sys.version[:3]),
+ ])
for path in reversed(add_paths):
- self.add_path(path)
+ self.add_path_to_sys_path(path)
- def setup_settings(self):
+ def setup_settings(self, settings=None):
"""Create the settings that the application itself can import"""
- if 'appsettings' in sys.modules:
- return
- module = new.module('appbsettings')
- module.add_setting = _add_setting
- sys.modules[module.__name__] = module
- return module
+ if 'appsettings' not in sys.modules:
+ module = new.module('appsettings')
+ module.add_setting = _add_setting
+ sys.modules[module.__name__] = module
+ else:
+ module = sys.modules['appsettings']
+ if settings is not None:
+ for name, value in settings.items():
+ module.add_setting(name, value)
- def add_sys_path(self, path):
+ def add_path_to_sys_path(self, path):
"""Adds one path to sys.path.
This also reads .pth files, and makes sure all paths end up at the front, ahead
- of any system paths.
+ of any system paths. Also executes sitecustomize.py
"""
if not os.path.exists(path):
return
@@ -149,18 +158,9 @@ def add_sys_path(self, path):
ns = {'__file__': sitecustomize, '__name__': 'sitecustomize'}
execfile(sitecustomize, ns)
- @property
- def wsgi_app(self):
- runner = self.runner
- if runner is None:
- raise Exception(
- "No runner has been defined")
- ns = {'__file__': runner, '__name__': 'main_py'}
- execfile(runner, ns)
- if 'application' in ns:
- return ns['application']
- else:
- raise NameError("No application defined in %s" % runner)
+ def bytecompile(self):
+ import compileall
+ compileall.compile_dir(self.path)
def call_script(self, script_path, arguments, env_overrides=None, cwd=None, python_exe=None,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE):
@@ -177,37 +177,315 @@ def call_script(self, script_path, arguments, env_overrides=None, cwd=None, pyth
calling_script = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'call-script.py')
args = [python_exe, calling_script, self.path, script_path]
args.extend(arguments)
- env['PYWEBAPP_LOCATION'] = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+ env['APPPKG_LOCATION'] = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
proc = subprocess.Popen(args, stdout=stdout, stderr=stderr, stdin=stdin,
environ=env, cwd=cwd)
return proc
## FIXME: need something to run "commands" (as defined in the spec)
-def _add_setting(name, value):
- _check_settings_value(name, value)
- setattr(sys.modules['appsettings'], name, value)
+class Requires(object):
+
+ def __init__(self, app, pip=None, deb=None, rpm=None):
+ ## FIXME: not sure if we need a reference to app
+ self.app = app
+ self.pip = pip
+ self.deb = deb
+ self.rpm = rpm
+
+ @classmethod
+ def from_config(cls, app, conf):
+ if not conf:
+ conf = {}
+ return cls(deb=cls.normalize(conf.get('deb')),
+ rpm=cls.normalize(conf.get('rpm')),
+ pip=cls.normalize(conf.get('pip')),
+ app=app)
+
+ @staticmethod
+ def normalize(setting):
+ if setting is None:
+ return []
+ if isinstance(setting, basestring):
+ return [setting]
+ return setting
+
+ def install_deb(self, command='apt-get', sudo=False):
+ if not self.deb:
+ return
+ cmd = [command, 'install'] + self.deb
+ if sudo:
+ cmd = ['sudo'] + cmd
+ ## FIXME: we need a better way to run commands:
+ self.run_command(cmd)
+
+ def install_rpm(self, command='yum', gpgcheck=True, sudo=False):
+ if not self.rpm:
+ return
+ cmd = [command, 'install', '-y']
+ if not gpgcheck:
+ cmd.append('--nogpgcheck')
+ if sudo:
+ cmd = ['sudo'] + cmd
+ self.run_command(cmd)
+
+ def run_command(self, cmd):
+ subprocess.check_call(cmd)
+
+ def create_venv(self, path):
+ """Create a virtualenv at the path"""
+ import virtualenv
+ ## FIXME: configure virtualenv.logger?
+ virtualenv.create_environment(
+ path,
+ site_packages=True,
+ clear=False,
+ unzip_setuptools=True,
+ use_distribute=True,
+ prompt=None,
+ search_dirs=None,
+ never_download=False)
+
+ def install_pip(self, venv_path, make_venv=False):
+ """Installs all the requirements in the requires: pip: ...
+
+ Can create a virtualenv in the process; you should create a
+ separate virtualenv for each application.
+ """
+ if not self.pip:
+ return
+ if make_venv:
+ self.create_venv(venv_path)
+ cmd = [os.path.join(venv_path, 'bin', 'pip'), 'install']
+ for requirement in self.pip:
+ if self.app.exists(requirement):
+ # Assume it is a requirements file
+ cmd.extend(['-r', self.app.abspath(requirement)])
+ else:
+ cmd.append(requirement)
+ self.run_command(cmd)
+
+
+class CommandReference(object):
+
+ def __init__(self, app, ref, name):
+ self.app = app
+ self.ref = ref
+ self.name = name
+ self.ref_type, self.ref_data = self.parse_ref_type(ref)
+
+ def __repr__(self):
+ return '<CommandReference %s=%s for %r>' % (self.name, self.ref, self.app)
+ _PY_MOD_RE = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_.]+(?:\:[a-zA-Z_][a-zA-Z0-9_.]*)?$')
-def _check_settings_value(name, value):
- """Checks that a setting value is correct.
+ def parse_ref_type(self, ref):
+ if ref.startswith('/') or ref.startswith('url:'):
+ if ref.startswith('url:'):
+ ref = ref[4:]
+ return 'url', ref
+ if ref.startswith('script:'):
+ ref = ref.split(':', 1)
+ path = self.app.abspath(ref)
+ with open(path) as fp:
+ first = fp.readline()
+ if first.startswith('#!') and 'python' in first:
+ return 'py', (ref, None)
+ return 'script', ref
+ if ref.endswith('.py') or '.py:' in ref or ref.startswith('pyscript:'):
+ if ref.startswith('pyscript:'):
+ ref = ref[len('pyscript:'):]
+ if ':' in ref:
+ path, extra = ref.split(':', 1)
+ else:
+ path = ref
+ extra = None
+ return 'pyscript', (path, extra)
+ if self._PY_MOD_RE.search(ref) or ref.startswith('py:'):
+ if ref.startswith('py:'):
+ ref = ref[3:]
+ return 'py', (path, extra)
- Settings values can only be JSON-compatible types, i.e., list,
- dict, string, int/float, bool, None.
- """
+ def run(self, *args):
+ """Runs the command, returning (text_output, extra_data), or
+ raising an exception"""
+ return getattr(self, 'run_' + self.ref_type)(self.app.environment, *args)
+
+ def run_url(self, *args):
+ obj = self.app.wsgi_application.get_object()
+ if '?' in self.ref:
+ path, query_string = self.ref.split('?', 1)
+ else:
+ path, query_string = self.ref, ''
+ if args:
+ body = []
+ for item in args:
+ if isinstance(item, (int, float, str, unicode)):
+ body.append(urllib.quote(str(item)))
+ else:
+ body.append(urllib.quote(json.dumps(item)))
+ body = '&'.join(body)
+ else:
+ body = ''
+ content_type = 'application/x-www-form-urlencoded'
+ env = {
+ 'wsgi.url_scheme': 'http',
+ 'wsgi.input': StringIO(body),
+ 'SERVER_NAME': 'localhost',
+ 'SERVER_PORT': '0',
+ 'HTTP_HOST': 'http://localhost:0'
+ 'SCRIPT_NAME': '',
+ 'PATH_INFO': urllib.unquote(path),
+ 'QUERY_STRING': query_string,
+ 'REQUEST_METHOD': 'POST',
+ 'CONTENT_TYPE': content_type,
+ 'CONTENT_LENGTH': len(body),
+ }
+ output = []
+ status_headers = []
+
+ def start_response(status, headers, exc_info=None):
+ if exc_info:
+ raise exc_info[0], exc_info[1], exc_info[2]
+ status_headers[:] = [status, headers]
+ return output.append
+ app_iter = obj(env, start_response)
+ output.extend(app_iter)
+ status, headers = status_headers
+ if status >= 300:
+ ## FIXME: do some error thing?
+ raise Exception()
+ output = ''.join(output)
+ metadata = {'headers': headers, 'status': status}
+ return output, metadata
+
+ def run_script(self, *args):
+ cmd = [self.app.abspath(self.ref)] + list(args)
+ proc = subprocess.Popen(
+
+
+
+class Environment(object):
+
+ def __init__(self, app, config=None, env_description=None, env_base=None,
+ base_python_exe=sys.executable, venv_location=None):
+ self.app = app
+ if config:
+ if not os.path.isdir(config):
+ raise Exception("The config directory (%r) must exist" % config)
+ self.config = config
+ check_jsonable(env_description)
+ self.env_description = env_description
+ if env_base:
+ if not os.path.isdir(env_base):
+ raise Exception("The env_base (%r) must be an existing dir" % env_base)
+ self.env_base = env_base
+ self.base_python_exe = base_python_exe
+ self.venv_location = venv_location
+
+ def run_command(self, command_path, args=None, env=None):
+ if self.env_base:
+ command_path = os.path.join(self.env_base, command_path)
+ if env is None:
+ env = {}
+ env['_APPPKG_COMMAND_PATH'] = command_path
+ env['_APPPKG_APP_PATH'] = self.app.path
+ env['_APPPKG_PATH'] = here
+ env['_APPPKG_ENV_DESCRIPTION'] = json.dumps(self.env_description)
+ env['_APPPKG_CONFIG'] = self.config or ''
+ env['_APPPKG_VENV_LOCATION'] = self.venv_location or ''
+ full_env = os.environ.copy()
+ full_env.update(env)
+ command = [self.base_python_exe, os.path.join(here, 'run-command.py')]
+ if args:
+ command.extend(args)
+ proc = subprocess.Popen(command, env=full_env, cwd=self.app.path,
+ stdout=subprocess.PIPE, stdin=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ return HandyProcess(proc, 'apppkg calling %s' % command_path)
+
+ def run_func(self, command_path, func, *args, **kw):
+ env = {
+ '_APPPKG_ARGS': json.dumps(args),
+ '_APPPKG_KW': json.dumps(kw),
+ '_APPPKG_FUNC': func,
+ }
+ proc = self.run_command(command_path, env=env)
+ data = json.loads(proc.stdout)
+ if data.get('error'):
+ raise FunctionException(data['error']['class'], data['error']['description'], data['error']['details'])
+ return data['data']
+
+
+class HandyProcess(object):
+
+ def __init__(self, proc, description):
+ self.proc = proc
+ self._stdout = None
+ self._stderr = None
+ self.description = self.description
+
+ def send_stdin(self, data=None):
+ self._stdout, self._stderr = self.proc.communicate(data)
+ if self.proc.returncode:
+ raise ProcessError(self.proc.returncode, self, self.description)
+
+ @property
+ def stdout(self):
+ if self._stdout is None:
+ self.send_stdin()
+ return self._stdout
+
+ @property
+ def stderr(self):
+ if self._stderr is None:
+ self.send_stdin()
+ return self._stderr
+
+
+class ProcessError(Exception):
+
+ def __init__(self, code, process, description):
+ self.code = code
+ self.process = process
+ self.description = description
+
+ def __str__(self):
+ return '<ProcessError returncode=%r running %s>' % (self.code, self.description)
+
+
+class FunctionException(Exception):
+
+ def __init__(self, class_name, description, details):
+ self.class_name = class_name
+ self.description = description
+ for name, value in details.items():
+ setattr(self, name, value)
+
+ def __str__(self):
+ return '<FunctionException:%s %s>' % (self.class_name, self.description)
+
+
+def _add_setting(name, value):
+ import appsettings
+ check_jsonable(name, value)
+ setattr(appsettings, name, value)
+
+
+def check_jsonable(name, value):
if isinstance(value, dict):
- for key in value:
- if not isinstance(key, basestring):
- raise ValueError("Setting %s has invalid key (not a string): %r"
- % key)
- _check_settings_value(name + "." + key, value[key])
- elif isinstance(value, list):
- for index, item in enumerate(value):
- _check_settings_value("%s[%r]" % (name, index), item)
- elif isinstance(value, (basestring, int, float, bool)):
- pass
- elif value is None:
+ for n, v in value.items():
+ if not isinstance(name, basestring):
+ raise ValueError("%s is a dict with a non-string key (%r)" % (name, n))
+ n = name + '.' + n
+ check_jsonable(n, v)
+ elif isinstance(value, (list, tuple)):
+ for i, v in enumerate(value):
+ n = '%s[%r]' % (name, i)
+ check_jsonable(n, v)
+ elif isinstance(value, (str, unicode, int, float, bool)) or value is None:
pass
else:
- raise ValueError("Setting %s is not a valid type: %r" % (name, value))
+ raise ValueError("%s (%r) is not a JSONable type (%s)"
+ % (name, value, type(value)))
View
156 apppkg/init.py
@@ -0,0 +1,156 @@
+"""Creates a simple app layout"""
+
+import argparse
+import os
+
+parser = argparse.ArgumentParser(
+ prog='python -c apppkg.init',
+ description="Create a new apppkg layout",
+ )
+
+parser.add_argument(
+ 'dir', metavar="DIR",
+ help="Directory to write to.")
+
+parser.add_argument(
+ '--name', metavar='NAME',
+ help="Name of the application (defaults to directory name)")
+
+
+TEMPLATE_DIRS = [
+ '.',
+ '%(pkg_name)s/%(pkg_name)s',
+ 'vendor',
+ ]
+
+TEMPLATE_FILES = {
+
+ 'app.yaml': """\
+platform: python wsgi
+name: %(name)s
+add_paths:
+ - %(pkg_name)s
+requires:
+ pip: requirements.txt
+wsgi: %(pkg_name)s.entrypoints:make_app()
+wsgi_ping: /.ping
+install: %(pkg_name)s.entrypoints:install
+before_update: %(pkg_name)s.entrypoints:before_update
+update: %(pkg_name)s.entrypoints:update
+health_check: %(pkg_name)s.entrypoints:health_check
+before_delete: %(pkg_name)s.entrypoints:before_delete
+check_environment: %(pkg_name)s.entrypoints:check_environment
+""",
+
+ '%(pkg_name)s/%(pkg_name)s/entrypoints.py': """\
+# Each of the functions here is referred to in app.yaml
+# They start out simply stubbed out
+
+def make_app():
+ # should return a WSGI application
+ from %(pkg_name)s.something import Application
+ return Application()
+
+def install():
+ # You might create db tables or setup files or something here
+ pass
+
+def before_update():
+ # This version of the application is about to be overwritten by
+ # a new version. Do something here, not sure what?
+ pass
+
+def update():
+ # This application has just been updated. You might want to migrate
+ # database tables to a new schema here, for example
+ # The safest thing to do is to treat everything like an install or update:
+ install()
+
+def before_delete():
+ # The application is about to be deleted (not just updated). Do something?
+ pass
+
+def health_check():
+ # You should do checks on data integrity here
+ pass
+
+def check_environment():
+ # If you need a command or utility you should check it here
+ # E.g., check that $PATH/git exists if you need to use git
+ pass
+""",
+
+ '%(pkg_name)s/%(pkg_name)s/__init__.py': """\
+""",
+ '%(pkg_name)s/sitecustomize.py': """\
+# You can put code here that will be run when the process is setup
+""",
+
+ '.pip.conf': """\
+[global]
+sys.path =
+ %%(here)s/vendor
+ %%(here)s/vendor-binary
+
+[install]
+install_option =
+ --install-purelib=%%(here)s/vendor/
+ --install-platlib=%%(here)s/vendor-binary/
+ --install-scripts=%%(here)s/bin/
+""",
+
+ '.gitignore': """\
+vendor-binary
+""",
+
+ 'requirements.txt': """\
+# You MAY put libraries here that you require.
+# You SHOUlD instead try to use "pip install" to install things into vendor/
+# You WILL notice some libraries end up in vendor-binary/ : these are libraries
+# that must be built locally. You should put those libraries into this file.
+# You are NOT recommended to use "pip freeze" to generate this file, as it will
+# include libraries should be present in vendor/
+"""
+ }
+
+
+def sub(c, vars):
+ return c % vars
+
+
+def make_package_name(name):
+ return name.lower().replace(' ', '_')
+
+
+def main():
+ args = parser.parse_args()
+ if not args.name:
+ args.name = os.path.basename(args.dir).strip('/').strip('\\')
+ vars = dict(
+ name=args.name,
+ dir=args.dir,
+ pkg_name=make_package_name(args.name),
+ )
+ for dir in TEMPLATE_DIRS:
+ dir = os.path.join(args.dir, sub(dir, vars))
+ if not os.path.exists(dir):
+ print 'Creating %s/' % dir
+ os.makedirs(dir)
+ for name, content in TEMPLATE_FILES.items():
+ name = os.path.join(args.dir, sub(name, vars))
+ content = sub(content, vars)
+ if os.path.exists(name):
+ with open(name, 'rb') as fp:
+ existing = fp.read()
+ if existing == content:
+ print 'No changes to %s' % name
+ continue
+ print 'Overwriting %s' % name
+ else:
+ print 'Writing %s' % name
+ with open(name, 'wb') as fp:
+ fp.write(content)
+
+
+if __name__ == '__main__':
+ main()
View
1,434 apppkg/paste_httpserver.py
@@ -0,0 +1,1434 @@
+# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
+# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
+# (c) 2005 Clark C. Evans
+# This module is part of the Python Paste Project and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+# This code was written with funding by http://prometheusresearch.com
+"""
+WSGI HTTP Server
+
+This is a minimalistic WSGI server using Python's built-in BaseHTTPServer;
+if pyOpenSSL is installed, it also provides SSL capabilities.
+"""
+
+# @@: add in protection against HTTP/1.0 clients who claim to
+# be 1.1 but do not send a Content-Length
+
+# @@: add support for chunked encoding, this is not a 1.1 server
+# till this is completed.
+
+import atexit
+import traceback
+import socket, sys, threading, urlparse, Queue, urllib
+import posixpath
+import time
+import thread
+import os
+from itertools import count
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+from SocketServer import ThreadingMixIn
+#from paste.util import converters
+import logging
+try:
+ from paste.util import killthread
+except ImportError:
+ # Not available, probably no ctypes
+ killthread = None
+
+__all__ = ['WSGIHandlerMixin', 'WSGIServer', 'WSGIHandler', 'serve']
+__version__ = "0.5"
+
+def asbool(obj):
+ if isinstance(obj, (str, unicode)):
+ obj = obj.strip().lower()
+ if obj in ['true', 'yes', 'on', 'y', 't', '1']:
+ return True
+ elif obj in ['false', 'no', 'off', 'n', 'f', '0']:
+ return False
+ else:
+ raise ValueError(
+ "String is not true/false: %r" % obj)
+ return bool(obj)
+
+def aslist(obj, sep=None, strip=True):
+ if isinstance(obj, (str, unicode)):
+ lst = obj.split(sep)
+ if strip:
+ lst = [v.strip() for v in lst]
+ return lst
+ elif isinstance(obj, (list, tuple)):
+ return obj
+ elif obj is None:
+ return []
+ else:
+ return [obj]
+
+class ContinueHook(object):
+ """
+ When a client request includes a 'Expect: 100-continue' header, then
+ it is the responsibility of the server to send 100 Continue when it
+ is ready for the content body. This allows authentication, access
+ levels, and other exceptions to be detected *before* bandwith is
+ spent on the request body.
+
+ This is a rfile wrapper that implements this functionality by
+ sending 100 Continue to the client immediately after the user
+ requests the content via a read() operation on the rfile stream.
+ After this response is sent, it becomes a pass-through object.
+ """
+
+ def __init__(self, rfile, write):
+ self._ContinueFile_rfile = rfile
+ self._ContinueFile_write = write
+ for attr in ('close', 'closed', 'fileno', 'flush',
+ 'mode', 'bufsize', 'softspace'):
+ if hasattr(rfile, attr):
+ setattr(self, attr, getattr(rfile, attr))
+ for attr in ('read', 'readline', 'readlines'):
+ if hasattr(rfile, attr):
+ setattr(self, attr, getattr(self, '_ContinueFile_' + attr))
+
+ def _ContinueFile_send(self):
+ self._ContinueFile_write("HTTP/1.1 100 Continue\r\n\r\n")
+ rfile = self._ContinueFile_rfile
+ for attr in ('read', 'readline', 'readlines'):
+ if hasattr(rfile, attr):
+ setattr(self, attr, getattr(rfile, attr))
+
+ def _ContinueFile_read(self, size=-1):
+ self._ContinueFile_send()
+ return self._ContinueFile_rfile.read(size)
+
+ def _ContinueFile_readline(self, size=-1):
+ self._ContinueFile_send()
+ return self._ContinueFile_rfile.readline(size)
+
+ def _ContinueFile_readlines(self, sizehint=0):
+ self._ContinueFile_send()
+ return self._ContinueFile_rfile.readlines(sizehint)
+
+class WSGIHandlerMixin:
+ """
+ WSGI mix-in for HTTPRequestHandler
+
+ This class is a mix-in to provide WSGI functionality to any
+ HTTPRequestHandler derivative (as provided in Python's BaseHTTPServer).
+ This assumes a ``wsgi_application`` handler on ``self.server``.
+ """
+ lookup_addresses = True
+
+ def log_request(self, *args, **kwargs):
+ """ disable success request logging
+
+ Logging transactions should not be part of a WSGI server,
+ if you want logging; look at paste.translogger
+ """
+ pass
+
+ def log_message(self, *args, **kwargs):
+ """ disable error message logging
+
+ Logging transactions should not be part of a WSGI server,
+ if you want logging; look at paste.translogger
+ """
+ pass
+
+ def version_string(self):
+ """ behavior that BaseHTTPServer should have had """
+ if not self.sys_version:
+ return self.server_version
+ else:
+ return self.server_version + ' ' + self.sys_version
+
+ def wsgi_write_chunk(self, chunk):
+ """
+ Write a chunk of the output stream; send headers if they
+ have not already been sent.
+ """
+ if not self.wsgi_headers_sent and not self.wsgi_curr_headers:
+ raise RuntimeError(
+ "Content returned before start_response called")
+ if not self.wsgi_headers_sent:
+ self.wsgi_headers_sent = True
+ (status, headers) = self.wsgi_curr_headers
+ code, message = status.split(" ", 1)
+ self.send_response(int(code), message)
+ #
+ # HTTP/1.1 compliance; either send Content-Length or
+ # signal that the connection is being closed.
+ #
+ send_close = True
+ for (k, v) in headers:
+ lk = k.lower()
+ if 'content-length' == lk:
+ send_close = False
+ if 'connection' == lk:
+ if 'close' == v.lower():
+ self.close_connection = 1
+ send_close = False
+ self.send_header(k, v)
+ if send_close:
+ self.close_connection = 1
+ self.send_header('Connection', 'close')
+
+ self.end_headers()
+ self.wfile.write(chunk)
+
+ def wsgi_start_response(self, status, response_headers, exc_info=None):
+ if exc_info:
+ try:
+ if self.wsgi_headers_sent:
+ raise exc_info[0], exc_info[1], exc_info[2]
+ else:
+ # In this case, we're going to assume that the
+ # higher-level code is currently handling the
+ # issue and returning a resonable response.
+ # self.log_error(repr(exc_info))
+ pass
+ finally:
+ exc_info = None
+ elif self.wsgi_curr_headers:
+ assert 0, "Attempt to set headers a second time w/o an exc_info"
+ self.wsgi_curr_headers = (status, response_headers)
+ return self.wsgi_write_chunk
+
+ def wsgi_setup(self, environ=None):
+ """
+ Setup the member variables used by this WSGI mixin, including
+ the ``environ`` and status member variables.
+
+ After the basic environment is created; the optional ``environ``
+ argument can be used to override any settings.
+ """
+
+ (scheme, netloc, path, query, fragment) = urlparse.urlsplit(self.path)
+ path = urllib.unquote(path)
+ endslash = path.endswith('/')
+ path = posixpath.normpath(path)
+ if endslash and path != '/':
+ # Put the slash back...
+ path += '/'
+ (server_name, server_port) = self.server.server_address[:2]
+
+ rfile = self.rfile
+ # We can put in the protection to keep from over-reading the
+ # file
+ try:
+ content_length = int(self.headers.get('Content-Length', '0'))
+ except ValueError:
+ content_length = 0
+ if '100-continue' == self.headers.get('Expect','').lower():
+ rfile = LimitedLengthFile(ContinueHook(rfile, self.wfile.write), content_length)
+ else:
+ if not hasattr(self.connection, 'get_context'):
+ # @@: LimitedLengthFile is currently broken in connection
+ # with SSL (sporatic errors that are diffcult to trace, but
+ # ones that go away when you don't use LimitedLengthFile)
+ rfile = LimitedLengthFile(rfile, content_length)
+
+ remote_address = self.client_address[0]
+ self.wsgi_environ = {
+ 'wsgi.version': (1,0)
+ ,'wsgi.url_scheme': 'http'
+ ,'wsgi.input': rfile
+ ,'wsgi.errors': sys.stderr
+ ,'wsgi.multithread': True
+ ,'wsgi.multiprocess': False
+ ,'wsgi.run_once': False
+ # CGI variables required by PEP-333
+ ,'REQUEST_METHOD': self.command
+ ,'SCRIPT_NAME': '' # application is root of server
+ ,'PATH_INFO': path
+ ,'QUERY_STRING': query
+ ,'CONTENT_TYPE': self.headers.get('Content-Type', '')
+ ,'CONTENT_LENGTH': self.headers.get('Content-Length', '0')
+ ,'SERVER_NAME': server_name
+ ,'SERVER_PORT': str(server_port)
+ ,'SERVER_PROTOCOL': self.request_version
+ # CGI not required by PEP-333
+ ,'REMOTE_ADDR': remote_address
+ }
+ if scheme:
+ self.wsgi_environ['paste.httpserver.proxy.scheme'] = scheme
+ if netloc:
+ self.wsgi_environ['paste.httpserver.proxy.host'] = netloc
+
+ if self.lookup_addresses:
+ # @@: make lookup_addreses actually work, at this point
+ # it has been address_string() is overriden down in
+ # file and hence is a noop
+ if remote_address.startswith("192.168.") \
+ or remote_address.startswith("10.") \
+ or remote_address.startswith("172.16."):
+ pass
+ else:
+ address_string = None # self.address_string()
+ if address_string:
+ self.wsgi_environ['REMOTE_HOST'] = address_string
+
+ if hasattr(self.server, 'thread_pool'):
+ # Now that we know what the request was for, we should
+ # tell the thread pool what its worker is working on
+ self.server.thread_pool.worker_tracker[thread.get_ident()][1] = self.wsgi_environ
+ self.wsgi_environ['paste.httpserver.thread_pool'] = self.server.thread_pool
+
+ for k, v in self.headers.items():
+ key = 'HTTP_' + k.replace("-","_").upper()
+ if key in ('HTTP_CONTENT_TYPE','HTTP_CONTENT_LENGTH'):
+ continue
+ self.wsgi_environ[key] = ','.join(self.headers.getheaders(k))
+
+ if hasattr(self.connection,'get_context'):
+ self.wsgi_environ['wsgi.url_scheme'] = 'https'
+ # @@: extract other SSL parameters from pyOpenSSL at...
+ # http://www.modssl.org/docs/2.8/ssl_reference.html#ToC25
+
+ if environ:
+ assert isinstance(environ, dict)
+ self.wsgi_environ.update(environ)
+ if 'on' == environ.get('HTTPS'):
+ self.wsgi_environ['wsgi.url_scheme'] = 'https'
+
+ self.wsgi_curr_headers = None
+ self.wsgi_headers_sent = False
+
+ def wsgi_connection_drop(self, exce, environ=None):
+ """
+ Override this if you're interested in socket exceptions, such
+ as when the user clicks 'Cancel' during a file download.
+ """
+ pass
+
+ def wsgi_execute(self, environ=None):
+ """
+ Invoke the server's ``wsgi_application``.
+ """
+
+ self.wsgi_setup(environ)
+
+ try:
+ result = self.server.wsgi_application(self.wsgi_environ,
+ self.wsgi_start_response)
+ try:
+ for chunk in result:
+ self.wsgi_write_chunk(chunk)
+ if not self.wsgi_headers_sent:
+ self.wsgi_write_chunk('')
+ finally:
+ if hasattr(result,'close'):
+ result.close()
+ result = None
+ except socket.error, exce:
+ self.wsgi_connection_drop(exce, environ)
+ return
+ except:
+ if not self.wsgi_headers_sent:
+ error_msg = "Internal Server Error\n"
+ self.wsgi_curr_headers = (
+ '500 Internal Server Error',
+ [('Content-type', 'text/plain'),
+ ('Content-length', str(len(error_msg)))])
+ self.wsgi_write_chunk("Internal Server Error\n")
+ raise
+
+#
+# SSL Functionality
+#
+# This implementation was motivated by Sebastien Martini's SSL example
+# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/442473
+#
+try:
+ from OpenSSL import SSL, tsafe
+ SocketErrors = (socket.error, SSL.ZeroReturnError, SSL.SysCallError)
+except ImportError:
+ # Do not require pyOpenSSL to be installed, but disable SSL
+ # functionality in that case.
+ SSL = None
+ SocketErrors = (socket.error,)
+ class SecureHTTPServer(HTTPServer):
+ def __init__(self, server_address, RequestHandlerClass,
+ ssl_context=None, request_queue_size=None):
+ assert not ssl_context, "pyOpenSSL not installed"
+ HTTPServer.__init__(self, server_address, RequestHandlerClass)
+ if request_queue_size:
+ self.socket.listen(request_queue_size)
+else:
+
+ class _ConnFixer(object):
+ """ wraps a socket connection so it implements makefile """
+ def __init__(self, conn):
+ self.__conn = conn
+ def makefile(self, mode, bufsize):
+ return socket._fileobject(self.__conn, mode, bufsize)
+ def __getattr__(self, attrib):
+ return getattr(self.__conn, attrib)
+
+ class SecureHTTPServer(HTTPServer):
+ """
+ Provides SSL server functionality on top of the BaseHTTPServer
+ by overriding _private_ members of Python's standard
+ distribution. The interface for this instance only changes by
+ adding a an optional ssl_context attribute to the constructor:
+
+ cntx = SSL.Context(SSL.SSLv23_METHOD)
+ cntx.use_privatekey_file("host.pem")
+ cntx.use_certificate_file("host.pem")
+
+ """
+
+ def __init__(self, server_address, RequestHandlerClass,
+ ssl_context=None, request_queue_size=None):
+ # This overrides the implementation of __init__ in python's
+ # SocketServer.TCPServer (which BaseHTTPServer.HTTPServer
+ # does not override, thankfully).
+ HTTPServer.__init__(self, server_address, RequestHandlerClass)
+ self.socket = socket.socket(self.address_family,
+ self.socket_type)
+ self.ssl_context = ssl_context
+ if ssl_context:
+ class TSafeConnection(tsafe.Connection):
+ def settimeout(self, *args):
+ self._lock.acquire()
+ try:
+ return self._ssl_conn.settimeout(*args)
+ finally:
+ self._lock.release()
+ def gettimeout(self):
+ self._lock.acquire()
+ try:
+ return self._ssl_conn.gettimeout()
+ finally:
+ self._lock.release()
+ self.socket = TSafeConnection(ssl_context, self.socket)
+ self.server_bind()
+ if request_queue_size:
+ self.socket.listen(request_queue_size)
+ self.server_activate()
+
+ def get_request(self):
+ # The default SSL request object does not seem to have a
+ # ``makefile(mode, bufsize)`` method as expected by
+ # Socketserver.StreamRequestHandler.
+ (conn, info) = self.socket.accept()
+ if self.ssl_context:
+ conn = _ConnFixer(conn)
+ return (conn, info)
+
+ def _auto_ssl_context():
+ import OpenSSL, time, random
+ pkey = OpenSSL.crypto.PKey()
+ pkey.generate_key(OpenSSL.crypto.TYPE_RSA, 768)
+
+ cert = OpenSSL.crypto.X509()
+
+ cert.set_serial_number(random.randint(0, sys.maxint))
+ cert.gmtime_adj_notBefore(0)
+ cert.gmtime_adj_notAfter(60 * 60 * 24 * 365)
+ cert.get_subject().CN = '*'
+ cert.get_subject().O = 'Dummy Certificate'
+ cert.get_issuer().CN = 'Untrusted Authority'
+ cert.get_issuer().O = 'Self-Signed'
+ cert.set_pubkey(pkey)
+ cert.sign(pkey, 'md5')
+
+ ctx = SSL.Context(SSL.SSLv23_METHOD)
+ ctx.use_privatekey(pkey)
+ ctx.use_certificate(cert)
+
+ return ctx
+
+class WSGIHandler(WSGIHandlerMixin, BaseHTTPRequestHandler):
+ """
+ A WSGI handler that overrides POST, GET and HEAD to delegate
+ requests to the server's ``wsgi_application``.
+ """
+ server_version = 'PasteWSGIServer/' + __version__
+
+ def handle_one_request(self):
+ """Handle a single HTTP request.
+
+ You normally don't need to override this method; see the class
+ __doc__ string for information on how to handle specific HTTP
+ commands such as GET and POST.
+
+ """
+ self.raw_requestline = self.rfile.readline()
+ if not self.raw_requestline:
+ self.close_connection = 1
+ return
+ if not self.parse_request(): # An error code has been sent, just exit
+ return
+ self.wsgi_execute()
+
+ def handle(self):
+ # don't bother logging disconnects while handling a request
+ try:
+ BaseHTTPRequestHandler.handle(self)
+ except SocketErrors, exce:
+ self.wsgi_connection_drop(exce)
+
+ def address_string(self):
+ """Return the client address formatted for logging.
+
+ This is overridden so that no hostname lookup is done.
+ """
+ return ''
+
+class LimitedLengthFile(object):
+ def __init__(self, file, length):
+ self.file = file
+ self.length = length
+ self._consumed = 0
+ if hasattr(self.file, 'seek'):
+ self.seek = self._seek
+
+ def __repr__(self):
+ base_repr = repr(self.file)
+ return base_repr[:-1] + ' length=%s>' % self.length
+
+ def read(self, length=None):
+ left = self.length - self._consumed
+ if length is None:
+ length = left
+ else:
+ length = min(length, left)
+ # next two lines are hnecessary only if read(0) blocks
+ if not left:
+ return ''
+ data = self.file.read(length)
+ self._consumed += len(data)
+ return data
+
+ def readline(self, *args):
+ max_read = self.length - self._consumed
+ if len(args):
+ max_read = min(args[0], max_read)
+ data = self.file.readline(max_read)
+ self._consumed += len(data)
+ return data
+
+ def readlines(self, hint=None):
+ data = self.file.readlines(hint)
+ for chunk in data:
+ self._consumed += len(chunk)
+ return data
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if self.length - self._consumed <= 0:
+ raise StopIteration
+ return self.readline()
+
+ ## Optional methods ##
+
+ def _seek(self, place):
+ self.file.seek(place)
+ self._consumed = place
+
+ def tell(self):
+ if hasattr(self.file, 'tell'):
+ return self.file.tell()
+ else:
+ return self._consumed
+
+class ThreadPool(object):
+ """
+ Generic thread pool with a queue of callables to consume.
+
+ Keeps a notion of the status of its worker threads:
+
+ idle: worker thread with nothing to do
+
+ busy: worker thread doing its job
+
+ hung: worker thread that's been doing a job for too long
+
+ dying: a hung thread that has been killed, but hasn't died quite
+ yet.
+
+ zombie: what was a worker thread that we've tried to kill but
+ isn't dead yet.
+
+ At any time you can call track_threads, to get a dictionary with
+ these keys and lists of thread_ids that fall in that status. All
+ keys will be present, even if they point to emty lists.
+
+ hung threads are threads that have been busy more than
+ hung_thread_limit seconds. Hung threads are killed when they live
+ longer than kill_thread_limit seconds. A thread is then
+ considered dying for dying_limit seconds, if it is still alive
+ after that it is considered a zombie.
+
+ When there are no idle workers and a request comes in, another
+ worker *may* be spawned. If there are less than spawn_if_under
+ threads in the busy state, another thread will be spawned. So if
+ the limit is 5, and there are 4 hung threads and 6 busy threads,
+ no thread will be spawned.
+
+ When there are more than max_zombie_threads_before_die zombie
+ threads, a SystemExit exception will be raised, stopping the
+ server. Use 0 or None to never raise this exception. Zombie
+ threads *should* get cleaned up, but killing threads is no
+ necessarily reliable. This is turned off by default, since it is
+ only a good idea if you've deployed the server with some process
+ watching from above (something similar to daemontools or zdaemon).
+
+ Each worker thread only processes ``max_requests`` tasks before it
+ dies and replaces itself with a new worker thread.
+ """
+
+
+ SHUTDOWN = object()
+
+ def __init__(
+ self, nworkers, name="ThreadPool", daemon=False,
+ max_requests=100, # threads are killed after this many requests
+ hung_thread_limit=30, # when a thread is marked "hung"
+ kill_thread_limit=1800, # when you kill that hung thread
+ dying_limit=300, # seconds that a kill should take to go into effect (longer than this and the thread is a "zombie")
+ spawn_if_under=5, # spawn if there's too many hung threads
+ max_zombie_threads_before_die=0, # when to give up on the process
+ hung_check_period=100, # every 100 requests check for hung workers
+ logger=None, # Place to log messages to
+ error_email=None, # Person(s) to notify if serious problem occurs
+ ):
+ """
+ Create thread pool with `nworkers` worker threads.
+ """
+ self.nworkers = nworkers
+ self.max_requests = max_requests
+ self.name = name
+ self.queue = Queue.Queue()
+ self.workers = []
+ self.daemon = daemon
+ if logger is None:
+ logger = logging.getLogger('paste.httpserver.ThreadPool')
+ if isinstance(logger, basestring):
+ logger = logging.getLogger(logger)
+ self.logger = logger
+ self.error_email = error_email
+ self._worker_count = count()
+
+ assert (not kill_thread_limit
+ or kill_thread_limit >= hung_thread_limit), (
+ "kill_thread_limit (%s) should be higher than hung_thread_limit (%s)"
+ % (kill_thread_limit, hung_thread_limit))
+ if not killthread:
+ kill_thread_limit = 0
+ self.logger.info(
+ "Cannot use kill_thread_limit as ctypes/killthread is not available")
+ self.kill_thread_limit = kill_thread_limit
+ self.dying_limit = dying_limit
+ self.hung_thread_limit = hung_thread_limit
+ assert spawn_if_under <= nworkers, (
+ "spawn_if_under (%s) should be less than nworkers (%s)"
+ % (spawn_if_under, nworkers))
+ self.spawn_if_under = spawn_if_under
+ self.max_zombie_threads_before_die = max_zombie_threads_before_die
+ self.hung_check_period = hung_check_period
+ self.requests_since_last_hung_check = 0
+ # Used to keep track of what worker is doing what:
+ self.worker_tracker = {}
+ # Used to keep track of the workers not doing anything:
+ self.idle_workers = []
+ # Used to keep track of threads that have been killed, but maybe aren't dead yet:
+ self.dying_threads = {}
+ # This is used to track when we last had to add idle workers;
+ # we shouldn't cull extra workers until some time has passed
+ # (hung_thread_limit) since workers were added:
+ self._last_added_new_idle_workers = 0
+ if not daemon:
+ atexit.register(self.shutdown)
+ for i in range(self.nworkers):
+ self.add_worker_thread(message='Initial worker pool')
+
+ def add_task(self, task):
+ """
+ Add a task to the queue
+ """
+ self.logger.debug('Added task (%i tasks queued)', self.queue.qsize())
+ if self.hung_check_period:
+ self.requests_since_last_hung_check += 1
+ if self.requests_since_last_hung_check > self.hung_check_period:
+ self.requests_since_last_hung_check = 0
+ self.kill_hung_threads()
+ if not self.idle_workers and self.spawn_if_under:
+ # spawn_if_under can come into effect...
+ busy = 0
+ now = time.time()
+ self.logger.debug('No idle workers for task; checking if we need to make more workers')
+ for worker in self.workers:
+ if not hasattr(worker, 'thread_id'):
+ # Not initialized
+ continue
+ time_started, info = self.worker_tracker.get(worker.thread_id,
+ (None, None))
+ if time_started is not None:
+ if now - time_started < self.hung_thread_limit:
+ busy += 1
+ if busy < self.spawn_if_under:
+ self.logger.info(
+ 'No idle tasks, and only %s busy tasks; adding %s more '
+ 'workers', busy, self.spawn_if_under-busy)
+ self._last_added_new_idle_workers = time.time()
+ for i in range(self.spawn_if_under - busy):
+ self.add_worker_thread(message='Response to lack of idle workers')
+ else:
+ self.logger.debug(
+ 'No extra workers needed (%s busy workers)',
+ busy)
+ if (len(self.workers) > self.nworkers
+ and len(self.idle_workers) > 3
+ and time.time()-self._last_added_new_idle_workers > self.hung_thread_limit):
+ # We've spawned worers in the past, but they aren't needed
+ # anymore; kill off some
+ self.logger.info(
+ 'Culling %s extra workers (%s idle workers present)',
+ len(self.workers)-self.nworkers, len(self.idle_workers))
+ self.logger.debug(
+ 'Idle workers: %s', self.idle_workers)
+ for i in range(len(self.workers) - self.nworkers):
+ self.queue.put(self.SHUTDOWN)
+ self.queue.put(task)
+
+ def track_threads(self):
+ """
+ Return a dict summarizing the threads in the pool (as
+ described in the ThreadPool docstring).
+ """
+ result = dict(idle=[], busy=[], hung=[], dying=[], zombie=[])
+ now = time.time()
+ for worker in self.workers:
+ if not hasattr(worker, 'thread_id'):
+ # The worker hasn't fully started up, we should just
+ # ignore it
+ continue
+ time_started, info = self.worker_tracker.get(worker.thread_id,
+ (None, None))
+ if time_started is not None:
+ if now - time_started > self.hung_thread_limit:
+ result['hung'].append(worker)
+ else:
+ result['busy'].append(worker)
+ else:
+ result['idle'].append(worker)
+ for thread_id, (time_killed, worker) in self.dying_threads.items():
+ if not self.thread_exists(thread_id):
+ # Cull dying threads that are actually dead and gone
+ self.logger.info('Killed thread %s no longer around',
+ thread_id)
+ try:
+ del self.dying_threads[thread_id]
+ except KeyError:
+ pass
+ continue
+ if now - time_killed > self.dying_limit:
+ result['zombie'].append(worker)
+ else:
+ result['dying'].append(worker)
+ return result
+
+ def kill_worker(self, thread_id):
+ """
+ Removes the worker with the given thread_id from the pool, and
+ replaces it with a new worker thread.
+
+ This should only be done for mis-behaving workers.
+ """
+ if killthread is None:
+ raise RuntimeError(
+ "Cannot kill worker; killthread/ctypes not available")
+ thread_obj = threading._active.get(thread_id)
+ killthread.async_raise(thread_id, SystemExit)
+ try:
+ del self.worker_tracker[thread_id]
+ except KeyError:
+ pass
+ self.logger.info('Killing thread %s', thread_id)
+ if thread_obj in self.workers:
+ self.workers.remove(thread_obj)
+ self.dying_threads[thread_id] = (time.time(), thread_obj)
+ self.add_worker_thread(message='Replacement for killed thread %s' % thread_id)
+
+ def thread_exists(self, thread_id):
+ """
+ Returns true if a thread with this id is still running
+ """
+ return thread_id in threading._active
+
+ def add_worker_thread(self, *args, **kwargs):
+ index = self._worker_count.next()
+ worker = threading.Thread(target=self.worker_thread_callback,
+ args=args, kwargs=kwargs,
+ name=("worker %d" % index))
+ worker.setDaemon(self.daemon)
+ worker.start()
+
+ def kill_hung_threads(self):
+ """
+ Tries to kill any hung threads
+ """
+ if not self.kill_thread_limit:
+ # No killing should occur
+ return
+ now = time.time()
+ max_time = 0
+ total_time = 0
+ idle_workers = 0
+ starting_workers = 0
+ working_workers = 0
+ killed_workers = 0
+ for worker in self.workers:
+ if not hasattr(worker, 'thread_id'):
+ # Not setup yet
+ starting_workers += 1
+ continue
+ time_started, info = self.worker_tracker.get(worker.thread_id,
+ (None, None))
+ if time_started is None:
+ # Must be idle
+ idle_workers += 1
+ continue
+ working_workers += 1
+ max_time = max(max_time, now-time_started)
+ total_time += now-time_started
+ if now - time_started > self.kill_thread_limit:
+ self.logger.warning(
+ 'Thread %s hung (working on task for %i seconds)',
+ worker.thread_id, now - time_started)
+ try:
+ import pprint
+ info_desc = pprint.pformat(info)
+ except:
+ out = StringIO()
+ traceback.print_exc(file=out)
+ info_desc = 'Error:\n%s' % out.getvalue()
+ self.notify_problem(
+ "Killing worker thread (id=%(thread_id)s) because it has been \n"
+ "working on task for %(time)s seconds (limit is %(limit)s)\n"
+ "Info on task:\n"
+ "%(info)s"
+ % dict(thread_id=worker.thread_id,
+ time=now - time_started,
+ limit=self.kill_thread_limit,
+ info=info_desc))
+ self.kill_worker(worker.thread_id)
+ killed_workers += 1
+ if working_workers:
+ ave_time = float(total_time) / working_workers
+ ave_time = '%.2fsec' % ave_time
+ else:
+ ave_time = 'N/A'
+ self.logger.info(
+ "kill_hung_threads status: %s threads (%s working, %s idle, %s starting) "
+ "ave time %s, max time %.2fsec, killed %s workers"
+ % (idle_workers + starting_workers + working_workers,
+ working_workers, idle_workers, starting_workers,
+ ave_time, max_time, killed_workers))
+ self.check_max_zombies()
+
+ def check_max_zombies(self):
+ """
+ Check if we've reached max_zombie_threads_before_die; if so
+ then kill the entire process.
+ """
+ if not self.max_zombie_threads_before_die:
+ return
+ found = []
+ now = time.time()
+ for thread_id, (time_killed, worker) in self.dying_threads.items():
+ if not self.thread_exists(thread_id):
+ # Cull dying threads that are actually dead and gone
+ try:
+ del self.dying_threads[thread_id]
+ except KeyError:
+ pass
+ continue
+ if now - time_killed > self.dying_limit:
+ found.append(thread_id)
+ if found:
+ self.logger.info('Found %s zombie threads', found)
+ if len(found) > self.max_zombie_threads_before_die:
+ self.logger.fatal(
+ 'Exiting process because %s zombie threads is more than %s limit',
+ len(found), self.max_zombie_threads_before_die)
+ self.notify_problem(
+ "Exiting process because %(found)s zombie threads "
+ "(more than limit of %(limit)s)\n"
+ "Bad threads (ids):\n"
+ " %(ids)s\n"
+ % dict(found=len(found),
+ limit=self.max_zombie_threads_before_die,
+ ids="\n ".join(map(str, found))),
+ subject="Process restart (too many zombie threads)")
+ self.shutdown(10)
+ print 'Shutting down', threading.currentThread()
+ raise ServerExit(3)
+
+ def worker_thread_callback(self, message=None):
+ """
+ Worker thread should call this method to get and process queued
+ callables.
+ """
+ thread_obj = threading.currentThread()
+ thread_id = thread_obj.thread_id = thread.get_ident()
+ self.workers.append(thread_obj)
+ self.idle_workers.append(thread_id)
+ requests_processed = 0
+ add_replacement_worker = False
+ self.logger.debug('Started new worker %s: %s', thread_id, message)
+ try:
+ while True:
+ if self.max_requests and self.max_requests < requests_processed:
+ # Replace this thread then die
+ self.logger.debug('Thread %s processed %i requests (limit %s); stopping thread'
+ % (thread_id, requests_processed, self.max_requests))
+ add_replacement_worker = True
+ break
+ runnable = self.queue.get()
+ if runnable is ThreadPool.SHUTDOWN:
+ self.logger.debug('Worker %s asked to SHUTDOWN', thread_id)
+ break
+ try:
+ self.idle_workers.remove(thread_id)
+ except ValueError:
+ pass
+ self.worker_tracker[thread_id] = [time.time(), None]
+ requests_processed += 1
+ try:
+ try:
+ runnable()
+ except:
+ # We are later going to call sys.exc_clear(),
+ # removing all remnants of any exception, so
+ # we should log it now. But ideally no
+ # exception should reach this level
+ print >> sys.stderr, (
+ 'Unexpected exception in worker %r' % runnable)
+ traceback.print_exc()
+ if thread_id in self.dying_threads:
+ # That last exception was intended to kill me
+ break
+ finally:
+ try:
+ del self.worker_tracker[thread_id]
+ except KeyError:
+ pass
+ sys.exc_clear()
+ self.idle_workers.append(thread_id)
+ finally:
+ try:
+ del self.worker_tracker[thread_id]
+ except KeyError:
+ pass
+ try:
+ self.idle_workers.remove(thread_id)
+ except ValueError:
+ pass
+ try:
+ self.workers.remove(thread_obj)
+ except ValueError:
+ pass
+ try:
+ del self.dying_threads[thread_id]
+ except KeyError:
+ pass
+ if add_replacement_worker:
+ self.add_worker_thread(message='Voluntary replacement for thread %s' % thread_id)
+
+ def shutdown(self, force_quit_timeout=0):
+ """
+ Shutdown the queue (after finishing any pending requests).
+ """
+ self.logger.info('Shutting down threadpool')
+ # Add a shutdown request for every worker
+ for i in range(len(self.workers)):
+ self.queue.put(ThreadPool.SHUTDOWN)
+ # Wait for each thread to terminate
+ hung_workers = []
+ for worker in self.workers:
+ worker.join(0.5)
+ if worker.isAlive():
+ hung_workers.append(worker)
+ zombies = []
+ for thread_id in self.dying_threads:
+ if self.thread_exists(thread_id):
+ zombies.append(thread_id)
+ if hung_workers or zombies:
+ self.logger.info("%s workers didn't stop properly, and %s zombies",
+ len(hung_workers), len(zombies))
+ if hung_workers:
+ for worker in hung_workers:
+ self.kill_worker(worker.thread_id)
+ self.logger.info('Workers killed forcefully')
+ if force_quit_timeout:
+ hung = []
+ timed_out = False
+ need_force_quit = bool(zombies)
+ for workers in self.workers:
+ if not timed_out and worker.isAlive():
+ timed_out = True
+ worker.join(force_quit_timeout)
+ if worker.isAlive():
+ print "Worker %s won't die" % worker
+ need_force_quit = True
+ if need_force_quit:
+ import atexit
+ # Remove the threading atexit callback
+ for callback in list(atexit._exithandlers):
+ func = getattr(callback[0], 'im_func', None)
+ if not func:
+ continue
+ globs = getattr(func, 'func_globals', {})
+ mod = globs.get('__name__')
+ if mod == 'threading':
+ atexit._exithandlers.remove(callback)
+ atexit._run_exitfuncs()
+ print 'Forcefully exiting process'
+ os._exit(3)
+ else:
+ self.logger.info('All workers eventually killed')
+ else:
+ self.logger.info('All workers stopped')
+
+ def notify_problem(self, msg, subject=None, spawn_thread=True):
+ """
+ Called when there's a substantial problem. msg contains the
+ body of the notification, subject the summary.
+
+ If spawn_thread is true, then the email will be send in
+ another thread (so this doesn't block).
+ """
+ if not self.error_email:
+ return
+ if spawn_thread:
+ t = threading.Thread(
+ target=self.notify_problem,
+ args=(msg, subject, False))
+ t.start()
+ return
+ from_address = 'errors@localhost'
+ if not subject:
+ subject = msg.strip().splitlines()[0]
+ subject = subject[:50]
+ subject = '[http threadpool] %s' % subject
+ headers = [
+ "To: %s" % self.error_email,
+ "From: %s" % from_address,
+ "Subject: %s" % subject,
+ ]
+ try:
+ system = ' '.join(os.uname())
+ except:
+ system = '(unknown)'
+ body = (
+ "An error has occurred in the paste.httpserver.ThreadPool\n"
+ "Error:\n"
+ " %(msg)s\n"
+ "Occurred at: %(time)s\n"
+ "PID: %(pid)s\n"
+ "System: %(system)s\n"
+ "Server .py file: %(file)s\n"
+ % dict(msg=msg,
+ time=time.strftime("%c"),
+ pid=os.getpid(),
+ system=system,
+ file=os.path.abspath(__file__),
+ ))
+ message = '\n'.join(headers) + "\n\n" + body
+ import smtplib
+ server = smtplib.SMTP('localhost')
+ error_emails = [
+ e.strip() for e in self.error_email.split(",")
+ if e.strip()]
+ server.sendmail(from_address, error_emails, message)
+ server.quit()
+ print 'email sent to', error_emails, message
+
+class ThreadPoolMixIn(object):
+ """
+ Mix-in class to process requests from a thread pool
+ """
+ def __init__(self, nworkers, daemon=False, **threadpool_options):
+ # Create and start the workers
+ self.running = True
+ assert nworkers > 0, "ThreadPoolMixIn servers must have at least one worker"
+ self.thread_pool = ThreadPool(
+ nworkers,
+ "ThreadPoolMixIn HTTP server on %s:%d"
+ % (self.server_name, self.server_port),
+ daemon,
+ **threadpool_options)
+
+ def process_request(self, request, client_address):
+ """
+ Queue the request to be processed by on of the thread pool threads
+ """
+ # This sets the socket to blocking mode (and no timeout) since it
+ # may take the thread pool a little while to get back to it. (This
+ # is the default but since we set a timeout on the parent socket so
+ # that we can trap interrupts we need to restore this,.)
+ request.setblocking(1)
+ # Queue processing of the request
+ self.thread_pool.add_task(
+ lambda: self.process_request_in_thread(request, client_address))
+
+ def handle_error(self, request, client_address):
+ exc_class, exc, tb = sys.exc_info()
+ if exc_class is ServerExit:
+ # This is actually a request to stop the server
+ raise
+ return super(ThreadPoolMixIn, self).handle_error(request, client_address)
+
+ def process_request_in_thread(self, request, client_address):
+ """
+ The worker thread should call back here to do the rest of the
+ request processing. Error handling normaller done in 'handle_request'
+ must be done here.
+ """
+ try:
+ self.finish_request(request, client_address)
+ self.close_request(request)
+ except:
+ self.handle_error(request, client_address)
+ self.close_request(request)
+ exc = sys.exc_info()[1]
+ if isinstance(exc, (MemoryError, KeyboardInterrupt)):
+ raise
+
+ def serve_forever(self):
+ """
+ Overrides `serve_forever` to shut the threadpool down cleanly.
+ """
+ try:
+ while self.running:
+ try:
+ self.handle_request()
+ except socket.timeout:
+ # Timeout is expected, gives interrupts a chance to
+ # propogate, just keep handling
+ pass
+ finally:
+ self.thread_pool.shutdown()
+
+ def server_activate(self):
+ """
+ Overrides server_activate to set timeout on our listener socket.
+ """
+ # We set the timeout here so that we can trap interrupts on windows
+ self.socket.settimeout(1)
+
+ def server_close(self):
+ """
+ Finish pending requests and shutdown the server.
+ """
+ self.running = False
+ self.socket.close()
+ self.thread_pool.shutdown(60)
+
+class WSGIServerBase(SecureHTTPServer):
+ def __init__(self, wsgi_application, server_address,
+ RequestHandlerClass=None, ssl_context=None,
+ request_queue_size=None):
+ SecureHTTPServer.__init__(self, server_address,
+ RequestHandlerClass, ssl_context,
+ request_queue_size=request_queue_size)
+ self.wsgi_application = wsgi_application
+ self.wsgi_socket_timeout = None
+
+ def get_request(self):
+ # If there is a socket_timeout, set it on the accepted
+ (conn,info) = SecureHTTPServer.get_request(self)
+ if self.wsgi_socket_timeout:
+ conn.settimeout(self.wsgi_socket_timeout)
+ return (conn, info)
+
+class WSGIServer(ThreadingMixIn, WSGIServerBase):
+ daemon_threads = False
+
+class WSGIThreadPoolServer(ThreadPoolMixIn, WSGIServerBase):
+ def __init__(self, wsgi_application, server_address,
+ RequestHandlerClass=None, ssl_context=None,
+ nworkers=10, daemon_threads=False,
+ threadpool_options=None, request_queue_size=None):
+ WSGIServerBase.__init__(self, wsgi_application, server_address,
+ RequestHandlerClass, ssl_context,
+ request_queue_size=request_queue_size)
+ if threadpool_options is None:
+ threadpool_options = {}
+ ThreadPoolMixIn.__init__(self, nworkers, daemon_threads,
+ **threadpool_options)
+
+class ServerExit(SystemExit):
+ """
+ Raised to tell the server to really exit (SystemExit is normally
+ caught)
+ """
+
+def serve(application, host=None, port=None, handler=None, ssl_pem=None,
+ ssl_context=None, server_version=None, protocol_version=None,
+ start_loop=True, daemon_threads=None, socket_timeout=None,
+ use_threadpool=None, threadpool_workers=10,
+ threadpool_options=None, request_queue_size=5):
+ """
+ Serves your ``application`` over HTTP(S) via WSGI interface
+
+ ``host``
+
+ This is the ipaddress to bind to (or a hostname if your
+ nameserver is properly configured). This defaults to
+ 127.0.0.1, which is not a public interface.
+
+ ``port``
+
+ The port to run on, defaults to 8080 for HTTP, or 4443 for
+ HTTPS. This can be a string or an integer value.
+
+ ``handler``
+
+ This is the HTTP request handler to use, it defaults to
+ ``WSGIHandler`` in this module.
+
+ ``ssl_pem``
+
+ This an optional SSL certificate file (via OpenSSL). You can
+ supply ``*`` and a development-only certificate will be
+ created for you, or you can generate a self-signed test PEM
+ certificate file as follows::
+
+ $ openssl genrsa 1024 > host.key
+ $ chmod 400 host.key
+ $ openssl req -new -x509 -nodes -sha1 -days 365 \\
+ -key host.key > host.cert
+ $ cat host.cert host.key > host.pem
+ $ chmod 400 host.pem
+
+ ``ssl_context``
+
+ This an optional SSL context object for the server. A SSL
+ context will be automatically constructed for you if you supply
+ ``ssl_pem``. Supply this to use a context of your own
+ construction.
+
+ ``server_version``
+
+ The version of the server as reported in HTTP response line. This
+ defaults to something like "PasteWSGIServer/0.5". Many servers
+ hide their code-base identity with a name like 'Amnesiac/1.0'
+
+ ``protocol_version``
+
+ This sets the protocol used by the server, by default
+ ``HTTP/1.0``. There is some support for ``HTTP/1.1``, which
+ defaults to nicer keep-alive connections. This server supports
+ ``100 Continue``, but does not yet support HTTP/1.1 Chunked
+ Encoding. Hence, if you use HTTP/1.1, you're somewhat in error
+ since chunked coding is a mandatory requirement of a HTTP/1.1
+ server. If you specify HTTP/1.1, every response *must* have a
+ ``Content-Length`` and you must be careful not to read past the
+ end of the socket.
+
+ ``start_loop``
+
+ This specifies if the server loop (aka ``server.serve_forever()``)
+ should be called; it defaults to ``True``.
+
+ ``daemon_threads``
+
+ This flag specifies if when your webserver terminates all
+ in-progress client connections should be droppped. It defaults
+ to ``False``. You might want to set this to ``True`` if you
+ are using ``HTTP/1.1`` and don't set a ``socket_timeout``.
+
+ ``socket_timeout``
+
+ This specifies the maximum amount of time that a connection to a
+ given client will be kept open. At this time, it is a rude
+ disconnect, but at a later time it might follow the RFC a bit
+ more closely.
+
+ ``use_threadpool``
+
+ Server requests from a pool of worker threads (``threadpool_workers``)
+ rather than creating a new thread for each request. This can
+ substantially reduce latency since there is a high cost associated
+ with thread creation.
+
+ ``threadpool_workers``
+
+ Number of worker threads to create when ``use_threadpool`` is true. This
+ can be a string or an integer value.
+
+ ``threadpool_options``
+
+ A dictionary of options to be used when instantiating the
+ threadpool. See paste.httpserver.ThreadPool for specific
+ options (``threadpool_workers`` is a specific option that can
+ also go here).
+
+ ``request_queue_size``
+
+ The 'backlog' argument to socket.listen(); specifies the
+ maximum number of queued connections.
+
+ """
+ is_ssl = False
+ if ssl_pem or ssl_context:
+ assert SSL, "pyOpenSSL is not installed"
+ is_ssl = True
+ port = int(port or 4443)
+ if not ssl_context:
+ if ssl_pem == '*':
+ ssl_context = _auto_ssl_context()
+ else:
+ ssl_context = SSL.Context(SSL.SSLv23_METHOD)
+ ssl_context.use_privatekey_file(ssl_pem)
+ ssl_context.use_certificate_chain_file(ssl_pem)
+
+ host = host or '127.0.0.1'
+ if port is None:
+ if ':' in host:
+ host, port = host.split(':', 1)
+ else:
+ port = 8080
+ server_address = (host, int(port))
+
+ if not handler:
+ handler = WSGIHandler
+ if server_version:
+ handler.server_version = server_version
+ handler.sys_version = None
+ if protocol_version:
+ assert protocol_version in ('HTTP/0.9', 'HTTP/1.0', 'HTTP/1.1')
+ handler.protocol_version = protocol_version
+
+ if use_threadpool is None:
+ use_threadpool = True
+
+ if asbool(use_threadpool):
+ server = WSGIThreadPoolServer(application, server_address, handler,
+ ssl_context, int(threadpool_workers),
+ daemon_threads,
+ threadpool_options=threadpool_options,
+ request_queue_size=request_queue_size)
+ else:
+ server = WSGIServer(application, server_address, handler, ssl_context,
+ request_queue_size=request_queue_size)
+ if daemon_threads:
+ server.daemon_threads = daemon_threads
+
+ if socket_timeout:
+ server.wsgi_socket_timeout = int(socket_timeout)
+
+ if asbool(start_loop):
+ protocol = is_ssl and 'https' or 'http'
+ host, port = server.server_address[:2]
+ if host == '0.0.0.0':
+ print 'serving on 0.0.0.0:%s view at %s://127.0.0.1:%s' % \
+ (port, protocol, port)
+ else:
+ print "serving on %s://%s:%s" % (protocol, host, port)
+ try:
+ server.serve_forever()
+ except KeyboardInterrupt:
+ # allow CTRL+C to shutdown
+ pass
+ return server
+
+# For paste.deploy server instantiation (egg:Paste#http)
+# Note: this gets a separate function because it has to expect string
+# arguments (though that's not much of an issue yet, ever?)
+def server_runner(wsgi_app, global_conf, **kwargs):
+ for name in ['port', 'socket_timeout', 'threadpool_workers',
+ 'threadpool_hung_thread_limit',
+ 'threadpool_kill_thread_limit',
+ 'threadpool_dying_limit', 'threadpool_spawn_if_under',
+ 'threadpool_max_zombie_threads_before_die',
+ 'threadpool_hung_check_period',
+ 'threadpool_max_requests', 'request_queue_size']:
+ if name in kwargs:
+ kwargs[name] = int(kwargs[name])
+ for name in ['use_threadpool', 'daemon_threads']:
+ if name in kwargs:
+ kwargs[name] = asbool(kwargs[name])
+ threadpool_options = {}
+ for name, value in kwargs.items():
+ if name.startswith('threadpool_') and name != 'threadpool_workers':
+ threadpool_options[name[len('threadpool_'):]] = value
+ del kwargs[name]
+ if ('error_email' not in threadpool_options
+ and 'error_email' in global_conf):
+ threadpool_options['error_email'] = global_conf['error_email']
+ kwargs['threadpool_options'] = threadpool_options
+ serve(wsgi_app, **kwargs)
+
+server_runner.__doc__ = (serve.__doc__ or '') + """
+
+ You can also set these threadpool options:
+
+ ``threadpool_max_requests``:
+
+ The maximum number of requests a worker thread will process
+ before dying (and replacing itself with a new worker thread).
+ Default 100.
+
+ ``threadpool_hung_thread_limit``:
+
+ The number of seconds a thread can work on a task before it is
+ considered hung (stuck). Default 30 seconds.
+
+ ``threadpool_kill_thread_limit``:
+
+ The number of seconds a thread can work before you should kill it
+ (assuming it will never finish). Default 600 seconds (10 minutes).
+
+ ``threadpool_dying_limit``:
+
+ The length of time after killing a thread that it should actually
+ disappear. If it lives longer than this, it is considered a
+ "zombie". Note that even in easy situations killing a thread can
+ be very slow. Default 300 seconds (5 minutes).
+
+ ``threadpool_spawn_if_under``:
+
+ If there are no idle threads and a request comes in, and there are
+ less than this number of *busy* threads, then add workers to the
+ pool. Busy threads are threads that have taken less than
+ ``threadpool_hung_thread_limit`` seconds so far. So if you get
+ *lots* of requests but they complete in a reasonable amount of time,
+ the requests will simply queue up (adding more threads probably
+ wouldn't speed them up). But if you have lots of hung threads and
+ one more request comes in, this will add workers to handle it.
+ Default 5.
+
+ ``threadpool_max_zombie_threads_before_die``:
+
+ If there are more zombies than this, just kill the process. This is
+ only good if you have a monitor that will automatically restart
+ the server. This can clean up the mess. Default 0 (disabled).
+
+ `threadpool_hung_check_period``:
+
+ Every X requests, check for hung threads that need to be killed,
+ or for zombie threads that should cause a restart. Default 100
+ requests.
+
+ ``threadpool_logger``:
+
+ Logging messages will go the logger named here.
+
+ ``threadpool_error_email`` (or global ``error_email`` setting):
+
+ When threads are killed or the process restarted, this email
+ address will be contacted (using an SMTP server on localhost).
+
+"""
+
+
+if __name__ == '__main__':
+ from paste.wsgilib import dump_environ
+ #serve(dump_environ, ssl_pem="test.pem")
+ serve(dump_environ, server_version="Wombles/1.0",
+ protocol_version="HTTP/1.1", port="8888")
View
133 apppkg/paste_reloader.py
@@ -0,0 +1,133 @@
+# Copied from paste.reloader:
+"""
+Use this like::
+
+ import reloader
+ reloader.install()
+
+Then make sure your server is installed with a shell script like::
+
+ err=3
+ while test "$err" -eq 3 ; do
+ python server.py
+ err="$?"
+ done
+
+or restart in Python (server.py does this). Use the watch_file(filename)
+function to cause a reload/restart for other other non-Python files (e.g.,
+configuration files).
+"""
+
+import os
+import sys
+import time
+import threading
+import atexit
+
+# Copied from paste.util.classinstance:
+class classinstancemethod(object):
+ """
+ Acts like a class method when called from a class, like an
+ instance method when called by an instance. The method should
+ take two arguments, 'self' and 'cls'; one of these will be None
+ depending on how the method was called.
+ """
+
+ def __init__(self, func):
+ self.func = func
+
+ def __get__(self, obj, type=None):
+ return _methodwrapper(self.func, obj=obj, type=type)
+
+class _methodwrapper(object):
+
+ def __init__(self, func, obj, type):
+ self.func = func
+ self.obj = obj
+ self.type = type
+
+ def __call__(self, *args, **kw):
+ assert not kw.has_key('self') and not kw.has_key('cls'), (
+ "You cannot use 'self' or 'cls' arguments to a "
+ "classinstancemethod")
+ return self.func(*((self.obj, self.type) + args), **kw)
+
+ def __repr__(self):
+ if self.obj is None:
+ return ('<bound class method %s.%s>'
+ % (self.type.__name__, self.func.func_name))
+ else:
+ return ('<bound method %s.%s of %r>'
+ % (self.type.__name__, self.func.func_name, self.obj))
+
+def install(poll_interval=1, raise_keyboard_interrupt=True):
+ mon = Monitor(poll_interval=poll_interval,
+ raise_keyboard_interrupt=raise_keyboard_interrupt)
+ t = threading.Thread(target=mon.periodic_reload)
+ t.start()
+
+class Monitor:
+
+ instances = []
+ global_extra_files = []
+
+ def __init__(self, poll_interval, raise_keyboard_interrupt):
+ self.module_mtimes = {}
+ atexit.register(self.atexit)
+ self.keep_running = True
+ self.poll_interval = poll_interval
+ self.raise_keyboard_interrupt = raise_keyboard_interrupt
+ self.extra_files = self.global_extra_files[:]
+ self.instances.append(self)
+
+ def atexit(self):
+ self.keep_running = False
+ if self.raise_keyboard_interrupt:
+ # This exception is somehow magic, because it applies
+ # to more threads and situations (like socket.accept)
+ # that a mere SystemExit will not.
+ raise KeyboardInterrupt("Exiting process")
+
+ def periodic_reload(self):
+ while 1:
+ if not self.keep_running:
+ break
+ if not self.check_reload():
+ os._exit(3)
+ break
+ time.sleep(self.poll_interval)
+
+ def check_reload(self):
+ filenames = self.extra_files[:]
+ for module in sys.modules.values():
+ try:
+ filenames.append(module.__file__)
+ except AttributeError:
+ continue
+ for filename in filenames:
+ try:
+ mtime = os.stat(filename).st_mtime
+ except (OSError, IOError):
+ continue
+ if filename.endswith('.pyc') and os.path.exists(filename[:-1]):
+ mtime = max(os.stat(filename[:-1]).st_mtime, mtime)
+ if not self.module_mtimes.has_key(filename):
+ self.module_mtimes[filename] = mtime
+ elif self.module_mtimes[filename] < mtime:
+ print >> sys.stderr, (
+ "%s changed; reloading..." % filename)
+ return False
+ return True
+
+ def watch_file(self, cls, filename):
+ filename = os.path.abspath(filename)
+ if self is None:
+ for instance in cls.instances:
+ instance.watch_file(filename)
+ cls.global_extra_files.append(filename)
+ else:
+ self.extra_files.append(filename)
+
+ watch_file = classinstancemethod(watch_file)
+
+watch_file = Monitor.watch_file
View
95 apppkg/run-command.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+import os
+import sys
+try:
+ import simplejson as json
+except ImportError:
+ import json
+import new
+apppkg = None
+
+
+command_path = os.environ['_APPPKG_COMMAND_PATH']
+app_path = os.environ['_APPPKG_APP_PATH']
+apppkg_path = os.environ['_APPPKG_PATH']
+env_description = json.loads(os.environ['_APPPKG_ENV_DESCRIPTION'])
+config_dir = os.environ['_APPPKG_CONFIG'] or None
+venv_location = os.environ['_APPPKG_VENV_LOCATION'] or None
+args = sys.argv[1:]
+
+
+def setup_apppkg():
+ """Sets up apppkg so it is importable.
+
+ Does it the hard way so we don't have to add the parent directory
+ to the path (since the parent directory might contain a bunch of
+ other modules)
+ """
+ global apppkg
+ if 'apppkg' in sys.modules:
+ if apppkg is None:
+ import apppkg
+ return
+ mod = sys.modules['apppkg'] = new.module('apppkg')
+ mod.__path__ = apppkg_path
+ mod.__file__ = os.path.join(apppkg_path, '__init__.py')
+ execfile(mod.__file__, mod.__dict__)
+ import apppkg
+
+
+def make_app():
+ app = apppkg.AppPackage(app_path, config_dir)
+ return app
+
+
+def setup_settings(app):
+ app.setup_settings()
+ if not env_description:
+ return
+ import appsettings
+ for name, value in env_description.items():
+ setattr(appsettings, name, value)
+ appsettings.config_dir = config_dir
+
+
+def strip_json_attrs(d):
+ for key in list(d):
+ try:
+ json.dumps(dict[key])
+ except Type