Skip to content

Commit

Permalink
deploy system user assertion to device (#212)
Browse files Browse the repository at this point in the history
  • Loading branch information
st3v3nmw committed Feb 16, 2024
1 parent b807923 commit 7e5fd76
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 13 deletions.
5 changes: 2 additions & 3 deletions landscape/client/manager/snapmanager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import logging
from collections import deque

Expand Down Expand Up @@ -112,7 +111,7 @@ def _handle_batch_snap_task(self, message):
)
queue.append((response.change, "BATCH"))
except SnapdHttpException as e:
result = json.loads(e.args[0])["result"]
result = e.json["result"]
logging.error(
f"Error in {message_type}: {message}",
)
Expand Down Expand Up @@ -156,7 +155,7 @@ def _handle_multiple_snap_tasks(self, message):
)
queue.append((response.change, name))
except SnapdHttpException as e:
result = json.loads(e.args[0])["result"]
result = e.json["result"]
logging.error(
f"Error in {message_type} for '{name}': {message}",
)
Expand Down
41 changes: 38 additions & 3 deletions landscape/client/snap_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,45 @@
from copy import deepcopy

import yaml

from landscape.client import snap_http
from landscape.client.snap_http import SnapdHttpException


class IgnoreYamlAliasesLoader(yaml.SafeLoader):
"""Patch `yaml.SafeLoader` to ignore aliases like *alias when loading.
For instance, a system-user assertion can have the following json:
{
[...]
"system-user-authority": "*",
[...]
}
which after signing gets converted to yaml that looks like:
[...]
system-user-authority: *
[...]
pyyaml tries to parse the * as the start of an alias leading to errors.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.yaml_implicit_resolvers = deepcopy(
super().yaml_implicit_resolvers,
)
self.yaml_implicit_resolvers.pop("*", None)

def fetch_alias(self):
return super().fetch_plain()


def parse_assertion(headers, signature):
"""Parse an assertion."""
assertion = yaml.load(headers, IgnoreYamlAliasesLoader)
assertion["signature"] = signature
return assertion


def get_assertions(assertion_type: str):
"""Get and parse assertions."""
try:
Expand Down Expand Up @@ -31,9 +67,8 @@ def get_assertions(assertion_type: str):
rest = sections
while rest:
headers, signature, *rest = rest
assertion = yaml.safe_load(headers)
assertion["signature"] = signature
assertions.append(assertion)
parsed = parse_assertion(headers, signature)
assertions.append(parsed)

return assertions

Expand Down
38 changes: 32 additions & 6 deletions landscape/client/user/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
# API, with thorough usage of exceptions and such, instead of pipes to
# subprocesses. liboobs (i.e. System Tools) is a possibility, and has
# documentation now in the 2.17 series, but is not wrapped to Python.
import json
import logging
import subprocess

from landscape.client import snap_http
from landscape.client.snap_http import SnapdHttpException
from landscape.client.snap_utils import parse_assertion
from landscape.client.user.provider import UserManagementError
from landscape.client.user.provider import UserProvider

Expand Down Expand Up @@ -301,9 +301,23 @@ def __init__(self, provider=None):
)

def add_user(self, message):
"""Add a user via the Snapd API."""
username = message["username"]
email = message["email"]
"""Add a user via the Snapd API.
Message formats can be in two forms:
- SSO User (username, email, sudoer, force-managed)
- System User (assertion, sudoer, force-managed)
"""
if "assertion" in message:
assertion = self._add_system_user_assertion(message["assertion"])
username = assertion["username"]
email = assertion["email"]
known = True
else:
# Ubuntu One SSO User
username = message["username"]
email = message["email"]
known = False

sudoer = message.get("sudoer", False)
force_managed = message.get("force-managed", False)

Expand All @@ -313,13 +327,25 @@ def add_user(self, message):
email,
sudoer=sudoer,
force_managed=force_managed,
known=known,
)
except SnapdHttpException as e:
result = json.loads(e.args[0])["result"]
result = e.json["result"]
raise UserManagementError(result)

return response.result

def _add_system_user_assertion(self, assertion):
"""Add a system user assertion."""
try:
# adding an assertion is idempotent
snap_http.add_assertion(assertion)
except SnapdHttpException as e:
result = e.json["result"]
raise UserManagementError(result)

return parse_assertion(*assertion.split("\n\n"))

def set_user_details(self, *_):
"""Update a user's details."""

Expand All @@ -334,7 +360,7 @@ def remove_user(self, message):
try:
response = snap_http.remove_user(message["username"])
except SnapdHttpException as e:
result = json.loads(e.args[0])["result"]
result = e.json["result"]
raise UserManagementError(result)

return response.result
Expand Down
80 changes: 79 additions & 1 deletion landscape/client/user/tests/test_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,16 +884,70 @@ def test_add_user(self):
},
)

