Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start working on custom metrics for Kafka #404

Merged
merged 6 commits into from Mar 12, 2024
Merged

Start working on custom metrics for Kafka #404

merged 6 commits into from Mar 12, 2024

Conversation

whoahbot
Copy link
Contributor

@whoahbot whoahbot commented Feb 22, 2024

Overview

The premise of this PR is to provide the ability for our users to add custom instrumentation to dataflows that will be reported from our /metrics endpoint.

I've tried a few different approaches, and I didn't feel as though any of the ideas I've tried was a clear winner, so I'm opening this PR as a strawman to solicit some ideas on how to proceed.

I'll start with a list of general issues I encountered:

Use of the Python Prometheus library, or OpenTelemetry client

I looked for a bit into using the Python OpenTelemetry library to collect metrics on the Python side, and register them as a Collector to the Rust-side registry. I didn't end up prototyping this approach as it felt overly complicated.

Which Rust crates to use for metrics?

Our use of the opentelemetry crate's metrics feature was initially a little problematic as that crate does not offer a synchronous Gauge type yet. The callback pattern of metrics collection that is used by the confluent_kafka Python library doesn't seem amenable to using an async Gauge.

The OpenTelemetry rust crate also requires the use of both the prometheus crate, and the opentelemetry_prometheus crate. The opentelemetry_prometheus crate depends on the prometheus crate to provide the registry, and the encoder that is used to generate the text format for metrics that are scraped by Prometheus.

You can see in this strawman PR that since we are already depending on the prometheus crate, I used the Gauge, Counter, and Histogram implementations that are provided there.

I also included the prometheus crate's process feature, which includes some useful metrics around process memory usage and thread counts.

A smaller issue- Where should custom metrics get the step_id to use as a label for metrics?

In the KafkaSource example in this PR, there wasn't an obvious way to reference the step_id to use as a label for metrics without changing the signature for FixedPartitionSource. Since this approach will probably change, I didn't make any changes to the API of input sources.

Comment on lines 171 to 176
pub(crate) fn register(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<Counter>()?;
m.add_class::<Gauge>()?;
m.add_class::<Histogram>()?;
Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. This feels like committing to an uphill battle. Writing a Python metrics API could be an entire startup into itself, and although yes we are piggy-backing on the great work of OpenTelemetry, just maintaining the binding API seems not fun. It feels like a theme of my comments recently has been "re-use APIs". I do not think this is a reasonable long-term solution.

["topic", "partition"],
)
# Labels to use when recording metrics
# TODO: It would be nice to have the step_id available here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would update build_part to include a step_id: str argument which the input operator injects. If the partition itself wants to store it as an instance variable and use it, great; if not, it can ignore it. This would parallel the way we inject it into UnaryLogic by closing over it in the builder.

@@ -46,3 +167,10 @@ pub(crate) fn initialize_metrics() -> PyResult<()> {
global::set_meter_provider(provider);
Ok(())
}

pub(crate) fn register(_py: Python, m: &PyModule) -> PyResult<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although it might be more intricate, would the API surface of writing the "collector bridge" require us to shoulder the same problem of maintaining a binding API? Or would it be generic enough we could write it once?

My guess is it'd be a little better, but might not be the lowest cost.

@@ -46,3 +167,10 @@ pub(crate) fn initialize_metrics() -> PyResult<()> {
global::set_meter_provider(provider);
Ok(())
}

pub(crate) fn register(_py: Python, m: &PyModule) -> PyResult<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option? Could the Python side metrics be entirely separate for now while it's difficult to bridge? Is there a big penalty to setting up two metrics endpoints that external collectors have to hit on different ports?

@Psykopear
Copy link
Contributor

Sorry, I somehow missed this PR.

I agree with David, that the way it's setup right now means we'll need to update and maintain the bindings just to offer a small subset of what the underlying library offers, depending on what we think might be useful to us or the users.

But, being able to expose custom metrics in our own connectors is surely useful, and to do that we need to access the metrics backend from python.

Doing a second, separate setup of the metrics system in python did introduce some problems when parallelizing, as most of the libraries I've worked with tend to rely on a global object, but maybe that's less of a problem now that the multiprocess sheningans are relegated to the testing runner, so it could be something to explore again.

Or maybe we could try to expose a less specific api, something like a def stats(self, **possibly_some_params) -> dict[str, list[float]] method that can be defined in sources and sinks and is called after every next_batch? It's less flexible, but this way we only have to deal with the actual data for the metrics, rather than the entire metrics system.

