Skip to content

Commit

Permalink
Add plugin hooks required for the encrypted links plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
adisbladis committed Apr 23, 2020
1 parent 9962fe4 commit 9dda74f
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 92 deletions.
22 changes: 18 additions & 4 deletions nixops/backends/__init__.py
Expand Up @@ -3,10 +3,11 @@
import os
import re
import subprocess
from typing import Mapping, Any, List, Optional, Union, Set, Sequence
from typing import Mapping, Any, List, Optional, Union, Set, Sequence, Tuple, Dict
import nixops.util
import nixops.resources
import nixops.ssh_util
from typing_extensions import Protocol


class KeyOptions(nixops.resources.ResourceOptions):
Expand Down Expand Up @@ -50,14 +51,20 @@ def __init__(self, name: str, config: nixops.resources.ResourceEval):
class MachineState(nixops.resources.ResourceState):
"""Base class for NixOps machine state objects."""

defn: Optional[MachineDefinition]

vm_id: Optional[str] = nixops.util.attr_property("vmId", None)
has_fast_connection: bool = nixops.util.attr_property(
"hasFastConnection", False, bool
)
ssh_pinged: bool = nixops.util.attr_property("sshPinged", False, bool)
ssh_port: int = nixops.util.attr_property("targetPort", 22, int)

# Legacy attribute used by the encrypted links plugin
# Do _not_ remove this attribute
public_vpn_key: Optional[str] = nixops.util.attr_property("publicVpnKey", None)
keys: Mapping[str, str] = nixops.util.attr_property("keys", {}, "json")

keys: Mapping[str, KeyOptions] = nixops.util.attr_property("keys", {}, "json")
owners: List[str] = nixops.util.attr_property("owners", [], "json")

# Nix store path of the last global configuration deployed to this
Expand All @@ -77,7 +84,7 @@ class MachineState(nixops.resources.ResourceState):
state_version: Optional[str] = nixops.util.attr_property("stateVersion", None, str)

def __init__(self, depl, name: str, id: int) -> None:
nixops.resources.ResourceState.__init__(self, depl, name, id)
super().__init__(depl, name, id)
self._ssh_pinged_this_time = False
self.ssh = nixops.ssh_util.SSH(self.logger)
self.ssh.register_flag_fun(self.get_ssh_flags)
Expand All @@ -94,7 +101,8 @@ def started(self) -> bool:
state = self.state
return state == self.STARTING or state == self.UP

def set_common_state(self, defn) -> None:
def set_common_state(self, defn: MachineDefinition) -> None:
self.defn = defn
self.keys = defn.keys
self.ssh_port = defn.ssh_port
self.has_fast_connection = defn.has_fast_connection
Expand Down Expand Up @@ -476,3 +484,9 @@ def __init__(self) -> None:

# FIXME: add a check whether the active NixOS config on the
# machine is correct.


class MachinePlugin(Protocol):

def post_wait(self, m: MachineState) -> None:
pass
88 changes: 37 additions & 51 deletions nixops/deployment.py
Expand Up @@ -43,11 +43,14 @@
from nixops.util import ansi_success
from nixops.plugins import get_plugin_manager

from typing_extensions import Protocol

import nixops.backends
import nixops.logger
import nixops.parallel
from nixops.nix_expr import RawValue, Function, Call, nixmerge, py2nix
from nixops.util import ansi_success, Undefined
from nixops.util import ImmutableValidatedObject
from nixops.plugins import get_plugin_manager

Definitions = Dict[str, nixops.resources.ResourceDefinition]
Expand Down Expand Up @@ -139,7 +142,7 @@ def machines(self) -> Dict[str, nixops.backends.MachineState]:
}

@property
def active(
def active_machines(
self,
) -> Dict[str, nixops.backends.MachineState]: # FIXME: rename to "active_machines"
return {
Expand Down Expand Up @@ -551,21 +554,16 @@ def get_arguments(self) -> Any:
def get_physical_spec(self) -> Any:
"""Compute the contents of the Nix expression specifying the computed physical deployment attributes"""

active_machines = self.active
active_machines = self.active_machines
active_resources = self.active_resources

attrs_per_resource: Dict[str, List[Dict[Tuple[str, ...], Any]]] = {
m.name: [] for m in active_resources.values()
}
authorized_keys: Dict[str, List[str]] = {
m.name: [] for m in active_machines.values()
}
kernel_modules: Dict[str, Set[str]] = {
m.name: set() for m in active_machines.values()
}
trusted_interfaces: Dict[str, Set[str]] = {
m.name: set() for m in active_machines.values()
}

for p in get_plugin_manager().hook.deployment_hook():
for name, attrs in p.physical_spec(self).items():
attrs_per_resource[name].extend(attrs)

# Hostnames should be accumulated like this:
#
Expand All @@ -583,11 +581,6 @@ def get_physical_spec(self) -> Any:
lambda: defaultdict(list)
)

def index_to_private_ip(index: int) -> str:
n = 105 + index / 256
assert n <= 255
return "192.168.{0}.{1}".format(n, index % 256)

def do_machine(m: nixops.backends.MachineState) -> None:
defn = self._machine_definition_for_required(m.name)

Expand Down Expand Up @@ -625,7 +618,7 @@ def do_machine(m: nixops.backends.MachineState) -> None:
do_machine(m)

def emit_resource(r: nixops.resources.ResourceState) -> Any:
config = []
config: List = []
config.extend(attrs_per_resource[r.name])
if is_machine(r):
# Sort the hosts by its canonical host names.
Expand All @@ -638,28 +631,8 @@ def emit_resource(r: nixops.resources.ResourceState) -> Any:
"{0} {1}".format(ip, " ".join(names)) for ip, names in sorted_hosts
]

