Skip to content

Commit

Permalink
make advisory lock files for each service while start/stop sequences …
Browse files Browse the repository at this point in the history
…do run #24
  • Loading branch information
gdraheim committed Apr 21, 2018
1 parent bcc5a50 commit 01be1fd
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 42 deletions.
95 changes: 74 additions & 21 deletions files/docker/systemctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from __future__ import print_function

__copyright__ = "(C) 2016-2018 Guido U. Draheim, licensed under the EUPL"
__version__ = "1.1.2116"
__version__ = "1.2.2116"

import logging
logg = logging.getLogger("systemctl")
Expand All @@ -20,9 +20,11 @@
import socket
import tempfile
import datetime
import fcntl

if sys.version[0] == '2':
string_types = basestring
BlockingIOError = IOError
else:
string_types = str
xrange = range
Expand Down Expand Up @@ -71,7 +73,9 @@
DefaultMaximumTimeout = 200
InitLoopSleep = 5
ProcMaxDepth = 100
MaxLockWait = None # equals DefaultMaximumTimeout

_systemctl_lockfile = "/var/run/systemd/system"
_notify_socket_folder = "/var/run/systemd" # alias /run/systemd
_notify_socket_name = "notify" # NOTIFY_SOCKET="/var/run/systemd/notify"
_pid_file_folder = "/var/run"
Expand Down Expand Up @@ -446,6 +450,46 @@ def get_preset(self, unit):
return status
return None

## with waitlock(unit): self.start()
class waitlock:
def __init__(self, unit):
self.unit = unit # currently unused
self.opened = None
self.lockfolder = _notify_socket_folder
try:
folder = self.lockfolder
if not os.path.isdir(folder):
os.mkdir(folder)
except Exception as e:
logg.warning("oops, %s", e)
def __enter__(self):
try:
lockfile = os.path.join(self.lockfolder, str(self.unit or "global") + ".lock")
self.opened = os.open(lockfile, os.O_RDWR | os.O_CREAT, 0o600)
for attempt in xrange(int(MaxLockWait or DefaultMaximumTimeout)):
try:
fcntl.flock(self.opened, fcntl.LOCK_EX | fcntl.LOCK_NB)
os.write(self.opened, "{ 'systemctl': %s, 'unit': '%s' }\n" % (os.getpid(), self.unit))
logg.debug("holding %s", lockfile)
break
except BlockingIOError as e:
whom = os.read(os.opened, 4096)
os.lseek(os.openened, 0, os.SEEK_SET)
logg.info("(%s) systemctl locked by %s", attempt, whom)
time.sleep(1)
continue
except Exception as e:
logg.warning("oops %s, %s", str(type(e)), e)
def __exit__(self, type, value, traceback):
try:
os.lseek(self.opened, 0, os.SEEK_SET)
os.ftruncate(self.opened, 0)
fcntl.flock(self.opened, fcntl.LOCK_UN)
os.close(self.opened)
self.opened = None
except Exception as e:
logg.warning("oops, %s", e)

