Skip to content

Commit

Permalink
add process commit method
Browse files Browse the repository at this point in the history
Like spawn but the process won't be kept alived at the end. It is also not
handled uring scaling or reaping.
  • Loading branch information
benoitc committed Mar 1, 2013
1 parent 7d237b5 commit 13db684
Show file tree
Hide file tree
Showing 12 changed files with 307 additions and 28 deletions.
9 changes: 9 additions & 0 deletions gaffer/controller.py
Expand Up @@ -23,6 +23,7 @@
"stats": "stats",
"stopall": "stopall",
"killall": "killall",
"commit": "commit",
# process commands
"process_info": "process_info",
"process_stats": "process_stats",
Expand Down Expand Up @@ -144,6 +145,14 @@ def stop_job(self, cmd):
self.manager.stop_job(cmd.args[0])
cmd.reply({"ok": True})

def commit(self, cmd):
if not cmd.args:
raise CommandError()

pid = self.manager.commit(cmd.args[0], *cmd.args[1:])
cmd.reply({"ok": True, "pid": pid})


def scale(self, cmd):
if len(cmd.args) < 2:
raise CommandError()
Expand Down
1 change: 1 addition & 0 deletions gaffer/gafferd/http.py
Expand Up @@ -40,6 +40,7 @@
(r'/jobs/([^/]+)/([^/]+)/signal$', http_handlers.SignalJobHandler),
(r'/jobs/([^/]+)/([^/]+)/state$', http_handlers.StateJobHandler),
(r'/jobs/([^/]+)/([^/]+)/pids$', http_handlers.PidsJobHandler),
(r'/jobs/([^/]+)/([^/]+)/commit$', http_handlers.CommitJobHandler),
(r'/watch', http_handlers.WatcherHandler),
(r'/watch/([^/]+)$', http_handlers.WatcherHandler),
(r'/watch/([^/]+)/([^/]+)$', http_handlers.WatcherHandler),
Expand Down
2 changes: 1 addition & 1 deletion gaffer/gafferd/http_handlers/__init__.py
Expand Up @@ -11,4 +11,4 @@
from .stream import StreamHandler, WStreamHandler
from .jobs import (SessionsHandler, AllJobsHandler, JobsHandler,
JobHandler, JobStatsHandler, ScaleJobHandler,
PidsJobHandler, SignalJobHandler, StateJobHandler)
PidsJobHandler, SignalJobHandler, StateJobHandler, CommitJobHandler)
34 changes: 34 additions & 0 deletions gaffer/gafferd/http_handlers/jobs.py
Expand Up @@ -328,3 +328,37 @@ def get_action(self, m):
else:
raise ValueError("invalid state")
return do


class CommitJobHandler(CorsHandler):
""" /jobs/<sessionid>/<label>/commit """


def post(self, *args):
self.preflight()
self.set_header('Content-Type', 'application/json')
m = self.settings.get('manager')

try:
graceful_timeout, env = self.get_params()
except ValueError:
self.set_status(400)
return self.write({"error": "bad_request"})

try:
pid = m.commit("%s.%s" % (args[0], args[1]),
graceful_timeout=graceful_timeout, env=env)
except ProcessError as e:
self.set_status(e.errno)
return self.write(e.to_dict())

self.write({"pid": pid})

def get_params(self):
obj = json.loads(self.request.body.decode('utf-8'))
env = obj.get('env')
try:
graceful_timeout = int(obj.get('graceful_timeout', 10.0))
except TypeError as e:
raise ValueError(str(e))
return graceful_timeout, env
27 changes: 25 additions & 2 deletions gaffer/httpclient.py
Expand Up @@ -147,8 +147,11 @@ def request(self, method, path, headers=None, body=None, **params):
return resp

def json_body(self, resp):
return json.loads(resp.body.decode('utf-8'))

respbody = resp.body.decode('utf-8')
try:
return json.loads(respbody)
except ValueError:
return respbody


class Server(BaseClient):
Expand All @@ -165,6 +168,8 @@ def running(self):
resp = self.request("get", "/pids")
return self.json_body(resp)['pids']

pids = running

