Skip to content

Commit

Permalink
refactoring shared logic to be served from here
Browse files Browse the repository at this point in the history
I do not like the idea of additional dependencies, but I like the
idea of duplicate code worse!

Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Jun 30, 2023
1 parent dab74c3 commit 3c4e476
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 7 deletions.
5 changes: 5 additions & 0 deletions fluxburst/kubernetes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Kubernetes

These are shared functions and classes for burst plugins that use Kubernetes.
They do not add additional install dependencies, as they are expected to be installed
with their respective plugins.
1 change: 1 addition & 0 deletions fluxburst/kubernetes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .plugins import KubernetesBurstPlugin
165 changes: 165 additions & 0 deletions fluxburst/kubernetes/cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# Copyright 2023 Lawrence Livermore National Security, LLC and other
# HPCIC DevTools Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (MIT)

import base64
import os

import fluxburst_eks.defaults as defaults
import requests
from kubernetes import client as kubernetes_client

import fluxburst.utils as utils


def get_minicluster(
command,
size=None,
tasks=None, # nodes * cpu per node, where cpu per node is vCPU / 2
cpu_limit=None,
memory_limit=None,
flags=None,
name=None,
namespace=None,
image=None,
wrap=None,
log_level=7,
flux_user=None,
lead_host=None,
lead_port=None,
broker_toml=None,
munge_secret_name=None,
curve_cert_secret_name=None,
curve_cert=None,
lead_size=None,
lead_jobname=None,
zeromq=False,
quiet=False,
strict=False,
):
"""
Get a MiniCluster CRD as a dictionary
Limits should be slightly below actual pod resources. The curve cert and broker config
are required, since we need this external cluster to connect to ours!
"""
flags = flags or "-ompi=openmpi@5 -c 1 -o cpu-affinity=per-task"
image = image or "ghcr.io/flux-framework/flux-restful-api"
container = {"image": image, "command": command, "resources": {}}

if cpu_limit is None and memory_limit is None:
del container["resources"]
elif cpu_limit is not None or memory_limit is not None:
container["resources"] = {"limits": {}, "requests": {}}
if cpu_limit is not None:
container["resources"]["limits"]["cpu"] = cpu_limit
container["resources"]["requests"]["cpu"] = cpu_limit
if memory_limit is not None:
container["resources"]["limits"]["memory"] = memory_limit
container["resources"]["requests"]["memory"] = memory_limit

# Do we have a custom flux user for the container?
if flux_user:
container["flux_user"] = {"name": flux_user}

# The MiniCluster has the added name and namespace
mc = {
"size": size,
"namespace": namespace,
"name": name,
"interactive": False,
"logging": {"zeromq": zeromq, "quiet": quiet, "strict": strict},
"flux": {
"option_flags": flags,
"connect_timeout": "5s",
"log_level": log_level,
# Providing the lead broker and port points back to the parent
"bursting": {
"lead_broker": {
"address": lead_host,
"port": int(lead_port),
"name": lead_jobname,
"size": int(lead_size),
},
"clusters": [{"size": size, "name": name}],
},
},
}

if tasks is not None:
mc["tasks"] = tasks

# This is text directly in config
if curve_cert:
mc["flux"]["curve_cert"] = curve_cert

# A provided secret will take precedence
if curve_cert_secret_name:
mc["flux"]["curve_cert_secret"] = curve_cert_secret_name

# This is just the secret name
if munge_secret_name:
mc["flux"]["munge_secret"] = munge_secret_name
if broker_toml:
mc["flux"]["broker_config"] = broker_toml

# eg., this would require strace "strace,-e,network,-tt"
if wrap is not None:
mc["flux"]["wrap"] = wrap
return mc, container


def ensure_curve_cert(curve_cert):
"""
Ensure we are provided with an existing curve certificate we can load.
"""
if not curve_cert or not os.path.exists(curve_cert):
raise ValueError(
f"Curve cert (provided as {curve_cert}) needs to be defined and exist."
)
return utils.read_file(curve_cert)


