Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
tree: 5ce55e3afe
Fetching contributors…

Cannot retrieve contributors at this time

735 lines (575 sloc) 29.895 kB
# -*- coding: utf-8 -*-
import sys
import os.path
import subprocess
import json
import string
import tempfile
import atexit
import shutil
import threading
import fcntl
import exceptions
import errno
from xml.etree import ElementTree
import charon.backends
import charon.parallel
import re
from datetime import datetime
import getpass
class Deployment:
"""Charon top-level deployment manager."""
def __init__(self, state_file, create=False, nix_exprs=[], nix_path=[], log_file=sys.stderr):
self.state_file = os.path.realpath(state_file)
self.machines = {}
self._machine_state = {}
self.active = {}
self.configs_path = None
self.description = "Unnamed Charon network"
self.enable_rollback = False
self._last_log_prefix = None
self.auto_response = None
self.extra_nix_path = []
self._args = {}
self._state_lock = threading.Lock()
self._log_lock = threading.Lock()
self.expr_path = os.path.dirname(__file__) + "/../../../../share/nix/charon"
if not os.path.exists(self.expr_path):
self.expr_path = os.path.dirname(__file__) + "/../nix"
self._create = create
self._nix_exprs = nix_exprs
self._nix_path = nix_path
self.tempdir = tempfile.mkdtemp(prefix="charon-tmp")
atexit.register(lambda: shutil.rmtree(self.tempdir))
self._log_file = log_file
def __enter__(self):
if self._create:
self._create_state_lock()
if os.path.exists(self.state_file):
self.load_state()
else:
import uuid
self.uuid = str(uuid.uuid1())
self.nix_exprs = [os.path.abspath(x) if x[0:1] != '<' else x for x in self._nix_exprs]
self.nix_path = [_abs_nix_path(x) for x in self._nix_path]
else:
if not os.path.isfile(self.state_file):
raise Exception("state file ‘{0}’ does not exist".format(self.state_file))
self._create_state_lock()
self.load_state()
def __exit__(self, exception_type, exception_value, exception_traceback):
fcntl.lockf(self._state_file_lock, fcntl.LOCK_UN)
self._state_file_lock.close()
def _create_state_lock(self):
self._state_file_lock = open(self.state_file + ".lock", "w+")
try:
fcntl.lockf(self._state_file_lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
except exceptions.IOError as e:
if e.errno != errno.EAGAIN: raise
self.log("waiting for exclusive lock on ‘{0}’...".format(self.state_file))
fcntl.lockf(self._state_file_lock, fcntl.LOCK_EX)
def load_state(self):
"""Read the current deployment state from the state file."""
f = open(self.state_file, 'r')
state = json.load(f)
self.nix_exprs = state['networkExprs']
self.nix_path = state.get('nixPath', [])
self.uuid = state['uuid']
self.description = state.get('description', self.description)
self.enable_rollback = state.get('enableRollback', False)
self.machines = {}
self._machine_state = {}
self.active = {}
self.configs_path = state.get('vmsPath', None)
for n, v in state['machines'].iteritems():
m = charon.backends.create_state(self, v['targetEnv'], n, self._log_file)
self.machines[n] = m
self.machines[n].deserialise(v)
self._machine_state[n] = v
if not m.obsolete: self.active[m.name] = m
self._args = state.get('args', {})
self.set_log_prefixes()
def write_state(self):
"""Write the current deployment state to the state file in JSON format."""
state = {'networkExprs': self.nix_exprs,
'nixPath': self.nix_path,
'uuid': self.uuid,
'description': self.description,
'enableRollback': self.enable_rollback,
'machines': self._machine_state,
'args': self._args}
if self.configs_path: state['vmsPath'] = self.configs_path
tmp = self.state_file + ".tmp"
f = open(tmp, 'w')
json.dump(state, f, indent=2)
f.close()
os.rename(tmp, self.state_file)
def update_machine_state(self, m):
with self._state_lock:
self._machine_state[m.name] = m.serialise()
self.write_state()
def delete_machine(self, m):
with self._state_lock:
del self.machines[m.name]
if m.name in self._machine_state: del self._machine_state[m.name]
if m.name in self.active: del self.active[m.name]
self.write_state()
def log(self, msg):
with self._log_lock:
if self._last_log_prefix != None:
self._log_file.write("\n")
self._last_log_prefix = None
self._log_file.write(msg + "\n")
def log_start(self, prefix, msg):
with self._log_lock:
if self._last_log_prefix != prefix:
if self._last_log_prefix != None:
self._log_file.write("\n")
self._log_file.write(prefix)
self._log_file.write(msg)
self._last_log_prefix = prefix
def log_end(self, prefix, msg):
with self._log_lock:
last = self._last_log_prefix
self._last_log_prefix = None
if last != prefix:
if last != None:
self._log_file.write("\n")
if msg == "": return
self._log_file.write(prefix)
self._log_file.write(msg + "\n")
def set_log_prefixes(self):
max_len = max([len(m.name) for m in self.machines.itervalues()] or [0])
for m in self.machines.itervalues():
m.set_log_prefix(max_len)
def confirm(self, question):
while True:
with self._log_lock:
if self._last_log_prefix != None:
self._log_file.write("\n")
self._last_log_prefix = None
self._log_file.write(charon.util.ansi_warn("warning: {0} (y/N) ".format(question), outfile=self._log_file))
if self.auto_response != None:
self._log_file.write("{0}\n".format(self.auto_response))
return self.auto_response == "y"
response = sys.stdin.readline()
if response == "": return False
response = response.rstrip().lower()
if response == "y": return True
if response == "n" or response == "": return False
def _eval_flags(self):
return sum([["-I", x] for x in (self.extra_nix_path + self.nix_path)], [])
def set_arg(self, name, value):
"""Set a persistent argument to the deployment specification."""
assert isinstance(name, str)
assert isinstance(value, str)
self._args[name] = value
def set_argstr(self, name, value):
"""Set a persistent argument to the deployment specification."""
assert isinstance(value, str)
s = ""
for c in value:
if c == '"': s += '\\"'
elif c == '\\': s += '\\\\'
elif c == '$': s += '\\$'
else: s += c
self.set_arg(name, '"' + s + '"')
def unset_arg(self, name):
"""Unset a persistent argument to the deployment specification."""
assert isinstance(name, str)
self._args.pop(name, None)
def _args_to_attrs(self):
return "{ " + string.join([n + " = " + v + "; " for n, v in self._args.iteritems()]) + "}"
def evaluate(self):
"""Evaluate the Nix expressions belonging to this deployment into a deployment specification."""
self.definitions = {}
try:
xml = subprocess.check_output(
["nix-instantiate", "-I", "charon=" + self.expr_path]
+ self._eval_flags() +
["--eval-only", "--show-trace", "--xml", "--strict",
"<charon/eval-machine-info.nix>",
"--arg", "checkConfigurationOptions", "false",
"--arg", "networkExprs", "[ " + string.join(self.nix_exprs) + " ]",
"--arg", "args", self._args_to_attrs(),
"-A", "info"], stderr=self._log_file)
except subprocess.CalledProcessError:
raise NixEvalError
tree = ElementTree.fromstring(xml)
# Extract global deployment attributes.
info = tree.find("attrs/attr[@name='network']")
assert info != None
elem = info.find("attrs/attr[@name='description']/string")
if elem != None: self.description = elem.get("value")
elem = info.find("attrs/attr[@name='enableRollback']/bool")
if elem != None: self.enable_rollback = elem.get("value") == "true"
# Extract machine information.
machines = tree.find("attrs/attr[@name='machines']/attrs")
for m in machines.findall("attr"):
defn = charon.backends.create_definition(m)
self.definitions[defn.name] = defn
def evaluate_option_value(self, machine_name, option_name, xml=False):
"""Evaluate a single option of a single machine in the deployment specification."""
try:
return subprocess.check_output(
["nix-instantiate", "-I", "charon=" + self.expr_path]
+ self._eval_flags() +
["--eval-only", "--show-trace", "--strict",
"<charon/eval-machine-info.nix>",
"--arg", "networkExprs", "[ " + string.join(self.nix_exprs) + " ]",
"--arg", "args", self._args_to_attrs(),
"-A", "nodes.{0}.config.{1}".format(machine_name, option_name)]
+ (["--xml"] if xml else []),
stderr=self._log_file)
except subprocess.CalledProcessError:
raise NixEvalError
def get_physical_spec(self):
"""Compute the contents of the Nix expression specifying the computed physical deployment attributes"""
lines_per_machine = {m.name: [] for m in self.active.itervalues()}
authorized_keys = {m.name: [] for m in self.active.itervalues()}
kernel_modules = {m.name: set() for m in self.active.itervalues()}
hosts = {}
for m in self.active.itervalues():
hosts[m.name] = {}
for m2 in self.active.itervalues():
if m == m2: continue
ip = m.address_to(m2)
if ip: hosts[m.name][m2.name] = hosts[m.name][m2.name + "-unencrypted"] = ip
def do_machine(m):
defn = self.definitions[m.name]
lines = lines_per_machine[m.name]
lines.extend(m.get_physical_spec(self.active))
# Emit configuration to realise encrypted peer-to-peer links.
for m2_name in defn.encrypted_links_to:
if m2_name not in self.active:
raise Exception("‘deployment.encryptedLinksTo’ in machine ‘{0}’ refers to an unknown machine ‘{1}"
.format(m.name, m2_name))
m2 = self.active[m2_name]
# Don't create two tunnels between a pair of machines.
if m.name in self.definitions[m2.name].encrypted_links_to and m.name >= m2.name:
continue
local_ipv4 = "192.168.105.{0}".format(m.index)
remote_ipv4 = "192.168.105.{0}".format(m2.index)
lines.append(' networking.p2pTunnels.{0} ='.format(m2.name))
lines.append(' {{ target = "{0}-unencrypted";'.format(m2.name))
lines.append(' localTunnel = {0};'.format(10000 + m2.index))
lines.append(' remoteTunnel = {0};'.format(10000 + m.index))
lines.append(' localIPv4 = "{0}";'.format(local_ipv4))
lines.append(' remoteIPv4 = "{0}";'.format(remote_ipv4))
lines.append(' privateKey = "/root/.ssh/id_charon_vpn";')
lines.append(' }};'.format(m2.name))
# FIXME: set up the authorized_key file such that ‘m’
# can do nothing more than create a tunnel.
authorized_keys[m2.name].append('"' + m._public_vpn_key + '"')
kernel_modules[m.name].add('"tun"')
kernel_modules[m2.name].add('"tun"')
hosts[m.name][m2.name] = hosts[m.name][m2.name + "-encrypted"] = remote_ipv4
hosts[m2.name][m.name] = hosts[m2.name][m.name + "-encrypted"] = local_ipv4
private_ipv4 = m.private_ipv4
if private_ipv4: lines.append(' networking.privateIPv4 = "{0}";'.format(private_ipv4))
public_ipv4 = m.public_ipv4
if public_ipv4: lines.append(' networking.publicIPv4 = "{0}";'.format(public_ipv4))
for m in self.active.itervalues(): do_machine(m)
def emit_machine(m):
lines = []
lines.append(" \"" + m.name + "\" = { config, pkgs, ... }: {")
lines.extend(lines_per_machine[m.name])
if authorized_keys[m.name]:
lines.append(' users.extraUsers.root.openssh.authorizedKeys.keys = [ {0} ];'.format(" ".join(authorized_keys[m.name])))
lines.append(' services.openssh.extraConfig = "PermitTunnel yes\\n";')
lines.append(' boot.kernelModules = [ {0} ];'.format(" ".join(kernel_modules[m.name])))
lines.append(' networking.extraHosts = "{0}\\n";'.format('\\n'.join([hosts[m.name][m2] + " " + m2 for m2 in hosts[m.name]])))
lines.append(" };\n")
return "\n".join(lines)
return "".join(["{\n"] + [emit_machine(m) for m in self.active.itervalues()] + ["}\n"])
def get_profile(self):
return "/nix/var/nix/profiles/per-user/{0}/charon/{1}".format(getpass.getuser(), self.uuid)
def build_configs(self, include, exclude, dry_run=False):
"""Build the machine configurations in the Nix store."""
self.log("building all machine configurations...")
phys_expr = self.tempdir + "/physical.nix"
f = open(phys_expr, "w")
f.write(self.get_physical_spec())
f.close()
names = ['"' + m.name + '"' for m in self.active.itervalues() if should_do(m, include, exclude)]
try:
configs_path = subprocess.check_output(
["nix-build", "-I", "charon=" + self.expr_path, "--show-trace"]
+ self._eval_flags() +
["<charon/eval-machine-info.nix>",
"--arg", "networkExprs", "[ " + " ".join(self.nix_exprs + [phys_expr]) + " ]",
"--arg", "args", self._args_to_attrs(),
"--arg", "names", "[ " + " ".join(names) + " ]",
"-A", "machines", "-o", self.tempdir + "/configs"]
+ (["--dry-run"] if dry_run else []), stderr=self._log_file).rstrip()
except subprocess.CalledProcessError:
raise Exception("unable to build all machine configurations")
if self.enable_rollback:
profile = self.get_profile()
dir = os.path.dirname(profile)
if not os.path.exists(dir): os.makedirs(dir, 0755)
if subprocess.call(["nix-env", "-p", profile, "--set", configs_path]) != 0:
raise Exception("cannot update profile ‘{0}".format(profile))
return configs_path
def copy_closures(self, configs_path, include, exclude, max_concurrent_copy):
"""Copy the closure of each machine configuration to the corresponding machine."""
def worker(m):
if not should_do(m, include, exclude): return
m.log("copying closure...")
m.new_toplevel = os.path.realpath(configs_path + "/" + m.name)
if not os.path.exists(m.new_toplevel):
raise Exception("can't find closure of machine ‘{0}".format(m.name))
m.copy_closure_to(m.new_toplevel)
charon.parallel.run_tasks(
nr_workers=max_concurrent_copy,
tasks=self.active.itervalues(), worker_fun=worker)
def activate_configs(self, configs_path, include, exclude, allow_reboot, check):
"""Activate the new configuration on a machine."""
def worker(m):
if not should_do(m, include, exclude): return
try:
res = m.run_command(
# Set the system profile to the new configuration.
"set -e; nix-env -p /nix/var/nix/profiles/system --set " + m.new_toplevel + "; " +
# In case the switch crashes the system, do a sync.
"sync; " +
# Run the switch script. This will also update the
# GRUB boot loader. For performance, skip this step
# if the new config is already current.
("cur=$(readlink /run/current-system); " +
'if [ "$cur" != ' + m.new_toplevel + " ]; then /nix/var/nix/profiles/system/bin/switch-to-configuration switch; fi"
if not check else "/nix/var/nix/profiles/system/bin/switch-to-configuration switch"),
check=False)
if res != 0 and res != 100:
raise Exception("unable to activate new configuration")
if res == 100:
if not allow_reboot:
raise Exception("the new configuration requires a reboot to take effect (hint: use ‘--allow-reboot’)".format(m.name))
m.reboot_sync()
# FIXME: should check which systemd services
# failed to start after the reboot.
# Record that we switched this machine to the new
# configuration.
m.cur_configs_path = configs_path
m.cur_toplevel = m.new_toplevel
self.update_machine_state(m)
except Exception as e:
# This thread shouldn't throw an exception because
# that will cause Charon to exit and interrupt
# activation on the other machines.
m.log(str(e))
return m.name
return None
res = charon.parallel.run_tasks(nr_workers=len(self.active), tasks=self.active.itervalues(), worker_fun=worker)
failed = [x for x in res if x != None]
if failed != []:
raise Exception("activation of {0} of {1} machines failed (namely on {2})"
.format(len(failed), len(res), ", ".join(["{0}".format(x) for x in failed])))
def _get_free_machine_index(self):
index = 0
for m in self.machines.itervalues():
if m.index != None and index <= m.index:
index = m.index + 1
return index
def get_backups(self, include=[], exclude=[]):
self.evaluate_active(include, exclude) # unnecessary?
machine_backups = {}
for m in self.active.itervalues():
if should_do(m, include, exclude):
machine_backups[m.name] = m.get_backups()
# merging machine backups into network backups
backup_ids = [b for bs in machine_backups.values() for b in bs.keys()]
backups = {}
for backup_id in backup_ids:
backups[backup_id] = {}
backups[backup_id]['machines'] = {}
backups[backup_id]['info'] = []
backups[backup_id]['status'] = 'complete'
backup = backups[backup_id]
for m in self.active.itervalues():
if should_do(m, include, exclude):
backup['machines'][m.name] = machine_backups[m.name][backup_id]
backup['info'].extend(backup['machines'][m.name]['info'])
# status is always running when one of the backups is still running
if backup['machines'][m.name]['status'] != "complete" and backup['status'] != "running":
backup['status'] = backup['machines'][m.name]['status']
return backups
def backup(self, include=[], exclude=[]):
self.evaluate_active(include, exclude) # unnecessary?
backup_id = datetime.now().strftime("%Y%m%d%H%M%S");
def worker(m):
if not should_do(m, include, exclude): return
ssh_name = m.get_ssh_name()
res = subprocess.call(["ssh", "root@" + ssh_name] + m.get_ssh_flags() + ["sync"])
if res != 0:
m.log("Running sync failed on {0}.".format(m.name))
m.backup(backup_id)
charon.parallel.run_tasks(nr_workers=len(self.active), tasks=self.active.itervalues(), worker_fun=worker)
self.write_state()
def restore(self, include=[], exclude=[], backup_id=None):
self.evaluate_active(include, exclude)
def worker(m):
if not should_do(m, include, exclude): return
m.restore(self.definitions[m.name], backup_id)
charon.parallel.run_tasks(nr_workers=len(self.active), tasks=self.active.itervalues(), worker_fun=worker)
self.deploy(include=include, exclude=exclude, check=True)
def evaluate_active(self, include=[], exclude=[], kill_obsolete=False):
self.evaluate()
# Create state objects for all defined machines.
for m in self.definitions.itervalues():
if m.name not in self.machines:
self.machines[m.name] = charon.backends.create_state(self, m.get_type(), m.name, self._log_file)
self.set_log_prefixes()
# Determine the set of active machines. (We can't just delete
# obsolete machines from ‘self.machines’ because they contain
# important state that we don't want to forget about.)
self.active = {}
for m in self.machines.values():
if m.name in self.definitions:
self.active[m.name] = m
if m.obsolete:
self.log("machine ‘{0}’ is no longer obsolete".format(m.name))
m.obsolete = False
m.write()
else:
self.log("machine ‘{0}’ is obsolete".format(m.name))
if not m.obsolete:
m.obsolete = True
m.write()
if not should_do(m, include, exclude): continue
if kill_obsolete and m.destroy(): self.delete_machine(m)
def deploy(self, dry_run=False, build_only=False, create_only=False, copy_only=False,
include=[], exclude=[], check=False, kill_obsolete=False,
allow_reboot=False, max_concurrent_copy=5):
"""Perform the deployment defined by the deployment specification."""
self.evaluate_active(include, exclude, kill_obsolete)
# Assign each machine an index if it doesn't have one.
for m in self.active.itervalues():
if m.index == None:
m.index = self._get_free_machine_index()
self.set_log_prefixes()
# Start or update the active machines.
if not dry_run and not build_only:
def worker(m):
if not should_do(m, include, exclude): return
defn = self.definitions[m.name]
if m.get_type() != defn.get_type():
raise Exception("the type of machine ‘{0}’ changed from ‘{1}’ to ‘{2}’, which is currently unsupported"
.format(m.name, m.get_type(), defn.get_type()))
m.create(self.definitions[m.name], check=check, allow_reboot=allow_reboot)
m.wait_for_ssh(check=check)
m.send_keys()
m.generate_vpn_key()
charon.parallel.run_tasks(nr_workers=len(self.active), tasks=self.active.itervalues(), worker_fun=worker)
if create_only: return
# Build the machine configurations.
if dry_run:
self.build_configs(dry_run=True, include=include, exclude=exclude)
return
self.configs_path = self.build_configs(include=include, exclude=exclude)
# Record configs_path in the state so that the ‘info’ command
# can show whether machines have an outdated configuration.
self.write_state()
if build_only: return
# Copy the closures of the machine configurations to the
# target machines.
self.copy_closures(self.configs_path, include=include, exclude=exclude,
max_concurrent_copy=max_concurrent_copy)
if copy_only: return
# Active the configurations.
self.activate_configs(self.configs_path, include=include, exclude=exclude,
allow_reboot=allow_reboot, check=check)
def rollback(self, generation, include=[], exclude=[], check=False,
allow_reboot=False, max_concurrent_copy=5):
if not self.enable_rollback:
raise Exception("rollback is not enabled for this network; please set ‘network.enableRollback’ to ‘true’ and redeploy"
)
profile = self.get_profile()
if subprocess.call(["nix-env", "-p", profile, "--switch-generation", str(generation)]) != 0:
raise Exception("nix-env --switch-generation failed")
self.configs_path = os.path.realpath(profile)
assert os.path.isdir(self.configs_path)
self.write_state()
names = set()
for filename in os.listdir(self.configs_path):
if not os.path.islink(self.configs_path + "/" + filename): continue
if should_do_n(filename, include, exclude) and filename not in self.machines:
raise Exception("cannot roll back machine ‘{0}’ which no longer exists".format(filename))
names.add(filename)
# Update the set of active machines.
self.active = {}
for m in self.machines.values():
if m.name in names:
self.active[m.name] = m
if m.obsolete:
self.log("machine ‘{0}’ is no longer obsolete".format(m.name))
m.obsolete = False
m.write()
else:
self.log("machine ‘{0}’ is obsolete".format(m.name))
if not m.obsolete:
m.obsolete = True
m.write()
self.copy_closures(self.configs_path, include=include, exclude=exclude,
max_concurrent_copy=max_concurrent_copy)
self.activate_configs(self.configs_path, include=include, exclude=exclude,
allow_reboot=allow_reboot, check=check)
def destroy_vms(self, include=[], exclude=[]):
"""Destroy all active or obsolete VMs."""
def worker(m):
if not should_do(m, include, exclude): return
if m.destroy(): self.delete_machine(m)
charon.parallel.run_tasks(nr_workers=len(self.machines), tasks=self.machines.values(), worker_fun=worker)
def reboot_machines(self, include=[], exclude=[], wait=False):
"""Reboot all active machines."""
def worker(m):
if not should_do(m, include, exclude): return
if wait:
m.reboot_sync()
else:
m.reboot()
charon.parallel.run_tasks(nr_workers=len(self.active), tasks=self.active.itervalues(), worker_fun=worker)
def stop_machines(self, include=[], exclude=[]):
"""Stop all active machines."""
def worker(m):
if not should_do(m, include, exclude): return
m.stop()
charon.parallel.run_tasks(nr_workers=len(self.active), tasks=self.active.itervalues(), worker_fun=worker)
def start_machines(self, include=[], exclude=[]):
"""Start all active machines."""
def worker(m):
if not should_do(m, include, exclude): return
m.start()
charon.parallel.run_tasks(nr_workers=len(self.active), tasks=self.active.itervalues(), worker_fun=worker)
def is_valid_machine_name(self, name):
p = re.compile('^\w+$')
return not p.match(name) is None
def rename(self, name, new_name):
if not name in self.machines:
raise Exception("Machine {0} not found.".format(name))
if new_name in self.machines:
raise Exception("Machine with {0} already exists.".format(new_name))
if not self.is_valid_machine_name(new_name):
raise Exception("{0} is not a valid machine identifier.".format(new_name))
self.log("Renaming machine ‘{0}’ to ‘{1}’...".format(name, new_name))
machine = self._machine_state.pop(name)
self._machine_state[new_name] = machine
self.write_state()
def send_keys(self, include=[], exclude=[]):
"""Send LUKS encryption keys to machines."""
def worker(m):
if not should_do(m, include, exclude): return
m.send_keys()
charon.parallel.run_tasks(nr_workers=len(self.active), tasks=self.active.itervalues(), worker_fun=worker)
class NixEvalError(Exception):
pass
def should_do(m, include, exclude):
return should_do_n(m.name, include, exclude)
def should_do_n(name, include, exclude):
if name in exclude: return False
if include == []: return True
return name in include
def _abs_nix_path(x):
xs = x.split('=', 1)
if len(xs) == 1: return os.path.abspath(x)
return xs[0] + '=' + os.path.abspath(xs[1])
Jump to Line
Something went wrong with that request. Please try again.