Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fire and forget processes #162

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions circus/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
reload,
restart,
rmwatcher,
run,
sendsignal,
set,
start,
Expand Down
53 changes: 53 additions & 0 deletions circus/commands/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from circus.commands.base import Command
from circus.exc import ArgumentError


class Run(Command):
"""\
Run a watcher
==============================

This command runs the process in a watcher
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's worth mentioning here that this is a "one shot" thing. this is me nitpicking, but mainly this would be something in the lines of "This command runs one watcher's process and does not try to control it in any way". Like you said, this is a fire-n-forget thing, that's worth mentioning here. :-)



ZMQ Message
-----------

::

{
"command": "run",
"properties": {
"name": '<name>",
}
}

The response return the status "ok".

If the property name is present, the watcher will be run.

Command line
------------

::

$ circusctl run <name>

Options
+++++++

- <name>: name of the watcher

"""
name = "run"

def message(self, *args, **opts):
if len(args) != 1:
raise ArgumentError("invalid number of arguments")

return self.make_message(name=args[0])

def execute(self, arbiter, props):
if 'name' in props:
watcher = self._get_watcher(arbiter, props['name'])
watcher.run()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that should probably mentioned somewhere, but having a look at other commands you'll find out that we have the validate method which takes care of validation of the input. You should put what you put in the message method in there instead. But in your specific case, you don't even need to do this: You can use the "property" class attribute (like here: https://github.com/mozilla-services/circus/blob/31ab93047859588b2d94a5ff71d877d7a99715d2/circus/commands/sendsignal.py#L103) which makes the properties required.

Also, if you do that, there's no real need to check that there is "name" in props, and you can just do "get_watcher" + run on it.

The message function is used to return information about what happened after the command. Since this is a fire and forget thing, you don't need this so you can just get rid of this message method.

76 changes: 57 additions & 19 deletions circus/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0.,
self.args = args
self._process_counter = 0
self.stopped = stopped
self.running = False
self.graceful_timeout = graceful_timeout
self.prereload_fn = prereload_fn
self.executable = None
Expand Down Expand Up @@ -267,35 +268,45 @@ def manage_processes(self):
if self.stopped:
return

if len(self.processes) < self.numprocesses:
self.spawn_processes()

processes = self.processes.values()
processes.sort()
while len(processes) > self.numprocesses:
process = processes.pop(0)
if process.status == STATUS_DEAD:
self.processes.pop(process.pid)
else:
self.processes.pop(process.pid)
self.kill_process(process)

self.shutdown_excess_processes()
self.spawn_needed_processes()

@util.debuglog
def shutdown_excess_processes(self):
""" If there are more running processes than numprocesses, kill the excess
"""
processes = self.processes.values()
processes.sort()
while len(processes) > self.numprocesses:
process = processes.pop(0)
if process.status == STATUS_DEAD:
self.processes.pop(process.pid)
else:
self.processes.pop(process.pid)
self.kill_process(process)

@util.debuglog
def reap_and_manage_processes(self):
"""Reap & manage processes.
"""
if self.stopped:
return

self.reap_processes()
self.manage_processes()

@util.debuglog
def spawn_processes(self):
def spawn_needed_processes(self):
"""Spawn processes.
"""
for i in range(self.numprocesses - len(self.processes)):
self.spawn_process()
time.sleep(self.warmup_delay)

if self.running:
return

if len(self.processes) < self.numprocesses:
for i in range(self.numprocesses - len(self.processes)):
self.spawn_process()
time.sleep(self.warmup_delay)

def spawn_process(self):
"""Spawn process.
Expand Down Expand Up @@ -419,7 +430,10 @@ def send_signal_children(self, pid, signum):
def status(self):
if self.stopped:
return "stopped"
return "active"
elif self.running:
return "running"
else:
return "active"

@util.debuglog
def process_info(self, pid):
Expand Down Expand Up @@ -472,10 +486,11 @@ def get_active_pids(self):
def start(self):
"""Start.
"""
if not self.stopped:
if not self.stopped and not self.running:
return

self.stopped = False
self.running = False
self._create_redirectors()
self.reap_processes()
self.manage_processes()
Expand All @@ -489,6 +504,29 @@ def start(self):
logger.info('%s started' % self.name)
self.notify_event("start", {"time": time.time()})

@util.debuglog
def run(self):
"""Run.
"""
if not self.stopped and not self.running:
return

self.stopped = False
self.running = False
self._create_redirectors()
self.reap_processes()
self.manage_processes()
self.running = True

if self.stdout_redirector is not None:
self.stdout_redirector.start()

if self.stderr_redirector is not None:
self.stderr_redirector.start()

logger.info('%s run' % self.name)
self.notify_event("run", {"time": time.time()})

@util.debuglog
def restart(self):
"""Restart.
Expand Down