Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/pr/1360' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
Sebastian Wagner committed Apr 9, 2019
2 parents 9eed982 + 0233cf1 commit 6e44b35
Showing 1 changed file with 265 additions and 1 deletion.
266 changes: 265 additions & 1 deletion intelmq/bin/intelmqctl.py
Expand Up @@ -12,6 +12,11 @@
import sys
import time
from collections import OrderedDict
import xmlrpc.client
import http.client
import socket
import distutils.version
import getpass

import pkg_resources
import psutil
Expand Down Expand Up @@ -349,7 +354,266 @@ def __status_process(self, pid, module, bot_id):
raise


PROCESS_MANAGER = {'intelmq': IntelMQProcessManager}
class SupervisorProcessManager:
class RpcFaults:
UNKNOWN_METHOD = 1
INCORRECT_PARAMETERS = 2
BAD_ARGUMENTS = 3
SIGNATURE_UNSUPPORTED = 4
SHUTDOWN_STATE = 6
BAD_NAME = 10
BAD_SIGNAL = 11
NO_FILE = 20
NOT_EXECUTABLE = 21
FAILED = 30
ABNORMAL_TERMINATION = 40
SPAWN_ERROR = 50
ALREADY_STARTED = 60
NOT_RUNNING = 70
SUCCESS = 80
ALREADY_ADDED = 90
STILL_RUNNING = 91
CANT_REREAD = 92

class ProcessState:
STOPPED = 0
STARTING = 10
RUNNING = 20
BACKOFF = 30
STOPPING = 40
EXITED = 100
FATAL = 200
UNKNOWN = 1000

@staticmethod
def is_running(state: int) -> bool:
return state in (
SupervisorProcessManager.ProcessState.STARTING,
SupervisorProcessManager.ProcessState.RUNNING,
SupervisorProcessManager.ProcessState.BACKOFF)

DEFAULT_SOCKET_PATH = "/var/run/supervisor.sock"
SUPERVISOR_GROUP = "intelmq"
__supervisor_xmlrpc = None

def __init__(self, runtime_configuration: dict, logger: logging.Logger, controller):
self.__runtime_configuration = runtime_configuration
self.__logger = logger
self.__controller = controller

def bot_run(self, bot_id, run_subcommand=None, console_type=None, message_action_kind=None, dryrun=None, msg=None,
show_sent=None, loglevel=None):
paused = False
state = self._get_process_state(bot_id)
if state in (self.ProcessState.STARTING, self.ProcessState.RUNNING, self.ProcessState.BACKOFF):
self.__logger.warning("Main instance of the bot is running in the background and will be stopped; "
"when finished, we try to relaunch it again. "
"You may want to launch: 'intelmqctl stop {}' to prevent this message."
.format(bot_id))
paused = True
self.bot_stop(bot_id)

log_bot_message("starting", bot_id)

try:
BotDebugger(self.__runtime_configuration[bot_id], bot_id, run_subcommand,
console_type, message_action_kind, dryrun, msg, show_sent,
loglevel=loglevel)
retval = 0
except KeyboardInterrupt:
print("Keyboard interrupt.")
retval = 0
except SystemExit as exc:
print("Bot exited with code %s." % exc.code)
retval = exc.code

if paused:
self.bot_start(bot_id)

return retval

def bot_start(self, bot_id: str, getstatus: bool = True):
state = self._get_process_state(bot_id)
if state is not None:
if state == self.ProcessState.RUNNING:
log_bot_message("running", bot_id)
return "running"

elif not self.ProcessState.is_running(state):
self._remove_bot(bot_id)

log_bot_message("starting", bot_id)
self._create_and_start_bot(bot_id)

if getstatus:
return self.bot_status(bot_id)

def bot_stop(self, bot_id: str, getstatus: bool = True):
state = self._get_process_state(bot_id)
if state is None:
if not self.__controller._is_enabled(bot_id):
log_bot_message("disabled", bot_id)
return "disabled"
else:
log_bot_error("stopped", bot_id)
return "stopped"

if not self.ProcessState.is_running(state):
self._remove_bot(bot_id)
log_bot_error("stopped", bot_id)
return "stopped"

log_bot_message("stopping", bot_id)

self._get_supervisor().supervisor.stopProcess(self._process_name(bot_id))
self._remove_bot(bot_id)

if getstatus:
return self.bot_status(bot_id)

def bot_reload(self, bot_id: str, getstatus: bool = True):
state = self._get_process_state(bot_id)
if state is None:
if not self.__controller._is_enabled(bot_id):
log_bot_message("disabled", bot_id)
return "disabled"
else:
log_bot_error("stopped", bot_id)
return "stopped"

if not self.ProcessState.is_running(state):
self._remove_bot(bot_id)
log_bot_error("stopped", bot_id)
return "stopped"

log_bot_message("reloading", bot_id)