def ping(self):
resp = self.request("get", "/ping")
return resp.body == b'OK'
Expand Down Expand Up @@ -383,6 +388,12 @@ def running(self):
info = self.info()
return info['running']

@property
def running_out(self):
""" return the number of processes running for this template """
info = self.info()
return info['running_out']

@property
def numprocesses(self):
""" return the maximum number of processes that can be launched
Expand Down Expand Up @@ -437,6 +448,18 @@ def scale(self, num=1):
result = self.server.json_body(resp)
return result['numprocesses']


def commit(self, graceful_timeout=10.0, env=None):
""" Like ``scale(1) but the process won't be kept alived at the end.
It is also not handled uring scaling or reaping. """

env = env or {}
body = json.dumps({"graceful_timeout": graceful_timeout, "env": env})
resp = self.server.request("post", "/jobs/%s/%s/commit" % (
self.sessionid, self.name), body=body)
result = self.server.json_body(resp)
return result['pid']

def kill(self, sig):
""" send a signal to all processes of this template """

Expand Down
106 changes: 86 additions & 20 deletions gaffer/manager.py
Expand Up @@ -383,6 +383,20 @@ def stop_job(self, name):

self._stopall(state)

def commit(self, name, graceful_timeout=10.0, env=None):
""" Like ``scale(1) but the process won't be kept alived at the end.
It is also not handled uring scaling or reaping. """

sessionid, name = self._parse_name(name)
pname = "%s.%s" % (sessionid, name)

with self._lock:
state = self._get_state(sessionid, name)

# commit the job and return the pid
return self._commit_process(state,
graceful_timeout=graceful_timeout, env=env)

def scale(self, name, n):
""" Scale the number of processes in for a job. By using this
function you can increase, decrease or set the number of processes in
Expand Down Expand Up @@ -445,10 +459,12 @@ def info(self, name):
state = self._get_state(sessionid, name)

processes = list(state.running)
processes.extend(list(state.running_out))

info = {"name": pname,
"active": state.active,
"running": len(state.running),
"running": len(processes),
"running_out": len(state.running_out),
"max_processes": state.numprocesses,
"processes": [p.pid for p in processes]}

Expand All @@ -472,11 +488,13 @@ def stats(self, name):

with self._lock:
state = self._get_state(sessionid, name)
processes = list(state.running)
processes.extend(list(state.running_out))

stats = []
lmem = []
lcpu = []
for p in state.running:
for p in processes:
pstats = p.stats
pstats['pid'] = p.pid
pstats['os_pid'] = p.os_pid
Expand Down Expand Up @@ -523,7 +541,13 @@ def stop_process(self, pid):
# remove the process from the state
sessionid, name = self._parse_name(p.name)
state = self._get_state(sessionid, name)
state.remove(p)

# if the process is marked once it means the job has been
# committed and the process shouldn't be restarted
if p.once:
state.running_out.remove(p)
else:
state.remove(p)

# notify we stop this pid
self._publish("stop_process", pid=p.pid, name=p.name)
Expand All @@ -533,7 +557,8 @@ def stop_process(self, pid):

# track this process to make sure it's killed after the
# graceful time
self._tracker.check(p, state.graceful_timeout)
graceful_timeout = p.graceful_timeout or state.graceful_timeout
self._tracker.check(p, graceful_timeout)

