Skip to content

Commit

Permalink
Fixes from code review for supervisor process manager
Browse files Browse the repository at this point in the history
  • Loading branch information
ondj committed Feb 11, 2019
1 parent 99157a9 commit 0233cf1
Showing 1 changed file with 54 additions and 23 deletions.
77 changes: 54 additions & 23 deletions intelmq/bin/intelmqctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import xmlrpc.client
import http.client
import socket
import distutils.version
import getpass

import pkg_resources
import psutil
Expand Down Expand Up @@ -372,14 +374,21 @@ class RpcFaults:

class ProcessState:
STOPPED = 0
STARTING = 10 # running
RUNNING = 20 # running
BACKOFF = 30 # running
STARTING = 10
RUNNING = 20
BACKOFF = 30
STOPPING = 40
EXITED = 100
FATAL = 200
UNKNOWN = 1000

@staticmethod
def is_running(state: int) -> bool:
return state in (
SupervisorProcessManager.ProcessState.STARTING,
SupervisorProcessManager.ProcessState.RUNNING,
SupervisorProcessManager.ProcessState.BACKOFF)

DEFAULT_SOCKET_PATH = "/var/run/supervisor.sock"
SUPERVISOR_GROUP = "intelmq"
__supervisor_xmlrpc = None
Expand Down Expand Up @@ -420,14 +429,14 @@ def bot_run(self, bot_id, run_subcommand=None, console_type=None, message_action

return retval

def bot_start(self, bot_id: str, getstatus=True):
def bot_start(self, bot_id: str, getstatus: bool = True):
state = self._get_process_state(bot_id)
if state is not None:
if state == self.ProcessState.RUNNING:
log_bot_message("running", bot_id)
return "running"

elif state not in (self.ProcessState.STARTING, self.ProcessState.BACKOFF):
elif not self.ProcessState.is_running(state):
self._remove_bot(bot_id)

log_bot_message("starting", bot_id)
Expand All @@ -436,7 +445,7 @@ def bot_start(self, bot_id: str, getstatus=True):
if getstatus:
return self.bot_status(bot_id)

def bot_stop(self, bot_id: str, getstatus=True):
def bot_stop(self, bot_id: str, getstatus: bool = True):
state = self._get_process_state(bot_id)
if state is None:
if not self.__controller._is_enabled(bot_id):
Expand All @@ -446,7 +455,7 @@ def bot_stop(self, bot_id: str, getstatus=True):
log_bot_error("stopped", bot_id)
return "stopped"

if state not in (self.ProcessState.RUNNING, self.ProcessState.STARTING, self.ProcessState.BACKOFF):
if not self.ProcessState.is_running(state):
self._remove_bot(bot_id)
log_bot_error("stopped", bot_id)
return "stopped"
Expand All @@ -459,7 +468,7 @@ def bot_stop(self, bot_id: str, getstatus=True):
if getstatus:
return self.bot_status(bot_id)

def bot_reload(self, bot_id: str, getstatus=True):
def bot_reload(self, bot_id: str, getstatus: bool = True):
state = self._get_process_state(bot_id)
if state is None:
if not self.__controller._is_enabled(bot_id):
Expand All @@ -469,13 +478,21 @@ def bot_reload(self, bot_id: str, getstatus=True):
log_bot_error("stopped", bot_id)
return "stopped"

if state not in (self.ProcessState.RUNNING, self.ProcessState.STARTING, self.ProcessState.BACKOFF):
if not self.ProcessState.is_running(state):
self._remove_bot(bot_id)
log_bot_error("stopped", bot_id)
return "stopped"

log_bot_message("reloading", bot_id)
self._get_supervisor().supervisor.signalProcess(self._process_name(bot_id), "HUP")

try:
self._get_supervisor().supervisor.signalProcess(self._process_name(bot_id), "HUP")
except xmlrpc.client.Fault as e:
if e.faultCode == self.RpcFaults.UNKNOWN_METHOD:
self._abort("Supervisor does not support signalProcess method, that was added in supervisor 3.2.0. "
"Reloading bots will not work.")
else:
raise e

if getstatus:
return self.bot_status(bot_id)
Expand Down Expand Up @@ -507,23 +524,23 @@ def bot_status(self, bot_id: str) -> str:
log_bot_message("stopped", bot_id)
return "stopped"

def _create_and_start_bot(self, bot_id: str):
def _create_and_start_bot(self, bot_id: str) -> None:
module = self.__runtime_configuration[bot_id]["module"]
cmdargs = [module, bot_id]
cmdargs = (module, bot_id)

self._get_supervisor().twiddler.addProgramToGroup(self.SUPERVISOR_GROUP, bot_id, {
"command": " ".join(cmdargs),
"stopsignal": "INT",
})

def _remove_bot(self, bot_id: str):
def _remove_bot(self, bot_id: str) -> None:
self._get_supervisor().twiddler.removeProcessFromGroup(self.SUPERVISOR_GROUP, bot_id)

def _get_process_state(self, bot_id: str):
try:
return self._get_supervisor().supervisor.getProcessInfo(self._process_name(bot_id))["state"]
except xmlrpc.client.Fault as e:
if e.faultCode == self.RpcFaults.BAD_NAME:
if e.faultCode == self.RpcFaults.BAD_NAME: # Process does not exists
return None
raise

Expand All @@ -545,39 +562,53 @@ def make_connection(self, host):
socket_path = os.environ.get("SUPERVISOR_SOCKET", self.DEFAULT_SOCKET_PATH)

if not os.path.exists(socket_path):
raise Exception("Socket '{}' does not exists. Is supervisor running?".format(socket_path))
self._abort("Socket '{}' does not exists. Is supervisor running?".format(socket_path))

if not os.access(socket_path, os.W_OK):
raise Exception("Socket '{}' is not accessible. Do you have write permission?".format(socket_path))
current_user = getpass.getuser()
self._abort("Socket '{}' is not writable. "
"Has user '{}' write permission?".format(socket_path, current_user))

self.__supervisor_xmlrpc = xmlrpc.client.ServerProxy(
"http://none",
transport=UnixStreamTransport(socket_path)
)

self.__logger.debug("Connected to supervisor '{}' (API version {})".format(
supervisor_version = self.__supervisor_xmlrpc.supervisor.getSupervisorVersion()
self.__logger.debug("Connected to supervisor {} named '{}' (API version {})".format(
supervisor_version,
self.__supervisor_xmlrpc.supervisor.getIdentification(),
self.__supervisor_xmlrpc.supervisor.getAPIVersion()
))

supervisor_state = self.__supervisor_xmlrpc.supervisor.getState()
if distutils.version.StrictVersion(supervisor_version) < distutils.version.StrictVersion("3.2.0"):
self.__logger.warning("Current supervisor version is supported, but reloading bots will not work. "
"Please upgrade to version 3.2.0 or higher.")

if supervisor_state["statename"] != "RUNNING":
raise Exception("Unexpected supervisor state {}".format(supervisor_state["statename"]))
supervisor_state = self.__supervisor_xmlrpc.supervisor.getState()["statename"]
if supervisor_state != "RUNNING":
raise Exception("Unexpected supervisor state {}".format(supervisor_state))

try:
self.__supervisor_xmlrpc.twiddler.getAPIVersion()
except xmlrpc.client.Fault:
raise Exception("Twiddler is not supported. Is Twiddler for supervisor installed and enabled?")
except xmlrpc.client.Fault as e:
if e.faultCode == self.RpcFaults.UNKNOWN_METHOD:
self._abort("Twiddler is not supported. Is Twiddler for supervisor installed and enabled?")
else:
raise e

if self.SUPERVISOR_GROUP not in self.__supervisor_xmlrpc.twiddler.getGroupNames():
raise Exception("Supervisor does not contains process group {}. It must be created manually.")
self._abort("Supervisor`s process group '{}' is not defined. "
"It must be created manually in supervisor config.".format(self.SUPERVISOR_GROUP))

return self.__supervisor_xmlrpc

def _process_name(self, bot_id: str) -> str:
return "{}:{}".format(self.SUPERVISOR_GROUP, bot_id)

def _abort(self, message: str):
self.__controller.abort(message)


PROCESS_MANAGER = {'intelmq': IntelMQProcessManager, 'supervisor': SupervisorProcessManager}

Expand Down

0 comments on commit 0233cf1

Please sign in to comment.