Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

initial release

  • Loading branch information...
commit 7cb1ce35102598807b06ef630b7608987439d1e3 0 parents
@benoitc authored
24 .gitignore
@@ -0,0 +1,24 @@
+*.gem
+*.swp
+*.pyc
+*#*
+build
+dist
+setuptools-*
+.svn/*
+.DS_Store
+*.so
+.Python
+distribute-0.6.8-py2.6.egg
+distribute-0.6.8.tar.gz
+*.egg-info
+nohup.out
+.coverage
+doc/.sass-cache
+bin/
+lib/
+man/
+include/
+html/
+.tox
+htmlcov
22 LICENSE
@@ -0,0 +1,22 @@
+2012 (c) Benoît Chesneau <benoitc@e-engura.org>
+
+Permission is hereby granted, free of charge, to any person
+obtaining a copy of this software and associated documentation
+files (the "Software"), to deal in the Software without
+restriction, including without limitation the rights to use,
+copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the
+Software is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+OTHER DEALINGS IN THE SOFTWARE.
7 MANIFEST.in
@@ -0,0 +1,7 @@
+include NOTICE
+include LICENSE
+include README.rst
+include THANKS
+include UNLICENSE
+recursive-include examples *
+recursive-include tests *
8 NOTICE
@@ -0,0 +1,8 @@
+gaffer
+------
+
+2012 (c) Benoît Chesneau <benoitc@e-engura.org>
+
+gaffer is available in the public domain (see UNLICENSE). gaffer
+is also optionally available under the MIT License (see LICENSE), meant
+especially for jurisdictions that do not recognize public domain works.
0  README.rst
No changes.
0  THANKS
No changes.
24 UNLICENSE
@@ -0,0 +1,24 @@
+This is free and unencumbered software released into the public domain.
+
+Anyone is free to copy, modify, publish, use, compile, sell, or
+distribute this software, either in source code form or as a compiled
+binary, for any purpose, commercial or non-commercial, and by any
+means.
+
+In jurisdictions that recognize copyright laws, the author or authors
+of this software dedicate any and all copyright interest in the
+software to the public domain. We make this dedication for the benefit
+of the public at large and to the detriment of our heirs and
+successors. We intend this dedication to be an overt act of
+relinquishment in perpetuity of all present and future rights to this
+software under copyright law.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+OTHER DEALINGS IN THE SOFTWARE.
+
+For more information, please refer to <http://unlicense.org/>
32 examples/dummy.py
@@ -0,0 +1,32 @@
+#!/usr/bin/env python
+import os
+import signal
+import sys
+import time
+
+
+class Dummy(object):
+
+ def __init__(self):
+ # init signal handling
+ signal.signal(signal.SIGQUIT, self.handle_quit)
+ signal.signal(signal.SIGTERM, self.handle_quit)
+ signal.signal(signal.SIGINT, self.handle_quit)
+ signal.signal(signal.SIGCHLD, self.handle_chld)
+ self.alive = True
+
+ def handle_quit(self, *args):
+ self.alive = False
+ sys.exit(0)
+
+ def handle_chld(self, *args):
+ return
+
+ def run(self):
+ print("hello, dummy (pid: %s) is alive" % os.getpid())
+
+ while self.alive:
+ time.sleep(0.1)
+
+if __name__ == "__main__":
+ Dummy().run()
5 gaffer/__init__.py
@@ -0,0 +1,5 @@
+# -*- coding: utf-8 -
+#
+# This file is part of gaffer. See the NOTICE for more information.
+
+from gaffer.manager import Manager, ManagerThread, get_manager
13 gaffer/arbiter.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -
+#
+# This file is part of gaffer. See the NOTICE for more information.
+
+import pyuv
+
+class Manager(object):
+
+ def __init__(self, loop=None):
+
+ self.loop = loop or pyuv.Loop.default_loop()
+
+
429 gaffer/manager.py
@@ -0,0 +1,429 @@
+# -*- coding: utf-8 -
+#
+# This file is part of gaffer. See the NOTICE for more information.
+
+from collections import deque
+from functools import partial
+from threading import Thread, RLock
+try:
+ from queue import Queue
+except ImportError:
+ from Queue import Queue
+
+import pyuv
+import six
+
+from .process import Process
+from .sync import increment, add, sub, atomic_read
+
+class bomb(object):
+ def __init__(self, exp_type=None, exp_value=None, exp_traceback=None):
+ self.type = exp_type
+ self.value = exp_value
+ self.traceback = exp_traceback
+
+ def raise_(self):
+ six.reraise(self.type, self.value, self.traceback)
+
+
+class ProcessState(object):
+ """ object used by the manager to maintain the process state """
+
+ DEFAULT_PARAMS = {
+ "group": None,
+ "args": None,
+ "uid": None,
+ "gid": None,
+ "cwd": None,
+ "detach": False}
+
+ def __init__(self, name, cmd, **settings):
+ self.name = name
+ self.cmd = cmd
+ self.settings = settings
+ self._numprocesses = self.settings.get('numprocesses', 1)
+ self.running = deque()
+ self.stopped = False
+
+ @property
+ def active(self):
+ return len(self.running) > 0
+
+ def __str__(self):
+ return "settings: %s" % self.name
+
+ def make_process(self, loop, id, on_exit):
+ params = {}
+ for name, default in self.DEFAULT_PARAMS.items():
+ params[name] = self.settings.get(name, default)
+
+ params['on_exit_cb'] = on_exit
+
+ return Process(loop, id, self.name, self.cmd, **params)
+
+ @property
+ def group(self):
+ return self.settings.get('group')
+
+ @property
+ def numprocesses(self):
+ return atomic_read(self._numprocesses)
+
+ @property
+ def hup(self):
+ return self.settings.get('hup', False)
+
+ def reset(self):
+ self._numprocesses = self.settings.get('numprocesses', 1)
+
+ def ttin(self, i=1):
+ self._numprocesses = add(self._numprocesses, i)
+ return self._numprocesses
+
+ def ttou(self, i=1):
+ self._running = sub(self._numprocesses, i)
+ return self._numprocesses
+
+ def queue(self, process):
+ self.running.append(process)
+
+ def dequeue(self):
+ return self.running.popleft()
+
+ def remove(self, process):
+ try:
+ self.running.remove(process)
+ except ValueError:
+ pass
+
+class Manager(object):
+
+ def __init__(self, loop=None, controllers=[], on_error_cb=None):
+ self.loop = loop
+ self.controllers = controllers
+ self.on_error_cb = on_error_cb
+
+ self.started = False
+ self._stop_ev = None
+ self.max_process_id = 0
+ self.processes = {}
+ self.running = {}
+ self.channel = deque()
+ self._contollers = []
+ self._lock = RLock()
+
+ def start(self):
+ self.loop = self.loop or pyuv.Loop.default_loop()
+ self._stop_ev = pyuv.Async(self.loop, self._on_stop)
+ self._wakeup_ev = pyuv.Async(self.loop, self._on_wakeup)
+
+ # start contollers
+ for contoller in self.controllers:
+ ctl = contoller(self.loop)
+ ctl.start()
+ self._controllers.append(ctl)
+
+ self.started = True
+
+ def run(self):
+ if not self.started:
+ self.start()
+ self.loop.run()
+
+ def stop(self):
+ self._stop_ev.send()
+
+ def restart(self):
+ with self._lock:
+ for name, _ in self.processes.items():
+ self.stop_process(name)
+ self.start_process(name)
+
+ def send(self, cmd, *args, **kwargs):
+ cmd_type, func_name = cmd
+
+ c = None
+ if cmd_type == "call":
+ c = Queue()
+
+ self.channel.append((func_name, args, kwargs, c))
+ self._wakeup_ev.send()
+ if c is not None:
+ res = c.get()
+ if isinstance(res, bomb):
+ res.raise_()
+ return res
+
+ def add_process(self, name, cmd, **kwargs):
+ """ add a process to the manager. all process should be added
+ using this function """
+ with self._lock:
+ if name in self.processes:
+ raise KeyError("a process named %r is already managed" % name)
+
+ if 'start' in kwargs:
+ start = kwargs.pop('start')
+ else:
+ start = True
+
+ state = ProcessState(name, cmd, **kwargs)
+ self.processes[name] = state
+ if start:
+ self._spawn_processes(state)
+
+ def stop_process(self, name_or_id):
+ """ stop a process by name or id
+ if a name is given all processes associated to this name will be
+ removed and the process is marked at stopped. If the internal
+ process id is givien, only the process with this id will be
+ stopped """
+
+ if isinstance(name_or_id, six.string_types):
+ stop_func = self._stop_byname_unlocked
+ else:
+ stop_func = self._stop_byid_unlocked
+
+ # really stop the process
+ with self._lock:
+ stop_func(name_or_id)
+
+ def stop_processes(self):
+ with self._lock:
+ for name, _ in self.processes.items():
+ self.stop_process(name)
+
+
+ def remove_process(self, name):
+ """ remove the process and its config from the manager """
+
+ with self._lock:
+ if name not in self.processes:
+ return
+
+ self._stop_byname_unlocked(name)
+ del self.processes[name]
+
+
+ def manage_process(self, name):
+ with self._lock:
+ self._manage_processes(self.get_process_state(name))
+
+ start_process = manage_process
+
+ def reap_process(self, name):
+ with self._lock:
+ self._reap_processes(self.get_process_state(name))
+
+ def restart_process(self, name):
+ """ restart a process """
+ with self._lock:
+ state = self.get_process_state(name)
+ state.reset()
+ while True:
+ try:
+ p = state.dequeue()
+ except IndexError:
+ brak
+ p.stop()
+
+ def ttin(self, name, i=1):
+ """ increase the number of system processes for a state. Change
+ is handled once the event loop is idling """
+
+ with self._lock:
+ state = self.get_process_state(name)
+ ret = state.ttin(i)
+ self.update_state(name)
+ return ret
+
+ def ttou(self, name, i=1):
+ """ decrease the number of system processes for a state. Change
+ is handled once the event loop is idling """
+
+ with self._lock:
+ state = self.get_process_state(name)
+ ret = state.ttou(i)
+ self.update_state(name)
+ return ret
+
+ def update_state(name):
+ """ update the state. When the event loop is idle, the state is
+ read and processes in the state managed """
+
+ def _cb(handle):
+ self._on_state_change(handle, name)
+ h = pyuv.Idle(self.loop)
+ h.start(_cb)
+
+ def get_process_state(self, name):
+ if name not in self.processes:
+ return
+ return self.processes[name]
+
+ def get_process_id(self):
+ """ generate a process id """
+ self.max_process_id = increment(self.max_process_id)
+ return self.max_process_id
+
+
+
+ # ------------- private functions
+
+ def _stop(self):
+ # stop all processes
+ self.stop_processes()
+
+ # stop wakeup_event
+ with self._lock:
+ self._wakeup_ev.close()
+ self.started = False
+
+ def _on_stop(self, handle):
+ handle.close()
+ self._stop()
+
+ def _spawn_process(self, state):
+ """ spawn a new process and add it to the state """
+ # get internal process id
+ pid = self.get_process_id()
+
+ # start process
+ p = state.make_process(self.loop, pid, self._on_exit)
+ p.spawn()
+
+ # add the process to the running state
+ state.queue(p)
+
+ # we keep a list of all running process by id here
+ self.running[pid] = p
+
+ def _stop_byname_unlocked(self, name):
+ """ stop a process by name """
+ if name not in self.processes:
+ return
+
+ state = self.processes[name]
+ state.stopped = True
+
+ if not state.active:
+ return
+
+ while True:
+ try:
+ p = state.dequeue()
+ except IndexError:
+ break
+
+ p.stop()
+
+ def _stop_byid_unlocked(self, pid):
+ """ stop a process bby id """
+ if pid not in self.running:
+ return
+
+ # remove the process from the running processes
+ state = self.processes[p.name]
+ state.remove(p)
+
+ # finally stop the process
+ p.stop()
+
+ def _spawn_processes(self, state):
+ """ spawn all processes for a state """
+ num_to_start = state.numprocesses - len(state.running)
+ for i in range(num_to_start):
+ self._spawn_process(state)
+
+ def _reap_processes(self, state):
+ diff = len(state.running) - state.numprocesses
+
+ if diff > 0:
+ for i in range(diff):
+ p = state.dequeue()
+ p.stop()
+
+ def _manage_processes(self, state):
+ if len(state.running) < state.numprocesses:
+ print("spawn")
+ self._spawn_processes(state)
+
+ self._reap_processes(state)
+
+ def _on_state_change(self, handle, name):
+ handle.stop()
+ self.manage_process(name)
+
+ def _on_wakeup(self, handle):
+ func_name, args, kwargs, c = self.channel.popleft()
+ func = getattr(self, func_name)
+
+ try:
+ res = func(*args, **kwargs)
+ except:
+ return
+ exc_info = sys.exc_info
+ res = bomb(exc_info[0], exc_info[1], exc_info[2])
+
+ if c is not None:
+ c.put(res)
+
+ def _on_exit(self, process, exit_status, term_signal):
+ """ exit callback returned when a process exit """
+ with self._lock:
+ if process.id in self.running:
+ del self.running[process.id]
+
+ state = self.get_process_state(process.name)
+ state.remove(process)
+ if not state.stopped:
+ self._manage_processes(state)
+
+
+class ManagerThread(Thread):
+ """ A threaded manager class
+
+ The manager is started in a thread, all managers method are proxied
+ and handled asynchronously """
+
+ def __init__(self, controllers=[], on_error_cb=None, daemon=True):
+ Thread.__init__(self)
+ self.daemon = daemon
+ self.manager = Manager(controllers=controllers,
+ on_error_cb=on_error_cb)
+
+ def run(self):
+ self.manager.run()
+
+ def stop(self):
+ self.manager.stop()
+ self.join()
+
+ def __getattr__(self, name):
+ if name in self.__dict__:
+ return self.__dict__[name]
+
+ if not hasattr(self.manager, name):
+ raise AttributeError("%r is not a manager method")
+
+ attr = getattr(self.manager, name)
+ if six.callable(attr):
+ return partial(self.call, name)
+ return attr
+
+ def cast(self, name, *args, **kwargs):
+ """ call a manager method asynchronously """
+ self.manager.send(("cast", name), *args, **kwargs)
+
+ def call(self, name, *args, **kwargs):
+ """ call a manager method and wait for the result """
+ res = self.manager.send(("call", name), *args, **kwargs)
+ return res
+
+
+def get_manager(controllers=[], background=False):
+ """ return a manager """
+
+ if background:
+ return ManagerThread(controllers=controllers)
+
+ return Manager(controllers=controllers)
318 gaffer/process.py
@@ -0,0 +1,318 @@
+# -*- coding: utf-8 -
+#
+# This file is part of gaffer. See the NOTICE for more information.
+
+from collections import deque
+import json
+import os
+import signal
+import shlex
+
+import pyuv
+import psutil
+import six
+
+from .util import bytestring, getcwd, check_uid, check_gid, bytes2human
+from .sync import atomic_read, increment, decrement
+
+def get_process_info(process=None, interval=0):
+
+ """Return information about a process. (can be an pid or a Process object)
+
+ If process is None, will return the information about the current process.
+ """
+ if process is None:
+ process = psutil.Process(os.getpid())
+ info = {}
+ try:
+ mem_info = process.get_memory_info()
+ info['mem_info1'] = bytes2human(mem_info[0])
+ info['mem_info2'] = bytes2human(mem_info[1])
+ except AccessDenied:
+ info['mem_info1'] = info['mem_info2'] = "N/A"
+
+ try:
+ info['cpu'] = process.get_cpu_percent(interval=interval)
+ except AccessDenied:
+ info['cpu'] = "N/A"
+
+ try:
+ info['mem'] = round(process.get_memory_percent(), 1)
+ except AccessDenied:
+ info['mem'] = "N/A"
+
+ try:
+ cpu_times = process.get_cpu_times()
+ ctime = timedelta(seconds=sum(cpu_times))
+ ctime = "%s:%s.%s" % (ctime.seconds // 60 % 60,
+ str((ctime.seconds % 60)).zfill(2),
+ str(ctime.microseconds)[:2])
+ except AccessDenied:
+ ctime = "N/A"
+
+ info['ctime'] = ctime
+
+ try:
+ info['pid'] = process.pid
+ except AccessDenied:
+ info['pid'] = 'N/A'
+
+ try:
+ info['username'] = process.username
+ except AccessDenied:
+ info['username'] = 'N/A'
+
+ try:
+ info['nice'] = process.nice
+ except AccessDenied:
+ info['nice'] = 'N/A'
+ except NoSuchProcess:
+ info['nice'] = 'Zombie'
+
+ try:
+ cmdline = os.path.basename(shlex.split(process.cmdline[0])[0])
+ except (AccessDenied, IndexError):
+ cmdline = "N/A"
+
+ info['cmdline'] = cmdline
+
+ info['children'] = []
+ for child in process.get_children():
+ info['children'].append(get_info(psutil.Process(child),
+ interval=interval))
+
+ return info
+
+
+class RedirectIO(object):
+
+ def __init__(self, loop, label, stream, process):
+ self.loop = loop
+ self.label = label
+
+ if stream is None:
+ self._stdio = pyuv.StdIO(flags=pyuv.UV_IGNORE)
+ else:
+ self.channel = pyuv.Pipe(loop, True)
+ self._stdio = pyuv.StdIO(stream=self.channel,
+ flags=pyuv.UV_CREATE_PIPE | \
+ pyuv.UV_READABLE_PIPE | \
+ pyuv.UV_WRITABLE_PIPE)
+ self.stream = stream
+
+ @property
+ def stdio(self):
+ return self._stdio
+
+ def start(self):
+ self.channel.start_read(self._on_read)
+
+ def _on_read(self, handle, data, error):
+ # redirect the message to the main pipe
+ if data:
+ msg = { "name": self.process.name,
+ "pid": self.process_type,
+ "label": self.label,
+ "data": data,
+ "msg_type": "redirect"}
+ handle.write2(json.dumps(msg), self.stream)
@saghul Collaborator
saghul added a note

This will write back on the pipe, which is probably not what we want, is it? If the intention is that the child inherits the given stream then it should be taken from the arguments passed, not by creating a new channel. What is the intention here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+
+
+class ProcessWatcher(object):
+ """ object to retrieve process stats """
+
+ def __init__(self, loop, pid, on_refresh_cb=None):
+ self.loop = loop
+ self.pid = pid
+ self.on_refresh_cb = on_refresh_cb
+ self._process = psutil.Process(pid)
+ self._last_info = None
+ self.active = True
+ self._timer = pyuv.Timer(loop)
+ self._timer.start(self._async_refresh, 0.1, 0.1)
+
+ def _async_refresh(self, handle):
+ self._last_info = refresh()
+ if self.on_refresh_cb is not None:
+ self.on_refresh_cb(self, self._last_info)
+
+ def get_infos(self):
+ if not self._last_info:
+ self._last_info = refresh(0.1)
+ return self._last_info
+
+ def refresh(self, interval=0):
+ return get_process_info(self._process, interval=interval)
+
+ def stop(self):
+ self.active = decrement(self.active)
+ self._timer.stop()
+
+class Process(object):
+ """ class wrapping a process
+
+ Args:
+
+ - **loop**: main application loop (a pyuv Loop instance)
+ - **name**: name of the process
+ - **cmd**: program command, string)
+ - **args**: the arguments for the command to run. Can be a list or
+ a string. If **args** is a string, it's splitted using
+ :func:`shlex.split`. Defaults to None.
+ - **env**: a mapping containing the environment variables the command
+ will run with. Optional
+
+ """
+
+
+ def __init__(self, loop, id, name, cmd, group=None, args=None, env=None,
+ uid=None, gid=None, cwd=None, detach=False, redirect_stream=[],
+ monitor=False, monitor_cb=None, on_exit_cb=None):
+ self.loop = loop
+ self.id = id
+ self.name = name
+ self.group = group
+ self.cmd = cmd
+
+ # set command
+ self.cmd = bytestring(cmd)
+ if args is not None:
+ if isinstance(args, six.string_types):
+ self.args = shlex.split(bytestring(args))
+ else:
+ self.args = [bytestring(arg) for arg in args]
+
+ else:
+ args_ = shlex.split(self.cmd)
+ if len(args_) == 1:
+ self.args = []
+ else:
+ self.cmd = args_[0]
+ self.args = args_[1:]
+
+ self.uid = uid
+ if self.uid is not None:
+ self.uid = check_uid(uid)
+
+ self.gid = gid
+ if self.gid is not None:
+ self.gid = check_gid(gid)
+
+ self.cwd = cwd or getcwd()
+ self.env = env or {}
+ self.redirect_stream = redirect_stream
+ self.stdio = self._setup_stdio()
+ self.detach = detach
+ self.monitor = monitor
+ self.monitor_cb = monitor_cb
+ self.on_exit_cb = on_exit_cb
+ self._process = None
+ self._pprocess = None
+ self._process_watcher = None
+ self.stopped = False
+
+ def _setup_stdio(self):
+ stdio = []
+
+ if not self.redirect_stream:
+ # no redirect streams, ignore all
+ for i in range(3):
+ stdio.append(pyuv.StdIO(flags=pyuv.UV_IGNORE))
+ else:
+ # for now we ignore all stdin
+ stdio.append(pyuv.StdIO(flags=pyuv.UV_IGNORE))
+
+ # setup redirections
+ for stream in redirect_stream:
+ stdio.append(RedirectIO(self.loop, "stdout", stream))
+ stdio.append(RedirectIO(self.loop, "stderr", stream))
+
+ return stdio
+
+ def spawn(self):
+ """ spawn the process """
+ kwargs = dict(
+ file = self.cmd,
+ exit_callback = self._exit_cb,
+ args = self.args,
+ env = self.env,
+ cwd = self.cwd,
+ stdio = self.stdio)
+
+ flags = 0
+ if self.uid is not None:
+ kwargs['uid'] = self.uid
+ flags = pyuv.UV_PROCESS_SETUID
+
+ if self.gid is not None:
+ kwargs['gid'] = self.gid
+ flags = flags | pyuv.UV_PROCESS_SETGID
+
+ if self.detach:
+ flags = flags | pyuv.UV_PROCESS_DETACHED
+
+ self.running = True
+ self._process = pyuv.Process(self.loop)
+ self._process.disable_stdio_inheritance()
@saghul Collaborator
saghul added a note

This is a classmethod which you probably want to call very early on, at import time.

@benoitc Owner
benoitc added a note

fixed in 7fef594 thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+
+ # spawn the process
+ self._process.spawn(**kwargs)
+
+ # start redirection
+ for stream in self.redirect_stream:
+ stream.start()
+
+ if self.monitor:
+ self.start_monitor()
+
+ @property
+ def pid(self):
+ """ return the process pid """
+ return self._process.pid
+
+ @property
+ def info(self):
+ """ return the process info. If the process is monitored it
+ return the last informations stored asynchronously by the watcher"""
+
+ if not self._process_watcher:
+ if not self._pprocess:
+ self._pprocess = psutil.Process(self.pid)
+ return get_process_info(self._pprocess, 0.1)
+ else:
+ return self._process_watcher.get_info()
+
+ @property
+ def monitored(self):
+ """ return True if the process is monitored """
+ return self._process_watcher is not None
+
+ def start_monitor(self, monitor_cb=None):
+ """ start to monitor the process """
+ on_refresh_cb = monitor_cb or self.monitor_cb
+ self._process_watcher = ProcessWatcher(self.loop, self.pid,
+ on_refresh_cb=monitor_cb)
+
+ def stop_monitor(self):
+ """ stop to moonitor the process """
+ if atomic_read(self.monitored):
+ self._process_watcher.stop()
+ self._process_watcher = None
+
+ def stop(self):
+ """ stop the process """
+ self.kill(signal.SIGTERM)
+
+ def kill(self, signum):
+ """ send a signal to the process """
+ self._process.kill(signum)
+
+ def _exit_cb(self, handle, exit_status, term_signal):
+ # stop monitoring
+ self.stop_monitor()
+ self._process.close()
+
+ if not self.on_exit_cb:
+ return
+
+ self.on_exit_cb(self, exit_status, term_signal)
156 gaffer/sync.c
@@ -0,0 +1,156 @@
+/* -*- coding: utf-8 -
+ *
+ * This file is part of gaffer. See the NOTICE for more information. */
+
+#include <pthread.h>
+#if defined( __ia64__ ) && defined( __INTEL_COMPILER )
+# include <ia64intrin.h>
+#endif
+#include <stdio.h>
+
+#include "Python.h"
+
+struct module_state {
+ PyObject *error;
+};
+
+#if PY_MAJOR_VERSION >= 3
+#define GETSTATE(m) ((struct module_state*)PyModule_GetState(m))
+#else
+#define GETSTATE(m) (&_state)
+static struct module_state _state;
+#endif
+
+static
+PyObject* compare_and_swap(PyObject *self, PyObject *args)
+{
+ int oldval, newval, r;
+ if (!PyArg_ParseTuple(args, "ii", &oldval, &newval))
+ return NULL;
+ r = __sync_bool_compare_and_swap(&oldval, oldval, newval);
+
+ return Py_BuildValue("i", oldval);
+}
+
+static
+PyObject* increment(PyObject *self, PyObject *args)
+{
+ int val;
+ if (!PyArg_ParseTuple(args, "i", &val))
+ return NULL;
+
+ return Py_BuildValue("i", __sync_add_and_fetch(&val, 1));
+}
+
+static
+PyObject* decrement(PyObject *self, PyObject *args)
+{
+ int val;
+ if (!PyArg_ParseTuple(args, "i", &val))
+ return NULL;
+
+ return Py_BuildValue("i", __sync_sub_and_fetch(&val, 1));
+}
+
+static
+PyObject* add(PyObject *self, PyObject *args)
+{
+ int val, inc;
+ if (!PyArg_ParseTuple(args, "ii", &val, &inc))
+ return NULL;
+
+ return Py_BuildValue("i", __sync_add_and_fetch(&val, inc));
+}
+
+static
+PyObject* sub(PyObject *self, PyObject *args)
+{
+ int val, inc;
+ if (!PyArg_ParseTuple(args, "ii", &val, &inc))
+ return NULL;
+
+ return Py_BuildValue("i", __sync_sub_and_fetch(&val, inc));
+}
+
+
+
+static
+PyObject* atomic_read(PyObject *self, PyObject *args)
+{
+ int val;
+ if (!PyArg_ParseTuple(args, "i", &val))
+ return NULL;
+
+ return Py_BuildValue("i", __sync_add_and_fetch(&val, 0));
+}
+
+
+static PyMethodDef
+sync_methods[] = {
+ {"compare_and_swap", compare_and_swap, METH_VARARGS,
+ "Atomically compare and swap 2 integers"},
+ {"increment", increment, METH_VARARGS, "Atomically increment an integer"},
+ {"decrement", decrement, METH_VARARGS, "Atomically decrement an integer"},
+ {"add", add, METH_VARARGS, "Atomically increment an integer with a value"},
+ {"sub", add, METH_VARARGS, "Atomically decrement an integer with a value"},
+ {"atomic_read", atomic_read, METH_VARARGS, "Atomically read an integer"},
+ {NULL, NULL, 0, NULL}
+};
+
+#if PY_MAJOR_VERSION >= 3
+
+static int sync_traverse(PyObject *m, visitproc visit, void *arg) {
+ Py_VISIT(GETSTATE(m)->error);
+ return 0;
+}
+
+static int sync_clear(PyObject *m) {
+ Py_CLEAR(GETSTATE(m)->error);
+ return 0;
+}
+
+
+static struct PyModuleDef moduledef = {
+ PyModuleDef_HEAD_INIT,
+ "sync",
+ NULL,
+ sizeof(struct module_state),
+ sync_methods,
+ NULL,
+ sync_traverse,
+ sync_clear,
+ NULL
+};
+
+#define INITERROR return NULL
+
+PyObject *
+PyInit_sync(void)
+
+#else
+#define INITERROR return
+
+void
+initsync(void)
+#endif
+{
+#if PY_MAJOR_VERSION >= 3
+ PyObject *module = PyModule_Create(&moduledef);
+#else
+ PyObject *module = Py_InitModule("sync", sync_methods);
+#endif
+
+ if (module == NULL)
+ INITERROR;
+ struct module_state *st = GETSTATE(module);
+
+ st->error = PyErr_NewException("sync.Error", NULL, NULL);
+ if (st->error == NULL) {
+ Py_DECREF(module);
+ INITERROR;
+ }
+
+#if PY_MAJOR_VERSION >= 3
+ return module;
+#endif
+}
90 gaffer/util.py
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -
+#
+# This file is part of gaffer. See the NOTICE for more information.
+
+import os
+import pwd
+
+import six
+
+if six.PY3:
+ def bytestring(s):
+ return s
+else:
+ def bytestring(s):
+ if isinstance(s, unicode):
+ return s.encode('utf-8')
+ return s
+
+_SYMBOLS = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
+
+def getcwd():
+ """Returns current path, try to use PWD env first"""
+ try:
+ a = os.stat(os.environ['PWD'])
+ b = os.stat(os.getcwd())
+ if a.ino == b.ino and a.dev == b.dev:
+ working_dir = os.environ['PWD']
+ else:
+ working_dir = os.getcwd()
+ except:
+ working_dir = os.getcwd()
+ return working_dir
+
+def check_uid(val):
+ """Return an uid, given a user value.
+ If the value is an integer, make sure it's an existing uid.
+
+ If the user value is unknown, raises a ValueError.
+ """
+ if isinstance(val, int):
+ try:
+ pwd.getpwuid(val)
+ return val
+ except (KeyError, OverflowError):
+ raise ValueError("%r isn't a valid user id" % val)
+
+ if not isinstance(val, str):
+ raise TypeError(val)
+
+ try:
+ return pwd.getpwnam(val).pw_uid
+ except KeyError:
+ raise ValueError("%r isn't a valid user val" % name)
+
+
+def check_gid(val):
+ """Return a gid, given a group value
+
+ If the group value is unknown, raises a ValueError.
+ """
+ if isinstance(val, int):
+ try:
+ grp.getgrgid(val)
+ return val
+ except (KeyError, OverflowError):
+ raise ValueError("No such group: %r" % val)
+
+ if not isinstance(val, str):
+ raise TypeError(val)
+ try:
+ return grp.getgrnam(val).gr_gid
+ except KeyError:
+ raise ValueError("No such group: %r" % val)
+
+
+def bytes2human(n):
+ """Translates bytes into a human repr.
+ """
+ if not isinstance(n, int):
+ raise TypeError(n)
+
+ prefix = {}
+ for i, s in enumerate(_SYMBOLS):
+ prefix[s] = 1 << (i + 1) * 10
+
+ for s in reversed(_SYMBOLS):
+ if n >= prefix[s]:
+ value = int(float(n) / prefix[s])
+ return '%s%s' % (value, s)
+ return "%sB" % n
60 setup.py
@@ -0,0 +1,60 @@
+# -*- coding: utf-8 -
+#
+# This file is part of gaffer. See the NOTICE for more information.
+
+import os
+from setuptools import setup, find_packages, Extension
+
+import sys
+
+py_version = sys.version_info[:2]
+
+if py_version < (2, 6):
+ raise RuntimeError('On Python 2, Flower requires Python 2.6 or better')
+
+
+CLASSIFIERS = [
+ 'Development Status :: 4 - Beta',
+ 'Environment :: Web Environment',
+ 'Intended Audience :: Developers',
+ 'License :: OSI Approved :: MIT License',
+ 'Operating System :: POSIX',
+ 'Programming Language :: Python',
+ 'Programming Language :: Python :: 2',
+ 'Programming Language :: Python :: 2.6',
+ 'Programming Language :: Python :: 2.7',
+ 'Programming Language :: Python :: 3',
+ 'Programming Language :: Python :: 3.0',
+ 'Programming Language :: Python :: 3.1',
+ 'Programming Language :: Python :: 3.2',
+ 'Topic :: System :: Boot',
+ 'Topic :: System :: Monitoring',
+ 'Topic :: System :: Systems Administration',
+ 'Topic :: Software Development :: Libraries']
+
+
+# read long description
+with open(os.path.join(os.path.dirname(__file__), 'README.rst')) as f:
+ long_description = f.read()
+
+DATA_FILES = [
+ ('circus', ["LICENSE", "MANIFEST.in", "NOTICE", "README.rst",
+ "THANKS", "UNLICENSE"])
+ ]
+
+
+setup(name='gaffer',
+ version='0.1.0',
+ description = 'simple system process manager',
+ long_description = long_description,
+ classifiers = CLASSIFIERS,
+ license = 'BSD',
+ url = 'http://github.com/benoitc/gaffer',
+ author = 'Benoit Chesneau',
+ author_email = 'benoitc@e-engura.org',
+ packages=find_packages(),
+ ext_modules = [
+ Extension("gaffer.sync", ["gaffer/sync.c"])
+ ],
+ install_requires = ['pyuv', 'six', 'psutil'],
+ data_files = DATA_FILES)
0  tests/__init__.py
No changes.
50 tests/generic.py
@@ -0,0 +1,50 @@
+import sys
+
+
+def resolve_name(name):
+ ret = None
+ parts = name.split('.')
+ cursor = len(parts)
+ module_name = parts[:cursor]
+ last_exc = None
+
+ while cursor > 0:
+ try:
+ ret = __import__('.'.join(module_name))
+ break
+ except ImportError as exc:
+ last_exc = exc
+ if cursor == 0:
+ raise
+ cursor -= 1
+ module_name = parts[:cursor]
+
+ for part in parts[1:]:
+ try:
+ ret = getattr(ret, part)
+ except AttributeError:
+ if last_exc is not None:
+ raise last_exc
+ raise ImportError(name)
+
+ if ret is None:
+ if last_exc is not None:
+ raise last_exc
+ raise ImportError(name)
+
+ return ret
+
+
+if __name__ == '__main__':
+ cb = resolve_name(sys.argv[1])
+
+ print(cb)
+ try:
+ if len(sys.argv) > 2:
+ test_file = sys.argv[2]
+ print(test_file)
+ sys.exit(cb(test_file))
+ else:
+ sys.exit(cb())
+ except:
+ sys.exit(1)
137 tests/test_manager.py
@@ -0,0 +1,137 @@
+# -*- coding: utf-8 -
+#
+# This file is part of gaffer. See the NOTICE for more information.
+
+import os
+import signal
+import sys
+import time
+from tempfile import mkstemp
+
+from gaffer.manager import get_manager
+
+class DummyProcess(object):
+
+ def __init__(self, testfile):
+ self.alive = True
+ self.testfile = testfile
+ import signal
+ signal.signal(signal.SIGQUIT, self.handle_quit)
+ signal.signal(signal.SIGTERM, self.handle_quit)
+ signal.signal(signal.SIGINT, self.handle_quit)
+ signal.signal(signal.SIGCHLD, self.handle_chld)
+
+ def _write(self, msg):
+ with open(self.testfile, 'a+') as f:
+ f.write(msg)
+
+ def handle_quit(self, *args):
+ self._write('QUIT')
+ self.alive = False
+
+ def handle_chld(self, *args):
+ self._write('CHLD')
+
+ def run(self):
+ self._write('START')
+ while self.alive:
+ time.sleep(0.1)
+ self._write('STOP')
+
+
+def run_dummy(test_file):
+ dummy = DummyProcess(test_file)
+ dummy.run()
+ return 1
+
+def tmpfile():
+ fd, testfile = mkstemp()
+ os.close(fd)
+ return testfile
+
+def dummy_cmd():
+ fd, testfile = mkstemp()
+ os.close(fd)
+
+ cmd = sys.executable
+ args = ['generic.py', "test_manager.run_dummy", testfile]
+
+ wdir = os.path.dirname(__file__)
+ return (testfile, cmd, args, wdir)
+
+def test_simple():
+ m = get_manager(background=True)
+ m.start()
+ assert m.manager.started == True
+ m.stop()
+ assert m.manager.started == False
+
+def test_simple_process():
+ m = get_manager(background=True)
+ m.start()
+
+ testfile, cmd, args, wdir = dummy_cmd()
+
+ m.add_process("dummy", cmd, args=args, cwd=wdir, start=False)
+ state = m.get_process_state("dummy")
+
+ assert state.numprocesses == 1
+ assert state.name == "dummy"
+ assert state.cmd == cmd
+ assert state.settings['args'] == args
+ assert state.settings['cwd'] == wdir
+
+ m.remove_process("dummy")
+ assert m.get_process_state("dummy") == None
+
+ m.stop()
+
+def test_start_stop_process():
+ m = get_manager(background=True)
+ m.start()
+
+ testfile, cmd, args, wdir = dummy_cmd()
+
+ m.add_process("dummy", cmd, args=args, cwd=wdir)
+ state = m.get_process_state("dummy")
+
+ assert len(state.running) == 1
+
+ m.stop_process("dummy")
+ assert len(state.running) == 0
+
+ m.stop()
+
+
+def test_start_multiple():
+ m = get_manager(background=True)
+ m.start()
+
+ testfile, cmd, args, wdir = dummy_cmd()
+
+ m.add_process("dummy", cmd, args=args, cwd=wdir, numprocesses=2)
+ state = m.get_process_state("dummy")
+
+ assert len(state.running) == 2
+
+ m.stop()
+
+def test_ttin():
+
+ m = get_manager(background=True)
+ m.start()
+
+ testfile, cmd, args, wdir = dummy_cmd()
+
+ m.add_process("dummy", cmd, args=args, cwd=wdir, numprocesses=1)
+ state = m.get_process_state("dummy")
+
+ assert len(state.running) == 1
+ #ret = m.ttin("dummy", 1)
+ #assert ret == 2
+
+ time.sleep(0.2)
+ #assert len(state.running) == 3
+
+
+ m.stop()
Please sign in to comment.
Something went wrong with that request. Please try again.