Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Prometheus Middleware to collect metrics #1665

Closed
roma-frolov opened this issue Aug 11, 2024 · 7 comments · Fixed by #1791
Closed

Feature: Prometheus Middleware to collect metrics #1665

roma-frolov opened this issue Aug 11, 2024 · 7 comments · Fixed by #1791
Assignees
Labels
Core Issues related to core FastStream functionality and affects to all brokers enhancement New feature or request

Comments

@roma-frolov
Copy link
Contributor

roma-frolov commented Aug 11, 2024

To improve Observability in FastStream, Prometheus Middleware support is needed.

Suggested metrics:

  • number of messages received;
  • ack quantity;
  • nack quantity;
  • reject quantity;
  • number of unprocessed messages (unhandled errors);
  • number of messages sent.

I'm guessing the usage would look something like this:

from prometheus_client import make_asgi_app, CollectorRegistry
from faststream.asgi import AsgiFastStream
from faststream.rabbit import RabbitBroker
from faststream.rabbit.prometheus import RabbitPrometheusMiddleware


registry = CollectorRegistry()


broker = RabbitBroker(middlewares=[RabbitPrometheusMiddleware(registry=registry)])


app = AsgiFastStream(
    broker,
    asyncapi_path="/docs",
    asgi_routes=[
        ("/metrics", make_asgi_app(registry=registry))
    ]
)

Solution pseudocode:

from prometheus_client import CollectorRegistry, Counter


class RabbitPrometheusMiddleware(BasePrometheusMiddleware):
    @staticmethod
    def get_broker_name() -> str:
        return "rabbit"


class BasePrometheusMiddleware:
    def __init__(self, registry: CollectorRegistry):
        self._registry = registry
        self._received_messages_counter = Counter(
            name="received_messages",
            documentation="Received messages",
            labelnames=["broker"],
        )
        self._processed_messages_with_error_counter = Counter(
            name="processed_messages_with_error",
            documentation="Processed messages with error",
            labelnames=["broker"],
        )
        self._processed_messages_without_error_counter = Counter(
            name="processed_messages_without_error",
            documentation="Processed messages without error",
            labelnames=["broker"],
        )
        self._registry.register(self._received_messages_counter)
        self._registry.register(self._processed_messages_with_error_counter)
        self._registry.register(self._processed_messages_without_error_counter)

    @staticmethod
    def get_broker_name() -> str:
        raise NotImplementedError

    def __call__(self, msg: Optional[Any]) -> "BaseMiddleware":
        return PrometheusMiddleware(
            msg=msg,
            broker_name=self.get_broker_name(),
            received_messages_counter=self._received_messages_counter,
            processed_messages_with_error_counter=self._processed_messages_with_error_counter,
            processed_messages_without_error_counter=self._processed_messages_without_error_counter,
        )


class PrometheusMiddleware(BaseMiddleware):
    def __init__(
        self,
        msg: Optional[Any] = None,
        *,
        broker_name: str,
        received_messages_counter: "Counter",
        processed_messages_with_error_counter: "Counter",
        processed_messages_without_error_counter: "Counter",
    ) -> None:
        self._broker_name = broker_name
        self._received_messages_counter = received_messages_counter
        self._processed_messages_with_error_counter = processed_messages_with_error_counter
        self._processed_messages_without_error_counter = processed_messages_without_error_counter
        super().__init__(msg)

    async def on_consume(
        self,
        msg: "StreamMessage[Any]",
    ) -> "StreamMessage[Any]":
        self._received_messages_counter.labels(broker=self._broker_name).inc()
        return await super().on_consume(msg)

    async def after_processed(self, exc_type, exc_val, exc_tb):
        if exc_val is None:
            self._processed_messages_without_error_counter.labels(broker=self._broker_name).inc()
        else:
            self._processed_messages_with_error_counter.labels(broker=self._broker_name).inc()

        return await super().after_processed(exc_type, exc_val, exc_tb)

