diff --git a/intelmq/bin/intelmqctl.py b/intelmq/bin/intelmqctl.py index 26b7fa406..d69c0fe38 100644 --- a/intelmq/bin/intelmqctl.py +++ b/intelmq/bin/intelmqctl.py @@ -12,6 +12,11 @@ import sys import time from collections import OrderedDict +import xmlrpc.client +import http.client +import socket +import distutils.version +import getpass import pkg_resources import psutil @@ -349,7 +354,266 @@ def __status_process(self, pid, module, bot_id): raise -PROCESS_MANAGER = {'intelmq': IntelMQProcessManager} +class SupervisorProcessManager: + class RpcFaults: + UNKNOWN_METHOD = 1 + INCORRECT_PARAMETERS = 2 + BAD_ARGUMENTS = 3 + SIGNATURE_UNSUPPORTED = 4 + SHUTDOWN_STATE = 6 + BAD_NAME = 10 + BAD_SIGNAL = 11 + NO_FILE = 20 + NOT_EXECUTABLE = 21 + FAILED = 30 + ABNORMAL_TERMINATION = 40 + SPAWN_ERROR = 50 + ALREADY_STARTED = 60 + NOT_RUNNING = 70 + SUCCESS = 80 + ALREADY_ADDED = 90 + STILL_RUNNING = 91 + CANT_REREAD = 92 + + class ProcessState: + STOPPED = 0 + STARTING = 10 + RUNNING = 20 + BACKOFF = 30 + STOPPING = 40 + EXITED = 100 + FATAL = 200 + UNKNOWN = 1000 + + @staticmethod + def is_running(state: int) -> bool: + return state in ( + SupervisorProcessManager.ProcessState.STARTING, + SupervisorProcessManager.ProcessState.RUNNING, + SupervisorProcessManager.ProcessState.BACKOFF) + + DEFAULT_SOCKET_PATH = "/var/run/supervisor.sock" + SUPERVISOR_GROUP = "intelmq" + __supervisor_xmlrpc = None + + def __init__(self, runtime_configuration: dict, logger: logging.Logger, controller): + self.__runtime_configuration = runtime_configuration + self.__logger = logger + self.__controller = controller + + def bot_run(self, bot_id, run_subcommand=None, console_type=None, message_action_kind=None, dryrun=None, msg=None, + show_sent=None, loglevel=None): + paused = False + state = self._get_process_state(bot_id) + if state in (self.ProcessState.STARTING, self.ProcessState.RUNNING, self.ProcessState.BACKOFF): + self.__logger.warning("Main instance of the bot is running in the background and will be stopped; " + "when finished, we try to relaunch it again. " + "You may want to launch: 'intelmqctl stop {}' to prevent this message." + .format(bot_id)) + paused = True + self.bot_stop(bot_id) + + log_bot_message("starting", bot_id) + + try: + BotDebugger(self.__runtime_configuration[bot_id], bot_id, run_subcommand, + console_type, message_action_kind, dryrun, msg, show_sent, + loglevel=loglevel) + retval = 0 + except KeyboardInterrupt: + print("Keyboard interrupt.") + retval = 0 + except SystemExit as exc: + print("Bot exited with code %s." % exc.code) + retval = exc.code + + if paused: + self.bot_start(bot_id) + + return retval + + def bot_start(self, bot_id: str, getstatus: bool = True): + state = self._get_process_state(bot_id) + if state is not None: + if state == self.ProcessState.RUNNING: + log_bot_message("running", bot_id) + return "running" + + elif not self.ProcessState.is_running(state): + self._remove_bot(bot_id) + + log_bot_message("starting", bot_id) + self._create_and_start_bot(bot_id) + + if getstatus: + return self.bot_status(bot_id) + + def bot_stop(self, bot_id: str, getstatus: bool = True): + state = self._get_process_state(bot_id) + if state is None: + if not self.__controller._is_enabled(bot_id): + log_bot_message("disabled", bot_id) + return "disabled" + else: + log_bot_error("stopped", bot_id) + return "stopped" + + if not self.ProcessState.is_running(state): + self._remove_bot(bot_id) + log_bot_error("stopped", bot_id) + return "stopped" + + log_bot_message("stopping", bot_id) + + self._get_supervisor().supervisor.stopProcess(self._process_name(bot_id)) + self._remove_bot(bot_id) + + if getstatus: + return self.bot_status(bot_id) + + def bot_reload(self, bot_id: str, getstatus: bool = True): + state = self._get_process_state(bot_id) + if state is None: + if not self.__controller._is_enabled(bot_id): + log_bot_message("disabled", bot_id) + return "disabled" + else: + log_bot_error("stopped", bot_id) + return "stopped" + + if not self.ProcessState.is_running(state): + self._remove_bot(bot_id) + log_bot_error("stopped", bot_id) + return "stopped" + + log_bot_message("reloading", bot_id) + + try: + self._get_supervisor().supervisor.signalProcess(self._process_name(bot_id), "HUP") + except xmlrpc.client.Fault as e: + if e.faultCode == self.RpcFaults.UNKNOWN_METHOD: + self._abort("Supervisor does not support signalProcess method, that was added in supervisor 3.2.0. " + "Reloading bots will not work.") + else: + raise e + + if getstatus: + return self.bot_status(bot_id) + + def bot_status(self, bot_id: str) -> str: + state = self._get_process_state(bot_id) + if state is None: + if not self.__controller._is_enabled(bot_id): + log_bot_message("disabled", bot_id) + return "disabled" + else: + log_bot_message("stopped", bot_id) + return "stopped" + + if state == self.ProcessState.STARTING: + # If process is still starting, try check it later + time.sleep(0.1) + return self.bot_status(bot_id) + + elif state == self.ProcessState.RUNNING: + log_bot_message("running", bot_id) + return "running" + + elif state == self.ProcessState.STOPPING: + log_bot_error("stopping", bot_id) + return "stopping" + + else: + log_bot_message("stopped", bot_id) + return "stopped" + + def _create_and_start_bot(self, bot_id: str) -> None: + module = self.__runtime_configuration[bot_id]["module"] + cmdargs = (module, bot_id) + + self._get_supervisor().twiddler.addProgramToGroup(self.SUPERVISOR_GROUP, bot_id, { + "command": " ".join(cmdargs), + "stopsignal": "INT", + }) + + def _remove_bot(self, bot_id: str) -> None: + self._get_supervisor().twiddler.removeProcessFromGroup(self.SUPERVISOR_GROUP, bot_id) + + def _get_process_state(self, bot_id: str): + try: + return self._get_supervisor().supervisor.getProcessInfo(self._process_name(bot_id))["state"] + except xmlrpc.client.Fault as e: + if e.faultCode == self.RpcFaults.BAD_NAME: # Process does not exists + return None + raise + + def _get_supervisor(self) -> xmlrpc.client.ServerProxy: + class UnixStreamHTTPConnection(http.client.HTTPConnection): + def connect(self): + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock.connect(self.host) + + class UnixStreamTransport(xmlrpc.client.Transport, object): + def __init__(self, socket_path): + self.socket_path = socket_path + super(UnixStreamTransport, self).__init__() + + def make_connection(self, host): + return UnixStreamHTTPConnection(self.socket_path) + + if not self.__supervisor_xmlrpc: + socket_path = os.environ.get("SUPERVISOR_SOCKET", self.DEFAULT_SOCKET_PATH) + + if not os.path.exists(socket_path): + self._abort("Socket '{}' does not exists. Is supervisor running?".format(socket_path)) + + if not os.access(socket_path, os.W_OK): + current_user = getpass.getuser() + self._abort("Socket '{}' is not writable. " + "Has user '{}' write permission?".format(socket_path, current_user)) + + self.__supervisor_xmlrpc = xmlrpc.client.ServerProxy( + "http://none", + transport=UnixStreamTransport(socket_path) + ) + + supervisor_version = self.__supervisor_xmlrpc.supervisor.getSupervisorVersion() + self.__logger.debug("Connected to supervisor {} named '{}' (API version {})".format( + supervisor_version, + self.__supervisor_xmlrpc.supervisor.getIdentification(), + self.__supervisor_xmlrpc.supervisor.getAPIVersion() + )) + + if distutils.version.StrictVersion(supervisor_version) < distutils.version.StrictVersion("3.2.0"): + self.__logger.warning("Current supervisor version is supported, but reloading bots will not work. " + "Please upgrade to version 3.2.0 or higher.") + + supervisor_state = self.__supervisor_xmlrpc.supervisor.getState()["statename"] + if supervisor_state != "RUNNING": + raise Exception("Unexpected supervisor state {}".format(supervisor_state)) + + try: + self.__supervisor_xmlrpc.twiddler.getAPIVersion() + except xmlrpc.client.Fault as e: + if e.faultCode == self.RpcFaults.UNKNOWN_METHOD: + self._abort("Twiddler is not supported. Is Twiddler for supervisor installed and enabled?") + else: + raise e + + if self.SUPERVISOR_GROUP not in self.__supervisor_xmlrpc.twiddler.getGroupNames(): + self._abort("Supervisor`s process group '{}' is not defined. " + "It must be created manually in supervisor config.".format(self.SUPERVISOR_GROUP)) + + return self.__supervisor_xmlrpc + + def _process_name(self, bot_id: str) -> str: + return "{}:{}".format(self.SUPERVISOR_GROUP, bot_id) + + def _abort(self, message: str): + self.__controller.abort(message) + + +PROCESS_MANAGER = {'intelmq': IntelMQProcessManager, 'supervisor': SupervisorProcessManager} class IntelMQController():