Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.5 Release #292

Merged
merged 12 commits into from
May 29, 2024
Merged
1 change: 1 addition & 0 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ jobs:
env:
BROKER_DIRECTORY: "${{ github.workspace }}/broker_dir"
run: |
cp broker_settings.yaml.example ${BROKER_DIRECTORY}/broker_settings.yaml
pip install uv
uv pip install --system "broker[dev,docker] @ ."
ls -l "$BROKER_DIRECTORY"
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,4 @@ ENV/
*settings.yaml
inventory.yaml
*.bak
/bin/*
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ broker --output-file inventory.json inventory

**Run Broker in the background**

Certain Broker actions can be run in the background, these currently are: checkout, checkin, duplicate, and execute. When running a command in this mode, it will spin up a new Broker process and no longer log to stderr. To check progress, you can still follow broker's log file.
Certain Broker actions can be run in the background, these currently are: checkout, checkin, and execute. When running a command in this mode, it will spin up a new Broker process and no longer log to stderr. To check progress, you can still follow broker's log file.
Note that background mode will interfere with output options for execute since it won't be able to print to stdout. Those should kept in log mode.
```
broker checkout --background --nick rhel7
Expand Down
47 changes: 47 additions & 0 deletions broker/binds/containers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,52 @@
"""A collection of classes to ease interaction with Docker and Podman libraries."""

HEADER_SIZE = 8
STDOUT = 1
STDERR = 2


def demux_output(data_bytes):
"""Demuxes the output of a container stream into stdout and stderr streams.

Stream data is expected to be in the following format:
- 1 byte: stream type (1=stdout, 2=stderr)
- 3 bytes: padding
- 4 bytes: payload size (big-endian)
- N bytes: payload data
ref: https://docs.podman.io/en/latest/_static/api.html?version=v5.0#tag/containers/operation/ContainerAttachLibpod

Args:
data_bytes: Bytes object containing the combined stream data.

Returns:
A tuple containing two bytes objects: (stdout, stderr).
"""
stdout = b""
stderr = b""
while len(data_bytes) >= HEADER_SIZE:
# Extract header information
header, data_bytes = data_bytes[:HEADER_SIZE], data_bytes[HEADER_SIZE:]
stream_type = header[0]
payload_size = int.from_bytes(header[4:HEADER_SIZE], "big")
# Check if data is sufficient for payload
if len(data_bytes) < payload_size:
break # Incomplete frame, wait for more data

# Extract and process payload
payload = data_bytes[:payload_size]
if stream_type == STDOUT:
stdout += payload
elif stream_type == STDERR:
stderr += payload
else:
# todo: Handle unexpected stream types
pass

# Update data for next frame
data_bytes = data_bytes[payload_size:]

return stdout, stderr


class ContainerBind:
"""A base class that provides common functionality for Docker and Podman containers."""
Expand Down
162 changes: 162 additions & 0 deletions broker/binds/foreman.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""Foreman provider implementation."""
import time

from logzero import logger
import requests

from broker import exceptions
from broker.settings import settings


class ForemanBind:
"""Default runtime to query Foreman."""

headers = {
"Content-Type": "application/json",
}

def __init__(self, **kwargs):
self.foreman_username = settings.foreman.foreman_username
self.foreman_password = settings.foreman.foreman_password
self.url = settings.foreman.foreman_url
self.prefix = settings.foreman.name_prefix
self.verify = settings.foreman.verify
self.session = requests.session()

def _interpret_response(self, response):
"""Handle responses from Foreman, in particular catch errors."""
if "error" in response:
if "Unable to authenticate user" in response["error"]["message"]:
raise exceptions.AuthenticationError(response["error"]["message"])
raise exceptions.ForemanBindError(
provider=self.__class__.__name__,
message=" ".join(response["error"]["full_messages"]),
)
if "errors" in response:
raise exceptions.ForemanBindError(
provider=self.__class__.__name__, message=" ".join(response["errors"]["base"])
)
return response

def _get(self, endpoint):
"""Send GET request to Foreman API."""
response = self.session.get(
self.url + endpoint,
auth=(self.foreman_username, self.foreman_password),
headers=self.headers,
verify=self.verify,
).json()
return self._interpret_response(response)

def _post(self, endpoint, **kwargs):
"""Send POST request to Foreman API."""
response = self.session.post(
self.url + endpoint,
auth=(self.foreman_username, self.foreman_password),
headers=self.headers,
verify=self.verify,
**kwargs,
).json()
return self._interpret_response(response)

def _delete(self, endpoint, **kwargs):
"""Send DELETE request to Foreman API."""
response = self.session.delete(
self.url + endpoint,
auth=(self.foreman_username, self.foreman_password),
headers=self.headers,
verify=self.verify,
**kwargs,
)
return self._interpret_response(response)

def obtain_id_from_name(self, resource_type, resource_name):
"""Obtain id for resource with given name.

:param resource_type: Resource type, like hostgroups, hosts, ...

:param resource_name: String-like identifier of the resource

:return: ID of the found object
"""
response = self._get(
f"/api/{resource_type}?per_page=200",
)
try:
result = response["results"]
resource = next(
x
for x in result
if x.get("title") == resource_name or x.get("name") == resource_name
)
id_ = resource["id"]
except KeyError:
logger.error(f"Could not find {resource_type} {resource_name}")
raise
except StopIteration:
raise exceptions.ForemanBindError(
provider=self.__class__.__name__,
message=f"Could not find {resource_name} in {resource_type}",
)
return id_

def create_job_invocation(self, data):
"""Run a job from the provided data."""
return self._post(
"/api/job_invocations",
json=data,
)["id"]

def job_output(self, job_id):
"""Return output of job."""
return self._get(f"/api/job_invocations/{job_id}/outputs")["outputs"][0]["output"]

def wait_for_job_to_finish(self, job_id):
"""Poll API for job status until it is finished.

:param job_id: id of the job to poll
"""
still_running = True
while still_running:
response = self._get(f"/api/job_invocations/{job_id}")
still_running = response["status_label"] == "running"
time.sleep(1)

def hostgroups(self):
"""Return list of available hostgroups."""
return self._get("/api/hostgroups")

def hostgroup(self, name):
"""Return list of available hostgroups."""
hostgroup_id = self.obtain_id_from_name("hostgroups", name)
return self._get(f"/api/hostgroups/{hostgroup_id}")

def hosts(self):
"""Return list of hosts deployed using this prefix."""
return self._get(f"/api/hosts?search={self.prefix}")["results"]

def image_uuid(self, compute_resource_id, image_name):
"""Return the uuid of a VM image on a specific compute resource."""
try:
return self._get(
"/api/compute_resources/"
f"{compute_resource_id}"
f"/images/?search=name={image_name}"
)["results"][0]["uuid"]
except IndexError:
raise exceptions.ForemanBindError(f"Could not find {image_name} in VM images")

def create_host(self, data):
"""Create a host from the provided data."""
return self._post("/api/hosts", json=data)

def wait_for_host_to_install(self, hostname):
"""Poll API for host build status until it is built.

:param hostname: name of the host which is currently being built
"""
building = True
while building:
host_status = self._get(f"/api/hosts/{hostname}")
building = host_status["build_status"] != 0
time.sleep(1)
153 changes: 153 additions & 0 deletions broker/binds/hussh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""Module providing classes to establish ssh or ssh-like connections to hosts.

Classes:
Session - Wrapper around hussh's auth/connection system.

Note: You typically want to use a Host object instance to create sessions,
not these classes directly.
"""
from contextlib import contextmanager
from pathlib import Path

from hussh import Connection

from broker import exceptions, helpers


class Session:
"""Wrapper around hussh's auth/connection system."""

def __init__(self, **kwargs):
"""Initialize a Session object.

kwargs:
hostname (str): The hostname or IP address of the remote host. Defaults to 'localhost'.
username (str): The username to authenticate with. Defaults to 'root'.
timeout (float): The timeout for the connection in seconds. Defaults to 60.
port (int): The port number to connect to. Defaults to 22.
key_filename (str): The path to the private key file to use for authentication.
password (str): The password to use for authentication.
ipv6 (bool): Whether or not to use IPv6. Defaults to False.
ipv4_fallback (bool): Whether or not to fallback to IPv4 if IPv6 fails. Defaults to True.

Raises:
AuthException: If no password or key file is provided.
ConnectionError: If the connection fails.
FileNotFoundError: If the key file is not found.
"""
host = kwargs.get("hostname", "localhost")
user = kwargs.get("username", "root")
port = kwargs.get("port", 22)
timeout = kwargs.get("timeout", 60) * 1000

key_filename = kwargs.get("key_filename")
password = kwargs.get("password")

# TODO Create and use socket if hussh allows user to specify one
self.session = None

conn_kwargs = {"username": user, "port": port, "timeout": timeout}
try:
if key_filename:
auth_type = "Key"
if not Path(key_filename).exists():
raise FileNotFoundError(f"Key not found in '{key_filename}'")
conn_kwargs["private_key"] = key_filename
elif password:
auth_type = "Password"
conn_kwargs["password"] = password
elif user:
auth_type = "Session"
else:
raise exceptions.AuthenticationError("No password or key file provided.")

self.session = Connection(host, **conn_kwargs)

except Exception as err: # noqa: BLE001
raise exceptions.AuthenticationError(
f"{auth_type}-based authentication failed."
) from err

@staticmethod
def _set_destination(source, destination):
dest = destination or source
if dest.endswith("/"):
dest = dest + Path(source).name
return dest

def disconnect(self):
"""Disconnect session."""

def remote_copy(self, source, dest_host, dest_path=None, ensure_dir=True):
"""Copy a file from this host to another."""
dest_path = dest_path or source
if ensure_dir:
dest_host.session.run(f"mkdir -p {Path(dest_path).absolute().parent}")

# Copy from this host to destination host
self.session.remote_copy(
source_path=source, dest_conn=dest_host.session.session, dest_path=dest_path
)

def run(self, command, timeout=0):
"""Run a command on the host and return the results."""
# TODO support timeout parameter
result = self.session.execute(command)

# Create broker Result from hussh SSHResult
return helpers.Result(
status=result.status,
stderr=result.stderr,
stdout=result.stdout,
)

def scp_read(self, source, destination=None, return_data=False):
"""SCP read a remote file into a local destination or return a bytes object if return_data is True."""
destination = self._set_destination(source, destination)
if return_data:
return self.session.scp_read(remote_path=source)
self.session.scp_read(remote_path=source, local_path=destination)

def scp_write(self, source, destination=None, ensure_dir=True):
"""SCP write a local file to a remote destination."""
destination = self._set_destination(source, destination)
if ensure_dir:
self.run(f"mkdir -p {Path(destination).absolute().parent}")
self.session.scp_write(source, destination)

def sftp_read(self, source, destination=None, return_data=False):
"""Read a remote file into a local destination or return a bytes object if return_data is True."""
if return_data:
return self.session.sftp_read(remote_path=source).encode("utf-8")

destination = self._set_destination(source, destination)

# Create the destination path if it doesn't exist
Path(destination).parent.mkdir(parents=True, exist_ok=True)

self.session.sftp_read(remote_path=source, local_path=destination)

def sftp_write(self, source, destination=None, ensure_dir=True):
"""Sftp write a local file to a remote destination."""
destination = self._set_destination(source, destination)
if ensure_dir:
self.run(f"mkdir -p {Path(destination).absolute().parent}")
self.session.sftp_write(local_path=source, remote_path=destination)

def shell(self, pty=False):
"""Create and return an interactive shell instance."""
return self.session.shell(pty=pty)

@contextmanager
def tail_file(self, filename):
"""Tail a file on the remote host."""
with self.session.tail(filename) as _tailer:
yield (tailer := FileTailer(tailer=_tailer))
tailer.contents = _tailer.contents


class FileTailer:
"""Wrapper for hussh's FileTailer class."""

def __init__(self, **kwargs):
self.tailer = kwargs.get("tailer")
Loading