self.snap_http.add_assertion.assert_not_called()
self.snap_http.add_user.assert_called_once_with(
"john-doe",
"john.doe@example.com",
sudoer=False,
force_managed=True,
known=False,
)

def test_add_system_user(self):
"""L{SnapdUserManagement.add_user} should add a user."""
groups = [("users", "x", 1001, [])]
provider = FakeUserProvider(
groups=groups,
popen=MockPopen(""),
shadow_file=self.shadow_file,
)
management = SnapdUserManagement(provider=provider)

assertion = """
type: system-user
authority-id: f22PSauKuNkwQTM9Wz67ZCjNACuSjjhN
brand-id: f22PSauKuNkwQTM9Wz67ZCjNACuSjjhN
email: jane@example.com
models:
- ubuntu-core-22-amd64
name: Jane Doe
password: >
$6$goodsoup$XerhNYrEA8Qe5nmUaEQ4F./n2FHmjgs6qc2s513bnpv6IFqDjMxLktRfitQF
jGiNXfqJWscy7His.QG1l8G7W1
series:
- 16
since: 2024-02-08T09:14:00+00:00
system-user-authority: *
until: 2038-01-19T03:14:07+00:00
username: jane
sign-key-sha3-384: >
ncl1u5VJlEitiCt_XeGJ0FMLwWeXLw35r8VvWifR_5Vxeu2q1hYdkGcQZ6DURx1S
AcLBcwQAAQoAHRYhBOt0YHzas+IX2LgSYemrEyH88NiPBQJlxKCMAAoJEOmrEyH88NiPn+gP/3im
+YdXT+A6ZYh5gDaBhvogTB4b57LslWTwBBBoFaYvhkYzKZkFuiDvQvOUTGn3ZKBd23kvYqFXODXH
7lCjCBdOr10j24Yn+wpHelDPwzaGLNHc+2epFFiPHMu6sQyKBAC7Mvnn7LRa/hnDiJ+n5yLmnBWx
VmG+KqOGJa6UclW0nZqBBnmAoaPDRwoa6XCxK0jmhpCVtFRP/ZOh1I/N7/a2VYCNPSwgx9WeMtm1
adr+0unBRt4lsB1/BFoLQozRZF9klZsWDs3o8IxO9FPFEmPaSNeCWj5haS5GO55n5OI6s2nOl8ro
dFiGt+f6eQRSolhR7+pNZBQVGT95S8Cd2LCfThU3Pn1tM6oo56haLTx8uDhUyUlwRZLXyMK689jJ
701ChRvT7QYDksqdDwrKB/2/dxvDkuoRwuuGO0SowwdO5Dil9DFluVL0aq4BPs6CHjlbrngbVFfN
fINbBjAgZvYbcsY2AyDPX4nAXHZIRvXxDFcPTuYDmAP4zLlt0R3wiTMkpQq8c3dEKDq3Cd2UgwLb
s4ZW2IIyxYQzCe8L2ZXXy7aBsB9qMturxA9i2FizeTfO7OU1baHVgdxF8uSgF28F2T3xtA1ReciH
nzAQUNSvsvHSKb6REWEz0+blJQqFA46td/rwlTe7AKk+SlM4GWiI7lXYUZ5/iYTfM8TPzzG2
"""
management.add_user({"assertion": assertion, "force-managed": True})

self.snap_http.add_assertion.assert_called_once_with(assertion)
self.snap_http.add_user.assert_called_once_with(
"jane",
"jane@example.com",
sudoer=False,
force_managed=True,
known=True,
)

def test_add_user_exception(self):
"""
L{SnapdUserManagement.add_user} should raise C{SnapdHttpException}.
L{SnapdUserManagement.add_user} should raise C{UserManagementError}.
"""
self.snap_http.add_user.side_effect = SnapdHttpException(
'{"type":"error","status-code":400,"status":"Bad Request","result"'
Expand Down Expand Up @@ -922,6 +976,30 @@ def test_add_user_exception(self):
"john.doe@example.com",
sudoer=True,
force_managed=False,
known=False,
)

def test_add_user_system_user_assertion_exception(self):
"""
L{SnapdUserManagement.add_user} should raise C{UserManagementError}.
"""
self.snap_http.add_assertion.side_effect = SnapdHttpException(
'{"type":"error","status-code":400,"status":"Bad Request",'
'"result":{"message":"cannot decode request body into assertions: '
'unexpected EOF"}}',
)

provider = FakeUserProvider(
popen=MockPopen(""),
shadow_file=self.shadow_file,
)
management = SnapdUserManagement(provider=provider)

with self.assertRaises(UserManagementError):
management.add_user({"assertion": "not an assertion"})

self.snap_http.add_assertion.assert_called_once_with(
"not an assertion",
)

def test_remove_user(self):
Expand Down

0 comments on commit 7e5fd76

Please sign in to comment.