Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ and **Merged pull requests**. Critical items to know are:
The versions coincide with releases on pip. Only major versions will be released as tags on Github.

## [0.0.x](https://github.com/converged-computing/flux-metrics-api/tree/main) (0.0.x)
- On the fly metric (from a custom file) support, and job queue counts (0.0.11)
- Support for certificates for uvicorn and change default port to 8443 (0.0.1)
- Skelton release (0.0.0)
56 changes: 56 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ $ flux-metrics-api start
$ flux-metrics-api start --port 9000 --host 127.0.0.1
```

#### SSL

If you want ssl (port 443) you can provide the path to a certificate and keyfile:

```bash
Expand All @@ -68,6 +70,52 @@ An example of a full command we might run from within a pod:
$ flux-metrics-api start --port 8443 --ssl-certfile /etc/certs/tls.crt --ssl-keyfile /etc/certs/tls.key --namespace flux-operator --service-name custom-metrics-apiserver
```

#### On the fly custom metrics!

If you want to provide custom metrics, you can write a function in an external file that we will read it and add to the server.
As a general rule:

- The name of the function will be the name of the custom metric
- You can expect the only argument to be the flux handle
- You'll need to do imports within your function to get them in scope

This likely can be improved upon, but is a start for now! We provide an [example file](example/custom-metrics.py). As an example:

```bash
$ flux-metrics-api start --custom-metric ./example/custom-metrics.py
```

And then test it:

```bash
$ curl -s http://localhost:8443/apis/custom.metrics.k8s.io/v1beta2/namespaces/flux-operator/metrics/my_custom_metric_name | jq
```
```console
{
"items": [
{
"metric": {
"name": "my_custom_metric_name"
},
"value": 4,
"timestamp": "2023-06-01T01:39:08+00:00",
"windowSeconds": 0,
"describedObject": {
"kind": "Service",
"namespace": "flux-operator",
"name": "custom-metrics-apiserver",
"apiVersion": "v1beta2"
}
}
],
"apiVersion": "custom.metrics.k8s.io/v1beta2",
"kind": "MetricValueList",
"metadata": {
"selfLink": "/apis/custom.metrics.k8s.io/v1beta2"
}
}
```

See `--help` to see other options available.

### Endpoints
Expand Down Expand Up @@ -113,6 +161,14 @@ The following metrics are supported:
- **node_free_count**: number of nodes free in the MiniCluster
- **node_cores_free_count**: number of node cores free in the MiniCluster
- **node_cores_up_count**: number of node cores up in the MiniCluster
- **job_queue_state_new_count**: number of new jobs in the queue
- **job_queue_state_depend_count**: number of jobs in the queue in state "depend"
- **job_queue_state_priority_count**: number of jobs in the queue in state "priority"
- **job_queue_state_sched_count**: number of jobs in the queue in state "sched"
- **job_queue_state_run_count**: number of jobs in the queue in state "run"
- **job_queue_state_cleanup_count**: number of jobs in the queue in state "cleanup"
- **job_queue_state_inactive_count**: number of jobs in the queue in state "inactive"


### Docker

Expand Down
25 changes: 25 additions & 0 deletions example/custom-metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2023 Lawrence Livermore National Security, LLC and other
# HPCIC DevTools Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (MIT)

# This is an example of a custom metrics file you can provide on the command line, e.g,
# flux-metrics-api start --custom-metrics ./custom-metrics.py

# The default format for a custom metric is the following:


def my_custom_metric_name(handle):
"""
All custom metrics will be passed the active flux handle.

- The name of the function is the name of the metric.
- You'll need to import what you need.
"""
# You'll need to import what you need again from Flux
# or other places.
import flux.resource

rpc = flux.resource.list.resource_list(handle)
listing = rpc.get()
return listing.free.ncores
116 changes: 109 additions & 7 deletions flux_metrics_api/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
# SPDX-License-Identifier: (MIT)

import collections
import importlib.util
import inspect
import os
import shutil
import sys

import flux_metrics_api.utils as utils
from flux_metrics_api.logger import logger

try:
Expand Down Expand Up @@ -57,29 +63,125 @@ def node_free_count():
return len(listing.free.nodelist)


def update_queue_metrics():
def get_queue_metrics():
"""
Update metrics for counts of jobs in the queue

See https://github.com/flux-framework/flux-core/blob/master/src/common/libjob/job.h#L45-L53
for identifiers.
"""
jobs = flux.job.job_list(handle)
listing = jobs.get()

# Organize based on states
states = [x["state"] for x in listing["jobs"]]
print(states)
counter = collections.Counter(states)

# Lookup of state name to integer
lookup = {
"new": 1,
"depend": 2,
"priority": 4,
"sched": 8,
"run": 16,
"cleanup": 32,
"inactive": 64,
}

# This is how to get states
# TODO make an endpoint for each, if this works at all :/
for stateint, _ in counter.items():
flux.job.info.statetostr(stateint)
counts = {}
for stateint, count in counter.items():
state = flux.job.info.statetostr(stateint)
counts[state] = count
for state in lookup:
if state not in counts:
counts[state] = 0
return counts


# Queue states


def job_queue_state_new_count():
return get_queue_metrics()["new"]


def job_queue_state_depend_count():
return get_queue_metrics()["depend"]


def job_queue_state_priority_count():
return get_queue_metrics()["priority"]


def job_queue_state_sched_count():
return get_queue_metrics()["sched"]

# Organize metrics by name so we can eventually support export of custom set (if needed)

def job_queue_state_run_count():
return get_queue_metrics()["run"]


def job_queue_state_cleanup_count():
return get_queue_metrics()["cleanup"]


def job_queue_state_inactive_count():
return get_queue_metrics()["inactive"]


def add_custom_metrics(metric_file):
"""
Add custom metrics to the server
"""
global metrics
tmpdir = utils.get_tmpdir()

# Copy our metrics file there and do relative import
custom_metrics_file = os.path.join(tmpdir, "custom_metrics.py")
shutil.copyfile(metric_file, custom_metrics_file)
spec = importlib.util.spec_from_file_location("custom_metrics", custom_metrics_file)
cm = importlib.util.module_from_spec(spec)
sys.modules["cm"] = cm
spec.loader.exec_module(cm)

# Discover the names, and add the functions!
for contender in dir(cm):
if contender.startswith("_"):
continue
func = getattr(cm, contender)

# We only care about functions
if func.__class__.__name__ == "function":
args = inspect.signature(func)

# Must have at least one argument (the handle)
# We could be more strict here, but this is probably OK
if len(args.parameters) == 0:
sys.exit(f"{contender} is not a valid function - has no arguments")
print(f"Adding custom function {contender} to metrics.")
custom_metrics[contender] = func

# Cleanup
shutil.rmtree(tmpdir)


# Organize metrics by name
metrics = {
# Node resources
"node_cores_free_count": node_core_free_count,
"node_cores_up_count": node_core_up_count,
"node_free_count": node_free_count,
"node_up_count": node_up_count,
# TODO add shared function to get queue stats
# Queue states
"job_queue_state_new_count": job_queue_state_new_count,
"job_queue_state_depend_count": job_queue_state_depend_count,
"job_queue_state_priority_count": job_queue_state_priority_count,
"job_queue_state_sched_count": job_queue_state_sched_count,
"job_queue_state_run_count": job_queue_state_run_count,
"job_queue_state_cleanup_count": job_queue_state_cleanup_count,
"job_queue_state_inactive_count": job_queue_state_inactive_count,
}

# Custom metrics defined by the user (have the handle provided)
custom_metrics = {}
9 changes: 6 additions & 3 deletions flux_metrics_api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import flux_metrics_api.defaults as defaults
import flux_metrics_api.types as types
import flux_metrics_api.version as version
from flux_metrics_api.metrics import metrics
from flux_metrics_api.metrics import custom_metrics, handle, metrics

schemas = APISpecSchemaGenerator(
APISpec(
Expand Down Expand Up @@ -55,7 +55,7 @@ def get_metric(request):
# TODO we don't do anything with namespace currently, we assume we won't
# be able to hit this if running in the wrong one
# Unknown metric
if metric_name not in metrics:
if metric_name not in metrics and metric_name not in custom_metrics:
print(f"Unknown metric requested {metric_name}")
return JSONResponse(
{"detail": "This metric is not known to the server."}, status_code=404
Expand All @@ -65,7 +65,10 @@ def get_metric(request):
metric = types.new_identifier(metric_name)

# Get the value from Flux, assemble into listing
value = metrics[metric_name]()
if metric_name in custom_metrics:
value = custom_metrics[metric_name](handle)
else:
value = metrics[metric_name]()
metric_value = types.new_metric(metric, value=value)

# Give the endpoint for the service as metadata
Expand Down
5 changes: 5 additions & 0 deletions flux_metrics_api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import flux_metrics_api
import flux_metrics_api.defaults as defaults
import flux_metrics_api.metrics as metrics
from flux_metrics_api.logger import setup_logger
from flux_metrics_api.routes import routes

Expand Down Expand Up @@ -73,6 +74,7 @@ def get_parser():
start.add_argument(
"--service-name", help="Service name the metrics service is running from"
)
start.add_argument("--custom-metric", help="A Python file with custom metrics")
start.add_argument(
"--api-path",
dest="api_path",
Expand Down Expand Up @@ -111,6 +113,9 @@ def start(args):
if args.ssl_certfile and not args.ssl_keyfile:
sys.exit("A --ssl-certfile was provided without a --ssl-keyfile.")

# The user wants to add a file with custom metrics
if args.custom_metric:
metrics.add_custom_metrics(args.custom_metric)
app = Starlette(debug=args.debug, routes=routes)
uvicorn.run(
app,
Expand Down
35 changes: 35 additions & 0 deletions flux_metrics_api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
#
# SPDX-License-Identifier: (MIT)

import os
import tempfile
from contextlib import contextmanager


def read_file(path):
"""
Expand All @@ -11,3 +15,34 @@ def read_file(path):
with open(path, "r") as fd:
content = fd.read()
return content


@contextmanager
def workdir(dirname):
"""
Provide context for a working directory, e.g.,

with workdir(name):
# do stuff
"""
here = os.getcwd()
os.chdir(dirname)
try:
yield
finally:
os.chdir(here)


def get_tmpdir(tmpdir=None, prefix="", create=True):
"""
Get a temporary directory for an operation.
"""
tmpdir = tmpdir or tempfile.gettempdir()
prefix = prefix or "flux-metrics-api-tmp"
prefix = "%s.%s" % (prefix, next(tempfile._get_candidate_names()))
tmpdir = os.path.join(tmpdir, prefix)

if not os.path.exists(tmpdir) and create is True:
os.mkdir(tmpdir)

return tmpdir
2 changes: 1 addition & 1 deletion flux_metrics_api/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#
# SPDX-License-Identifier: (MIT)

__version__ = "0.0.1"
__version__ = "0.0.11"
AUTHOR = "Vanessa Sochat"
EMAIL = "vsoch@users.noreply.github.com"
NAME = "flux-metrics-api"
Expand Down