try:
self._get_supervisor().supervisor.signalProcess(self._process_name(bot_id), "HUP")
except xmlrpc.client.Fault as e:
if e.faultCode == self.RpcFaults.UNKNOWN_METHOD:
self._abort("Supervisor does not support signalProcess method, that was added in supervisor 3.2.0. "
"Reloading bots will not work.")
else:
raise e

if getstatus:
return self.bot_status(bot_id)

def bot_status(self, bot_id: str) -> str:
state = self._get_process_state(bot_id)
if state is None:
if not self.__controller._is_enabled(bot_id):
log_bot_message("disabled", bot_id)
return "disabled"
else:
log_bot_message("stopped", bot_id)
return "stopped"

if state == self.ProcessState.STARTING:
# If process is still starting, try check it later
time.sleep(0.1)
return self.bot_status(bot_id)

elif state == self.ProcessState.RUNNING:
log_bot_message("running", bot_id)
return "running"

elif state == self.ProcessState.STOPPING:
log_bot_error("stopping", bot_id)
return "stopping"

else:
log_bot_message("stopped", bot_id)
return "stopped"

def _create_and_start_bot(self, bot_id: str) -> None:
module = self.__runtime_configuration[bot_id]["module"]
cmdargs = (module, bot_id)

self._get_supervisor().twiddler.addProgramToGroup(self.SUPERVISOR_GROUP, bot_id, {
"command": " ".join(cmdargs),
"stopsignal": "INT",
})

def _remove_bot(self, bot_id: str) -> None:
self._get_supervisor().twiddler.removeProcessFromGroup(self.SUPERVISOR_GROUP, bot_id)

def _get_process_state(self, bot_id: str):
try:
return self._get_supervisor().supervisor.getProcessInfo(self._process_name(bot_id))["state"]
except xmlrpc.client.Fault as e:
if e.faultCode == self.RpcFaults.BAD_NAME: # Process does not exists
return None
raise

def _get_supervisor(self) -> xmlrpc.client.ServerProxy:
class UnixStreamHTTPConnection(http.client.HTTPConnection):
def connect(self):
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.connect(self.host)

class UnixStreamTransport(xmlrpc.client.Transport, object):
def __init__(self, socket_path):
self.socket_path = socket_path
super(UnixStreamTransport, self).__init__()

def make_connection(self, host):
return UnixStreamHTTPConnection(self.socket_path)

if not self.__supervisor_xmlrpc:
socket_path = os.environ.get("SUPERVISOR_SOCKET", self.DEFAULT_SOCKET_PATH)

if not os.path.exists(socket_path):
self._abort("Socket '{}' does not exists. Is supervisor running?".format(socket_path))

if not os.access(socket_path, os.W_OK):
current_user = getpass.getuser()
self._abort("Socket '{}' is not writable. "
"Has user '{}' write permission?".format(socket_path, current_user))

self.__supervisor_xmlrpc = xmlrpc.client.ServerProxy(
"http://none",
transport=UnixStreamTransport(socket_path)
)

supervisor_version = self.__supervisor_xmlrpc.supervisor.getSupervisorVersion()
self.__logger.debug("Connected to supervisor {} named '{}' (API version {})".format(
supervisor_version,
self.__supervisor_xmlrpc.supervisor.getIdentification(),
self.__supervisor_xmlrpc.supervisor.getAPIVersion()
))

if distutils.version.StrictVersion(supervisor_version) < distutils.version.StrictVersion("3.2.0"):
self.__logger.warning("Current supervisor version is supported, but reloading bots will not work. "
"Please upgrade to version 3.2.0 or higher.")

supervisor_state = self.__supervisor_xmlrpc.supervisor.getState()["statename"]
if supervisor_state != "RUNNING":
raise Exception("Unexpected supervisor state {}".format(supervisor_state))

try:
self.__supervisor_xmlrpc.twiddler.getAPIVersion()
except xmlrpc.client.Fault as e:
if e.faultCode == self.RpcFaults.UNKNOWN_METHOD:
self._abort("Twiddler is not supported. Is Twiddler for supervisor installed and enabled?")
else:
raise e

if self.SUPERVISOR_GROUP not in self.__supervisor_xmlrpc.twiddler.getGroupNames():
self._abort("Supervisor`s process group '{}' is not defined. "
"It must be created manually in supervisor config.".format(self.SUPERVISOR_GROUP))

return self.__supervisor_xmlrpc

def _process_name(self, bot_id: str) -> str:
return "{}:{}".format(self.SUPERVISOR_GROUP, bot_id)

def _abort(self, message: str):
self.__controller.abort(message)


PROCESS_MANAGER = {'intelmq': IntelMQProcessManager, 'supervisor': SupervisorProcessManager}


class IntelMQController():
Expand Down

0 comments on commit 6e44b35

Please sign in to comment.