From 99157a92aacd4e71807ad49840297fe08f6886b5 Mon Sep 17 00:00:00 2001 From: ondj <19149300+ondj@users.noreply.github.com> Date: Tue, 4 Dec 2018 14:07:55 +0100 Subject: [PATCH 1/2] Added support for supervisor as process manager --- intelmq/bin/intelmqctl.py | 235 +++++++++++++++++++++++++++++++++++++- 1 file changed, 234 insertions(+), 1 deletion(-) diff --git a/intelmq/bin/intelmqctl.py b/intelmq/bin/intelmqctl.py index baa1c2573..fc7c6705d 100644 --- a/intelmq/bin/intelmqctl.py +++ b/intelmq/bin/intelmqctl.py @@ -12,6 +12,9 @@ import sys import time from collections import OrderedDict +import xmlrpc.client +import http.client +import socket import pkg_resources import psutil @@ -346,7 +349,237 @@ def __status_process(self, pid, module, bot_id): raise -PROCESS_MANAGER = {'intelmq': IntelMQProcessManager} +class SupervisorProcessManager: + class RpcFaults: + UNKNOWN_METHOD = 1 + INCORRECT_PARAMETERS = 2 + BAD_ARGUMENTS = 3 + SIGNATURE_UNSUPPORTED = 4 + SHUTDOWN_STATE = 6 + BAD_NAME = 10 + BAD_SIGNAL = 11 + NO_FILE = 20 + NOT_EXECUTABLE = 21 + FAILED = 30 + ABNORMAL_TERMINATION = 40 + SPAWN_ERROR = 50 + ALREADY_STARTED = 60 + NOT_RUNNING = 70 + SUCCESS = 80 + ALREADY_ADDED = 90 + STILL_RUNNING = 91 + CANT_REREAD = 92 + + class ProcessState: + STOPPED = 0 + STARTING = 10 # running + RUNNING = 20 # running + BACKOFF = 30 # running + STOPPING = 40 + EXITED = 100 + FATAL = 200 + UNKNOWN = 1000 + + DEFAULT_SOCKET_PATH = "/var/run/supervisor.sock" + SUPERVISOR_GROUP = "intelmq" + __supervisor_xmlrpc = None + + def __init__(self, runtime_configuration: dict, logger: logging.Logger, controller): + self.__runtime_configuration = runtime_configuration + self.__logger = logger + self.__controller = controller + + def bot_run(self, bot_id, run_subcommand=None, console_type=None, message_action_kind=None, dryrun=None, msg=None, + show_sent=None, loglevel=None): + paused = False + state = self._get_process_state(bot_id) + if state in (self.ProcessState.STARTING, self.ProcessState.RUNNING, self.ProcessState.BACKOFF): + self.__logger.warning("Main instance of the bot is running in the background and will be stopped; " + "when finished, we try to relaunch it again. " + "You may want to launch: 'intelmqctl stop {}' to prevent this message." + .format(bot_id)) + paused = True + self.bot_stop(bot_id) + + log_bot_message("starting", bot_id) + + try: + BotDebugger(self.__runtime_configuration[bot_id], bot_id, run_subcommand, + console_type, message_action_kind, dryrun, msg, show_sent, + loglevel=loglevel) + retval = 0 + except KeyboardInterrupt: + print("Keyboard interrupt.") + retval = 0 + except SystemExit as exc: + print("Bot exited with code %s." % exc.code) + retval = exc.code + + if paused: + self.bot_start(bot_id) + + return retval + + def bot_start(self, bot_id: str, getstatus=True): + state = self._get_process_state(bot_id) + if state is not None: + if state == self.ProcessState.RUNNING: + log_bot_message("running", bot_id) + return "running" + + elif state not in (self.ProcessState.STARTING, self.ProcessState.BACKOFF): + self._remove_bot(bot_id) + + log_bot_message("starting", bot_id) + self._create_and_start_bot(bot_id) + + if getstatus: + return self.bot_status(bot_id) + + def bot_stop(self, bot_id: str, getstatus=True): + state = self._get_process_state(bot_id) + if state is None: + if not self.__controller._is_enabled(bot_id): + log_bot_message("disabled", bot_id) + return "disabled" + else: + log_bot_error("stopped", bot_id) + return "stopped" + + if state not in (self.ProcessState.RUNNING, self.ProcessState.STARTING, self.ProcessState.BACKOFF): + self._remove_bot(bot_id) + log_bot_error("stopped", bot_id) + return "stopped" + + log_bot_message("stopping", bot_id) + + self._get_supervisor().supervisor.stopProcess(self._process_name(bot_id)) + self._remove_bot(bot_id) + + if getstatus: + return self.bot_status(bot_id) + + def bot_reload(self, bot_id: str, getstatus=True): + state = self._get_process_state(bot_id) + if state is None: + if not self.__controller._is_enabled(bot_id): + log_bot_message("disabled", bot_id) + return "disabled" + else: + log_bot_error("stopped", bot_id) + return "stopped" + + if state not in (self.ProcessState.RUNNING, self.ProcessState.STARTING, self.ProcessState.BACKOFF): + self._remove_bot(bot_id) + log_bot_error("stopped", bot_id) + return "stopped" + + log_bot_message("reloading", bot_id) + self._get_supervisor().supervisor.signalProcess(self._process_name(bot_id), "HUP") + + if getstatus: + return self.bot_status(bot_id) + + def bot_status(self, bot_id: str) -> str: + state = self._get_process_state(bot_id) + if state is None: + if not self.__controller._is_enabled(bot_id): + log_bot_message("disabled", bot_id) + return "disabled" + else: + log_bot_message("stopped", bot_id) + return "stopped" + + if state == self.ProcessState.STARTING: + # If process is still starting, try check it later + time.sleep(0.1) + return self.bot_status(bot_id) + + elif state == self.ProcessState.RUNNING: + log_bot_message("running", bot_id) + return "running" + + elif state == self.ProcessState.STOPPING: + log_bot_error("stopping", bot_id) + return "stopping" + + else: + log_bot_message("stopped", bot_id) + return "stopped" + + def _create_and_start_bot(self, bot_id: str): + module = self.__runtime_configuration[bot_id]["module"] + cmdargs = [module, bot_id] + + self._get_supervisor().twiddler.addProgramToGroup(self.SUPERVISOR_GROUP, bot_id, { + "command": " ".join(cmdargs), + "stopsignal": "INT", + }) + + def _remove_bot(self, bot_id: str): + self._get_supervisor().twiddler.removeProcessFromGroup(self.SUPERVISOR_GROUP, bot_id) + + def _get_process_state(self, bot_id: str): + try: + return self._get_supervisor().supervisor.getProcessInfo(self._process_name(bot_id))["state"] + except xmlrpc.client.Fault as e: + if e.faultCode == self.RpcFaults.BAD_NAME: + return None + raise + + def _get_supervisor(self) -> xmlrpc.client.ServerProxy: + class UnixStreamHTTPConnection(http.client.HTTPConnection): + def connect(self): + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock.connect(self.host) + + class UnixStreamTransport(xmlrpc.client.Transport, object): + def __init__(self, socket_path): + self.socket_path = socket_path + super(UnixStreamTransport, self).__init__() + + def make_connection(self, host): + return UnixStreamHTTPConnection(self.socket_path) + + if not self.__supervisor_xmlrpc: + socket_path = os.environ.get("SUPERVISOR_SOCKET", self.DEFAULT_SOCKET_PATH) + + if not os.path.exists(socket_path): + raise Exception("Socket '{}' does not exists. Is supervisor running?".format(socket_path)) + + if not os.access(socket_path, os.W_OK): + raise Exception("Socket '{}' is not accessible. Do you have write permission?".format(socket_path)) + + self.__supervisor_xmlrpc = xmlrpc.client.ServerProxy( + "http://none", + transport=UnixStreamTransport(socket_path) + ) + + self.__logger.debug("Connected to supervisor '{}' (API version {})".format( + self.__supervisor_xmlrpc.supervisor.getIdentification(), + self.__supervisor_xmlrpc.supervisor.getAPIVersion() + )) + + supervisor_state = self.__supervisor_xmlrpc.supervisor.getState() + + if supervisor_state["statename"] != "RUNNING": + raise Exception("Unexpected supervisor state {}".format(supervisor_state["statename"])) + + try: + self.__supervisor_xmlrpc.twiddler.getAPIVersion() + except xmlrpc.client.Fault: + raise Exception("Twiddler is not supported. Is Twiddler for supervisor installed and enabled?") + + if self.SUPERVISOR_GROUP not in self.__supervisor_xmlrpc.twiddler.getGroupNames(): + raise Exception("Supervisor does not contains process group {}. It must be created manually.") + + return self.__supervisor_xmlrpc + + def _process_name(self, bot_id: str) -> str: + return "{}:{}".format(self.SUPERVISOR_GROUP, bot_id) + + +PROCESS_MANAGER = {'intelmq': IntelMQProcessManager, 'supervisor': SupervisorProcessManager} class IntelMQController(): From 0233cf1cd516f50d388ba6d6c1e897730af771e1 Mon Sep 17 00:00:00 2001 From: ondj <19149300+ondj@users.noreply.github.com> Date: Mon, 11 Feb 2019 14:49:49 +0100 Subject: [PATCH 2/2] Fixes from code review for supervisor process manager --- intelmq/bin/intelmqctl.py | 77 +++++++++++++++++++++++++++------------ 1 file changed, 54 insertions(+), 23 deletions(-) diff --git a/intelmq/bin/intelmqctl.py b/intelmq/bin/intelmqctl.py index fc7c6705d..d28cd29ae 100644 --- a/intelmq/bin/intelmqctl.py +++ b/intelmq/bin/intelmqctl.py @@ -15,6 +15,8 @@ import xmlrpc.client import http.client import socket +import distutils.version +import getpass import pkg_resources import psutil @@ -372,14 +374,21 @@ class RpcFaults: class ProcessState: STOPPED = 0 - STARTING = 10 # running - RUNNING = 20 # running - BACKOFF = 30 # running + STARTING = 10 + RUNNING = 20 + BACKOFF = 30 STOPPING = 40 EXITED = 100 FATAL = 200 UNKNOWN = 1000 + @staticmethod + def is_running(state: int) -> bool: + return state in ( + SupervisorProcessManager.ProcessState.STARTING, + SupervisorProcessManager.ProcessState.RUNNING, + SupervisorProcessManager.ProcessState.BACKOFF) + DEFAULT_SOCKET_PATH = "/var/run/supervisor.sock" SUPERVISOR_GROUP = "intelmq" __supervisor_xmlrpc = None @@ -420,14 +429,14 @@ def bot_run(self, bot_id, run_subcommand=None, console_type=None, message_action return retval - def bot_start(self, bot_id: str, getstatus=True): + def bot_start(self, bot_id: str, getstatus: bool = True): state = self._get_process_state(bot_id) if state is not None: if state == self.ProcessState.RUNNING: log_bot_message("running", bot_id) return "running" - elif state not in (self.ProcessState.STARTING, self.ProcessState.BACKOFF): + elif not self.ProcessState.is_running(state): self._remove_bot(bot_id) log_bot_message("starting", bot_id) @@ -436,7 +445,7 @@ def bot_start(self, bot_id: str, getstatus=True): if getstatus: return self.bot_status(bot_id) - def bot_stop(self, bot_id: str, getstatus=True): + def bot_stop(self, bot_id: str, getstatus: bool = True): state = self._get_process_state(bot_id) if state is None: if not self.__controller._is_enabled(bot_id): @@ -446,7 +455,7 @@ def bot_stop(self, bot_id: str, getstatus=True): log_bot_error("stopped", bot_id) return "stopped" - if state not in (self.ProcessState.RUNNING, self.ProcessState.STARTING, self.ProcessState.BACKOFF): + if not self.ProcessState.is_running(state): self._remove_bot(bot_id) log_bot_error("stopped", bot_id) return "stopped" @@ -459,7 +468,7 @@ def bot_stop(self, bot_id: str, getstatus=True): if getstatus: return self.bot_status(bot_id) - def bot_reload(self, bot_id: str, getstatus=True): + def bot_reload(self, bot_id: str, getstatus: bool = True): state = self._get_process_state(bot_id) if state is None: if not self.__controller._is_enabled(bot_id): @@ -469,13 +478,21 @@ def bot_reload(self, bot_id: str, getstatus=True): log_bot_error("stopped", bot_id) return "stopped" - if state not in (self.ProcessState.RUNNING, self.ProcessState.STARTING, self.ProcessState.BACKOFF): + if not self.ProcessState.is_running(state): self._remove_bot(bot_id) log_bot_error("stopped", bot_id) return "stopped" log_bot_message("reloading", bot_id) - self._get_supervisor().supervisor.signalProcess(self._process_name(bot_id), "HUP") + + try: + self._get_supervisor().supervisor.signalProcess(self._process_name(bot_id), "HUP") + except xmlrpc.client.Fault as e: + if e.faultCode == self.RpcFaults.UNKNOWN_METHOD: + self._abort("Supervisor does not support signalProcess method, that was added in supervisor 3.2.0. " + "Reloading bots will not work.") + else: + raise e if getstatus: return self.bot_status(bot_id) @@ -507,23 +524,23 @@ def bot_status(self, bot_id: str) -> str: log_bot_message("stopped", bot_id) return "stopped" - def _create_and_start_bot(self, bot_id: str): + def _create_and_start_bot(self, bot_id: str) -> None: module = self.__runtime_configuration[bot_id]["module"] - cmdargs = [module, bot_id] + cmdargs = (module, bot_id) self._get_supervisor().twiddler.addProgramToGroup(self.SUPERVISOR_GROUP, bot_id, { "command": " ".join(cmdargs), "stopsignal": "INT", }) - def _remove_bot(self, bot_id: str): + def _remove_bot(self, bot_id: str) -> None: self._get_supervisor().twiddler.removeProcessFromGroup(self.SUPERVISOR_GROUP, bot_id) def _get_process_state(self, bot_id: str): try: return self._get_supervisor().supervisor.getProcessInfo(self._process_name(bot_id))["state"] except xmlrpc.client.Fault as e: - if e.faultCode == self.RpcFaults.BAD_NAME: + if e.faultCode == self.RpcFaults.BAD_NAME: # Process does not exists return None raise @@ -545,39 +562,53 @@ def make_connection(self, host): socket_path = os.environ.get("SUPERVISOR_SOCKET", self.DEFAULT_SOCKET_PATH) if not os.path.exists(socket_path): - raise Exception("Socket '{}' does not exists. Is supervisor running?".format(socket_path)) + self._abort("Socket '{}' does not exists. Is supervisor running?".format(socket_path)) if not os.access(socket_path, os.W_OK): - raise Exception("Socket '{}' is not accessible. Do you have write permission?".format(socket_path)) + current_user = getpass.getuser() + self._abort("Socket '{}' is not writable. " + "Has user '{}' write permission?".format(socket_path, current_user)) self.__supervisor_xmlrpc = xmlrpc.client.ServerProxy( "http://none", transport=UnixStreamTransport(socket_path) ) - self.__logger.debug("Connected to supervisor '{}' (API version {})".format( + supervisor_version = self.__supervisor_xmlrpc.supervisor.getSupervisorVersion() + self.__logger.debug("Connected to supervisor {} named '{}' (API version {})".format( + supervisor_version, self.__supervisor_xmlrpc.supervisor.getIdentification(), self.__supervisor_xmlrpc.supervisor.getAPIVersion() )) - supervisor_state = self.__supervisor_xmlrpc.supervisor.getState() + if distutils.version.StrictVersion(supervisor_version) < distutils.version.StrictVersion("3.2.0"): + self.__logger.warning("Current supervisor version is supported, but reloading bots will not work. " + "Please upgrade to version 3.2.0 or higher.") - if supervisor_state["statename"] != "RUNNING": - raise Exception("Unexpected supervisor state {}".format(supervisor_state["statename"])) + supervisor_state = self.__supervisor_xmlrpc.supervisor.getState()["statename"] + if supervisor_state != "RUNNING": + raise Exception("Unexpected supervisor state {}".format(supervisor_state)) try: self.__supervisor_xmlrpc.twiddler.getAPIVersion() - except xmlrpc.client.Fault: - raise Exception("Twiddler is not supported. Is Twiddler for supervisor installed and enabled?") + except xmlrpc.client.Fault as e: + if e.faultCode == self.RpcFaults.UNKNOWN_METHOD: + self._abort("Twiddler is not supported. Is Twiddler for supervisor installed and enabled?") + else: + raise e if self.SUPERVISOR_GROUP not in self.__supervisor_xmlrpc.twiddler.getGroupNames(): - raise Exception("Supervisor does not contains process group {}. It must be created manually.") + self._abort("Supervisor`s process group '{}' is not defined. " + "It must be created manually in supervisor config.".format(self.SUPERVISOR_GROUP)) return self.__supervisor_xmlrpc def _process_name(self, bot_id: str) -> str: return "{}:{}".format(self.SUPERVISOR_GROUP, bot_id) + def _abort(self, message: str): + self.__controller.abort(message) + PROCESS_MANAGER = {'intelmq': IntelMQProcessManager, 'supervisor': SupervisorProcessManager}