This example produces metrics like this:

# HELP received_messages_total Received messages
# TYPE received_messages_total counter
received_messages_total{broker="rabbit"} 314560.0
# HELP received_messages_created Received messages
# TYPE received_messages_created gauge
received_messages_created{broker="rabbit"} 1.723374866369715e+09
# HELP processed_messages_with_error_total Processed messages with error
# TYPE processed_messages_with_error_total counter
processed_messages_with_error_total{broker="rabbit"} 155.0
# HELP processed_messages_with_error_created Processed messages with error
# TYPE processed_messages_with_error_created gauge
processed_messages_with_error_created{broker="rabbit"} 1.723374866552031e+09
# HELP processed_messages_without_error_total Processed messages without error
# TYPE processed_messages_without_error_total counter
processed_messages_without_error_total{broker="rabbit"} 314559.0
# HELP processed_messages_without_error_created Processed messages without error
# TYPE processed_messages_without_error_created gauge
processed_messages_without_error_created{broker="rabbit"} 1.723374866372083e+09

It’s also a good idea to add a “handler” label in metrics, but I haven’t yet been able to figure out how to get meta information about the handler. Or how can I do this better by getting it from a msg object?

I would be glad to hear any comments and suggestions on this implementation.

@roma-frolov roma-frolov added the enhancement New feature or request label Aug 11, 2024
@roma-frolov
Copy link
Contributor Author

I would be happy to implement this feature.

@roma-frolov
Copy link
Contributor Author

and what is the best type for metrics? counter or histogram?

@Lancetnik
Copy link
Member

and what is the best type for metrics? counter or histogram?

I think, it should be counter type

@Lancetnik Lancetnik added the Core Issues related to core FastStream functionality and affects to all brokers label Aug 11, 2024
@Lancetnik Lancetnik mentioned this issue Aug 11, 2024
63 tasks
@gaby
Copy link

gaby commented Aug 13, 2024

I'd be cool if there was also an option for publishing the metrics to a Prometheus Pushgateway. This would benefit short-lived and ephemeral jobs. It's already supported by the Prometheus Python client.

https://prometheus.github.io/client_python/exporting/pushgateway/

@Lancetnik
Copy link
Member

I'd be cool if there was also an option for publishing the metrics to a Prometheus Pushgateway. This would benefit short-lived and ephemeral jobs. It's already supported by the Prometheus Python client.

https://prometheus.github.io/client_python/exporting/pushgateway/

The main idea, that you should pass CollectorRegistry() object to FastStream middlewares to write metrics into it. Then you still able to use this registry object any way, that prometheus python client allows. So, I don't think FastStream reqires any additional options to support Pushgateway

@gaby
Copy link

gaby commented Aug 13, 2024

@Lancetnik Yeah, that would work 💪

@davorrunje davorrunje assigned Lancetnik and roma-frolov and unassigned Lancetnik Sep 2, 2024
@roma-frolov
Copy link
Contributor Author

roma-frolov commented Sep 2, 2024

I conducted a small research and am adding information about the planned metrics:

metrics that I plan to implement:

  • number of received messages (Counter);
  • size of received messages (Histogram);
  • duration of message processing (Histogram);
  • number of messages in processing (Gauge);
  • number of processed messages (Counter);
  • number of errors during message processing (Counter);
  • number of sent messages (Counter);
  • duration of message sending (Histogram);
  • size of sent messages (Histogram);
  • number of errors during message sending (Counter).

each metric will have labels: broker, handler (for receiving metrics), destination (for sending metrics)

message processing metrics will also have a status label

status values ​​for processing received messages:

  • acked
  • nacked
  • rejected
  • skipped
  • error

status values ​​for processing sent messages:

  • success
  • error

the error number metric will have a label error_type, its values ​​will be type(exc).__name__

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Core Issues related to core FastStream functionality and affects to all brokers enhancement New feature or request
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

3 participants