diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a0b3b9..ea93ca5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,5 +14,6 @@ and **Merged pull requests**. Critical items to know are: The versions coincide with releases on pip. Only major versions will be released as tags on Github. ## [0.0.x](https://github.com/converged-computing/flux-metrics-api/tree/main) (0.0.x) + - On the fly metric (from a custom file) support, and job queue counts (0.0.11) - Support for certificates for uvicorn and change default port to 8443 (0.0.1) - Skelton release (0.0.0) diff --git a/README.md b/README.md index c0ec61c..d3a9ca7 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,8 @@ $ flux-metrics-api start $ flux-metrics-api start --port 9000 --host 127.0.0.1 ``` +#### SSL + If you want ssl (port 443) you can provide the path to a certificate and keyfile: ```bash @@ -68,6 +70,52 @@ An example of a full command we might run from within a pod: $ flux-metrics-api start --port 8443 --ssl-certfile /etc/certs/tls.crt --ssl-keyfile /etc/certs/tls.key --namespace flux-operator --service-name custom-metrics-apiserver ``` +#### On the fly custom metrics! + +If you want to provide custom metrics, you can write a function in an external file that we will read it and add to the server. +As a general rule: + + - The name of the function will be the name of the custom metric + - You can expect the only argument to be the flux handle + - You'll need to do imports within your function to get them in scope + +This likely can be improved upon, but is a start for now! We provide an [example file](example/custom-metrics.py). As an example: + +```bash +$ flux-metrics-api start --custom-metric ./example/custom-metrics.py +``` + +And then test it: + +```bash +$ curl -s http://localhost:8443/apis/custom.metrics.k8s.io/v1beta2/namespaces/flux-operator/metrics/my_custom_metric_name | jq +``` +```console +{ + "items": [ + { + "metric": { + "name": "my_custom_metric_name" + }, + "value": 4, + "timestamp": "2023-06-01T01:39:08+00:00", + "windowSeconds": 0, + "describedObject": { + "kind": "Service", + "namespace": "flux-operator", + "name": "custom-metrics-apiserver", + "apiVersion": "v1beta2" + } + } + ], + "apiVersion": "custom.metrics.k8s.io/v1beta2", + "kind": "MetricValueList", + "metadata": { + "selfLink": "/apis/custom.metrics.k8s.io/v1beta2" + } +} +``` + See `--help` to see other options available. ### Endpoints @@ -113,6 +161,14 @@ The following metrics are supported: - **node_free_count**: number of nodes free in the MiniCluster - **node_cores_free_count**: number of node cores free in the MiniCluster - **node_cores_up_count**: number of node cores up in the MiniCluster + - **job_queue_state_new_count**: number of new jobs in the queue + - **job_queue_state_depend_count**: number of jobs in the queue in state "depend" + - **job_queue_state_priority_count**: number of jobs in the queue in state "priority" + - **job_queue_state_sched_count**: number of jobs in the queue in state "sched" + - **job_queue_state_run_count**: number of jobs in the queue in state "run" + - **job_queue_state_cleanup_count**: number of jobs in the queue in state "cleanup" + - **job_queue_state_inactive_count**: number of jobs in the queue in state "inactive" + ### Docker diff --git a/example/custom-metrics.py b/example/custom-metrics.py new file mode 100644 index 0000000..87ff50f --- /dev/null +++ b/example/custom-metrics.py @@ -0,0 +1,25 @@ +# Copyright 2023 Lawrence Livermore National Security, LLC and other +# HPCIC DevTools Developers. See the top-level COPYRIGHT file for details. +# +# SPDX-License-Identifier: (MIT) + +# This is an example of a custom metrics file you can provide on the command line, e.g, +# flux-metrics-api start --custom-metrics ./custom-metrics.py + +# The default format for a custom metric is the following: + + +def my_custom_metric_name(handle): + """ + All custom metrics will be passed the active flux handle. + + - The name of the function is the name of the metric. + - You'll need to import what you need. + """ + # You'll need to import what you need again from Flux + # or other places. + import flux.resource + + rpc = flux.resource.list.resource_list(handle) + listing = rpc.get() + return listing.free.ncores diff --git a/flux_metrics_api/metrics.py b/flux_metrics_api/metrics.py index 71dcbc3..51ee5ed 100644 --- a/flux_metrics_api/metrics.py +++ b/flux_metrics_api/metrics.py @@ -4,7 +4,13 @@ # SPDX-License-Identifier: (MIT) import collections +import importlib.util +import inspect +import os +import shutil +import sys +import flux_metrics_api.utils as utils from flux_metrics_api.logger import logger try: @@ -57,29 +63,125 @@ def node_free_count(): return len(listing.free.nodelist) -def update_queue_metrics(): +def get_queue_metrics(): """ Update metrics for counts of jobs in the queue + + See https://github.com/flux-framework/flux-core/blob/master/src/common/libjob/job.h#L45-L53 + for identifiers. """ jobs = flux.job.job_list(handle) listing = jobs.get() # Organize based on states states = [x["state"] for x in listing["jobs"]] - print(states) counter = collections.Counter(states) + # Lookup of state name to integer + lookup = { + "new": 1, + "depend": 2, + "priority": 4, + "sched": 8, + "run": 16, + "cleanup": 32, + "inactive": 64, + } + # This is how to get states - # TODO make an endpoint for each, if this works at all :/ - for stateint, _ in counter.items(): - flux.job.info.statetostr(stateint) + counts = {} + for stateint, count in counter.items(): + state = flux.job.info.statetostr(stateint) + counts[state] = count + for state in lookup: + if state not in counts: + counts[state] = 0 + return counts + + +# Queue states + + +def job_queue_state_new_count(): + return get_queue_metrics()["new"] + + +def job_queue_state_depend_count(): + return get_queue_metrics()["depend"] + + +def job_queue_state_priority_count(): + return get_queue_metrics()["priority"] + +def job_queue_state_sched_count(): + return get_queue_metrics()["sched"] -# Organize metrics by name so we can eventually support export of custom set (if needed) + +def job_queue_state_run_count(): + return get_queue_metrics()["run"] + + +def job_queue_state_cleanup_count(): + return get_queue_metrics()["cleanup"] + + +def job_queue_state_inactive_count(): + return get_queue_metrics()["inactive"] + + +def add_custom_metrics(metric_file): + """ + Add custom metrics to the server + """ + global metrics + tmpdir = utils.get_tmpdir() + + # Copy our metrics file there and do relative import + custom_metrics_file = os.path.join(tmpdir, "custom_metrics.py") + shutil.copyfile(metric_file, custom_metrics_file) + spec = importlib.util.spec_from_file_location("custom_metrics", custom_metrics_file) + cm = importlib.util.module_from_spec(spec) + sys.modules["cm"] = cm + spec.loader.exec_module(cm) + + # Discover the names, and add the functions! + for contender in dir(cm): + if contender.startswith("_"): + continue + func = getattr(cm, contender) + + # We only care about functions + if func.__class__.__name__ == "function": + args = inspect.signature(func) + + # Must have at least one argument (the handle) + # We could be more strict here, but this is probably OK + if len(args.parameters) == 0: + sys.exit(f"{contender} is not a valid function - has no arguments") + print(f"Adding custom function {contender} to metrics.") + custom_metrics[contender] = func + + # Cleanup + shutil.rmtree(tmpdir) + + +# Organize metrics by name metrics = { + # Node resources "node_cores_free_count": node_core_free_count, "node_cores_up_count": node_core_up_count, "node_free_count": node_free_count, "node_up_count": node_up_count, - # TODO add shared function to get queue stats + # Queue states + "job_queue_state_new_count": job_queue_state_new_count, + "job_queue_state_depend_count": job_queue_state_depend_count, + "job_queue_state_priority_count": job_queue_state_priority_count, + "job_queue_state_sched_count": job_queue_state_sched_count, + "job_queue_state_run_count": job_queue_state_run_count, + "job_queue_state_cleanup_count": job_queue_state_cleanup_count, + "job_queue_state_inactive_count": job_queue_state_inactive_count, } + +# Custom metrics defined by the user (have the handle provided) +custom_metrics = {} diff --git a/flux_metrics_api/routes.py b/flux_metrics_api/routes.py index 45ca6c8..5a70bf3 100644 --- a/flux_metrics_api/routes.py +++ b/flux_metrics_api/routes.py @@ -12,7 +12,7 @@ import flux_metrics_api.defaults as defaults import flux_metrics_api.types as types import flux_metrics_api.version as version -from flux_metrics_api.metrics import metrics +from flux_metrics_api.metrics import custom_metrics, handle, metrics schemas = APISpecSchemaGenerator( APISpec( @@ -55,7 +55,7 @@ def get_metric(request): # TODO we don't do anything with namespace currently, we assume we won't # be able to hit this if running in the wrong one # Unknown metric - if metric_name not in metrics: + if metric_name not in metrics and metric_name not in custom_metrics: print(f"Unknown metric requested {metric_name}") return JSONResponse( {"detail": "This metric is not known to the server."}, status_code=404 @@ -65,7 +65,10 @@ def get_metric(request): metric = types.new_identifier(metric_name) # Get the value from Flux, assemble into listing - value = metrics[metric_name]() + if metric_name in custom_metrics: + value = custom_metrics[metric_name](handle) + else: + value = metrics[metric_name]() metric_value = types.new_metric(metric, value=value) # Give the endpoint for the service as metadata diff --git a/flux_metrics_api/server.py b/flux_metrics_api/server.py index 02a8581..2202001 100644 --- a/flux_metrics_api/server.py +++ b/flux_metrics_api/server.py @@ -14,6 +14,7 @@ import flux_metrics_api import flux_metrics_api.defaults as defaults +import flux_metrics_api.metrics as metrics from flux_metrics_api.logger import setup_logger from flux_metrics_api.routes import routes @@ -73,6 +74,7 @@ def get_parser(): start.add_argument( "--service-name", help="Service name the metrics service is running from" ) + start.add_argument("--custom-metric", help="A Python file with custom metrics") start.add_argument( "--api-path", dest="api_path", @@ -111,6 +113,9 @@ def start(args): if args.ssl_certfile and not args.ssl_keyfile: sys.exit("A --ssl-certfile was provided without a --ssl-keyfile.") + # The user wants to add a file with custom metrics + if args.custom_metric: + metrics.add_custom_metrics(args.custom_metric) app = Starlette(debug=args.debug, routes=routes) uvicorn.run( app, diff --git a/flux_metrics_api/utils.py b/flux_metrics_api/utils.py index 199596e..5a6f81d 100644 --- a/flux_metrics_api/utils.py +++ b/flux_metrics_api/utils.py @@ -3,6 +3,10 @@ # # SPDX-License-Identifier: (MIT) +import os +import tempfile +from contextlib import contextmanager + def read_file(path): """ @@ -11,3 +15,34 @@ def read_file(path): with open(path, "r") as fd: content = fd.read() return content + + +@contextmanager +def workdir(dirname): + """ + Provide context for a working directory, e.g., + + with workdir(name): + # do stuff + """ + here = os.getcwd() + os.chdir(dirname) + try: + yield + finally: + os.chdir(here) + + +def get_tmpdir(tmpdir=None, prefix="", create=True): + """ + Get a temporary directory for an operation. + """ + tmpdir = tmpdir or tempfile.gettempdir() + prefix = prefix or "flux-metrics-api-tmp" + prefix = "%s.%s" % (prefix, next(tempfile._get_candidate_names())) + tmpdir = os.path.join(tmpdir, prefix) + + if not os.path.exists(tmpdir) and create is True: + os.mkdir(tmpdir) + + return tmpdir diff --git a/flux_metrics_api/version.py b/flux_metrics_api/version.py index 9d0a650..7a49500 100644 --- a/flux_metrics_api/version.py +++ b/flux_metrics_api/version.py @@ -3,7 +3,7 @@ # # SPDX-License-Identifier: (MIT) -__version__ = "0.0.1" +__version__ = "0.0.11" AUTHOR = "Vanessa Sochat" EMAIL = "vsoch@users.noreply.github.com" NAME = "flux-metrics-api"