def subprocess_wait(cmd, env=None, check = False, shell=False):
# logg.warning("running = %s", cmd)
run = subprocess.Popen(cmd, shell=shell, env=env)
Expand Down Expand Up @@ -1192,7 +1236,7 @@ def get_env(self, conf):
for env_file in conf.data.getlist("Service", "EnvironmentFile", []):
for name, value in self.read_env_file(env_file):
env[name] = self.expand_env(value, env)
logg.info("extra-vars %s", self.extra_vars())
logg.debug("extra-vars %s", self.extra_vars())
for extra in self.extra_vars():
if extra.startswith("@"):
for name, value in self.read_env_file(extra[1:]):
Expand Down Expand Up @@ -1446,8 +1490,9 @@ def start_unit(self, unit):
if conf is None:
logg.error("Unit %s could not be found.", unit)
return False
logg.debug(" start unit %s => %s", unit, conf.filename())
return self.start_unit_from(conf)
with waitlock(unit):
logg.debug(" start unit %s => %s", unit, conf.filename())
return self.start_unit_from(conf)
def get_TimeoutStartSec(self, conf):
timeout = conf.data.get("Service", "TimeoutSec", DefaultTimeoutStartSec)
timeout = conf.data.get("Service", "TimeoutStartSec", timeout)
Expand Down Expand Up @@ -1682,8 +1727,9 @@ def stop_unit(self, unit):
if conf is None:
logg.error("Unit %s could not be found.", unit)
return False
logg.info(" stop unit %s => %s", unit, conf.filename())
return self.stop_unit_from(conf)
with waitlock(unit):
logg.info(" stop unit %s => %s", unit, conf.filename())
return self.stop_unit_from(conf)
def get_TimeoutStopSec(self, conf):
timeout = conf.data.get("Service", "TimeoutSec", DefaultTimeoutStartSec)
timeout = conf.data.get("Service", "TimeoutStopSec", timeout)
Expand Down Expand Up @@ -1850,8 +1896,9 @@ def reload_unit(self, unit):
if conf is None:
logg.error("Unit %s could not be found.", unit)
return False
logg.info(" reload unit %s => %s", unit, conf.filename())
return self.reload_unit_from(conf)
with waitlock(unit):
logg.info(" reload unit %s => %s", unit, conf.filename())
return self.reload_unit_from(conf)
def reload_unit_from(self, conf):
if not conf: return
if self.syntax_check(conf) > 100: return False
Expand Down Expand Up @@ -1919,11 +1966,12 @@ def restart_unit(self, unit):
if conf is None:
logg.error("Unit %s could not be found.", unit)
return False
logg.info(" restart unit %s => %s", unit, conf.filename())
if not self.is_active_from(conf):
return self.start_unit_from(conf)
else:
return self.restart_unit_from(conf)
with waitlock(unit):
logg.info(" restart unit %s => %s", unit, conf.filename())
if not self.is_active_from(conf):
return self.start_unit_from(conf)
else:
return self.restart_unit_from(conf)
def restart_unit_from(self, conf):
if not conf: return
if self.syntax_check(conf) > 100: return False
Expand Down Expand Up @@ -1956,8 +2004,10 @@ def try_restart_unit(self, unit):
if conf is None:
logg.error("Unit %s could not be found.", unit)
return False
if self.is_active_from(conf):
return self.restart_unit_from(conf)
with waitlock(unit):
logg.info(" try-restart unit %s => %s", unit, conf.filename())
if self.is_active_from(conf):
return self.restart_unit_from(conf)
return True
def reload_or_restart_modules(self, *modules):
""" [UNIT]... -- reload-or-restart these units """
Expand Down Expand Up @@ -1985,8 +2035,9 @@ def reload_or_restart_unit(self, unit):
if conf is None:
logg.error("Unit %s could not be found.", unit)
return False
logg.info(" reload-or-restart unit %s => %s", unit, conf.filename())
return self.reload_or_restart_unit_from(conf)
with waitlock(unit):
logg.info(" reload-or-restart unit %s => %s", unit, conf.filename())
return self.reload_or_restart_unit_from(conf)
def reload_or_restart_unit_from(self, conf):
if not self.is_active_from(conf):
# try: self.stop_unit_from(conf)
Expand Down Expand Up @@ -2024,8 +2075,9 @@ def reload_or_try_restart_unit(self, unit):
if conf is None:
logg.error("Unit %s could not be found.", unit)
return False
logg.info(" reload-or-try-restart unit %s => %s", unit, conf.filename())
return self.reload_or_try_restart_unit_from(conf)
with waitlock(unit):
logg.info(" reload-or-try-restart unit %s => %s", unit, conf.filename())
return self.reload_or_try_restart_unit_from(conf)
def reload_or_try_restart_unit_from(self, conf):
if conf.data.getlist("Service", "ExecReload", []):
return self.reload_unit_from(conf)
Expand Down Expand Up @@ -2059,8 +2111,9 @@ def kill_unit(self, unit):
if conf is None:
logg.error("Unit %s could not be found.", unit)
return False
logg.info(" kill unit %s => %s", unit, conf.filename())
return self.kill_unit_from(conf)
with waitlock(unit):
logg.info(" kill unit %s => %s", unit, conf.filename())
return self.kill_unit_from(conf)
def kill_stopped_unit_from(self, conf, mainpid = None):
if not mainpid:
return True
Expand Down
95 changes: 74 additions & 21 deletions files/docker/systemctl3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from __future__ import print_function