def ensure_flux_operator_yaml(flux_operator_yaml):
"""
Ensure we are provided with the installation yaml and it exists!
"""
# flux operator yaml default is current from main
if not flux_operator_yaml:
flux_operator_yaml = utils.get_tmpfile(prefix="flux-operator") + ".yaml"
r = requests.get(defaults.flux_operator_yaml, allow_redirects=True)
utils.write_file(r.content, flux_operator_yaml)

# Ensure it really really exists
flux_operator_yaml = os.path.abspath(flux_operator_yaml)
if not os.path.exists(flux_operator_yaml):
raise ValueError(f"{flux_operator_yaml} does not exist.")
return flux_operator_yaml


def create_secret(path, secret_path, name, namespace, mode="r"):
"""
Create a secret
"""
# Configureate ConfigMap metadata
metadata = kubernetes_client.V1ObjectMeta(
name=name,
namespace=namespace,
)
# Get File Content
with open(path, mode) as f:
content = f.read()

# base64 encoded string
if mode == "rb":
content = base64.b64encode(content).decode("utf-8")

# Instantiate the configmap object
return kubernetes_client.V1Secret(
api_version="v1",
kind="Secret",
type="opaque",
string_data={secret_path: content},
metadata=metadata,
)
183 changes: 183 additions & 0 deletions fluxburst/kubernetes/plugins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# Copyright 2023 Lawrence Livermore National Security, LLC and other
# HPCIC DevTools Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (MIT)


import os
import socket

from fluxoperator.client import FluxMiniCluster
from kubernetes import client as kubernetes_client
from kubernetes import utils as k8sutils
from kubernetes.client.rest import ApiException

import fluxburst.kubernetes.clusters as helpers
from fluxburst.logger import logger
from fluxburst.plugins import BurstPlugin