def stopall(self, name):
""" stop all processes of a job. Processes are just exiting and will
Expand Down Expand Up @@ -567,7 +592,11 @@ def killall(self, name, sig):
with self._lock:
state = self._get_state(sessionid, name)
self._publish("job.%s.kill" % pname, name=pname, signum=signum)
for p in state.running:

processes = list(state.running)
processes.extend(list(state.running_out))
print(processes)
for p in processes:
# notify we stop this job
self._publish("proc.%s.kill" % p.pid, pid=p.pid, name=p.name)
# effectively send the signal
Expand Down Expand Up @@ -749,19 +778,10 @@ def _restart(self):

# ------------- process type private functions


def _stopall(self, state):
""" stop all processes of a job """

# start the flapping detection before killing the process to prevent
# any race condition

if state.flapping_timer is not None:
state.flapping_timer.stop()

def _stop_group(self, state, group):
while True:
try:
p = state.dequeue()
p = group.popleft()
except IndexError:
break

Expand All @@ -778,14 +798,54 @@ def _stopall(self, state):

# track this process to make sure it's killed after the
# graceful time
self._tracker.check(p, state.graceful_timeout)
graceful_timeout = p.graceful_timeout or state.graceful_timeout
self._tracker.check(p, graceful_timeout)

def _stopall(self, state):
""" stop all processes of a job """

# stop the flapping detection before killing the process to prevent
# any race condition
if state.flapping_timer is not None:
state.flapping_timer.stop()

# kill all keepalived processes
if state.running:
self._stop_group(state, state.running)

# kill all others processes (though who have been committed)
if state.running_out:
self._stop_group(state, state.running_out)

# if the job isn't stopped, restart the flapping detection
if not state.stopped and state.flapping_timer is not None:
state.flapping_timer.start()

# ------------- functions that manage the process

def _commit_process(self, state, graceful_timeout=10.0, env=None):
""" like spawn but doesn't keep the process associated to the state.
It should die at the end """
# get internal process id
pid = self.get_process_id()

# start process
p = state.make_process(self.loop, pid, self._on_process_exit)
p.spawn(once=True, graceful_timeout=graceful_timeout, env=env)

# add the pid to external processes in the state
state.running_out.append(p)

# we keep a list of all running process by id here
self.running[pid] = p

# notify
self._publish("spawn", name=p.name, pid=pid, os_pid=p.os_pid)

# on commit we return the pid now, so someone will be able to use it.
return pid


def _spawn_process(self, state):
""" spawn a new process and add it to the state """
# get internal process id
Expand Down Expand Up @@ -905,6 +965,7 @@ def _wakeup(self, handle):

def _on_exit(self, evtype, msg):
sessionid, name = self._parse_name(msg['name'])
once = msg.get('once', False)

with self._lock:
try:
Expand All @@ -914,7 +975,7 @@ def _on_exit(self, evtype, msg):
return

# eventually restart the process
if not state.stopped:
if not state.stopped and not once:
# manage the template, eventually restart a new one.
if self._check_flapping(state):
self._manage_processes(state)
Expand All @@ -929,18 +990,23 @@ def _on_process_exit(self, process, exit_status, term_signal):
if process.pid in self.running:
self.running.pop(process.pid)

print("exit %s" % process.pid)

sessionid, name = self._parse_name(process.name)
try:
state = self._get_state(sessionid, name)
# remove the process from the state if needed
state.remove(process)
if process.once:
state.running_out.remove(process)
else:
state.remove(process)
except (ProcessNotFound, KeyError):
pass

# notify other that the process exited
ev_details = dict(name=process.name, pid=process.pid,
exit_status=exit_status, term_signal=term_signal,
os_pid=process.os_pid)
os_pid=process.os_pid, once=process.once)

self._publish("exit", **ev_details)
self._publish("job.%s.exit" % process.name, **ev_details)
13 changes: 11 additions & 2 deletions gaffer/process.py
Expand Up @@ -497,6 +497,8 @@ def __init__(self, loop, pid, name, cmd, args=None, env=None, uid=None,
self._info = None
self.stopped = False
self.graceful_time = 0
self.graceful_timeout = None
self.once = False

self._setup_stdio()

Expand All @@ -523,8 +525,15 @@ def _setup_stdio(self):
self._stdio.append(pyuv.StdIO(stream=channel,
flags=pyuv.UV_INHERIT_STREAM))

def spawn(self):
def spawn(self, once=False, graceful_timeout=None, env=None):
""" spawn the process """

self.once = once
self.graceful_timeout = graceful_timeout

if env is not None:
self.env.update(env)

kwargs = dict(
file = self.cmd,
exit_callback = self._exit_cb,
Expand Down Expand Up @@ -591,7 +600,7 @@ def info(self):
if self._info is None:
self._info = dict(pid=self.pid, name=self.name, cmd=self.cmd,
args=self.args, env=self.env, uid=self.uid, gid=self.gid,
os_pid=None, create_time=None)
os_pid=None, create_time=None, commited=self.once)

if (self._info.get('create_time') is None and
self._pprocess is not None):
Expand Down

0 comments on commit 13db684

Please sign in to comment.