@whoahbot whoahbot force-pushed the kafka_metrics branch 2 times, most recently from 5f95edd to 9e9b8d0 Compare February 29, 2024 00:46
@whoahbot whoahbot marked this pull request as ready for review February 29, 2024 16:43
@whoahbot
Copy link
Contributor Author

@davidselassie @Psykopear I reworked this PR to use the Python side exporter to generate metrics from the prometheus-client library, and combine that with the output from the Rust side exporter.

@Psykopear
Copy link
Contributor

@davidselassie @Psykopear I reworked this PR to use the Python side exporter to generate metrics from the prometheus-client library, and combine that with the output from the Rust side exporter.

I like it, it keeps the nice user facing api without us having to map it manually.
There's a source of problems though: we need to make sure to use unique names for time series in each worker. If you try to run the custom_metrics example as it is with multiple workers, it crashes due to a duplicate time series (each worker uses the same name). It works if you add the worker_index to each Gauge's name:

class PeriodicPartition(StatelessSourcePartition):
    def __init__(self, step_id: str, worker_index: int, frequency: timedelta):
        self.frequency = frequency
        self._next_awake = datetime.now(timezone.utc)
        self._gauge = Gauge(
            f"next_batch_delay_{worker_index}",
            "Calculated delay of when next batch was called in seconds",
            ["step_id", "partition"],
            unit="seconds",
        )
        self._counter = 0
        self._metric_labels = {"step_id": step_id, "partition": "0"}

But it could be enough to warn users of this caveat.

@whoahbot
Copy link
Contributor Author

whoahbot commented Mar 2, 2024

@davidselassie @Psykopear I reworked this PR to use the Python side exporter to generate metrics from the prometheus-client library, and combine that with the output from the Rust side exporter.

I like it, it keeps the nice user facing api without us having to map it manually. There's a source of problems though: we need to make sure to use unique names for time series in each worker. If you try to run the custom_metrics example as it is with multiple workers, it crashes due to a duplicate time series (each worker uses the same name). It works if you add the worker_index to each Gauge's name:

Great catch!

I need to think carefully about the right way to do this. I think that the worker_id in some cases is more properly a label, so that you can aggregate the data.

Copy link
Contributor

@davidselassie davidselassie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is good enough for now, but it'd be good to re-visit if there are issues in the future.

CHANGELOG.md Show resolved Hide resolved
// Remove trailing newline
rust_metrics.pop();

format!("{rust_metrics}{py_metrics}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most brilliant hack of all: string concatenation.

@whoahbot whoahbot force-pushed the kafka_metrics branch 4 times, most recently from 14aacd8 to 4c3516c Compare March 11, 2024 21:07
- Removes Rust side wrappers.
- Adds a Python dependency on prometheus-client.
Cleans up some old examples.
The Prometheus client library's REGISTRY is a global.

When using a Gauge for Kafka, registering the same
Gauge twice will cause an error.

This change makes the declaration of Gauges used
between multiple partitions global as well. Labels
should be used to collect metrics for different
partitions or workers.
Copy link

codspeed-hq bot commented Mar 11, 2024

CodSpeed Performance Report

Merging #404 will create unknown performance changes

Comparing kafka_metrics (b064437) with main (0f9cf2d)

Summary

⚠️ No benchmarks were detected in both the base of the PR and the PR.

Copy link
Contributor

@Psykopear Psykopear left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me 👍 I added a couple of comments, but nothing major.

@@ -92,7 +92,9 @@ def list_parts(self) -> List[str]:
return [self._metric_name]

@override
def build_part(self, for_part: str, resume_state: Optional[_RandomMetricState]):
def build_part(
self, _step_id: str, for_part: str, resume_state: Optional[_RandomMetricState]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When overriding methods we should probably keep the original name of the kwarg, so step_id instead of _step_id, even if it's not used, so that if we pass explicitly named arguments to the subclass it still works. Same for all the other overrides in this PR

):
self._offset = starting_offset if resume_state is None else resume_state
print(f"starting offset: {starting_offset}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe log rather than print here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to remove this. Thanks for pointing it out!

- Remove print of offsets
- Change unused `_step_id` back to `step_id` to fix overrides.
@whoahbot whoahbot merged commit f3341f5 into main Mar 12, 2024
29 checks passed
@whoahbot whoahbot deleted the kafka_metrics branch March 12, 2024 16:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants