Skip to content

Commit

Permalink
Add initial support for a Beaker provider.
Browse files Browse the repository at this point in the history
This change adds a Beaker provider and its bind.
For now, this relies on the beaker-client cli and config through that.
Currently the only supported checkout/execute method is via job xml or
and existing job id.

Also, I removed the custom FileLock class since we have moved away from
multiprocessing. Instead, we will just use threading locks.
  • Loading branch information
JacobCallahan committed Jun 30, 2023
1 parent 8668f35 commit 636ed1d
Show file tree
Hide file tree
Showing 22 changed files with 876 additions and 197 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Copy the example settings file to `broker_settings.yaml` and edit it.

(optional) If you are using the Container provider, install the extra dependency based on your container runtime of choice with either `pip install broker[podman]` or `pip install broker[docker]`.

(optional) If you are using the Beaker provider, install the extra dependency with `dnf install krb5-devel` and then `pip install broker[beaker]`.

To run Broker outside of its base directory, specify the directory with the `BROKER_DIRECTORY` environment variable.

Configure the `broker_settings.yaml` file to set configuration values for broker's interaction with its providers.
Expand Down
224 changes: 224 additions & 0 deletions broker/binds/beaker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
import json
import subprocess
import time
from logzero import logger
from pathlib import Path
from xml.etree import ElementTree as ET
from broker import helpers
from broker.exceptions import BeakerBindError


def _elementree_to_dict(etree):
"""Converts an ElementTree object to a dictionary"""
data = {}
if etree.attrib:
data.update(etree.attrib)
if etree.text:
data["text"] = etree.text
for child in etree:
child_data = _elementree_to_dict(child)
if (tag := child.tag) in data:
if not isinstance(data[tag], list):
data[tag] = [data[tag]]
data[tag].append(child_data)
else:
data[tag] = child_data
return data


def _curate_job_info(job_info_dict):
curated_info = {
"job_id": "id",
# "reservation_id": "current_reservation/recipe_id",
"whiteboard": "whiteboard/text",
"hostname": "recipeSet/recipe/system",
"distro": "recipeSet/recipe/distro"
}
return helpers.dict_from_paths(job_info_dict, curated_info)


class BeakerBind:
def __init__(self, hub_url, auth="krbv", **kwargs):
self.hub_url = hub_url
self._base_args = ["--insecure", f"--hub={self.hub_url}"]
if auth == "basic":
# If we're not using system kerberos auth, add in explicit basic auth
self.username = kwargs.pop("username", None)
self.password = kwargs.pop("password", None)
self._base_args.extend(
[
f"--username {self.username}",
f"--password {self.password}",
]
)
self.__dict__.update(kwargs)