if authorized_keys[r.name]:
config.append(
{
("users", "extraUsers", "root"): {
("openssh", "authorizedKeys", "keys"): authorized_keys[
r.name
]
},
("services", "openssh"): {
"extraConfig": "PermitTunnel yes\n"
},
}
)

config.append(
{
("boot", "kernelModules"): list(kernel_modules[r.name]),
("networking", "firewall"): {
"trustedInterfaces": list(trusted_interfaces[r.name])
},
("networking", "extraHosts"): "\n".join(extra_hosts) + "\n",
}
{("networking", "extraHosts"): "\n".join(extra_hosts) + "\n",}
)

# Add SSH public host keys for all machines in network.
Expand Down Expand Up @@ -748,7 +721,9 @@ def build_configs(
if DEBUG:
print("generated physical spec:\n" + p, file=sys.stderr)

selected = [m for m in self.active.values() if should_do(m, include, exclude)]
selected = [
m for m in self.active_machines.values() if should_do(m, include, exclude)
]

names = [m.name for m in selected]

Expand Down Expand Up @@ -845,7 +820,7 @@ def worker(m: nixops.backends.MachineState) -> None:

nixops.parallel.run_tasks(
nr_workers=max_concurrent_copy,
tasks=iter(self.active.values()),
tasks=iter(self.active_machines.values()),
worker_fun=worker,
)
self.logger.log(
Expand Down Expand Up @@ -966,7 +941,7 @@ def set_profile():

res = nixops.parallel.run_tasks(
nr_workers=max_concurrent_activate,
tasks=iter(self.active.values()),
tasks=iter(self.active_machines.values()),
worker_fun=worker,
)
failed = [x for x in res if x != None]
Expand All @@ -991,7 +966,7 @@ def get_backups(
) -> Dict[str, Dict[str, Any]]:
self.evaluate_active(include, exclude) # unnecessary?
machine_backups = {}
for m in self.active.values():
for m in self.active_machines.values():
if should_do(m, include, exclude):
machine_backups[m.name] = m.get_backups()

Expand All @@ -1005,7 +980,7 @@ def get_backups(
backups[backup_id]["info"] = []
backups[backup_id]["status"] = "complete"
backup = backups[backup_id]
for m in self.active.values():
for m in self.active_machines.values():
if should_do(m, include, exclude):
if backup_id in machine_backups[m.name].keys():
backup["machines"][m.name] = machine_backups[m.name][backup_id]
Expand Down Expand Up @@ -1052,7 +1027,7 @@ def worker(m: nixops.backends.MachineState) -> None:
m.remove_backup(backup_id, keep_physical)

nixops.parallel.run_tasks(
nr_workers=len(self.active),
nr_workers=len(self.active_machines),
tasks=iter(self.machines.values()),
worker_fun=worker,
)
Expand All @@ -1076,7 +1051,7 @@ def worker(m: nixops.backends.MachineState) -> None:
m.backup(self._definition_for_required(m.name), backup_id, devices)

nixops.parallel.run_tasks(
nr_workers=5, tasks=iter(self.active.values()), worker_fun=worker
nr_workers=5, tasks=iter(self.active_machines.values()), worker_fun=worker
)

return backup_id
Expand All @@ -1098,7 +1073,9 @@ def worker(m: nixops.backends.MachineState) -> None:
m.restore(self._definition_for_required(m.name), backup_id, devices)

nixops.parallel.run_tasks(
nr_workers=-1, tasks=iter(self.active.values()), worker_fun=worker
nr_workers=-1,
tasks=iter(self.active_machines.values()),
worker_fun=worker,
)
self.start_machines(include=include, exclude=exclude)
self.logger.warn(
Expand Down Expand Up @@ -1274,6 +1251,9 @@ def worker(r: nixops.resources.ResourceState):

m.wait_for_ssh(check=check)

for p in get_plugin_manager().hook.machine_hook():
p.post_wait(m)

except:
r._errored = True
raise
Expand Down Expand Up @@ -1514,7 +1494,7 @@ def destroy_resources(
profile = self.create_profile()
attrs = {
m.name: Call(RawValue("builtins.storePath"), m.cur_toplevel)
for m in self.active.values()
for m in self.active_machines.values()
if m.cur_toplevel
}
if (
Expand Down Expand Up @@ -1574,7 +1554,7 @@ def worker(m: nixops.backends.MachineState) -> None:
m.reboot(hard=hard)

nixops.parallel.run_tasks(
nr_workers=-1, tasks=iter(self.active.values()), worker_fun=worker
nr_workers=-1, tasks=iter(self.active_machines.values()), worker_fun=worker
)

def stop_machines(self, include: List[str] = [], exclude: List[str] = []) -> None:
Expand All @@ -1586,7 +1566,7 @@ def worker(m: nixops.backends.MachineState) -> None:
m.stop()

nixops.parallel.run_tasks(
nr_workers=-1, tasks=iter(self.active.values()), worker_fun=worker
nr_workers=-1, tasks=iter(self.active_machines.values()), worker_fun=worker
)

def start_machines(self, include: List[str] = [], exclude: List[str] = []) -> None:
Expand All @@ -1598,7 +1578,7 @@ def worker(m: nixops.backends.MachineState) -> None:
m.start()

nixops.parallel.run_tasks(
nr_workers=-1, tasks=iter(self.active.values()), worker_fun=worker
nr_workers=-1, tasks=iter(self.active_machines.values()), worker_fun=worker
)

def is_valid_resource_name(self, name: str) -> bool:
Expand Down Expand Up @@ -1633,7 +1613,7 @@ def worker(m: nixops.backends.MachineState) -> None:
m.send_keys()

nixops.parallel.run_tasks(
nr_workers=-1, tasks=iter(self.active.values()), worker_fun=worker
nr_workers=-1, tasks=iter(self.active_machines.values()), worker_fun=worker
)


Expand Down Expand Up @@ -1699,5 +1679,11 @@ def _load_modules_from(dir: str) -> None:
importlib.import_module("nixops." + dir + "." + module[:-3])


class DeploymentPlugin(Protocol):

def physical_spec(self, d: Deployment) -> Dict[str, List[Dict[Tuple[str, ...], Any]]]:
return {}


_load_modules_from("backends")
_load_modules_from("resources")
26 changes: 26 additions & 0 deletions nixops/plugins/enums.py
@@ -0,0 +1,26 @@
from __future__ import annotations

from typing import Optional, Any, Callable, Tuple
from typing import TypeVar, Dict, Generic, Generator, List
from nixops.backends import MachineState
from nixops.deployment import Deployment
from enum import Enum


HookReturn = TypeVar("HookReturn")
HookParam = TypeVar("HookParam")


class HookOption(Generic[HookParam, HookReturn]):
id: int

def __init__(self, id: int):
self.id = id
self._hooks: List[Callable[[HookParam], HookReturn]] = []

def add_hook(self, fn: Callable[[HookParam], HookReturn]):
self._hooks.append(fn)

def run_hooks(self, param: HookParam) -> Generator[HookReturn, None, None]:
for fn in self._hooks:
yield fn(param)
10 changes: 10 additions & 0 deletions nixops/plugins/hookspecs.py
Expand Up @@ -11,6 +11,16 @@ def load():
"""


@hookspec
def deployment_hook():
"""Load hooks at early NixOps startup"""


@hookspec
def machine_hook():
"""Load hooks at early NixOps startup"""


@hookspec
def nixexprs():
""" Get all the Nix expressions to load
Expand Down
2 changes: 2 additions & 0 deletions nixops/resources/__init__.py
Expand Up @@ -21,6 +21,7 @@ class ResourceDefinition:
"""Base class for NixOps resource definitions."""

config: ResourceOptions
resource_eval: ResourceEval

@classmethod
def get_type(cls) -> str:
Expand All @@ -34,6 +35,7 @@ def get_resource_type(cls):

def __init__(self, name: str, config: ResourceEval):
config_type = self.__annotations__.get("config", ResourceOptions)
self.resource_eval = config
if not issubclass(config_type, ResourceOptions):
raise TypeError(
'"config" type annotation must be a ResourceOptions subclass'
Expand Down
4 changes: 2 additions & 2 deletions nixops/script_defs.py
Expand Up @@ -756,7 +756,7 @@ def op_import(args):
sys.stderr.write("added deployment ‘{0}’\n".format(uuid))

if args.include_keys:
for m in depl.active.values():
for m in depl.active_machines.values():
if nixops.deployment.is_machine(m) and hasattr(
m, "public_host_key"
):
Expand Down Expand Up @@ -806,7 +806,7 @@ def worker(m: nixops.backends.MachineState) -> Optional[int]:

results = results + nixops.parallel.run_tasks(
nr_workers=len(depl.machines) if args.parallel else 1,
tasks=iter(depl.active.values()),
tasks=iter(depl.active_machines.values()),
worker_fun=worker,
)

Expand Down

0 comments on commit 9dda74f

Please sign in to comment.