In [None]:
#| default_exp core

# API
- Implementation of fastcloudinit

In [None]:
#| export
from fastcore.utils import *
from textwrap import dedent
from jsonschema import validate
from httpx import get as xget

import fastcore.xtras, yaml, json

In [None]:
from fastcore.test import test_eq,test_fail
from jsonschema import ValidationError

In [None]:
#| export
def ufw(logging="off", def_incoming="deny", def_outgoing="allow", internal=None, **allows):
    cmds = [ f"ufw default {def_incoming} incoming", f"ufw default {def_outgoing} outgoing", f"ufw logging {logging}" ]
    for name, port in allows.items(): cmds.append("ufw allow {}/tcp".format(port))
    if internal: cmds.append("ufw allow in on {}".format(internal))
    cmds.append("ufw --force enable")
    return cmds

In [None]:
ufw(internal="enp7s0", OpenSSH=22)

['ufw default deny incoming',
 'ufw default allow outgoing',
 'ufw logging off',
 'ufw allow 22/tcp',
 'ufw allow in on enp7s0',
 'ufw --force enable']

In [None]:
#| export
def user(name, pub_keys, groups=None, shell="/bin/bash", sudo=True):
    groups = listify(groups)
    if sudo and 'sudo' not in groups: groups.append('sudo')
    sudo = ["ALL=(ALL) NOPASSWD:ALL"] if sudo else []
    return dict(name=name, groups=groups, shell=shell, sudo=sudo, ssh_authorized_keys=listify(pub_keys))

In [None]:
user('jph', 'mykey', 'docker')

{'name': 'jph',
 'groups': ['docker', 'sudo'],
 'shell': '/bin/bash',
 'sudo': ['ALL=(ALL) NOPASSWD:ALL'],
 'ssh_authorized_keys': ['mykey']}

In [None]:
#| export
def source(url, keyid, keyserver):
    return dict(source=f"deb [signed-by=$KEY_FILE] {url} $RELEASE stable", keyid=keyid, keyserver=keyserver)

In [None]:
dsource = source("https://download.docker.com/linux/ubuntu",
                 keyid="9DC858229FC7DD38854AE2D88D81803C0EBFCD88",
                 keyserver="https://download.docker.com/linux/ubuntu/gpg")
dsource

{'source': 'deb [signed-by=$KEY_FILE] https://download.docker.com/linux/ubuntu $RELEASE stable',
 'keyid': '9DC858229FC7DD38854AE2D88D81803C0EBFCD88',
 'keyserver': 'https://download.docker.com/linux/ubuntu/gpg'}

In [None]:
#| export
def apt(unattended=False, autoclean=30, email='', auto_reboot=False, **sources):
    unattended = "1" if unattended else "0"
    auto_reboot = "true" if auto_reboot else "false"
    apt_conf = f"""\
APT::Periodic::Update-Package-Lists "1";
APT::Periodic::Download-Upgradeable-Packages "1";
APT::Periodic::AutocleanInterval "7";
APT::Periodic::Unattended-Upgrade "{unattended}";
Unattended-Upgrade::Automatic-Reboot "{auto_reboot}";
"""
    if email: apt_conf += 'Unattended-Upgrade::Mail "{email}";\n'
    res = dict(conf=apt_conf)
    if sources: res['sources'] = sources
    return dict(apt=res)

In [None]:
apt(docker=dsource)

{'apt': {'conf': 'APT::Periodic::Update-Package-Lists "1";\nAPT::Periodic::Download-Upgradeable-Packages "1";\nAPT::Periodic::AutocleanInterval "7";\nAPT::Periodic::Unattended-Upgrade "0";\nUnattended-Upgrade::Automatic-Reboot "false";\n',
  'sources': {'docker': {'source': 'deb [signed-by=$KEY_FILE] https://download.docker.com/linux/ubuntu $RELEASE stable',
    'keyid': '9DC858229FC7DD38854AE2D88D81803C0EBFCD88',
    'keyserver': 'https://download.docker.com/linux/ubuntu/gpg'}}}}

In [None]:
#| export
def systemd(items):
    return [
        {'path': f"/etc/systemd/system/{unit_name}.d/override.conf", 
         'owner': 'root:root',  'permissions': '0644',  'content': content}
        for unit_name, content in items.items()
    ]

In [None]:
systemd({"systemd-journald.service":"[Journal]\nSystemMaxUse=50M\n"})