def _exec_command(self, *cmd_args, **cmd_kwargs):
raise_on_error = cmd_kwargs.pop("raise_on_error", True)
exec_cmd, cmd_args = ["bkr"], list(cmd_args)
# check through kwargs and if any are True add to cmd_args
del_keys = []
for k, v in cmd_kwargs.items():
if isinstance(v, bool) or v is None:
del_keys.append(k)
if v is True:
cmd_args.append(f"--{k}" if not k.startswith("--") else k)
for k in del_keys:
del cmd_kwargs[k]
exec_cmd.extend(cmd_args)
exec_cmd.extend(self._base_args)
exec_cmd.extend([f"--{k.replace('_', '-')}={v}" for k, v in cmd_kwargs.items()])
logger.debug(f"Executing beaker command: {exec_cmd}")
proc = subprocess.Popen(
exec_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = proc.communicate()
result = helpers.Result(
stdout=stdout.decode(),
stderr=stderr.decode(),
status=proc.returncode,
)
if result.status != 0 and raise_on_error:
raise BeakerBindError(
f"Beaker command failed:\n"
f"Command={' '.join(exec_cmd)}\n"
f"Result={result}",
)
logger.debug(f"Beaker command result: {result.stdout}")
return result

def job_submit(self, job_xml, wait=False):
# wait behavior seems buggy to me, so best to avoid it
if not Path(job_xml).exists():
raise FileNotFoundError(f"Job XML file {job_xml} not found")
result = self._exec_command("job-submit", job_xml, wait=wait)
if not wait:
# get the job id from the output
# format is "Submitted: ['J:7849837'] where the number is the job id
for line in result.stdout.splitlines():
if line.startswith("Submitted:"):
return line.split("'")[1].replace("J:", "")

def job_watch(self, job_id):
job_id = f"J:{job_id}" if not job_id.startswith("J:") else job_id
return self._exec_command("job-watch", job_id)

def job_results(self, job_id, format="beaker-results-xml", pretty=False):
job_id = f"J:{job_id}" if not job_id.startswith("J:") else job_id
return self._exec_command(
"job-results", job_id, format=format, prettyxml=pretty
)

def job_clone(self, job_id, wait=False, **kwargs):
job_id = f"J:{job_id}" if not job_id.startswith("J:") else job_id
return self._exec_command("job-clone", job_id, wait=wait, **kwargs)

def job_list(self, *args, **kwargs):
return self._exec_command("job-list", *args, **kwargs)

def job_cancel(self, job_id):
if not job_id.startswith("J:") and not job_id.startswith("RS:"):
job_id = f"J:{job_id}"
return self._exec_command("job-cancel", job_id)

def job_delete(self, job_id):
job_id = f"J:{job_id}" if not job_id.startswith("J:") else job_id
return self._exec_command("job-delete", job_id)

def system_release(self, system_id):
return self._exec_command("system-release", system_id)

def system_list(self, **kwargs):
"""Due to the number of arguments, we will not validate before submitting
Accepted arguments are:
available available to be used by this user
free available to this user and not currently being used
removed which have been removed
mine owned by this user
type=TYPE of TYPE
status=STATUS with STATUS
pool=POOL in POOL
arch=ARCH with ARCH
dev-vendor-id=VENDOR-ID with a device that has VENDOR-ID
dev-device-id=DEVICE-ID with a device that has DEVICE-ID
dev-sub-vendor-id=SUBVENDOR-ID with a device that has SUBVENDOR-ID
dev-sub-device-id=SUBDEVICE-ID with a device that has SUBDEVICE-ID
dev-driver=DRIVER with a device that has DRIVER
dev-description=DESCRIPTION with a device that has DESCRIPTION
xml-filter=XML matching the given XML filter
host-filter=NAME matching pre-defined host filter
"""
# convert the flags passed in kwargs to arguments
args = []
for key in {"available", "free", "removed", "mine"}:
if kwargs.pop(key, False):
args.append(f"--{key}")
return self._exec_command("system-list", *args, **kwargs)

def user_systems(self): # to be used for inventory sync
result = self.system_list(mine=True, raise_on_error=False)
if result.status != 0:
return []
else:
return result.stdout.splitlines()

def system_details(self, system_id, format="json"):
return self._exec_command("system-details", system_id, format=format)

def execute_job(self, job, max_wait="24h"):
"""Submit a job, periodically checking the status until it completes
then return a dictionary of the results.
"""
if Path(job).exists(): # job xml path passed in
job_id = self.job_submit(job, wait=False)
else: # using a job id
job_id = self.job_clone(job)
logger.info(f"Submitted job: {job_id}")
_max_wait = time.time() + helpers.translate_timeout(max_wait or "24h")
while time.time() < _max_wait:
time.sleep(60)
result = self.job_results(job_id, pretty=True)
if 'result="Pass"' in result.stdout:
return _curate_job_info(_elementree_to_dict(ET.fromstring(result.stdout)))
elif 'result="Fail"' in result.stdout or "Exception: " in result.stdout:
raise BeakerBindError(f"Job {job_id} failed:\n{result}")
elif 'result="Warn"' in result.stdout:
res_dict = _elementree_to_dict(ET.fromstring(result.stdout))
raise BeakerBindError(f"Job {job_id} was resulted in a warning. Status: {res_dict['status']}")
raise BeakerBindError(f"Job {job_id} did not complete within {max_wait}")

def system_details_curated(self, system_id):
full_details = json.loads(self.system_details(system_id).stdout)
curated_details = {
"hostname": full_details["fqdn"],
"mac_address": full_details["mac_address"],
"owner": "{display_name} <{email_address}>".format(
display_name=full_details["owner"]["display_name"],
email_address=full_details["owner"]["email_address"],
),
"id": full_details["id"],
}
if current_res := full_details.get("current_reservation"):
curated_details.update(
{
"reservation_id": current_res["recipe_id"],
"reserved_on": current_res.get("start_time"),
"expires_on": current_res.get("finish_time"),
"reserved_for": "{display_name} <{email_address}>".format(
display_name=current_res["user"]["display_name"],
email_address=current_res["user"]["email_address"],
),
}
)
return curated_details

def jobid_from_system(self, system_hostname):
"""Return the job id for the current reservation on the system"""
for job_id in json.loads(self.job_list(mine=True).stdout):
job_result = self.job_results(job_id, pretty=True)
job_detail = _curate_job_info(_elementree_to_dict(ET.fromstring(job_result.stdout)))
if job_detail["hostname"] == system_hostname:
return job_id
15 changes: 7 additions & 8 deletions broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def checkout(self):
self._hosts.extend(hosts)
helpers.update_inventory([host.to_dict() for host in hosts])
if err:
raise err
raise self.BrokerError(f"Error during checkout from {self}") from err
return hosts if not len(hosts) == 1 else hosts[0]

def execute(self, **kwargs):
Expand All @@ -169,12 +169,11 @@ def execute(self, **kwargs):
logger.info(f"Using provider {provider.__name__} for execution")
return self._act(provider, method)

def nick_help(self):
"""Use a provider's nick_help method to get argument information"""
if self._provider_actions:
provider, _ = PROVIDER_ACTIONS[[*self._provider_actions.keys()][0]]
logger.info(f"Querying provider {provider.__name__}")
self._act(provider, "nick_help", checkout=False)
def provider_help(self, provider_name):
"""Use a provider's provider_help method to get argument information"""
provider = PROVIDERS[provider_name]
logger.info(f"Querying provider {provider.__name__}")
self._act(provider, "provider_help", checkout=False)

def _checkin(self, host):
logger.info(f"Checking in {host.hostname or host.name}")
Expand Down Expand Up @@ -298,7 +297,7 @@ def sync_inventory(provider):
prov_inventory = PROVIDERS[provider](**instance).get_inventory(additional_arg)
curr_inventory = [
hostname if (hostname := host.get("hostname")) else host.get("name")
for host in helpers.load_inventory(filter=f"_broker_provider={provider}")
for host in helpers.load_inventory(filter=f'@inv._broker_provider == "{provider}"')
]
helpers.update_inventory(add=prov_inventory, remove=curr_inventory)

Expand Down
Loading

0 comments on commit 636ed1d

Please sign in to comment.