__copyright__ = "(C) 2016-2018 Guido U. Draheim, licensed under the EUPL"
__version__ = "1.1.2116"
__version__ = "1.2.2116"

import logging
logg = logging.getLogger("systemctl")
Expand All @@ -20,9 +20,11 @@
import socket
import tempfile
import datetime
import fcntl

if sys.version[0] == '2':
string_types = basestring
BlockingIOError = IOError
else:
string_types = str
xrange = range
Expand Down Expand Up @@ -71,7 +73,9 @@
DefaultMaximumTimeout = 200
InitLoopSleep = 5
ProcMaxDepth = 100
MaxLockWait = None # equals DefaultMaximumTimeout

_systemctl_lockfile = "/var/run/systemd/system"
_notify_socket_folder = "/var/run/systemd" # alias /run/systemd
_notify_socket_name = "notify" # NOTIFY_SOCKET="/var/run/systemd/notify"
_pid_file_folder = "/var/run"
Expand Down Expand Up @@ -446,6 +450,46 @@ def get_preset(self, unit):
return status
return None

## with waitlock(unit): self.start()
class waitlock:
def __init__(self, unit):
self.unit = unit # currently unused
self.opened = None
self.lockfolder = _notify_socket_folder
try:
folder = self.lockfolder
if not os.path.isdir(folder):
os.mkdir(folder)
except Exception as e:
logg.warning("oops, %s", e)
def __enter__(self):
try:
lockfile = os.path.join(self.lockfolder, str(self.unit or "global") + ".lock")
self.opened = os.open(lockfile, os.O_RDWR | os.O_CREAT, 0o600)
for attempt in xrange(int(MaxLockWait or DefaultMaximumTimeout)):
try:
fcntl.flock(self.opened, fcntl.LOCK_EX | fcntl.LOCK_NB)
os.write(self.opened, "{ 'systemctl': %s, 'unit': '%s' }\n" % (os.getpid(), self.unit))
logg.debug("holding %s", lockfile)
break
except BlockingIOError as e:
whom = os.read(os.opened, 4096)
os.lseek(os.openened, 0, os.SEEK_SET)
logg.info("(%s) systemctl locked by %s", attempt, whom)
time.sleep(1)
continue
except Exception as e:
logg.warning("oops %s, %s", str(type(e)), e)
def __exit__(self, type, value, traceback):
try:
os.lseek(self.opened, 0, os.SEEK_SET)
os.ftruncate(self.opened, 0)
fcntl.flock(self.opened, fcntl.LOCK_UN)
os.close(self.opened)
self.opened = None
except Exception as e:
logg.warning("oops, %s", e)

