New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decouple workload, configuration management from charm.py #9
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
"""Grafana agent config builder.""" | ||
|
||
from typing import Any, Dict, List, Optional | ||
|
||
from charms.observability_libs.v0.juju_topology import JujuTopology | ||
|
||
|
||
class Config: | ||
"""A 'config builder' for grafana agent.""" | ||
|
||
def __init__( | ||
self, | ||
*, | ||
topology: JujuTopology, | ||
scrape_configs: Optional[list] = None, | ||
remote_write: Optional[List[Dict[str, Any]]] = None, | ||
loki_endpoints: Optional[List[dict]] = None, | ||
positions_dir: str = "/run", | ||
insecure_skip_verify: bool = False, | ||
http_listen_port: int = 3500, | ||
grpc_listen_port: int = 3600, | ||
): | ||
self._topology = topology | ||
self.scrape_configs = (scrape_configs or []).copy() | ||
self.remote_write = (remote_write or []).copy() | ||
self.loki_endpoints = (loki_endpoints or []).copy() | ||
|
||
for endpoint in self.remote_write + self.loki_endpoints: | ||
endpoint["tls_config"] = {"insecure_skip_verify": insecure_skip_verify} | ||
|
||
self.positions_dir = f"{positions_dir.rstrip('/')}/grafana-agent-positions" | ||
self.http_port = http_listen_port | ||
self.grpc_port = grpc_listen_port | ||
|
||
def build(self) -> dict: | ||
"""Build the full config dict.""" | ||
config = { | ||
"server": {"log_level": "info"}, | ||
"integrations": self._integrations_config, | ||
"metrics": { | ||
"wal_directory": "/tmp/agent/data", # should match metadata | ||
"configs": [ | ||
{ | ||
"name": "agent_scraper", | ||
"scrape_configs": self.scrape_configs, | ||
"remote_write": self.remote_write, | ||
} | ||
], | ||
}, | ||
"logs": { | ||
"positions_directory": self.positions_dir, | ||
"configs": [ | ||
{ | ||
"name": "push_api_server", | ||
"clients": self.loki_endpoints, | ||
"scrape_configs": [ | ||
{ | ||
"job_name": "loki", | ||
"loki_push_api": { | ||
"server": { | ||
"http_listen_port": self.http_port, | ||
"grpc_listen_port": self.grpc_port, | ||
}, | ||
}, | ||
} | ||
], | ||
} | ||
], # TODO: capture `_additional_log_configs` logic for the machine charm | ||
}, | ||
} | ||
|
||
# Seems like we cannot have an empty "configs" section. Delete it if no endpoints. | ||
if not self.loki_endpoints: | ||
config["logs"] = {} | ||
|
||
# TODO add a roundtrip check that the config is valid (dump then load), and set status | ||
# accordingly | ||
|
||
return config | ||
|
||
@property | ||
def _instance_name(self) -> str: | ||
parts = [ | ||
self._topology.model, | ||
self._topology.model_uuid, | ||
self._topology.application, | ||
self._topology.unit, | ||
] | ||
return "_".join(parts) # TODO do we also need to `replace("/", "_")` ? | ||
|
||
@property | ||
def _integrations_config(self) -> dict: | ||
"""Return the integrations section of the config. | ||
|
||
Returns: | ||
The dict representing the config | ||
""" | ||
# Align the "job" name with those of prometheus_scrape | ||
job_name = "juju_{}_{}_{}_self-monitoring".format( | ||
self._topology.model, self._topology.model_uuid, self._topology.application | ||
) | ||
|
||
conf = { | ||
"agent": { | ||
"enabled": True, | ||
"relabel_configs": [ | ||
{ | ||
"target_label": "job", | ||
"regex": "(.*)", | ||
"replacement": job_name, | ||
}, | ||
{ # Align the "instance" label with the rest of the Juju-collected metrics | ||
"target_label": "instance", | ||
"regex": "(.*)", | ||
"replacement": self._instance_name, | ||
}, | ||
{ # To add a label, we create a relabelling that replaces a built-in | ||
"source_labels": ["__address__"], | ||
"target_label": "juju_charm", | ||
"replacement": self._topology.charm_name, | ||
}, | ||
{ # To add a label, we create a relabelling that replaces a built-in | ||
"source_labels": ["__address__"], | ||
"target_label": "juju_model", | ||
"replacement": self._topology.model, | ||
}, | ||
{ | ||
"source_labels": ["__address__"], | ||
"target_label": "juju_model_uuid", | ||
"replacement": self._topology.model_uuid, | ||
}, | ||
{ | ||
"source_labels": ["__address__"], | ||
"target_label": "juju_application", | ||
"replacement": self._topology.application, | ||
}, | ||
{ | ||
"source_labels": ["__address__"], | ||
"target_label": "juju_unit", | ||
"replacement": self._topology.unit, | ||
}, | ||
], | ||
}, | ||
"prometheus_remote_write": self.remote_write, | ||
# TODO capture `_additional_integrations` logic for the machine charm | ||
} | ||
return conf |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
"""Workload manager for grafana agent.""" | ||
|
||
import logging | ||
import pathlib | ||
import re | ||
from typing import Any, Callable, Optional, Union | ||
|
||
import yaml | ||
from ops.framework import Object | ||
from ops.model import ActiveStatus, StatusBase, UnknownStatus, WaitingStatus | ||
from ops.pebble import APIError, PathError | ||
from yaml.constructor import ConstructorError | ||
from yaml.parser import ParserError | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class Status: | ||
"""Helping with centralized status setting.""" | ||
|
||
def __init__(self, callback: Callable[[StatusBase], None] = lambda _: None): | ||
self._config: StatusBase = UnknownStatus() | ||
self._callback = callback | ||
Comment on lines
+18
to
+23
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I seems to me like this status class should be defined globally in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It for sure would need to be combined, but the idea I'm proposing here is:
This seems more flexible than introducing additional assumptions on the structure/hierarchy of one global status object. Wdyt? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm thinking we should wait on compound status to land in ops, inventing our own here seems like coinventing the wheel |
||
|
||
def __call__(self) -> StatusBase: | ||
"""Return the 'total' status: a single status that sums/represents all statuses.""" | ||
return self._combined() | ||
|
||
@property | ||
def config(self): | ||
"""Status for the grafana agent config file.""" | ||
return self._config | ||
|
||
@config.setter | ||
def config(self, value: StatusBase): | ||
self._config = value | ||
# When status is updated, it is likely desirable to have some kind of side effect. | ||
self._side_effect() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit]: |
||
|
||
def _side_effect(self): | ||
logger.debug("Status updated to: %s", self._combined()) | ||
self._callback(self._combined()) | ||
|
||
def _combined(self) -> StatusBase: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit]: either |
||
# Currently there's only one status component, so the combined status is just it. | ||
return self.config | ||
|
||
|
||
class WorkloadManager(Object): | ||
"""Workload manager for grafana agent.""" | ||
|
||
CONFIG_PATH = "/etc/grafana-agent.yaml" | ||
|
||
def __init__( | ||
self, | ||
charm, | ||
*, | ||
container_name: str, | ||
config_getter: Callable[[], Any], | ||
status_changed_callback: Callable[[StatusBase], None], | ||
): | ||
# Must inherit from ops 'Object' to be able to register events. | ||
super().__init__(charm, f"{self.__class__.__name__}-{container_name}") | ||
|
||
# Property to facilitate centralized status update | ||
self.status = Status(callback=status_changed_callback) # pyright: ignore | ||
|
||
self._unit = charm.unit | ||
|
||
self._service_name = self._container_name = container_name | ||
self._container = charm.unit.get_container(container_name) | ||
|
||
self._render_config = config_getter | ||
|
||
# turn the container name to a valid Python identifier | ||
snake_case_container_name = self._container_name.replace("-", "_") | ||
charm.framework.observe( | ||
getattr(charm.on, "{}_pebble_ready".format(snake_case_container_name)), | ||
self._on_pebble_ready, | ||
) | ||
Comment on lines
+63
to
+80
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we call super, I think we should be able to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, but this seems more explicit in a good way? |
||
|
||
def _cli_args(self) -> str: | ||
"""Return the cli arguments to pass to agent. | ||
|
||
Returns: | ||
The arguments as a string | ||
""" | ||
return f"-config.file={self.CONFIG_PATH}" | ||
|
||
def _on_pebble_ready(self, _): | ||
self._update_config() | ||
|
||
pebble_layer = { | ||
"summary": "agent layer", | ||
"description": "pebble config layer for Grafana Agent", | ||
"services": { | ||
"agent": { | ||
"override": "replace", | ||
"summary": "agent", | ||
"command": f"/bin/agent {self._cli_args()}", | ||
"startup": "enabled", | ||
}, | ||
}, | ||
} | ||
self._container.add_layer(self._service_name, pebble_layer, combine=True) | ||
self._container.autostart() | ||
|
||
if version := self.version: | ||
self._unit.set_workload_version(version) | ||
Comment on lines
+108
to
+109
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (blocking): I'm not sure the version we display should be the grafana-agent version. This is a Mimir charm after all. Not sure what version we should display but I don't think it should be this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point!
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO:
|
||
else: | ||
logger.debug( | ||
"Cannot set workload version at this time: could not get grafana-agent version." | ||
) | ||
|
||
def is_ready(self): | ||
"""Is ready.""" | ||
return self._container.can_connect() | ||
|
||
@property | ||
def version(self) -> Optional[str]: | ||
"""Returns the version of the agent. | ||
|
||
Returns: | ||
A string equal to the agent version | ||
""" | ||
if not self.is_ready: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
return None | ||
|
||
# Output looks like this: | ||
# agent, version v0.26.1 (branch: HEAD, revision: 2b88be37) | ||
version_output, _ = self._container.exec(["/bin/agent", "-version"]).wait_output() | ||
result = re.search(r"v(\d*\.\d*\.\d*)", version_output) | ||
return result.group(1) if result else None | ||
|
||
def read_file(self, filepath: Union[str, pathlib.Path]): | ||
"""Read a file's contents. | ||
|
||
Returns: | ||
A string with the file's contents | ||
""" | ||
return self._container.pull(filepath).read() | ||
|
||
def write_file(self, path: Union[str, pathlib.Path], text: str) -> None: | ||
"""Write text to a file. | ||
|
||
Args: | ||
path: file path to write to | ||
text: text to write to the file | ||
""" | ||
self._container.push(path, text, make_dirs=True) | ||
|
||
def restart(self) -> None: | ||
"""Restart grafana agent.""" | ||
self._container.restart(self._service_name) | ||
|
||
def _update_config(self) -> None: | ||
if not self.is_ready: | ||
# Workload is not yet available so no need to update config | ||
self.status.config = WaitingStatus("Workload is not yet available") | ||
return | ||
|
||
config = self._render_config() # TODO: Must not be None | ||
assert config is not None | ||
Comment on lines
+162
to
+163
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we capturing possible |
||
|
||
try: | ||
old_config = self.read_file(self.CONFIG_PATH) | ||
logger.info("Old config: %s", old_config) | ||
old_config = yaml.safe_load(old_config) | ||
except (FileNotFoundError, PathError, ParserError, ConstructorError): | ||
# The file does not yet exist? | ||
old_config = None | ||
|
||
if config == old_config: | ||
# Nothing changed, possibly new installation. Move on. | ||
self.status.config = ActiveStatus() | ||
return | ||
|
||
try: | ||
self.write_file(self.CONFIG_PATH, yaml.dump(config)) | ||
except APIError as e: | ||
logger.warning(str(e)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this fails, how does the charm know? and how does it retry? |
||
self.status.config = WaitingStatus(str(e)) | ||
return | ||
Comment on lines
+173
to
+183
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: Should the question: Should we restart or reload grafana-agent after writing the new config file? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Comment on lines
+180
to
+183
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (blocking): I do not think waiting status makes sense. We are not waiting on a related app. In fact, this error is not resolvable at all, so we should surface the exception in order to go in to error state. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is copied from gagent. Seems like:
So I agree, all in all this should be BlockedStatus. |
||
|
||
self.status.config = ActiveStatus() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not look like it necessarily belongs in the init function.