[{'path': '/etc/systemd/system/systemd-journald.service.d/override.conf',
  'owner': 'root:root',
  'permissions': '0644',
  'content': '[Journal]\nSystemMaxUse=50M\n'}]

In [None]:
#| export
def log_rotate(freq="weekly", num_keep=7, fname="00-cloud-init-global"):
    snippet = f"""\
/var/log/*.log {{
    {freq}
    rotate {num_keep}
    compress
    su root adm
    create
    missingok
}}
"""
    return dict(path=f"/etc/logrotate.d/{fname}", owner="root:root", permissions="0644", content=snippet)

In [None]:
#| export
def phone_home(url):
    if not url: return {}
    res = dict(url=url, post=["instance_id", "hostname"], tries=5)
    return dict(phone_home=res)

In [None]:
#| export
def reboot(enable=True, message="Rebooting"):
    if not enable: return {}
    return dict(power_state=dict( mode="reboot", message=message, timeout=1, condition=True))

In [None]:
#| export
def mounts(devices):
    if not devices: return {}
    return dict(mounts=devices)

In [None]:
#| export
def runcmd(cmds):
    if not cmds: return {}
    return dict(runcmd=cmds)

In [None]:
#| export
def cc_validate(d):
    vsc = xget('https://raw.githubusercontent.com/canonical/cloud-init/main/cloudinit/config/schemas/versions.schema.cloud-config.json').text
    validate(d, schema=json.loads(vsc))

In [None]:
samp = '''#cloud-config
hostname: example-host
fqdn: example-host.example.com
# User setup configuration
users:
  - name: exampleuser
    gecos: Example User
    sudo: ['ALL=(ALL) NOPASSWD:ALL']'''
cc_validate(yaml.safe_load(samp))

This example has an intentional key error ("hostna" instead of "hostname"):

In [None]:
samp = '''#cloud-config
hostna: example-host
fqdn: example-host.example.com'''
try: cc_validate(yaml.safe_load(samp))
except ValidationError: pass
else: raise Exception('Expected validation error')

In [None]:
#| export
def cloud_init_base(hostname, packages=None, check=True, **kw):
    cfg = dict(
        hostname=hostname, preserve_hostname=False,
        packages=listify(packages), package_update=True, package_upgrade=True,
        disable_root=True, ssh_pwauth=False, **kw
    )
    if check: cc_validate(cfg)
    return "#cloud-config\n" + yaml.safe_dump(cfg, sort_keys=False, width=1_000_000)

In [None]:
#| export
def cloud_init_config(hostname, username, pub_keys, email='', groups=None, internal=None, cmds=None,
                      devices=None, ping_host=None, packages=None, dropins=None, **sources):
    cmds = listify(cmds)
    cmds += ufw(internal=internal, OpenSSH=22)
    return cloud_init_base(
        hostname, packages=packages,
        users=[user(username, pub_keys, groups=groups)],
        **runcmd(cmds),
        **apt(**sources),
        write_files=[ log_rotate(), *systemd(dropins or {}) ],
        **mounts(devices),
        **phone_home(ping_host),
        **reboot(),
    )

In [None]:
packages = [ "unattended-upgrades", "vim-nox", "python3", "rsync", "ubuntu-drivers-common", "python3-pip", "ack", "lsyncd", "wget", "bzip2", "ca-certificates", "git", "build-essential", "software-properties-common", "curl", "grep", "sed", "dpkg", "tmux", "less", "htop", "openssh-client", "python-is-python3", "python3-dev", "cron", "gnupg", "docker-ce", "docker-ce-cli", "containerd.io", "docker-buildx-plugin", "docker-compose-plugin" ]

dsource = source(
    "https://download.docker.com/linux/ubuntu", 
    keyid="9DC858229FC7DD38854AE2D88D81803C0EBFCD88", 
    keyserver="https://download.docker.com/linux/ubuntu/gpg")
devices = [ ['mydevice', "/data", "ext4", "defaults,nofail", "0", "0"] ]
dropins = {"systemd-journald.service":"[Journal]\nSystemMaxUse=250M\n"}

res = cloud_init_config('myhost', 'jph', 'mykey', 'j@answer.ai', dropins=dropins, groups="docker", internal="enp7s0",
                        devices=devices, ping_host='https://ping.me', packages=packages, docker=dsource)
print(res[:50])

#cloud-config
hostname: myhost
preserve_hostname: 


## export -

In [None]:
#|hide
from nbdev import nbdev_export
nbdev_export()