diff --git a/doc/bridge-design.md b/doc/bridge-design.md new file mode 100644 index 000000000000..cf9b2eafb2c4 --- /dev/null +++ b/doc/bridge-design.md @@ -0,0 +1,149 @@ +# Internal documentation of cockpit-bridge + +This document aims to describe the internal design decisions of +`cockpit-bridge` from the standpoint of someone who already has an +understanding of how Cockpit functions at a component level (ie: how the +protocol looks, what channels are, etc.). + +`doc/protocol.md` describes the protocol itself in more detail. + +First: a bit of terminology. When written as "`Channel`", we're talking about +a specific instance of a subclass of the `Channel` type from `channel.py`. When +written as "channel", we're talking about the concept of a given named channel. +In this case, "channel id" refers to the name of that channel. + +## Protocols and transports + +The design of `cockpit-bridge` is based around the `asyncio.Protocol` and +`asyncio.Transport` approach. + +The Cockpit wire protocol is implemented by an `asyncio.Protocol` subclass +named `CockpitProtocol, living in `protocol.py`. It calls virtual methods on +itself in response to incoming messages. Those methods all have names like +`transport_control_received()` or `channel_data_received()`, to match the +flavour of the `data_received()` and `eof_received()` methods on +`asyncio.Protocol`. Similarly, all methods for writing outgoing data are named +like `write_channel_data()` or `write_control()` to match the `write()` method +on `asyncio.Transport`. + +## Router, endpoints, and routing rules. + +The most relevant subclass of `CockpitProtocol` is `Router` — a protocol +implementation that responds to incoming messages by routing them to +`Endpoint`s. + +This relationship between `Router` and `Endpoint` is most important internal +class relationship in the bridge. These two classes would be described as +"friends" in some languages. They both live in `router.py` and reach inside of +each others implementation. Neither of them makes any sense without the other. + +A given `cockpit-bridge` process contains a single `Router` and potentially +many `Endpoint`s. The two main `Endpoint` subclasses are `Channel` and `Peer`. + +All messages sent through `cockpit-bridge` involve either the `Router` +forwarding incoming messages by calling one of the `do_` methods on an +`Endpoint` or a given `Endpoint` sending reply messages back to the client by +calling one of the `write_()` method on the `Router`. + +All `Endpoint` objects refer back to the `Router` (from the instant they're +created) and the `Router` keeps track of all existing `Endpoint` objects. + +`Router` maintains two main sets of state: + + - a mapping between open channel ids and the `Endpoint` responsible for + serving them. This is used to route incoming messages to the correct place. + Entries are added in response to `Endpoint`s being returned from `RoutingRule`s + (see below) and entries are removed in response to `close` messages from the + `Endpoint`. + + - a mapping between all existing `Endpoint` objects and a set of channels ids + which are open on them. The `Endpoint`s are added to this table when + they're created and the channel set is updated in lockstep with the above + mapping. + +These two mappings might seem to be redundant, but they allow for a very +important situation: the second mapping allows the `Router` to hold a reference +to an `Endpoint` even if there are zero open channels. In that case. the set +of channel ids is empty. In this sense, the second mapping contains strictly +more information than the first (even if the first one is the one which is used +on every incoming message). + +An endpoint with zero channels routed to it may exist because it was requested, +but not currently in use by anything (common with `Peer`s) or because it's +currently trying to shutdown and all channels have been closed but there are +still outstanding tasks (happens with both `Peer` and `Channel`). + +Important: in Python you need to take care to ensure that asynchronous +background tasks are properly cleaned up before dropping references to them. +The relationship between `Router` and `Endpoint`, even in the absence of routed +channels, allows `Endpoint`s to finish up their tasks without getting dropped. +The `Router` will hold a reference to each `Endpoint` until everything is +complete, and will even prevent the entire bridge from exiting until everything +has been cleaned up properly. + +## Routing rules + +The third class which is most closely related to `Router` and `Endpoint` is +definitely `RoutingRule`. + +A `RoutingRule` looks at incoming `open` message and has the ability to return +an `Endpoint` to handle the requested channel. Routing rules are responsible +for the creation of `Endpoint`s. The `Router` maintains a (mostly) static list +of `RoutingRule`s which are consulted, in seqeunce, for each incoming `open` +message. + +In general, there are several different routing rules which can return `Peer`s +and one special routing rule — `ChannelRoutingRule` — which creates `Channel` +instances according to the requested `payload` type on the `open` message. + +Once a `RoutingRule` returns a given endpoint to handle a message, the router +adds a mapping from the channel id to the `Endpoint` to its routing table. + +`RoutingRule`s can have a sort of "caching" relationship with their +`Endpoint`s, which is often the case for `Peer`s. For example, once the +superuser bridge is brought online the `SuperuserRoutingRule` will remember the +`SuperuserPeer` instance and return the same one each time a superuser channel +is requested, rather than creating a new one. + +The `RoutingRule` is not responsible for ensuring that the `Endpoint` is +properly shutdown before dropping its reference to it — that's the job of the +`Router`. Indeed, a `RoutingRule` can be completely stateless (as is the case +for the `ChannelRoutingRule`). + +`RoutingRule`s can sometimes close their cached `Peer`s. This can happen, for +example: + + - if a routing rule created for a bridge described in a `manifest.json` file no + longer exists after a reload then the associated bridge will be shutdown + + - if the client calls the `Stop` method on the `/superuser` D-Bus endpoint + + - if the client sends a `kill` message with `host=` set to the name of a given + remote host + +In this case, the `RoutingRule` can request the close and immediately drop its +reference — `Router` is responsible for waiting until the `Peer` is finished. + +## Channels + +A channel is a bi-directional ordered datagram transport between `cockpit.js` +and some piece of backend code (ie: a subclass of `Channel` inside of a bridge +running on some host). + +A channel is only opened in response to an `open` control message from the +client. A channel is closed when the `Channel` subclass sends a `close` +control message. The client sending a `close` message does not close the +channel — it is considered to be a request from the client — although it will +usually be quickly followed by a corresponding `close` message from the +`Channel` implementation. + +The Python bridge will always send either a `close` message (with `problem=`) +or a `ready` message in response to an `open` control message. This is +different from the original implementation of the bridge in C, which would +sometimes open channels without sending any acknowledgement. + +Inside the Python bridge, `Channel` is a subclass of `Endpoint` — it is an +endpoint which is responsible for a single channel. Usually a single channel +is routed to the `Channel` object, but some `Channel`s which require a longer +time to shutdown may temporarily be in a state where no channels are routed to +them before they finish shutting down, as described above. diff --git a/src/cockpit/beiboot.py b/src/cockpit/beiboot.py index c42e249d1e62..bb0b6adcb894 100644 --- a/src/cockpit/beiboot.py +++ b/src/cockpit/beiboot.py @@ -5,7 +5,7 @@ # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# (at your option) any l version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -18,27 +18,29 @@ import argparse import asyncio import base64 -import importlib.resources +import importlib.abc import logging import os import shlex -import sys +import socket from pathlib import Path -from typing import Dict, Iterable, Optional, Sequence +from typing import Iterable, Sequence -from cockpit import polyfills from cockpit._vendor import ferny from cockpit._vendor.bei import bootloader -from cockpit.beipack import BridgeBeibootHelper -from cockpit.bridge import setup_logging -from cockpit.channel import ChannelRoutingRule -from cockpit.channels import PackagesChannel -from cockpit.jsonutil import JsonObject -from cockpit.packages import Packages, PackagesLoader, patch_libexecdir -from cockpit.peer import Peer -from cockpit.protocol import CockpitProblem -from cockpit.router import Router, RoutingRule -from cockpit.transports import StdioTransport +from cockpit._vendor.systemd_ctypes import run_async + +from . import polyfills +from .beipack import BridgeBeibootHelper +from .bridge import setup_logging +from .channel import ChannelRoutingRule +from .channels import PackagesChannel +from .jsonutil import JsonDocument, JsonObject +from .packages import Packages, PackagesLoader, patch_libexecdir +from .peer import Peer +from .protocol import CockpitProblem +from .router import Endpoint, Router, RoutingRule +from .transports import StdioTransport logger = logging.getLogger('cockpit.beiboot') @@ -49,8 +51,9 @@ def ensure_ferny_askpass() -> Path: We need this for the flatpak: ssh and thus the askpass program run on the host (via flatpak-spawn), not the flatpak. Thus we cannot use the shipped cockpit-askpass program. """ - src_path = importlib.resources.files(ferny.__name__) / 'interaction_client.py' - src_data = src_path.read_bytes() + loader = ferny.interaction_client.__loader__ + assert isinstance(loader, importlib.abc.ResourceLoader) + src_data = loader.get_data(ferny.interaction_client.__file__) # Create the file in $XDG_CACHE_HOME, one of the few locations that a flatpak can write to xdg_cache_home = os.environ.get('XDG_CACHE_HOME') @@ -85,7 +88,7 @@ def get_interesting_files() -> Iterable[str]: class ProxyPackagesLoader(PackagesLoader): - file_status: Dict[str, bool] + file_status: 'dict[str, bool]' def check_condition(self, condition: str, value: object) -> bool: assert isinstance(value, str) @@ -98,242 +101,360 @@ def check_condition(self, condition: str, value: object) -> bool: else: raise KeyError - def __init__(self, file_status: Dict[str, bool]): + def __init__(self, file_status: 'dict[str, bool]'): self.file_status = file_status -BEIBOOT_GADGETS = { - "report_exists": r""" - import os - def report_exists(files): - command('cockpit.report-exists', {name: os.path.exists(name) for name in files}) - """, - **ferny.BEIBOOT_GADGETS -} +class ExistsHandler(ferny.InteractionHandler): + commands = ('cockpit.report-exists',) + router: 'ForwarderRouter' + def __init__(self, router: 'ForwarderRouter'): + self.router = router -class DefaultRoutingRule(RoutingRule): - peer: 'Peer | None' + async def run_command(self, command: str, args: tuple, fds: 'list[int]', stderr: str) -> None: + logger.debug('run_command(%r, %r, %r, %r, %r)', self, command, args, fds, stderr) + assert command == 'cockpit.report-exists' - def __init__(self, router: Router): - super().__init__(router) + file_status, = args + packages = Packages(loader=ProxyPackagesLoader(file_status)) + self.router.divert_packages(packages) - def apply_rule(self, options: JsonObject) -> 'Peer | None': - return self.peer - def shutdown(self) -> None: - if self.peer is not None: - self.peer.close() - - -class AuthorizeResponder(ferny.AskpassHandler): - commands = ('ferny.askpass', 'cockpit.report-exists') +class AuthorizeResponder(ferny.SshAskpassResponder): + password_attempts: int = 0 + hostkey_info: 'JsonObject | None' = None router: Router def __init__(self, router: Router): self.router = router - async def do_askpass(self, messages: str, prompt: str, hint: str) -> Optional[str]: - if hint == 'none': - # We have three problems here: - # - # - we have no way to present a message on the login - # screen without presenting a prompt and a button - # - the login screen will not try to repost the login - # request because it doesn't understand that we are not - # waiting on input, which means that it won't notice - # that we've logged in successfully - # - cockpit-ws has an issue where if we retry the request - # again after login succeeded then it won't forward the - # init message to the client, stalling the login. This - # is a race and can't be fixed without -ws changes. - # - # Let's avoid all of that by just showing nothing. - return None - - challenge = 'X-Conversation - ' + base64.b64encode(prompt.encode()).decode() + async def do_fido_user_presence_prompt(self, prompt: ferny.SshFIDOUserPresencePrompt) -> 'str | None': + # We have three problems here: + # + # - we have no way to present a message on the login + # screen without presenting a prompt and a button + # - the login screen will not try to repost the login + # request because it doesn't understand that we are not + # waiting on input, which means that it won't notice + # that we've logged in successfully + # - cockpit-ws has an issue where if we retry the request + # again after login succeeded then it won't forward the + # init message to the client, stalling the login. This + # is a race and can't be fixed without -ws changes. + # + # Let's avoid all of that by just showing nothing. + return None + + async def do_prompt(self, prompt: ferny.AskpassPrompt) -> 'str | None': + # We handle all other prompt types by sending an authorize message. + # The X-Conversation challenge is for the benefit of the cockpitauth + # code in cockpit-ws. The ferny information is interpreted by the + # bridge, in remote.py. + message = (prompt.messages + prompt.prompt) + challenge = 'X-Conversation - ' + base64.b64encode(message.encode()).decode() response = await self.router.request_authorization(challenge, - messages=messages, - prompt=prompt, - hint=hint, - echo=False) + ferny_type=prompt.__class__.__name__, + ferny_attrs=prompt.__dict__) + + # this impacts the error message that we return in case authentication fails + if isinstance(prompt, ferny.SshPasswordPrompt): + self.password_attempts += 1 b64 = response.removeprefix('X-Conversation -').strip() passwd = base64.b64decode(b64.encode()).decode() logger.debug('Returning a %d chars password', len(passwd)) return passwd - async def do_custom_command(self, command: str, args: tuple, fds: list[int], stderr: str) -> None: - logger.debug('Got ferny command %s %s %s', command, args, stderr) + async def do_hostkey(self, reason: str, host: str, algorithm: str, key: str, fingerprint: str) -> bool: + self.hostkey_info = {'host-key': f'{host} {algorithm} {key}', 'host-fingerprint': fingerprint} + return False - if command == 'cockpit.report-exists': - file_status, = args - # FIXME: evil duck typing here -- this is a half-way Bridge - self.router.packages = Packages(loader=ProxyPackagesLoader(file_status)) # type: ignore[attr-defined] - self.router.routing_rules.insert(0, ChannelRoutingRule(self.router, [PackagesChannel])) +class ForwarderPeer(Peer): + stage1: 'str | None' = None # sent on connection_made() + authorize_handler: AuthorizeResponder -class SshPeer(Peer): - always: bool + def __init__(self, rule: RoutingRule): + self.authorize_handler = AuthorizeResponder(rule.router) + super().__init__(rule) - def __init__(self, router: Router, destination: str, args: argparse.Namespace): - self.destination = destination - self.always = args.always - super().__init__(router) + def do_init(self, message: JsonObject) -> None: + assert isinstance(self.router, ForwarderRouter) + self.router.peer_sent_init(message) - async def do_connect_transport(self) -> None: - beiboot_helper = BridgeBeibootHelper(self) + def transport_control_received(self, command: str, message: JsonObject) -> None: + if command == 'authorize': + # We've disabled this for explicit-superuser bridges, but older + # bridges don't support that and will ask us anyway. + return - agent = ferny.InteractionAgent([AuthorizeResponder(self.router), beiboot_helper]) + super().transport_control_received(command, message) - # We want to run a python interpreter somewhere... - cmd: Sequence[str] = ('python3', '-ic', '# cockpit-bridge') - env: Sequence[str] = () + def connection_made(self, transport: asyncio.BaseTransport) -> None: + assert isinstance(transport, asyncio.Transport) + super().connection_made(transport) + if self.stage1 is not None: + transport.write(self.stage1.encode()) - in_flatpak = os.path.exists('/.flatpak-info') + def do_exception(self, exc: Exception) -> None: + if isinstance(exc, socket.gaierror): + raise CockpitProblem('no-host', error='no-host', message=str(exc)) from exc - # Remote host? Wrap command with SSH - if self.destination != 'localhost': - if in_flatpak: - # we run ssh and thus the helper on the host, always use the xdg-cache helper - ssh_askpass = ensure_ferny_askpass() - else: - # outside of the flatpak we expect cockpit-ws and thus an installed helper - askpass = patch_libexecdir('${libexecdir}/cockpit-askpass') - assert isinstance(askpass, str) - ssh_askpass = Path(askpass) - if not ssh_askpass.exists(): - logger.error("Could not find cockpit-askpass helper at %r", askpass) + elif isinstance(exc, ferny.SshHostKeyError): + hostkey_info = self.authorize_handler.hostkey_info or {} - env = ( - f'SSH_ASKPASS={ssh_askpass!s}', - 'DISPLAY=x', - 'SSH_ASKPASS_REQUIRE=force', - ) - host, _, port = self.destination.rpartition(':') - # catch cases like `host:123` but not cases like `[2001:abcd::1] - if port.isdigit(): - host_args = ['-p', port, host] + if isinstance(exc, ferny.SshChangedHostKeyError): + error = 'invalid-hostkey' else: - host_args = [self.destination] + error = 'unknown-hostkey' - cmd = ('ssh', *host_args, shlex.join(cmd)) + raise CockpitProblem(error, hostkey_info, error=error, auth_method_results={}) from exc - # Running in flatpak? Wrap command with flatpak-spawn --host - if in_flatpak: - cmd = ('flatpak-spawn', '--host', - *(f'--env={kv}' for kv in env), - *cmd) - env = () + elif isinstance(exc, ferny.SshAuthenticationError): + logger.debug('authentication to host failed: %s', exc) - logger.debug("Launching command: cmd=%s env=%s", cmd, env) - transport = await self.spawn(cmd, env, stderr=agent, start_new_session=True) + results: JsonObject = {method: 'not-provided' for method in exc.methods} + if 'password' in results: + if self.authorize_handler.password_attempts == 0: + results['password'] = 'not-tried' + else: + results['password'] = 'denied' - if not self.always: - exec_cockpit_bridge_steps = [('try_exec', (['cockpit-bridge'],))] - else: - exec_cockpit_bridge_steps = [] + raise CockpitProblem('authentication-failed', + error='authentication-failed', + auth_method_results=results) from exc - # Send the first-stage bootloader - stage1 = bootloader.make_bootloader([ - *exec_cockpit_bridge_steps, - ('report_exists', [list(get_interesting_files())]), - *beiboot_helper.steps, - ], gadgets=BEIBOOT_GADGETS) - transport.write(stage1.encode()) + elif isinstance(exc, ferny.SshError): + logger.debug('unknown failure connecting to host: %s', exc) + raise CockpitProblem('internal-error', message=str(exc)) from exc - # Wait for "init" or error, handling auth and beiboot requests - await agent.communicate() + super().do_exception(exc) - def transport_control_received(self, command: str, message: JsonObject) -> None: - if command == 'authorize': - # We've disabled this for explicit-superuser bridges, but older - # bridges don't support that and will ask us anyway. - return - super().transport_control_received(command, message) +class ForwarderRoutingRule(RoutingRule): + peer: 'Peer | None' + + def apply_rule(self, options: JsonObject) -> 'Peer | None': + # forward everything to the peer + return self.peer -class SshBridge(Router): - packages: Optional[Packages] = None - ssh_peer: SshPeer +class ForwarderRouter(Router): + packages: 'Packages | None' = None + rule: ForwarderRoutingRule - def __init__(self, args: argparse.Namespace): + def __init__(self): # By default, we route everything to the other host. We add an extra # routing rule for the packages webserver only if we're running the # beipack. - rule = DefaultRoutingRule(self) - super().__init__([rule]) + self.rule = ForwarderRoutingRule(self) + super().__init__([self.rule]) - # This needs to be created after Router.__init__ is called. - self.ssh_peer = SshPeer(self, args.destination, args) - rule.peer = self.ssh_peer + def divert_packages(self, packages: Packages) -> None: + self.packages = packages + self.routing_rules.insert(0, ChannelRoutingRule(self, [PackagesChannel])) + + def set_peer(self, peer: Peer) -> None: + self.rule.peer = peer def do_send_init(self): pass # wait for the peer to do it first - def do_init(self, message): - # https://github.com/cockpit-project/cockpit/issues/18927 - # - # We tell cockpit-ws that we have the explicit-superuser capability and - # handle it ourselves (just below) by sending `superuser-init-done` and - # passing {'superuser': False} on to the actual bridge (Python or C). - if isinstance(message.get('superuser'), dict): - self.write_control(command='superuser-init-done') - message['superuser'] = False - self.ssh_peer.write_control(message) + def shutdown_endpoint(self, endpoint: Endpoint, _msg: 'JsonObject | None' = None, **kwargs: JsonDocument) -> None: + super().shutdown_endpoint(endpoint, _msg, **kwargs) + if isinstance(endpoint, ForwarderPeer) and self.transport is not None: + if not endpoint.saw_init: + self.write_control(_msg, **kwargs, command='init') + self.transport.close() + # First: peer sends its init message. We patch it and pass it along. + def peer_sent_init(self, message: JsonObject) -> None: + # only patch the packages line if we are in beiboot mode + if self.packages is not None: + message = dict(message, packages={p: None for p in self.packages.packages}) -async def run(args) -> None: - logger.debug("Hi. How are you today?") + self.write_control(message) - bridge = SshBridge(args) - StdioTransport(asyncio.get_running_loop(), bridge) + # Only after we sent the init message to stdout do we want to start + # reading from stdin. Next, we expect .do_init() will be called. + assert self.transport is not None + self.transport.resume_reading() - try: - message = await bridge.ssh_peer.start() + # Second: we get the init message from the user. Pass it along. + def do_init(self, message: JsonObject) -> None: + assert self.rule.peer is not None + self.rule.peer.write_control(message) + self.rule.peer.saw_init = True + self.rule.peer.thaw_endpoint() - # See comment in do_init() above: we tell cockpit-ws that we support - # this and then handle it ourselves when we get the init message. - capabilities = message.setdefault('capabilities', {}) - if not isinstance(capabilities, dict): - bridge.write_control(command='init', problem='protocol-error', message='capabilities must be a dict') - return - assert isinstance(capabilities, dict) # convince mypy - capabilities['explicit-superuser'] = True - # only patch the packages line if we are in beiboot mode - if bridge.packages: - message['packages'] = {p: None for p in bridge.packages.packages} - - bridge.write_control(message) - bridge.ssh_peer.thaw_endpoint() - except ferny.InteractionError as exc: - sys.exit(str(exc)) - except CockpitProblem as exc: - bridge.write_control(exc.attrs, command='init') - return - - logger.debug('Startup done. Looping until connection closes.') - try: - await bridge.communicate() - except BrokenPipeError: - # expected if the peer doesn't hang up cleanly - pass +def get_argv_envp(args: argparse.Namespace) -> 'tuple[tuple[str, ...], dict[str, str]]': + # We want to run a python interpreter somewhere... + cmd: tuple[str, ...] = ('python3', '-ic', '# cockpit-bridge') + env: dict[str, str] = {} + + # We now perform a series of transformations based on our execution context + # (in flatpak or not) and what was requested (remote host, or local). + in_flatpak = os.path.exists('/.flatpak-info') + + # We take 'localhost' to mean 'spawn the bridge locally' + if args.destination is not None and args.destination != 'localhost': + if in_flatpak: + # we run ssh and thus the helper on the host, always use the xdg-cache helper + ssh_askpass = ensure_ferny_askpass() + else: + # outside of the flatpak we expect cockpit-ws and thus an installed helper + askpass = patch_libexecdir('${libexecdir}/cockpit-askpass') + assert isinstance(askpass, str) + ssh_askpass = Path(askpass) + + if not ssh_askpass.exists(): + # Last ditch: try finding the in-tree version. + interaction_client_path = ferny.interaction_client.__file__ + if os.access(interaction_client_path, os.X_OK): + ssh_askpass = Path(interaction_client_path) + else: + logger.error("Could not find cockpit-askpass helper at %r", askpass) + + # Forcing DISPLAY to 'x' enables an equivalent heuristic in older + # versions of OpenSSH which don't support SSH_ASKPASS_REQUIRE. + env.update(SSH_ASKPASS=str(ssh_askpass), + DISPLAY='x', + SSH_ASKPASS_REQUIRE='force') + + ssh_cmd = [ + 'ssh', + ] + + if args.password_prompts is not None: + ssh_cmd.extend(('-o', f'NumberOfPasswordPrompts={args.password_prompts}')) + + if args.strict: + ssh_cmd.extend(( + '-o', 'StrictHostKeyChecking=yes', + '-o', f'KnownHostsCommand={ssh_askpass} %I %H %t %K %f', + )) + + # We support `host:port` notation, but SSH doesn't. Split it out. + host, _, port = args.destination.rpartition(':') + # catch cases like `host:123` but not cases like `[2001:abcd::1] + if port.isdigit(): + ssh_cmd.extend(('-p', port, host)) + else: + ssh_cmd.append(args.destination) + + cmd = (*ssh_cmd, ' '.join(shlex.quote(arg) for arg in cmd)) + + # Running in flatpak? Wrap command with flatpak-spawn --host + if in_flatpak: + cmd = ('flatpak-spawn', '--host', + *(f'--env={k}={v}' for k, v in env.items()), + *cmd) + env.clear() + + return cmd, env + + +def create_stage1(beiboot_handler: BridgeBeibootHelper, *, always: bool) -> str: + # Set up the beiboot stage1 bootloader. + beiboot_steps: 'list[tuple[str, Sequence[object]]]' = [] + + # Unless --always was given, first step is to try running /usr/bin/cockpit-bridge + if not always: + beiboot_steps.append(('try_exec', (['cockpit-bridge'],))) + + beiboot_steps.extend([ + # If we didn't run /usr/bin/cockpit-bridge, our first step is to check + # which files exist on the remote in order to make decisions about + # which packages will be served locally. The result of that lands in + # our ExistsHandler, which will also set up the routing rule for local + # packages handling. + ('report_exists', (list(get_interesting_files()),)), + + # This is the main step of requesting and booting the bridge beipack.xz. + *beiboot_handler.steps, + ]) + beiboot_gadgets = { + "report_exists": r""" + import os + def report_exists(files): + command('cockpit.report-exists', {name: os.path.exists(name) for name in files}) + """, + **ferny.BEIBOOT_GADGETS + } + + return bootloader.make_bootloader(beiboot_steps, gadgets=beiboot_gadgets) + + +def create_router(args: argparse.Namespace) -> ForwarderRouter: + # Wire everything together. Doesn't do anything yet. + router = ForwarderRouter() + peer = ForwarderPeer(router.rule) + router.set_peer(peer) + + # We implement our own handler for reporting the existence of files. + exists_handler = ExistsHandler(router) + + # This is our handler to send the beipack over, if it gets requested. + beiboot_handler = BridgeBeibootHelper(peer) + + # Setup the stage1 bootloader — will be sent from Peer.connection_made(). + peer.stage1 = create_stage1(beiboot_handler, always=args.always) + + # This is where we actually fork and spawn the peer... + handlers = [ + peer.authorize_handler, + beiboot_handler, + exists_handler, + ] + + cmd, env = get_argv_envp(args) # broken out because it's complex + transport, peer = ferny.FernyTransport.spawn(lambda: peer, cmd, env=dict(os.environ, **env), + interaction_handlers=handlers, is_ssh=True) + + peer.transport = transport + + return router + + +async def run(args: argparse.Namespace) -> None: + router = create_router(args) + + # Start the router side talking on stdin/stdout. + logger.debug("Hi. How are you today?") + stdio = StdioTransport(asyncio.get_running_loop(), router) + + # Sometimes the client breaks protocol by not waiting for us to send `init` + # first, leading to us to send messages to the peer before it's ready. + # Looking at you, `cockpit.print`. Start off by ignoring the client. + # stdio.pause_reading() XXX + + # From here on out, there are a lot of things that can happen "next" — the + # normal process of exchanging `init` messages, but also various error + # conditions, early exits, etc. Therefore we switch to an asynchronous + # mode of execution that deals with events as they occur. + await router.communicate() def main() -> None: polyfills.install() - parser = argparse.ArgumentParser(description='cockpit-bridge is run automatically inside of a Cockpit session.') - parser.add_argument('--always', action='store_true', help="Never try to run cockpit-bridge from the system") - parser.add_argument('--debug', action='store_true') + parser = argparse.ArgumentParser(description='cockpit-beiboot is run automatically inside of a Cockpit session.') + + group = parser.add_mutually_exclusive_group(required=False) + group.add_argument('--never', action='store_true', help="Never try to beiboot cockpit-bridge") + group.add_argument('--always', action='store_true', help="Never try to run cockpit-bridge from the system") + + parser.add_argument('--password-prompts', type=int, metavar='N', help="Number of times to ask for password") + parser.add_argument('--strict', action='store_true', help="Don't prompt for unknown host keys") + parser.add_argument('--debug', action='store_true', help="Enable all debugging output (warning: loud)") parser.add_argument('destination', help="Name of the remote host to connect to, or 'localhost'") args = parser.parse_args() setup_logging(debug=args.debug) - asyncio.run(run(args), debug=args.debug) + run_async(run(args), debug=args.debug) if __name__ == '__main__': diff --git a/src/cockpit/beipack.py b/src/cockpit/beipack.py index c200195759b0..965934010a20 100644 --- a/src/cockpit/beipack.py +++ b/src/cockpit/beipack.py @@ -23,7 +23,8 @@ from cockpit._vendor.bei import beipack from .data import read_cockpit_data_file -from .peer import Peer, PeerError +from .peer import Peer +from .protocol import CockpitProblem logger = logging.getLogger(__name__) @@ -33,7 +34,7 @@ def get_bridge_beipack_xz() -> Tuple[str, bytes]: bridge_beipack_xz = read_cockpit_data_file('cockpit-bridge.beipack.xz') logger.debug('Got pre-built cockpit-bridge.beipack.xz') except FileNotFoundError: - logger.debug('Pre-built cockpit-bridge.beipack.xz; building our own.') + logger.debug('No pre-built cockpit-bridge.beipack.xz; building our own.') # beipack ourselves cockpit_contents = beipack.collect_module('cockpit', recursive=True) bridge_beipack = beipack.pack(cockpit_contents, entrypoint='cockpit.bridge:main', args='beipack=True') @@ -65,12 +66,12 @@ async def run_command(self, command: str, args: Tuple, fds: List[int], stderr: s size, = args assert size == len(self.payload) except (AssertionError, ValueError) as exc: - raise PeerError('internal-error', message=f'ferny interaction error {exc!s}') from exc + raise CockpitProblem('internal-error', message=f'ferny interaction error {exc!s}') from exc assert self.peer.transport is not None logger.debug('Writing %d bytes of payload', len(self.payload)) self.peer.transport.write(self.payload) elif command == 'beiboot.exc': - raise PeerError('internal-error', message=f'Remote exception: {args[0]}') + raise CockpitProblem('internal-error', message=f'Remote exception: {args[0]}') else: - raise PeerError('internal-error', message=f'Unexpected ferny interaction command {command}') + raise CockpitProblem('internal-error', message=f'Unexpected ferny interaction command {command}') diff --git a/src/cockpit/bridge.py b/src/cockpit/bridge.py index 22517653e871..e7fb3a7e4f96 100644 --- a/src/cockpit/bridge.py +++ b/src/cockpit/bridge.py @@ -40,6 +40,7 @@ from .jsonutil import JsonError, JsonObject, get_dict from .packages import BridgeConfig, Packages, PackagesListener from .peer import PeersRoutingRule +from .polkit import PolkitAgent from .remote import HostRoutingRule from .router import Router from .superuser import SuperuserRoutingRule @@ -139,15 +140,13 @@ def do_init(self, message: JsonObject) -> None: def do_send_init(self) -> None: init_args = { 'capabilities': {'explicit-superuser': True}, - 'command': 'init', 'os-release': self.get_os_release(), - 'version': 1, } if self.packages is not None: init_args['packages'] = {p: None for p in self.packages.packages} - self.write_control(init_args) + self.write_control(command='init', version=1, **init_args) # PackagesListener interface def packages_loaded(self) -> None: @@ -174,11 +173,12 @@ async def run(args) -> None: logger.debug('Startup done. Looping until connection closes.') - try: - await router.communicate() - except (BrokenPipeError, ConnectionResetError): - # not unexpected if the peer doesn't hang up cleanly - pass + async with PolkitAgent(router.superuser_rule): + try: + await router.communicate() + except (BrokenPipeError, ConnectionResetError): + # not unexpected if the peer doesn't hang up cleanly + pass def try_to_receive_stderr(): diff --git a/src/cockpit/channel.py b/src/cockpit/channel.py index dcf0b68409c4..6b0d67ae9591 100644 --- a/src/cockpit/channel.py +++ b/src/cockpit/channel.py @@ -70,7 +70,7 @@ def apply_rule(self, options: JsonObject) -> Optional['Channel']: for cls in self.table.get(payload, []): if self.check_restrictions(cls.restrictions, options): - return cls(self.router) + return cls(self) else: return None diff --git a/src/cockpit/channels/filesystem.py b/src/cockpit/channels/filesystem.py index 58bcb32af5f3..3256d0a683a0 100644 --- a/src/cockpit/channels/filesystem.py +++ b/src/cockpit/channels/filesystem.py @@ -265,6 +265,7 @@ def do_open(self, options): self.ready() def do_close(self): - self._watch.close() - self._watch = None + if self._watch is not None: + self._watch.close() + self._watch = None self.close() diff --git a/src/cockpit/jsonutil.py b/src/cockpit/jsonutil.py index 63370d273dc1..a035c8149680 100644 --- a/src/cockpit/jsonutil.py +++ b/src/cockpit/jsonutil.py @@ -111,10 +111,6 @@ def create_object(message: 'JsonObject | None', kwargs: JsonObject) -> JsonObjec If only message is given, it is returned, unmodified. If message is None, it is equivalent to an empty dictionary. A copy is always made. - - If kwargs are present, then any underscore ('_') present in a key name is - rewritten to a dash ('-'). This is intended to bridge between the required - Python syntax when providing kwargs and idiomatic JSON (which uses '-' for attributes). These values override values in message. The idea is that `message` should be used for passing data along, and diff --git a/src/cockpit/peer.py b/src/cockpit/peer.py index 002388465f7e..501c900bbbe4 100644 --- a/src/cockpit/peer.py +++ b/src/cockpit/peer.py @@ -15,219 +15,118 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import asyncio import logging import os -from typing import Callable, List, Optional, Sequence +from typing import List, Optional, Sequence -from .jsonutil import JsonDocument, JsonObject +from cockpit._vendor import ferny + +from .jsonutil import JsonObject, get_str from .packages import BridgeConfig from .protocol import CockpitProblem, CockpitProtocol, CockpitProtocolError from .router import Endpoint, Router, RoutingRule -from .transports import SubprocessProtocol, SubprocessTransport logger = logging.getLogger(__name__) -class PeerError(CockpitProblem): - pass - - -class PeerExited(Exception): - def __init__(self, exit_code: int): - self.exit_code = exit_code - +class Peer(CockpitProtocol, Endpoint): + saw_init: bool = False -class Peer(CockpitProtocol, SubprocessProtocol, Endpoint): - done_callbacks: List[Callable[[], None]] - init_future: Optional[asyncio.Future] - - def __init__(self, router: Router): - super().__init__(router) - - # All Peers start out frozen — we only unfreeze after we see the first 'init' message + def __init__(self, rule: RoutingRule) -> None: + super().__init__(rule) self.freeze_endpoint() - self.init_future = asyncio.get_running_loop().create_future() - self.done_callbacks = [] - - # Initialization - async def do_connect_transport(self) -> None: - raise NotImplementedError - - async def spawn(self, argv: Sequence[str], env: Sequence[str], **kwargs) -> asyncio.Transport: - # Not actually async... - loop = asyncio.get_running_loop() - user_env = dict(e.split('=', 1) for e in env) - return SubprocessTransport(loop, self, argv, env=dict(os.environ, **user_env), **kwargs) - - async def start(self, init_host: Optional[str] = None, **kwargs: JsonDocument) -> JsonObject: - """Request that the Peer is started and connected to the router. - - Creates the transport, connects it to the protocol, and participates in - exchanging of init messages. If anything goes wrong, the connection - will be closed and an exception will be raised. - - The Peer starts out in a frozen state (ie: attempts to send messages to - it will initially be queued). If init_host is not None then an init - message is sent with the given 'host' field, plus any extra kwargs, and - the queue is thawed. Otherwise, the caller is responsible for sending - the init message and thawing the peer. - - In any case, the return value is the init message from the peer. - """ - assert self.init_future is not None - - def _connect_task_done(task: asyncio.Task) -> None: - assert task is connect_task - try: - task.result() - except asyncio.CancelledError: # we did that (below) - pass # we want to ignore it - except Exception as exc: - self.close(exc) - - connect_task = asyncio.create_task(self.do_connect_transport()) - connect_task.add_done_callback(_connect_task_done) - - try: - # Wait for something to happen: - # - exception from our connection function - # - receiving "init" from the other side - # - receiving EOF from the other side - # - .close() was called - # - other transport exception - init_message = await self.init_future - - except (PeerExited, BrokenPipeError): - # These are fairly generic errors. PeerExited means that we observed the process exiting. - # BrokenPipeError means that we got EPIPE when attempting to write() to it. In both cases, - # the process is gone, but it's not clear why. If the connection process is still running, - # perhaps we'd get a better error message from it. - await connect_task - # Otherwise, re-raise - raise - - finally: - self.init_future = None - - # In any case (failure or success) make sure this is done. - if not connect_task.done(): - connect_task.cancel() - - if init_host is not None: - logger.debug(' sending init message back, host %s', init_host) - # Send "init" back - self.write_control(None, command='init', version=1, host=init_host, **kwargs) - - # Thaw the queued messages - self.thaw_endpoint() - - return init_message - - # Background initialization - def start_in_background(self, init_host: Optional[str] = None, **kwargs: JsonDocument) -> None: - def _start_task_done(task: asyncio.Task) -> None: - assert task is start_task - - try: - task.result() - except (OSError, PeerExited, CockpitProblem, asyncio.CancelledError): - pass # Those are expected. Others will throw. - - start_task = asyncio.create_task(self.start(init_host, **kwargs)) - start_task.add_done_callback(_start_task_done) - - # Shutdown - def add_done_callback(self, callback: Callable[[], None]) -> None: - self.done_callbacks.append(callback) + def do_init_args(self, message: JsonObject) -> JsonObject: + return {} # Handling of interesting events - def do_superuser_init_done(self) -> None: - pass - - def do_authorize(self, message: JsonObject) -> None: - pass + def do_init(self, message: JsonObject) -> None: + logger.debug('do_init(%r, %r)', self, message) + if self.saw_init: + logger.warning('received duplicate "init" control message on %r', self) + return + + self.saw_init = True + + problem = get_str(message, 'problem', None) + if problem is not None: + raise CockpitProblem(problem, message) + + assert self.router.init_host is not None + assert self.transport is not None + args: JsonObject = { + 'command': 'init', + 'host': self.router.init_host, + 'version': 1 + } + args.update(self.do_init_args(message)) + self.write_control(args) + self.thaw_endpoint() def transport_control_received(self, command: str, message: JsonObject) -> None: - if command == 'init' and self.init_future is not None: - logger.debug('Got init message with active init_future. Setting result.') - self.init_future.set_result(message) - elif command == 'authorize': - self.do_authorize(message) - elif command == 'superuser-init-done': - self.do_superuser_init_done() + if command == 'init': + self.do_init(message) else: raise CockpitProtocolError(f'Received unexpected control message {command}') def eof_received(self) -> bool: - # We always expect to be the ones to close the connection, so if we get - # an EOF, then we consider it to be an error. This allows us to - # distinguish close caused by unexpected EOF (but no errno from a - # syscall failure) vs. close caused by calling .close() on our side. - # The process is still running at this point, so keep it and handle - # the error in process_exited(). - logger.debug('Peer %s received unexpected EOF', self.__class__.__name__) - return True - - def do_closed(self, exc: Optional[Exception]) -> None: + logger.debug('eof_received(%r)', self) + return True # wait for more information (exit status, stderr, etc.) + + def do_exception(self, exc: Exception) -> None: + if isinstance(exc, ferny.SubprocessError): + # a common case is that the called peer does not exist + # 127 is the return code from `sh -c` for ENOENT + if exc.returncode == 127: + raise CockpitProblem('no-cockpit') + else: + raise CockpitProblem('terminated', message=exc.stderr or f'Peer exited with status {exc.returncode}') + + def connection_lost(self, exc: 'Exception | None' = None) -> None: + super().connection_lost(exc) + logger.debug('Peer %s connection lost %s %s', self.__class__.__name__, type(exc), exc) + self.rule.endpoint_closed(self) if exc is None: + # No exception — just return 'terminated' self.shutdown_endpoint(problem='terminated') - elif isinstance(exc, PeerExited): - # a common case is that the called peer does not exist - if exc.exit_code == 127: - self.shutdown_endpoint(problem='no-cockpit') - else: - self.shutdown_endpoint(problem='terminated', message=f'Peer exited with status {exc.exit_code}') elif isinstance(exc, CockpitProblem): + # If this is already a CockpitProblem, report it self.shutdown_endpoint(exc.attrs) else: - self.shutdown_endpoint(problem='internal-error', - message=f"[{exc.__class__.__name__}] {exc!s}") - - # If .start() is running, we need to make sure it stops running, - # raising the correct exception. - if self.init_future is not None and not self.init_future.done(): - if exc is not None: - self.init_future.set_exception(exc) + # Otherwise, see if do_exception() waits to raise it as a CockpitProblem + try: + self.do_exception(exc) + except CockpitProblem as problem: + self.shutdown_endpoint(problem.attrs) else: - self.init_future.cancel() - - for callback in self.done_callbacks: - callback() - - def process_exited(self) -> None: - assert isinstance(self.transport, SubprocessTransport) - logger.debug('Peer %s exited, status %d', self.__class__.__name__, self.transport.get_returncode()) - returncode = self.transport.get_returncode() - assert isinstance(returncode, int) - self.close(PeerExited(returncode)) + self.shutdown_endpoint(problem='internal-error', message=f"[{exc.__class__.__name__}] {exc!s}") + raise exc # Forwarding data: from the peer to the router def channel_control_received(self, channel: str, command: str, message: JsonObject) -> None: - if self.init_future is not None: + if not self.saw_init: raise CockpitProtocolError('Received unexpected channel control message before init') self.send_channel_control(channel, command, message) def channel_data_received(self, channel: str, data: bytes) -> None: - if self.init_future is not None: + if not self.saw_init: raise CockpitProtocolError('Received unexpected channel data before init') self.send_channel_data(channel, data) # Forwarding data: from the router to the peer def do_channel_control(self, channel: str, command: str, message: JsonObject) -> None: - assert self.init_future is None + assert self.saw_init self.write_control(message) def do_channel_data(self, channel: str, data: bytes) -> None: - assert self.init_future is None + assert self.saw_init self.write_channel_data(channel, data) def do_kill(self, host: Optional[str], group: Optional[str]) -> None: - assert self.init_future is None - self.write_control(command='kill', host=host, group=group) + assert self.saw_init + self.write_control(None, command='kill', host=host, group=group) def do_close(self) -> None: self.close() @@ -235,17 +134,17 @@ def do_close(self) -> None: class ConfiguredPeer(Peer): config: BridgeConfig - args: Sequence[str] - env: Sequence[str] - def __init__(self, router: Router, config: BridgeConfig): + def __init__(self, rule: RoutingRule, config: BridgeConfig, **kwargs): self.config = config - self.args = config.spawn - self.env = config.environ - super().__init__(router) + super().__init__(rule) - async def do_connect_transport(self) -> None: - await self.spawn(self.args, self.env) + env_overrides = dict(e.split('=', 1) for e in config.environ) + self.transport, myself = ferny.FernyTransport.spawn( + lambda: self, config.spawn, env=dict(os.environ, **env_overrides), + **kwargs + ) + assert myself is self class PeerRoutingRule(RoutingRule): @@ -272,14 +171,12 @@ def apply_rule(self, options: JsonObject) -> Optional[Peer]: # Start the peer if it's not running already if self.peer is None: - self.peer = ConfiguredPeer(self.router, self.config) - self.peer.add_done_callback(self.peer_closed) - assert self.router.init_host - self.peer.start_in_background(init_host=self.router.init_host) + self.peer = ConfiguredPeer(self, self.config) return self.peer - def peer_closed(self): + def endpoint_closed(self, endpoint: Endpoint) -> None: + assert self.peer is endpoint or self.peer is None self.peer = None def shutdown(self): diff --git a/src/cockpit/polkit.py b/src/cockpit/polkit.py index b59a6a2c7e44..16bd41a476de 100644 --- a/src/cockpit/polkit.py +++ b/src/cockpit/polkit.py @@ -22,9 +22,10 @@ import pwd from typing import Dict, List, Sequence, Tuple -from cockpit._vendor.ferny import AskpassHandler from cockpit._vendor.systemd_ctypes import Variant, bus +from .superuser import SuperuserRoutingRule + # that path is valid on at least Debian, Fedora/RHEL, and Arch HELPER_PATH = '/usr/lib/polkit-1/polkit-agent-helper-1' @@ -43,9 +44,9 @@ # mapping, but that method is not available for Python 3.6 yet. class org_freedesktop_PolicyKit1_AuthenticationAgent(bus.Object): - def __init__(self, responder: AskpassHandler): + def __init__(self, superuser_rule: SuperuserRoutingRule): super().__init__() - self.responder = responder + self.superuser_rule = superuser_rule # confusingly named: this actually does the whole authentication dialog, see docs @bus.Interface.Method('', ['s', 's', 's', 'a{ss}', 's', 'a(sa{sv})']) @@ -94,12 +95,19 @@ async def _communicate(self, process: asyncio.subprocess.Process) -> None: if value.startswith('Password'): value = '' - # flush out accumulated info/error messages - passwd = await self.responder.do_askpass('\n'.join(messages), value, '') - messages.clear() + if self.superuser_rule.peer is None: + logger.debug('got PAM_PROMPT %s, but no active superuser peer') + raise asyncio.CancelledError('no active superuser peer') + + passwd = await self.superuser_rule.peer.askpass_handler.do_askpass('\n'.join(messages), value, '') + if passwd is None: logger.debug('got PAM_PROMPT %s, but do_askpass returned None', value) raise asyncio.CancelledError('no password given') + + # flush out accumulated info/error messages + messages.clear() + logger.debug('got PAM_PROMPT %s, do_askpass returned a password', value) process.stdin.write(passwd.encode()) process.stdin.write(b'\n') @@ -125,8 +133,8 @@ class PolkitAgent: Use this as a context manager to ensure that the agent gets unregistered again. """ - def __init__(self, responder: AskpassHandler): - self.responder = responder + def __init__(self, superuser_rule: SuperuserRoutingRule): + self.superuser_rule = superuser_rule self.agent_slot = None async def __aenter__(self): @@ -143,7 +151,7 @@ async def __aenter__(self): logger.debug('XDG_SESSION_ID not set, not registering polkit agent') return self - agent_object = org_freedesktop_PolicyKit1_AuthenticationAgent(self.responder) + agent_object = org_freedesktop_PolicyKit1_AuthenticationAgent(self.superuser_rule) self.agent_slot = self.system_bus.add_object(AGENT_DBUS_PATH, agent_object) # register agent diff --git a/src/cockpit/polyfills.py b/src/cockpit/polyfills.py index 6a85c77c8cb9..ea2c288bfa49 100644 --- a/src/cockpit/polyfills.py +++ b/src/cockpit/polyfills.py @@ -15,7 +15,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import contextlib import socket @@ -37,21 +36,3 @@ def recv_fds(sock, bufsize, maxfds, flags=0): return msg, list(fds), flags, addr socket.recv_fds = recv_fds - - # introduced in 3.7 - if not hasattr(contextlib, 'AsyncExitStack'): - class AsyncExitStack: - async def __aenter__(self): - self.cms = [] - return self - - async def enter_async_context(self, cm): - result = await cm.__aenter__() - self.cms.append(cm) - return result - - async def __aexit__(self, exc_type, exc_value, traceback): - for cm in self.cms: - cm.__aexit__(exc_type, exc_value, traceback) - - contextlib.AsyncExitStack = AsyncExitStack diff --git a/src/cockpit/protocol.py b/src/cockpit/protocol.py index 9ed09e75f187..b1b5ed1b6ff6 100644 --- a/src/cockpit/protocol.py +++ b/src/cockpit/protocol.py @@ -19,8 +19,11 @@ import json import logging import uuid -from typing import Dict, Optional +from typing import ClassVar, Dict, Optional +from cockpit._vendor import ferny, systemd_ctypes + +from . import transports from .jsonutil import JsonDocument, JsonError, JsonObject, create_object, get_str, typechecked logger = logging.getLogger(__name__) @@ -57,7 +60,9 @@ class CockpitProtocol(asyncio.Protocol): We need to use this because Python's SelectorEventLoop doesn't supported buffered protocols. """ + json_encoder: ClassVar[json.JSONEncoder] = systemd_ctypes.JSONEncoder(indent=2) transport: Optional[asyncio.Transport] = None + transport_connected: bool = False buffer = b'' _closed: bool = False @@ -147,41 +152,43 @@ def consume_one_frame(self, view): self.frame_received(view[start:end]) return end - def connection_made(self, transport): - logger.debug('connection_made(%s)', transport) + def connection_made(self, transport: asyncio.BaseTransport) -> None: + logger.debug('connection_made(%r, %r, %r)', self, self.transport, transport) + + # This can only ever be called once. We may already have transport set + # from when it was created, but if so, then it should be equal. + assert not self.transport_connected + assert self.transport is transport or self.transport is None + assert isinstance(transport, asyncio.Transport) + self.transport = transport + self.transport_connected = True self.do_ready() - if self._closed: - logger.debug(' but the protocol already was closed, so closing transport') - transport.close() - - def connection_lost(self, exc): - logger.debug('connection_lost') + def connection_lost(self, exc: 'Exception | None') -> None: + logger.debug('connection_lost(%r, %r)', self, exc) assert self.transport is not None + self.transport_connected = False self.transport = None - self.close(exc) - - def close(self, exc: Optional[Exception] = None) -> None: - if self._closed: - return - self._closed = True - - if self.transport: - self.transport.close() self.do_closed(exc) + def close(self, exc: Optional[Exception] = None) -> None: + if isinstance(self.transport, ferny.FernyTransport): + self.transport.close(exc) # type:ignore[call-arg] + elif isinstance(self.transport, transports.SubprocessTransport): + self.transport.abort(exc) # type: ignore[call-arg] + def write_channel_data(self, channel, payload): """Send a given payload (bytes) on channel (string)""" # Channel is certainly ascii (as enforced by .encode() below) frame_length = len(channel + '\n') + len(payload) header = f'{frame_length}\n{channel}\n'.encode('ascii') - if self.transport is not None: + if self.transport_connected: logger.debug('writing to transport %s', self.transport) self.transport.write(header + payload) else: - logger.debug('cannot write to closed transport') + logger.debug('cannot write to disconnected transport') def write_control(self, _msg: 'JsonObject | None' = None, **kwargs: JsonDocument) -> None: """Write a control message. See jsonutil.create_object() for details.""" @@ -197,7 +204,7 @@ def data_received(self, data): if result <= 0: return self.buffer = self.buffer[result:] - except CockpitProtocolError as exc: + except CockpitProblem as exc: self.close(exc) def eof_received(self) -> Optional[bool]: diff --git a/src/cockpit/remote.py b/src/cockpit/remote.py index 23f947566a09..3759d9aa8729 100644 --- a/src/cockpit/remote.py +++ b/src/cockpit/remote.py @@ -17,168 +17,102 @@ import getpass import logging -import re -import socket -from typing import Dict, List, Optional, Tuple +import sys +from typing import Dict, Optional, Tuple from cockpit._vendor import ferny -from .jsonutil import JsonDocument, JsonObject, get_str, get_str_or_none -from .peer import Peer, PeerError -from .router import Router, RoutingRule +from .jsonutil import JsonObject, get_dict, get_str, get_str_or_none +from .peer import Peer +from .protocol import CockpitProblem +from .router import Endpoint, RoutingRule logger = logging.getLogger(__name__) -class PasswordResponder(ferny.AskpassHandler): - PASSPHRASE_RE = re.compile(r"Enter passphrase for key '(.*)': ") - - password: Optional[str] - - hostkeys_seen: List[Tuple[str, str, str, str, str]] - error_message: Optional[str] - password_attempts: int - - def __init__(self, password: Optional[str]): - self.password = password - - self.hostkeys_seen = [] - self.error_message = None - self.password_attempts = 0 +class RemotePeer(Peer): + host: str + password: 'str | None' + superuser: 'str | bool' - async def do_hostkey(self, reason: str, host: str, algorithm: str, key: str, fingerprint: str) -> bool: - self.hostkeys_seen.append((reason, host, algorithm, key, fingerprint)) - return False + def do_authorize(self, message: JsonObject) -> str: + challenge = get_str(message, 'challenge') + ferny_type = get_str(message, 'ferny-type', None) + ferny_attrs: JsonObject = get_dict(message, 'ferny-attrs', {}) - async def do_askpass(self, messages: str, prompt: str, hint: str) -> Optional[str]: - logger.debug('Got askpass(%s): %s', hint, prompt) + if challenge == '*' and self.password is not None: + # the initial prompt from cockpit-ssh + return '' # TODO - match = PasswordResponder.PASSPHRASE_RE.fullmatch(prompt) - if match is not None: - # We never unlock private keys — we rather need to throw a - # specially-formatted error message which will cause the frontend - # to load the named key into the agent for us and try again. - path = match.group(1) - logger.debug("This is a passphrase request for %s, but we don't do those. Abort.", path) - self.error_message = f'locked identity: {path}' - return None + elif challenge.startswith('plain1:') and self.password is not None: + # superuser-init on the other host — try reusing the user's password for sudo + return self.password - assert self.password is not None - assert self.password_attempts == 0 - self.password_attempts += 1 - return self.password + elif ferny_type == 'SshPasswordPrompt' and self.password is not None: + # ssh asking for the password + return self.password + elif ferny_type == 'SshPassphrasePrompt': + # we don't directly support that — the hosts dialog needs to help us + filename = get_str(ferny_attrs, 'filename') + raise CockpitProblem('authorization-failed', message=f'locked identity: {filename}') -class SshPeer(Peer): - session: Optional[ferny.Session] = None - host: str - user: Optional[str] - password: Optional[str] - private: bool - - async def do_connect_transport(self) -> None: - assert self.session is not None - logger.debug('Starting ssh session user=%s, host=%s, private=%s', self.user, self.host, self.private) - - basename, colon, portstr = self.host.rpartition(':') - if colon and portstr.isdigit(): - host = basename - port = int(portstr) else: - host = self.host - port = None - - responder = PasswordResponder(self.password) - options = {"StrictHostKeyChecking": 'yes'} + raise CockpitProblem('authorization-failed', message) - if self.password is not None: - options.update(NumberOfPasswordPrompts='1') + def transport_control_received(self, command: str, message: JsonObject) -> None: + if command == 'authorize': + cookie = get_str(message, 'cookie') + response = self.do_authorize(message) + self.write_control(command='authorize', cookie=cookie, response=response) + elif command == 'superuser-init-done': + # explicit request to drop credential + self.password = None else: - options.update(PasswordAuthentication="no", KbdInteractiveAuthentication="no") - - try: - await self.session.connect(host, login_name=self.user, port=port, - handle_host_key=self.private, options=options, - interaction_responder=responder) - except (OSError, socket.gaierror) as exc: - logger.debug('connecting to host %s failed: %s', host, exc) - raise PeerError('no-host', error='no-host', message=str(exc)) from exc - - except ferny.SshHostKeyError as exc: - if responder.hostkeys_seen: - # If we saw a hostkey then we can issue a detailed error message - # containing the key that would need to be accepted. That will - # cause the front-end to present a dialog. - _reason, host, algorithm, key, fingerprint = responder.hostkeys_seen[0] - error_args: JsonObject = {'host-key': f'{host} {algorithm} {key}', 'host-fingerprint': fingerprint} - else: - error_args = {} - - if isinstance(exc, ferny.SshChangedHostKeyError): - error = 'invalid-hostkey' - elif self.private: - error = 'unknown-hostkey' - else: - # non-private session case. throw a generic error. - error = 'unknown-host' - - logger.debug('SshPeer got a %s %s; private %s, seen hostkeys %r; raising %s with extra args %r', - type(exc), exc, self.private, responder.hostkeys_seen, error, error_args) - raise PeerError(error, error_args, error=error, auth_method_results={}) from exc - - except ferny.SshAuthenticationError as exc: - logger.debug('authentication to host %s failed: %s', host, exc) - - results: JsonObject = {method: 'not-provided' for method in exc.methods} - if 'password' in results and self.password is not None: - if responder.password_attempts == 0: - results['password'] = 'not-tried' - else: - results['password'] = 'denied' - - raise PeerError('authentication-failed', - error=responder.error_message or 'authentication-failed', - auth_method_results=results) from exc - - except ferny.SshError as exc: - logger.debug('unknown failure connecting to host %s: %s', host, exc) - raise PeerError('internal-error', message=str(exc)) from exc - - args = self.session.wrap_subprocess_args(['cockpit-bridge']) - await self.spawn(args, []) + super().transport_control_received(command, message) def do_kill(self, host: Optional[str], group: Optional[str]) -> None: + # we interpret 'kill' for our host as a request to shut down the connection if host == self.host: self.close() elif host is None: super().do_kill(None, group) - def do_authorize(self, message: JsonObject) -> None: - if get_str(message, 'challenge').startswith('plain1:'): - cookie = get_str(message, 'cookie') - self.write_control(command='authorize', cookie=cookie, response=self.password or '') - self.password = None # once is enough... + def do_init_args(self, message: JsonObject) -> JsonObject: + args: JsonObject = {'host': self.host} - def do_superuser_init_done(self) -> None: - self.password = None + self.explicit_superuser = 'explicit-superuser' in get_dict(message, 'capabilities', {}) + if self.explicit_superuser: + args['superuser'] = self.superuser + + return args + + def __init__( + self, rule: RoutingRule, host: str, user: 'str | None', options: JsonObject, *, private: bool + ) -> None: + super().__init__(rule) - def __init__(self, router: Router, host: str, user: Optional[str], options: JsonObject, *, private: bool) -> None: - super().__init__(router) self.host = host - self.user = user self.password = get_str(options, 'password', None) - self.private = private - self.session = ferny.Session() - - superuser: JsonDocument init_superuser = get_str_or_none(options, 'init-superuser', None) - if init_superuser in (None, 'none'): - superuser = False + if init_superuser is not None and init_superuser != 'none': + self.superuser = init_superuser else: - superuser = {'id': init_superuser} + self.superuser = False + + # TODO: not gonna work with beipack + args = [ + sys.executable, '-m', 'cockpit.beiboot', + '--strict', + f'--password-prompts={int(self.password is not None)}', + self.host + ] + # TODO: do something helpful with private + + # TODO: do something useful with username - self.start_in_background(init_host=host, superuser=superuser) + ferny.FernyTransport.spawn(lambda: self, args) class HostRoutingRule(RoutingRule): @@ -220,14 +154,21 @@ def apply_rule(self, options: JsonObject) -> Optional[Peer]: logger.debug('Request for channel %s is remote.', options) logger.debug('key=%s', key) - if key not in self.remotes: + try: + peer = self.remotes[key] + except KeyError: logger.debug('%s is not among the existing remotes %s. Opening a new connection.', key, self.remotes) - peer = SshPeer(self.router, host, user, options, private=nonce is not None) - peer.add_done_callback(lambda: self.remotes.__delitem__(key)) + peer = RemotePeer(self, host, user, options, private=nonce is not None) self.remotes[key] = peer - return self.remotes[key] + # This is evil, but forwarding the password on to remote hosts is worse. + options.pop('password', None) + + return peer - def shutdown(self): - for peer in set(self.remotes.values()): - peer.close() + def endpoint_closed(self, endpoint: Endpoint) -> None: + # we may have more than one peer — find the correct one + for key, value in self.remotes.items(): + if value is endpoint: + del self.remotes[key] + return diff --git a/src/cockpit/router.py b/src/cockpit/router.py index 73449492a24f..34e64ff240b7 100644 --- a/src/cockpit/router.py +++ b/src/cockpit/router.py @@ -61,9 +61,10 @@ class Endpoint: router: 'Router' __endpoint_frozen_queue: Optional[ExecutionQueue] = None - def __init__(self, router: 'Router'): - router.add_endpoint(self) - self.router = router + def __init__(self, rule: 'RoutingRule'): + self.rule = rule + self.router = rule.router + self.router.add_endpoint(self) def freeze_endpoint(self): assert self.__endpoint_frozen_queue is None @@ -126,8 +127,8 @@ def apply_rule(self, options: JsonObject) -> Optional[Endpoint]: """ raise NotImplementedError - def shutdown(self): - raise NotImplementedError + def endpoint_closed(self, endpoint: Endpoint) -> None: + """Called when an endpoint created from this rule is closed.""" class Router(CockpitProtocolServer): diff --git a/src/cockpit/superuser.py b/src/cockpit/superuser.py index 317ee98896b3..395bdffae270 100644 --- a/src/cockpit/superuser.py +++ b/src/cockpit/superuser.py @@ -17,114 +17,129 @@ import array import asyncio -import contextlib import getpass import logging import os import socket -from tempfile import TemporaryDirectory -from typing import List, Optional, Sequence, Tuple +from typing import Any, Sequence from cockpit._vendor import ferny -from cockpit._vendor.bei.bootloader import make_bootloader from cockpit._vendor.systemd_ctypes import Variant, bus -from .beipack import BridgeBeibootHelper from .jsonutil import JsonObject, get_str from .packages import BridgeConfig -from .peer import ConfiguredPeer, Peer, PeerError -from .polkit import PolkitAgent +from .peer import ConfiguredPeer +from .protocol import CockpitProblem from .router import Router, RoutingError, RoutingRule logger = logging.getLogger(__name__) -class SuperuserPeer(ConfiguredPeer): - responder: ferny.AskpassHandler +# TODO: support temporary ferny-askpass if beipack +# support beibooting remote - def __init__(self, router: Router, config: BridgeConfig, responder: ferny.AskpassHandler): - super().__init__(router, config) - self.responder = responder - async def do_connect_transport(self) -> None: - async with contextlib.AsyncExitStack() as context: - if 'pkexec' in self.args: - logger.debug('connecting polkit superuser peer transport %r', self.args) - await context.enter_async_context(PolkitAgent(self.responder)) - else: - logger.debug('connecting non-polkit superuser peer transport %r', self.args) +class AuthorizeResponder(ferny.AskpassHandler): + def __init__(self, router: Router): + self.router = router - responders: 'list[ferny.InteractionHandler]' = [self.responder] + async def do_askpass(self, messages: str, prompt: str, hint: str) -> str: + hexuser = ''.join(f'{c:02x}' for c in getpass.getuser().encode('ascii')) + return await self.router.request_authorization(f'plain1:{hexuser}') - if '# cockpit-bridge' in self.args: - logger.debug('going to beiboot superuser bridge %r', self.args) - helper = BridgeBeibootHelper(self, ['--privileged']) - responders.append(helper) - stage1 = make_bootloader(helper.steps, gadgets=ferny.BEIBOOT_GADGETS).encode() - else: - stage1 = None - agent = ferny.InteractionAgent(responders) +class StderrInteractionHandler(ferny.InteractionHandler): + commands = ('cockpit.send-stderr',) - if 'SUDO_ASKPASS=ferny-askpass' in self.env: - tmpdir = context.enter_context(TemporaryDirectory()) - ferny_askpass = ferny.write_askpass_to_tmpdir(tmpdir) - env: Sequence[str] = [f'SUDO_ASKPASS={ferny_askpass}'] - else: - env = self.env + async def run_command(self, command: str, args: 'tuple[Any, ...]', fds: 'list[int]', stderr: str) -> None: + assert command == 'cockpit.send-stderr' + with socket.socket(fileno=fds[0]) as sock: + fds.pop(0) + # socket.send_fds(sock, [b'\0'], [2]) # New in Python 3.9 + sock.sendmsg([b'\0'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array("i", [2]))]) - transport = await self.spawn(self.args, env, stderr=agent, start_new_session=True) - if stage1 is not None: - transport.write(stage1) +class SuperuserPeer(ConfiguredPeer): + askpass_handler: 'ferny.AskpassHandler' + startup: 'asyncio.Future[None]' - try: - await agent.communicate() - except ferny.InteractionError as exc: - raise PeerError('authentication-failed', message=str(exc)) from exc + def post_result(self, exc: 'Exception | None') -> None: + logger.debug('post_result(%r, %r)', self, exc) + if self.startup.done(): + logger.debug(' but result already posted') + return -class CockpitResponder(ferny.AskpassHandler): - commands = ('ferny.askpass', 'cockpit.send-stderr') + if exc is not None: + logger.debug(' setting exception') + self.startup.set_exception(exc) + else: + logger.debug(' signalling success') + self.startup.set_result(None) # success (ie: not an exception) - async def do_custom_command(self, command: str, args: Tuple, fds: List[int], stderr: str) -> None: - if command == 'cockpit.send-stderr': - with socket.socket(fileno=fds[0]) as sock: - fds.pop(0) - # socket.send_fds(sock, [b'\0'], [2]) # New in Python 3.9 - sock.sendmsg([b'\0'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array("i", [2]))]) + def do_init(self, message: JsonObject) -> None: + assert isinstance(self.rule, SuperuserRoutingRule) + super().do_init(message) + self.post_result(None) + self.rule.update() # transition from 'init' → 'sudo' + def connection_lost(self, exc: 'Exception | None' = None) -> None: + assert isinstance(self.rule, SuperuserRoutingRule) + super().connection_lost(exc) + self.rule.set_peer(None) # transition from 'sudo' or 'init' → 'none' + self.post_result(exc or EOFError()) -class AuthorizeResponder(CockpitResponder): - def __init__(self, router: Router): - self.router = router + def __init__(self, rule: 'SuperuserRoutingRule', config: BridgeConfig, askpass: ferny.AskpassHandler) -> None: + self.startup = asyncio.get_running_loop().create_future() + self.askpass_handler = askpass + super().__init__(rule, config, interaction_handlers=[askpass, StderrInteractionHandler()]) - async def do_askpass(self, messages: str, prompt: str, hint: str) -> str: - hexuser = ''.join(f'{c:02x}' for c in getpass.getuser().encode('ascii')) - return await self.router.request_authorization(f'plain1:{hexuser}') +class SuperuserRoutingRule(RoutingRule, ferny.AskpassHandler, bus.Object, interface='cockpit.Superuser'): + privileged: bool # am I root? -class SuperuserRoutingRule(RoutingRule, CockpitResponder, bus.Object, interface='cockpit.Superuser'): + # Configuration state superuser_configs: Sequence[BridgeConfig] = () - pending_prompt: Optional[asyncio.Future] - peer: Optional[SuperuserPeer] + bridges = bus.Interface.Property('as', value=[]) + methods = bus.Interface.Property('a{sv}', value={}) # D-Bus signals prompt = bus.Interface.Signal('s', 's', 's', 'b', 's') # message, prompt, default, echo, error - # D-Bus properties - bridges = bus.Interface.Property('as', value=[]) - current = bus.Interface.Property('s', value='none') - methods = bus.Interface.Property('a{sv}', value={}) + # Current bridge state. 'current' is derived state, mostly from 'peer' + peer: 'SuperuserPeer | None' = None + current = bus.Interface.Property('s') + + @current.getter + def get_current(self) -> str: + if self.privileged: + return 'root' + elif self.peer is None: + return 'none' + elif not self.peer.startup.done(): + return 'init' + else: + return self.peer.config.name + + def update(self): + self.properties_changed('cockpit.Superuser', {'Current': Variant(self.current)}, []) + + # This is the only function which is permitted to modify 'peer' + def set_peer(self, peer: 'SuperuserPeer | None') -> None: + # We never hot-swap peers and we never do anything if we're already root + assert self.peer is None or peer is None or self.peer is peer + assert not self.privileged + self.peer = peer + self.update() # RoutingRule - def apply_rule(self, options: JsonObject) -> Optional[Peer]: + def apply_rule(self, options: JsonObject) -> 'SuperuserPeer | None': superuser = options.get('superuser') - if not superuser or self.current == 'root': + if self.privileged or not superuser: # superuser not requested, or already superuser? Next rule. return None - elif self.peer or superuser == 'try': + elif self.peer is not None or superuser == 'try': # superuser requested and active? Return it. # 'try' requested? Either return the peer, or None. return self.peer @@ -132,40 +147,12 @@ def apply_rule(self, options: JsonObject) -> Optional[Peer]: # superuser requested, but not active? That's an error. raise RoutingError('access-denied') - # ferny.AskpassHandler - async def do_askpass(self, messages: str, prompt: str, hint: str) -> Optional[str]: - assert self.pending_prompt is None - echo = hint == "confirm" - self.pending_prompt = asyncio.get_running_loop().create_future() - try: - logger.debug('prompting for %s', prompt) - # with sudo, all stderr messages are treated as warning/errors by the UI - # (such as the lecture or "wrong password"), so pass them in the "error" field - self.prompt('', prompt, '', echo, messages) - return await self.pending_prompt - finally: - self.pending_prompt = None - def __init__(self, router: Router, *, privileged: bool = False): + self.privileged = privileged or os.getuid() == 0 super().__init__(router) - self.pending_prompt = None - self.peer = None - self.startup = None - - if privileged or os.getuid() == 0: - self.current = 'root' - - def peer_done(self): - self.current = 'none' - self.peer = None - - async def go(self, name: str, responder: ferny.AskpassHandler) -> None: - if self.current != 'none': - raise bus.BusError('cockpit.Superuser.Error', 'Superuser bridge already running') - + def start_peer(self, name: str, responder: ferny.AskpassHandler) -> SuperuserPeer: assert self.peer is None - assert self.startup is None for config in self.superuser_configs: if name in (config.name, 'any'): @@ -173,18 +160,17 @@ async def go(self, name: str, responder: ferny.AskpassHandler) -> None: else: raise bus.BusError('cockpit.Superuser.Error', f'Unknown superuser bridge type "{name}"') - self.current = 'init' - self.peer = SuperuserPeer(self.router, config, responder) - self.peer.add_done_callback(self.peer_done) + peer = SuperuserPeer(self, config, responder) + self.set_peer(peer) - try: - await self.peer.start(init_host=self.router.init_host) - except asyncio.CancelledError: - raise bus.BusError('cockpit.Superuser.Error.Cancelled', 'Operation aborted') from None - except (OSError, PeerError) as exc: - raise bus.BusError('cockpit.Superuser.Error', str(exc)) from exc + return peer - self.current = self.peer.config.name + def shutdown(self, exc: 'Exception | None' = None) -> None: + if self.peer is not None: + self.peer.close(exc) + + # Peer might take a while to come down, so clear this immediately + self.set_peer(None) def set_configs(self, configs: Sequence[BridgeConfig]): logger.debug("set_configs() with %d items", len(configs)) @@ -201,36 +187,34 @@ def set_configs(self, configs: Sequence[BridgeConfig]): logger.debug(" stopping superuser bridge '%s': it disappeared from configs", self.peer.config.name) self.stop() - def cancel_prompt(self): - if self.pending_prompt is not None: - self.pending_prompt.cancel() - self.pending_prompt = None - - def shutdown(self): - self.cancel_prompt() - - if self.peer is not None: - self.peer.close() - - # close() should have disconnected the peer immediately - assert self.peer is None - # Connect-on-startup functionality def init(self, params: JsonObject) -> None: name = get_str(params, 'id', 'any') - responder = AuthorizeResponder(self.router) - self._init_task = asyncio.create_task(self.go(name, responder)) - self._init_task.add_done_callback(self._init_done) + peer = self.start_peer(name, AuthorizeResponder(self.router)) + peer.startup.add_done_callback(self._init_done) - def _init_done(self, task: 'asyncio.Task[None]') -> None: - logger.debug('superuser init done! %s', task.exception()) + def _init_done(self, future: 'asyncio.Future[None]') -> None: + logger.debug('superuser init done! %s', future.exception()) self.router.write_control(command='superuser-init-done') - del self._init_task # D-Bus methods @bus.Interface.Method(in_types=['s']) async def start(self, name: str) -> None: - await self.go(name, self) + if self.peer is not None or self.privileged: + raise bus.BusError('cockpit.Superuser.Error', 'Superuser bridge already running') + + try: + await self.start_peer(name, self).startup + except asyncio.CancelledError: + raise bus.BusError('cockpit.Superuser.Error.Cancelled', 'Operation aborted') from None + except EOFError: + raise bus.BusError('cockpit.Superuser.Error', 'Unexpected EOF from peer') from None + except OSError as exc: + raise bus.BusError('cockpit.Superuser.Error', str(exc)) from exc + except ferny.SubprocessError as exc: + raise bus.BusError('cockpit.Superuser.Error', exc.stderr) from exc + except CockpitProblem as exc: + raise bus.BusError('cockpit.Superuser.Error', str(exc)) from exc @bus.Interface.Method() def stop(self) -> None: @@ -243,3 +227,19 @@ def answer(self, reply: str) -> None: self.pending_prompt.set_result(reply) else: logger.debug('got Answer, but no prompt pending') + + # ferny.AskpassHandler + pending_prompt: 'asyncio.Future[str] | None' = None + + async def do_askpass(self, messages: str, prompt: str, hint: str) -> 'str | None': + assert self.pending_prompt is None + echo = hint == "confirm" + self.pending_prompt = asyncio.get_running_loop().create_future() + try: + logger.debug('prompting for %s', prompt) + # with sudo, all stderr messages are treated as warning/errors by the UI + # (such as the lecture or "wrong password"), so pass them in the "error" field + self.prompt('', prompt, '', echo, messages) + return await self.pending_prompt + finally: + self.pending_prompt = None diff --git a/test/pytest/mocktransport.py b/test/pytest/mocktransport.py index c2fb53441c5d..f9612a197dab 100644 --- a/test/pytest/mocktransport.py +++ b/test/pytest/mocktransport.py @@ -90,6 +90,15 @@ def write(self, data: bytes) -> None: _, channel, data = data.split(b'\n', 2) self.queue.put_nowait((channel.decode('ascii'), data)) + async def await_stopped(self) -> None: + keep_open = self.protocol.eof_received() + if keep_open: + self.close_future = asyncio.get_running_loop().create_future() + try: + await self.close_future + finally: + self.close_future = None + def stop(self, event_loop: Optional[asyncio.AbstractEventLoop] = None) -> None: keep_open = self.protocol.eof_received() if keep_open: @@ -162,7 +171,10 @@ async def assert_bus_error(self, tag: str, code: str, message: str, bus: Optiona assert 'id' in reply, reply assert reply['id'] == tag, reply assert 'error' in reply, reply - assert reply['error'] == [code, [message]], reply['error'] + print(f'{reply!r} {code!r} {message!r}') + print(f'{reply["error"]!r}') + print(f'{[code, [message]]!r}') + assert reply['error'] == [code, [message]] async def check_bus_call( self, @@ -208,7 +220,7 @@ async def assert_bus_notify( if bus is None: bus = await self.ensure_internal_bus() notify = await self.next_msg(bus) - assert 'notify' in notify + assert 'notify' in notify, notify assert notify['notify'][path][iface] == expected async def watch_bus(self, path: str, iface: str, expected: JsonObject, bus: Optional[str] = None) -> None: @@ -238,3 +250,9 @@ async def add_bus_match(self, path: str, iface: str, bus: Optional[str] = None) if bus is None: bus = await self.ensure_internal_bus() self.send_json(bus, add_match={'path': path, 'interface': iface}) + + def pause_reading(self): + pass + + def resume_reading(self): + pass diff --git a/test/pytest/test_beiboot.py b/test/pytest/test_beiboot.py index 8571c02501a5..ad22bbbd84ad 100644 --- a/test/pytest/test_beiboot.py +++ b/test/pytest/test_beiboot.py @@ -1,28 +1,16 @@ -import sys +import argparse import pytest -from cockpit._vendor import ferny -from cockpit._vendor.bei import bootloader -from cockpit.beipack import BridgeBeibootHelper -from cockpit.peer import Peer -from cockpit.router import Router +from cockpit.beiboot import create_router - -class BeibootPeer(Peer): - async def do_connect_transport(self) -> None: - helper = BridgeBeibootHelper(self) - agent = ferny.InteractionAgent([helper]) - transport = await self.spawn([sys.executable, '-iq'], env=[], stderr=agent) - transport.write(bootloader.make_bootloader(helper.steps, gadgets=ferny.BEIBOOT_GADGETS).encode()) - await agent.communicate() +from .mocktransport import MockTransport @pytest.mark.asyncio async def test_bridge_beiboot(): - # Try to beiboot a copy of the bridge and read its init message - peer = BeibootPeer(Router([])) - init_msg = await peer.start() + router = create_router(argparse.Namespace(always=True, destination=None)) + transport = MockTransport(router) + init_msg = await transport.next_msg('') assert init_msg['version'] == 1 - assert 'packages' not in init_msg - peer.close() + await transport.await_stopped() diff --git a/test/pytest/test_bridge.py b/test/pytest/test_bridge.py index fbfb11ddb902..2065a551ff5d 100644 --- a/test/pytest/test_bridge.py +++ b/test/pytest/test_bridge.py @@ -192,16 +192,12 @@ async def test_superuser_dbus(bridge, transport): root_null = await transport.check_open('null', superuser=True) # stop the bridge - stop = transport.send_bus_call(transport.internal_bus, '/superuser', - 'cockpit.Superuser', 'Stop', []) + () = await transport.check_bus_call('/superuser', 'cockpit.Superuser', 'Stop', []) # that should have implicitly closed the open channel await transport.assert_msg('', command='close', channel=root_null) assert root_null not in bridge.open_channels - # The Stop method call is done now - await transport.assert_msg(transport.internal_bus, reply=[[]], id=stop) - def format_methods(methods: Dict[str, str]): return {name: {'t': 'a{sv}', 'v': {'label': {'t': 's', 'v': label}}} for name, label in methods.items()} @@ -234,7 +230,6 @@ async def test_superuser_dbus_pw(bridge, transport, monkeypatch): await transport.check_bus_call('/superuser', 'cockpit.Superuser', 'Answer', ['p4ssw0rd']) # and now the bridge should be running await transport.assert_bus_notify('/superuser', 'cockpit.Superuser', {'Current': 'pseudo'}) - # Start call is now done await transport.assert_bus_reply(start, []) @@ -272,7 +267,7 @@ async def test_superuser_dbus_wrong_pw(bridge, transport, monkeypatch): await transport.assert_bus_notify('/superuser', 'cockpit.Superuser', {'Current': 'none'}) # Start call is now done and returned failure - await transport.assert_bus_error(start, 'cockpit.Superuser.Error', 'pseudo says: Bad password') + await transport.assert_bus_error(start, 'cockpit.Superuser.Error', 'pseudo says: Bad password\n') # double-check await verify_root_bridge_not_running(bridge, transport) diff --git a/test/pytest/test_peer.py b/test/pytest/test_peer.py index a4ae02b24f86..8b4161a8b871 100644 --- a/test/pytest/test_peer.py +++ b/test/pytest/test_peer.py @@ -1,16 +1,12 @@ import asyncio import os import sys -import time import pytest -from cockpit.channel import ChannelError from cockpit.packages import BridgeConfig -from cockpit.peer import ConfiguredPeer, PeerRoutingRule -from cockpit.protocol import CockpitProtocolError +from cockpit.peer import PeerRoutingRule from cockpit.router import Router -from cockpit.transports import SubprocessTransport from . import mockpeer from .mocktransport import MockTransport @@ -34,8 +30,11 @@ def do_send_init(self): @pytest.fixture -def bridge(): - return Bridge() +def bridge(event_loop): + bridge = Bridge() + yield bridge + while bridge.endpoints: + event_loop.run_until_complete(asyncio.sleep(0.1)) @pytest.fixture @@ -74,7 +73,7 @@ async def test_init_failure(rule, init_type, monkeypatch, transport): async def test_immediate_shutdown(rule): peer = rule.apply_rule({'payload': 'test'}) assert peer is not None - peer.close() + rule.shutdown() @pytest.mark.asyncio @@ -82,8 +81,8 @@ async def test_shutdown_before_init(monkeypatch, transport, rule): monkeypatch.setenv('INIT_TYPE', 'silence') channel = transport.send_open('test') assert rule.peer is not None - assert rule.peer.transport is None - while rule.peer.transport is None: + assert not rule.peer.transport_connected + while not rule.peer.transport_connected: await asyncio.sleep(0) rule.peer.close() await transport.assert_msg('', command='close', channel=channel, problem='terminated') @@ -104,111 +103,5 @@ async def test_exit_not_found(monkeypatch, transport): @pytest.mark.asyncio async def test_killed(monkeypatch, transport, rule): channel = await transport.check_open('test') - os.kill(rule.peer.transport._process.pid, 9) + os.kill(rule.peer.transport.get_pid(), 9) await transport.assert_msg('', command='close', channel=channel, problem='terminated') - - -@pytest.mark.asyncio -@pytest.mark.parametrize('init_type', ['wrong-command', 'channel-control', 'data', 'break-protocol']) -async def test_await_failure(init_type, monkeypatch, bridge): - monkeypatch.setenv('INIT_TYPE', init_type) - peer = ConfiguredPeer(bridge, PEER_CONFIG) - with pytest.raises(CockpitProtocolError): - await peer.start() - peer.close() - - -@pytest.mark.asyncio -async def test_await_broken_connect(bridge): - class BrokenConnect(ConfiguredPeer): - async def do_connect_transport(self): - _ = 42 / 0 - - peer = BrokenConnect(bridge, PEER_CONFIG) - with pytest.raises(ZeroDivisionError): - await peer.start() - peer.close() - - -@pytest.mark.asyncio -async def test_await_broken_after_connect(bridge): - class BrokenConnect(ConfiguredPeer): - async def do_connect_transport(self): - await super().do_connect_transport() - _ = 42 / 0 - - peer = BrokenConnect(bridge, PEER_CONFIG) - with pytest.raises(ZeroDivisionError): - await peer.start() - peer.close() - - -class CancellableConnect(ConfiguredPeer): - was_cancelled = False - - async def do_connect_transport(self): - await super().do_connect_transport() - try: - # We should get cancelled here when the mockpeer sends "init" - await asyncio.sleep(10000) - except asyncio.CancelledError: - self.was_cancelled = True - raise - - -@pytest.mark.asyncio -async def test_await_cancellable_connect_init(bridge): - peer = CancellableConnect(bridge, PEER_CONFIG) - await peer.start() - peer.close() - while len(asyncio.all_tasks()) > 1: - await asyncio.sleep(0.1) - assert peer.was_cancelled - - -@pytest.mark.asyncio -async def test_await_cancellable_connect_close(monkeypatch, event_loop, bridge): - monkeypatch.setenv('INIT_TYPE', 'silence') # make sure we never get "init" - peer = CancellableConnect(bridge, PEER_CONFIG) - event_loop.call_later(0.1, peer.close) # call peer.close() after .start() is running - with pytest.raises(asyncio.CancelledError): - await peer.start() - # we already called .close() - while len(asyncio.all_tasks()) > 1: - await asyncio.sleep(0.1) - assert peer.was_cancelled - - -@pytest.mark.asyncio -async def test_spawn_broken_pipe(bridge): - class BrokenPipePeer(ConfiguredPeer): - def __init__(self, *, specific_error=False): - super().__init__(bridge, PEER_CONFIG) - self.specific_error = specific_error - - async def do_connect_transport(self) -> None: - transport = await self.spawn(['sh', '-c', 'read a; exit 9'], ()) - assert isinstance(transport, SubprocessTransport) - # Make the process exit by writing a newline (causing `read` to finish) - transport.write(b'\n') - # The process will exit soon — try writing to it until a write fails. - while not transport.is_closing(): - transport.write(b'x') - time.sleep(0.1) - while transport.get_returncode() is None: - await asyncio.sleep(0.1) - if self.specific_error: - raise ChannelError('not-supported', message='kaputt') - - # BrokenPipe bubbles up without an error returned by do_connect_transport - peer = BrokenPipePeer(specific_error=False) - with pytest.raises(BrokenPipeError): - await peer.start() - peer.close() - - # BrokenPipe gets trumped by specific error returned by do_connect_transport - peer = BrokenPipePeer(specific_error=True) - with pytest.raises(ChannelError) as raises: - await peer.start() - assert raises.value.attrs == {'message': 'kaputt', 'problem': 'not-supported'} - peer.close() diff --git a/vendor/ferny b/vendor/ferny index c26d44452e9c..84950227aacb 160000 --- a/vendor/ferny +++ b/vendor/ferny @@ -1 +1 @@ -Subproject commit c26d44452e9cb82dcafa71f88032bffcd7b9fa41 +Subproject commit 84950227aacbc0fec28c2300db0312f1a96a1f7f