From 7fca067d5f4b75a6d52111bd4a37401fc3285ba8 Mon Sep 17 00:00:00 2001 From: Sander Hoentjen Date: Thu, 5 Jul 2018 16:42:16 +0200 Subject: [PATCH] Add support for socket groups, so different sockets can be connected to different watchers This can be useful if you use circus for socket activation of multiple daemons, connected to different sockets. --- circus/arbiter.py | 21 +++++++++++++++++---- circus/sockets.py | 11 ++++++++--- circus/watcher.py | 17 +++++++++++++---- 3 files changed, 38 insertions(+), 11 deletions(-) diff --git a/circus/arbiter.py b/circus/arbiter.py index 1b98f9869..4d7d548d7 100644 --- a/circus/arbiter.py +++ b/circus/arbiter.py @@ -2,6 +2,7 @@ import logging import os import gc +import operator from circus.fixed_threading import Thread, get_ident import sys import select @@ -473,6 +474,16 @@ def load_from_config(cls, config_file, loop=None): def iter_watchers(self, reverse=True): return sorted(self.watchers, key=lambda a: a.priority, reverse=reverse) + def iter_active_watchers(self, fd, reverse=True): + a_watchers = [] + for watcher in self.watchers: + wanted_sockets = [name for name, sock in watcher.sockets.items() + if sock.fileno() == fd] + if len(set(watcher.sockets).intersection(set(wanted_sockets))) > 0: + a_watchers.append(watcher) + return sorted(a_watchers, key=operator.attrgetter('priority'), + reverse=reverse) + @debuglog def initialize(self): # set process title @@ -647,9 +658,9 @@ def manage_watchers(self): if need_on_demand: sockets = [x.fileno() for x in self.sockets.values()] rlist, wlist, xlist = select.select(sockets, [], [], 0) - if rlist: + for r in rlist: self.socket_event = True - self._start_watchers() + self._start_watchers(watcher_fd=r) self.socket_event = False @synchronized("arbiter_reload") @@ -743,8 +754,10 @@ def start_watchers(self, watcher_iter_func=None): yield self._start_watchers(watcher_iter_func=watcher_iter_func) @gen.coroutine - def _start_watchers(self, watcher_iter_func=None): - if watcher_iter_func is None: + def _start_watchers(self, watcher_iter_func=None, watcher_fd=None): + if watcher_fd is not None: + watchers = self.iter_active_watchers(watcher_fd) + elif watcher_iter_func is None: watchers = self.iter_watchers() else: watchers = watcher_iter_func() diff --git a/circus/sockets.py b/circus/sockets.py index d80c53179..880db0c8a 100644 --- a/circus/sockets.py +++ b/circus/sockets.py @@ -37,7 +37,8 @@ class PapaSocketProxy(object): def __init__(self, name='', host=None, port=None, family=None, type=None, proto=None, backlog=None, path=None, umask=None, replace=None, - interface=None, so_reuseport=False, blocking=False): + interface=None, so_reuseport=False, blocking=False, + group=None): if path is not None: if not hasattr(socket, 'AF_UNIX'): raise NotImplementedError("AF_UNIX not supported on this" @@ -73,6 +74,7 @@ def __init__(self, name='', host=None, port=None, self.so_reuseport = papa_socket.get('so_reuseport', False) self._fileno = papa_socket.get('fileno') self.use_papa = True + self.group = group if log_differences: differences = [] if host != self.host: @@ -122,7 +124,8 @@ class CircusSocket(socket.socket): def __init__(self, name='', host='localhost', port=8080, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, backlog=2048, path=None, umask=None, replace=False, - interface=None, so_reuseport=False, blocking=False): + interface=None, so_reuseport=False, blocking=False, + group=None): if path is not None: if not hasattr(socket, 'AF_UNIX'): raise NotImplementedError("AF_UNIX not supported on this" @@ -138,6 +141,7 @@ def __init__(self, name='', host='localhost', port=8080, self.umask = umask self.replace = replace self.use_papa = False + self.group = group if hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: self.host = self.port = None @@ -246,7 +250,8 @@ def load_from_config(cls, config): 'so_reuseport': to_bool(config.get('so_reuseport')), 'umask': int(config.get('umask', 8)), 'replace': config.get('replace'), - 'blocking': to_bool(config.get('blocking'))} + 'blocking': to_bool(config.get('blocking')), + 'group': config.get('group')} use_papa = to_bool(config.get('use_papa')) and papa is not None proto_name = config.get('proto') if proto_name is not None: diff --git a/circus/watcher.py b/circus/watcher.py index 7a810f308..8c6c8e273 100644 --- a/circus/watcher.py +++ b/circus/watcher.py @@ -138,6 +138,10 @@ class Watcher(object): descriptors, thus can reuse the sockets opened by circusd. (default: False) + - **socket_group** -- Name of the group that sockets need to be in, to be + attached to this watcher. If unset, all sockets can be used. + (default: None) + - **on_demand** -- If True, the processes will be started only at the first connection to the socket (default: False) @@ -203,8 +207,8 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0., graceful_timeout=30.0, prereload_fn=None, rlimits=None, executable=None, stdout_stream=None, stderr_stream=None, priority=0, loop=None, singleton=False, use_sockets=False, - copy_env=False, copy_path=False, max_age=0, - max_age_variance=30, hooks=None, respawn=True, + socket_group=None, copy_env=False, copy_path=False, + max_age=0, max_age_variance=30, hooks=None, respawn=True, autostart=True, on_demand=False, virtualenv=None, stdin_socket=None, close_child_stdin=True, close_child_stdout=False, @@ -212,6 +216,7 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0., use_papa=False, **options): self.name = name self.use_sockets = use_sockets + self.socket_group = socket_group self.on_demand = on_demand self.res_name = name.lower().replace(" ", "_") self.numprocesses = int(numprocesses) @@ -271,7 +276,7 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0., "stop_children", "shell", "shell_args", "env", "max_retry", "cmd", "args", "respawn", "graceful_timeout", "executable", "use_sockets", - "priority", "copy_env", "singleton", + "socket_group", "priority", "copy_env", "singleton", "stdout_stream_conf", "on_demand", "stderr_stream_conf", "max_age", "max_age_variance", "close_child_stdin", "close_child_stdout", @@ -418,7 +423,11 @@ def load_from_config(cls, config): @util.debuglog def initialize(self, evpub_socket, sockets, arbiter): self.evpub_socket = evpub_socket - self.sockets = sockets + if self.socket_group is None: + self.sockets = sockets + else: + self.sockets = {name: socket for (name, socket) in sockets.items() + if socket.group == self.socket_group} self.arbiter = arbiter def __len__(self):