def subprocess_wait(cmd, env=None, check = False, shell=False):
# logg.warning("running = %s", cmd)
run = subprocess.Popen(cmd, shell=shell, env=env)
Expand Down Expand Up @@ -1192,7 +1236,7 @@ def get_env(self, conf):
for env_file in conf.data.getlist("Service", "EnvironmentFile", []):
for name, value in self.read_env_file(env_file):
env[name] = self.expand_env(value, env)
logg.info("extra-vars %s", self.extra_vars())
logg.debug("extra-vars %s", self.extra_vars())
for extra in self.extra_vars():
if extra.startswith("@"):
for name, value in self.read_env_file(extra[1:]):
Expand Down Expand Up @@ -1446,8 +1490,9 @@ def start_unit(self, unit):
if conf is None:
logg.error("Unit %s could not be found.", unit)
return False
logg.debug(" start unit %s => %s", unit, conf.filename())
return self.start_unit_from(conf)
with waitlock(unit):
logg.debug(" start unit %s => %s", unit, conf.filename())
return self.start_unit_from(conf)
def get_TimeoutStartSec(self, conf):
timeout = conf.data.get("Service", "TimeoutSec", DefaultTimeoutStartSec)
timeout = conf.data.get("Service", "TimeoutStartSec", timeout)
Expand Down Expand Up @@ -1682,8 +1727,9 @@ def stop_unit(self, unit):
if conf is None:
logg.error("Unit %s could not be found.", unit)
return False
logg.info(" stop unit %s => %s", unit, conf.filename())
return self.stop_unit_from(conf)
with waitlock(unit):
logg.info(" stop unit %s => %s", unit, conf.filename())
return self.stop_unit_from(conf)
def get_TimeoutStopSec(self, conf):
timeout = conf.data.get("Service", "TimeoutSec", DefaultTimeoutStartSec)
timeout = conf.data.get("Service", "TimeoutStopSec", timeout)
Expand Down Expand Up @@ -1850,8 +1896,9 @@ def reload_unit(self, unit):
if conf is None:
logg.error("Unit %s could not be found.", unit)
return False
logg.info(" reload unit %s => %s", unit, conf.filename())
return self.reload_unit_from(conf)
with waitlock(unit):
logg.info(" reload unit %s => %s", unit, conf.filename())
return self.reload_unit_from(conf)
def reload_unit_from(self, conf):
if not conf: return
if self.syntax_check(conf) > 100: return False
Expand Down Expand Up @@ -1919,11 +1966,12 @@ def restart_unit(self, unit):
if conf is None:
logg.error("Unit %s could not be found.", unit)
return False
logg.info(" restart unit %s => %s", unit, conf.filename())
if not self.is_active_from(conf):
return self.start_unit_from(conf)
else:
return self.restart_unit_from(conf)
with waitlock(unit):
logg.info(" restart unit %s => %s", unit, conf.filename())
if not self.is_active_from(conf):
return self.start_unit_from(conf)
else:
return self.restart_unit_from(conf)
def restart_unit_from(self, conf):
if not conf: return
if self.syntax_check(conf) > 100: return False
Expand Down Expand Up @@ -1956,8 +2004,10 @@ def try_restart_unit(self, unit):
if conf is None:
logg.error("Unit %s could not be found.", unit)
return False
if self.is_active_from(conf):
return self.restart_unit_from(conf)
with waitlock(unit):
logg.info(" try-restart unit %s => %s", unit, conf.filename())
if self.is_active_from(conf):
return self.restart_unit_from(conf)
return True
def reload_or_restart_modules(self, *modules):
""" [UNIT]... -- reload-or-restart these units """
Expand Down Expand Up @@ -1985,8 +2035,9 @@ def reload_or_restart_unit(self, unit):
if conf is None:
logg.error("Unit %s could not be found.", unit)
return False
logg.info(" reload-or-restart unit %s => %s", unit, conf.filename())
return self.reload_or_restart_unit_from(conf)
with waitlock(unit):
logg.info(" reload-or-restart unit %s => %s", unit, conf.filename())
return self.reload_or_restart_unit_from(conf)
def reload_or_restart_unit_from(self, conf):
if not self.is_active_from(conf):
# try: self.stop_unit_from(conf)
Expand Down Expand Up @@ -2024,8 +2075,9 @@ def reload_or_try_restart_unit(self, unit):
if conf is None:
logg.error("Unit %s could not be found.", unit)
return False
logg.info(" reload-or-try-restart unit %s => %s", unit, conf.filename())
return self.reload_or_try_restart_unit_from(conf)
with waitlock(unit):
logg.info(" reload-or-try-restart unit %s => %s", unit, conf.filename())
return self.reload_or_try_restart_unit_from(conf)
def reload_or_try_restart_unit_from(self, conf):
if conf.data.getlist("Service", "ExecReload", []):
return self.reload_unit_from(conf)
Expand Down Expand Up @@ -2059,8 +2111,9 @@ def kill_unit(self, unit):
if conf is None:
logg.error("Unit %s could not be found.", unit)
return False
logg.info(" kill unit %s => %s", unit, conf.filename())
return self.kill_unit_from(conf)
with waitlock(unit):
logg.info(" kill unit %s => %s", unit, conf.filename())
return self.kill_unit_from(conf)
def kill_stopped_unit_from(self, conf, mainpid = None):
if not mainpid:
return True
Expand Down

0 comments on commit 01be1fd

Please sign in to comment.