class KubernetesBurstPlugin(BurstPlugin):
"""
An additional wrapper to the plugin that adds support for the Flux Operator
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.clusters = {}

def ensure_namespace(self, kubectl):
"""
Use the instantiated kubectl to ensure the cluster namespace exists.
"""
try:
kubectl.create_namespace(
kubernetes_client.V1Namespace(
metadata=kubernetes_client.V1ObjectMeta(name=self.params.namespace)
)
)
except Exception:
logger.warning(
f"🥵️ Issue creating namespace {self.params.namespace}, assuming already exists."
)

def check_configs(self):
"""
Ensure we have required config files.
"""
message = "does not exist or you don't have permissions to see it"
if not os.path.exists(self.params.munge_key):
raise ValueError(f"Provided munge key {self.params.munge_key} {message}.")

if not os.path.exists(self.params.curve_cert):
raise ValueError(f"Provided curve cert {self.params.munge_key} {message}.")

def install_flux_operator(self, kubectl, flux_operator_yaml):
"""
Install the flux operator yaml
"""
try:
k8sutils.create_from_yaml(kubectl.api_client, flux_operator_yaml)
logger.info("Installed the operator.")
except Exception as exc:
logger.warning(
f"Issue installing the operator: {exc}, assuming already exists"
)

def run(self):
"""
Given some set of scheduled jobs, run bursting.
"""
# Exit early if no jobs to burst
if not self.jobs:
logger.info(f"Plugin {self.name} has no jobs to burst.")
return

# Ensure we have a flux operator yaml file, fosho, foyaml!
foyaml = helpers.ensure_flux_operator_yaml(self.params.flux_operator_yaml)

# lead host / port / size / are required in the dataclass
# We check munge paths here, because could be permissions issue
self.check_configs()

cli = self.create_cluster()
kubectl = cli.get_k8s_client()

# Install the operator!
self.install_flux_operator(kubectl, foyaml)

# Create a MiniCluster for each job
for _, job in self.jobs.items():
self.create_minicluster(kubectl, job)

def create_minicluster(self, kubectl, job):
"""
Create the MiniCluster
"""
command = " ".join(job["spec"]["tasks"][0]["command"])
logger.info(f"Preparing MiniCluster for {job['id']}: {command}")

# The plugin is assumed to be running from the lead broker
# of the cluster it is bursting from, this we get info about it
podname = socket.gethostname()
hostname = podname.rsplit("-", 1)[0]

# TODO: we are using defaults for now, but will update this to be likely
# configured based on the algorithm that chooses the best spec
minicluster, container = helpers.get_minicluster(
command,
name=self.params.name,
memory_limit=self.params.memory_limit,
cpu_limit=self.params.cpu_limit,
namespace=self.params.namespace,
broker_toml=self.params.broker_toml,
tasks=job["ntasks"],
size=job["nnodes"],
image=self.params.image,
wrap=self.params.wrap,
log_level=self.params.log_level,
flux_user=self.params.flux_user,
lead_host=self.params.lead_host,
lead_port=self.params.lead_port,
munge_secret_name=self.params.munge_secret_name,
curve_cert_secret_name=self.params.curve_cert_secret_name,
lead_jobname=hostname,
lead_size=self.params.lead_size,
)
# Create the namespace
self.ensure_namespace(kubectl)

# Let's assume there could be bugs applying this differently
crd_api = kubernetes_client.CustomObjectsApi(kubectl.api_client)

self.ensure_secrets(kubectl)

# Create the MiniCluster! This also waits for it to be ready
print(
f"⭐️ Creating the minicluster {self.params.name} in {self.params.namespace}..."
)

# Make sure we provide the core_v1_api we've created
operator = FluxMiniCluster(core_v1_api=kubectl)
return operator.create(**minicluster, container=container, crd_api=crd_api)

def ensure_secrets(self, kubectl):
"""
Ensure secrets (munge.key and curve.cert) are ready for a job
"""
secrets = []

# kubectl create secret --namespace flux-operator munge-key --from-file=/etc/munge/munge.key
if self.params.curve_cert:
secrets.append(
helpers.create_secret(
self.params.curve_cert,
"curve.cert",
self.params.curve_cert_secret_name,
self.params.namespace,
)
)

if self.params.munge_key:
secrets.append(
helpers.create_secret(
self.params.munge_key,
"munge.key",
self.params.munge_secret_name,
self.params.namespace,
mode="rb",
)
)

for secret in secrets:
try:
logger.debug(f"Creating secret {secret.metadata.name}")
kubectl.create_namespaced_secret(
namespace=self.params.namespace,
body=secret,
)
except ApiException as e:
print(
"Exception when calling CoreV1Api->create_namespaced_config_map: %s\n"
% e
)
7 changes: 6 additions & 1 deletion fluxburst/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@
("pick", {"min_version": None}),
)

KUBERNETES_REQUIRES = (
("kubernetes", {"min_version": None}),
("fluxoperator", {"min_version": None}),
)

TESTS_REQUIRES = (("pytest", {"min_version": "4.6.2"}),)

################################################################################
# Submodule Requirements (versions that include database)

INSTALL_REQUIRES_ALL = INSTALL_REQUIRES + TESTS_REQUIRES
INSTALL_REQUIRES_ALL = INSTALL_REQUIRES + KUBERNETES_REQUIRES + TESTS_REQUIRES
6 changes: 1 addition & 5 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,4 @@ max-line-length = 100
ignore = E1 E2 E5 W5
per-file-ignores =
fluxburst/utils/__init__.py:F401
fluxburst/main/__init__.py:F401
fluxburst/main/cloud/__init__.py:F401
fluxburst/main/cloud/aws/__init__.py:F401
fluxburst/main/cloud/google/__init__.py:F401
fluxburst/main/solve/__init__.py:F401
fluxburst/kubernetes/__init__.py:F401

0 comments on commit 3c4e476

Please sign in to comment.