Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for socket groups, so different sockets can be connected to different watchers #1076

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 17 additions & 4 deletions circus/arbiter.py
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import gc
import operator
from circus.fixed_threading import Thread, get_ident
import sys
import select
Expand Down Expand Up @@ -473,6 +474,16 @@ def load_from_config(cls, config_file, loop=None):
def iter_watchers(self, reverse=True):
return sorted(self.watchers, key=lambda a: a.priority, reverse=reverse)

def iter_active_watchers(self, fd, reverse=True):
a_watchers = []
for watcher in self.watchers:
wanted_sockets = [name for name, sock in watcher.sockets.items()
if sock.fileno() == fd]
if len(set(watcher.sockets).intersection(set(wanted_sockets))) > 0:
a_watchers.append(watcher)
return sorted(a_watchers, key=operator.attrgetter('priority'),
reverse=reverse)

@debuglog
def initialize(self):
# set process title
Expand Down Expand Up @@ -647,9 +658,9 @@ def manage_watchers(self):
if need_on_demand:
sockets = [x.fileno() for x in self.sockets.values()]
rlist, wlist, xlist = select.select(sockets, [], [], 0)
if rlist:
for r in rlist:
self.socket_event = True
self._start_watchers()
self._start_watchers(watcher_fd=r)
self.socket_event = False

@synchronized("arbiter_reload")
Expand Down Expand Up @@ -743,8 +754,10 @@ def start_watchers(self, watcher_iter_func=None):
yield self._start_watchers(watcher_iter_func=watcher_iter_func)

@gen.coroutine
def _start_watchers(self, watcher_iter_func=None):
if watcher_iter_func is None:
def _start_watchers(self, watcher_iter_func=None, watcher_fd=None):
if watcher_fd is not None:
watchers = self.iter_active_watchers(watcher_fd)
elif watcher_iter_func is None:
watchers = self.iter_watchers()
else:
watchers = watcher_iter_func()
Expand Down
11 changes: 8 additions & 3 deletions circus/sockets.py
Expand Up @@ -37,7 +37,8 @@ class PapaSocketProxy(object):
def __init__(self, name='', host=None, port=None,
family=None, type=None,
proto=None, backlog=None, path=None, umask=None, replace=None,
interface=None, so_reuseport=False, blocking=False):
interface=None, so_reuseport=False, blocking=False,
group=None):
if path is not None:
if not hasattr(socket, 'AF_UNIX'):
raise NotImplementedError("AF_UNIX not supported on this"
Expand Down Expand Up @@ -73,6 +74,7 @@ def __init__(self, name='', host=None, port=None,
self.so_reuseport = papa_socket.get('so_reuseport', False)
self._fileno = papa_socket.get('fileno')
self.use_papa = True
self.group = group
if log_differences:
differences = []
if host != self.host:
Expand Down Expand Up @@ -122,7 +124,8 @@ class CircusSocket(socket.socket):
def __init__(self, name='', host='localhost', port=8080,
family=socket.AF_INET, type=socket.SOCK_STREAM,
proto=0, backlog=2048, path=None, umask=None, replace=False,
interface=None, so_reuseport=False, blocking=False):
interface=None, so_reuseport=False, blocking=False,
group=None):
if path is not None:
if not hasattr(socket, 'AF_UNIX'):
raise NotImplementedError("AF_UNIX not supported on this"
Expand All @@ -138,6 +141,7 @@ def __init__(self, name='', host='localhost', port=8080,
self.umask = umask
self.replace = replace
self.use_papa = False
self.group = group

if hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
self.host = self.port = None
Expand Down Expand Up @@ -246,7 +250,8 @@ def load_from_config(cls, config):
'so_reuseport': to_bool(config.get('so_reuseport')),
'umask': int(config.get('umask', 8)),
'replace': config.get('replace'),
'blocking': to_bool(config.get('blocking'))}
'blocking': to_bool(config.get('blocking')),
'group': config.get('group')}
use_papa = to_bool(config.get('use_papa')) and papa is not None
proto_name = config.get('proto')
if proto_name is not None:
Expand Down
17 changes: 13 additions & 4 deletions circus/watcher.py
Expand Up @@ -138,6 +138,10 @@ class Watcher(object):
descriptors, thus can reuse the sockets opened by circusd.
(default: False)

- **socket_group** -- Name of the group that sockets need to be in, to be
attached to this watcher. If unset, all sockets can be used.
(default: None)

- **on_demand** -- If True, the processes will be started only
at the first connection to the socket
(default: False)
Expand Down Expand Up @@ -203,15 +207,16 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0.,
graceful_timeout=30.0, prereload_fn=None, rlimits=None,
executable=None, stdout_stream=None, stderr_stream=None,
priority=0, loop=None, singleton=False, use_sockets=False,
copy_env=False, copy_path=False, max_age=0,
max_age_variance=30, hooks=None, respawn=True,
socket_group=None, copy_env=False, copy_path=False,
max_age=0, max_age_variance=30, hooks=None, respawn=True,
autostart=True, on_demand=False, virtualenv=None,
stdin_socket=None, close_child_stdin=True,
close_child_stdout=False,
close_child_stderr=False, virtualenv_py_ver=None,
use_papa=False, **options):
self.name = name
self.use_sockets = use_sockets
self.socket_group = socket_group
self.on_demand = on_demand
self.res_name = name.lower().replace(" ", "_")
self.numprocesses = int(numprocesses)
Expand Down Expand Up @@ -271,7 +276,7 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0.,
"stop_children", "shell", "shell_args",
"env", "max_retry", "cmd", "args", "respawn",
"graceful_timeout", "executable", "use_sockets",
"priority", "copy_env", "singleton",
"socket_group", "priority", "copy_env", "singleton",
"stdout_stream_conf", "on_demand",
"stderr_stream_conf", "max_age", "max_age_variance",
"close_child_stdin", "close_child_stdout",
Expand Down Expand Up @@ -418,7 +423,11 @@ def load_from_config(cls, config):
@util.debuglog
def initialize(self, evpub_socket, sockets, arbiter):
self.evpub_socket = evpub_socket
self.sockets = sockets
if self.socket_group is None:
self.sockets = sockets
else:
self.sockets = {name: socket for (name, socket) in sockets.items()
if socket.group == self.socket_group}
self.arbiter = arbiter

def __len__(self):
Expand Down