Skip to content

Commit

Permalink
Add app_name to prometheus metrics (#120)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kuzyashin committed Mar 12, 2021
1 parent 1499300 commit e0010f7
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 69 deletions.
72 changes: 42 additions & 30 deletions faust/sensors/prometheus.py
Expand Up @@ -40,14 +40,17 @@


def setup_prometheus_sensors(
app: AppT, pattern: str = "/metrics", registry: CollectorRegistry = REGISTRY
app: AppT,
pattern: str = "/metrics",
registry: CollectorRegistry = REGISTRY,
name_prefix: str = None,
) -> None:
if prometheus_client is None:
raise ImproperlyConfigured(
"prometheus_client requires `pip install prometheus_client`."
)

faust_metrics = FaustMetrics.create(registry)
faust_metrics = FaustMetrics.create(registry, name_prefix or app.conf.name)
app.monitor = PrometheusMonitor(metrics=faust_metrics)

@app.page(pattern)
Expand Down Expand Up @@ -105,119 +108,128 @@ class FaustMetrics(NamedTuple):
consumer_commit_latency: Histogram

@classmethod
def create(cls, registry: CollectorRegistry) -> "FaustMetrics":
def create(cls, registry: CollectorRegistry, app_name: str) -> "FaustMetrics":
messages_received = Counter(
"messages_received", "Total messages received", registry=registry
f"{app_name}_messages_received",
"Total messages received",
registry=registry,
)
active_messages = Gauge(
"active_messages", "Total active messages", registry=registry
f"{app_name}_active_messages", "Total active messages", registry=registry
)
messages_received_per_topics = Counter(
"messages_received_per_topic",
f"{app_name}_messages_received_per_topic",
"Messages received per topic",
["topic"],
registry=registry,
)
messages_received_per_topics_partition = Gauge(
"messages_received_per_topics_partition",
f"{app_name}_messages_received_per_topics_partition",
"Messages received per topic/partition",
["topic", "partition"],
registry=registry,
)
events_runtime_latency = Histogram(
"events_runtime_ms", "Events runtime in ms", registry=registry
f"{app_name}_events_runtime_ms", "Events runtime in ms", registry=registry
)
total_events = Counter(
"total_events", "Total events received", registry=registry
f"{app_name}_total_events", "Total events received", registry=registry
)
total_active_events = Gauge(
"total_active_events", "Total active events", registry=registry
f"{app_name}_total_active_events", "Total active events", registry=registry
)
total_events_per_stream = Counter(
"total_events_per_stream",
f"{app_name}_total_events_per_stream",
"Events received per Stream",
["stream"],
registry=registry,
)
table_operations = Counter(
"table_operations",
f"{app_name}_table_operations",
"Total table operations",
["table", "operation"],
registry=registry,
)
topic_messages_sent = Counter(
"topic_messages_sent",
f"{app_name}_topic_messages_sent",
"Total messages sent per topic",
["topic"],
registry=registry,
)
total_sent_messages = Counter(
"total_sent_messages", "Total messages sent", registry=registry
f"{app_name}_total_sent_messages", "Total messages sent", registry=registry
)
producer_send_latency = Histogram(
"producer_send_latency", "Producer send latency in ms", registry=registry
f"{app_name}_producer_send_latency",
"Producer send latency in ms",
registry=registry,
)
total_error_messages_sent = Counter(
"total_error_messages_sent", "Total error messages sent", registry=registry
f"{app_name}_total_error_messages_sent",
"Total error messages sent",
registry=registry,
)
producer_error_send_latency = Histogram(
"producer_error_send_latency",
f"{app_name}_producer_error_send_latency",
"Producer error send latency in ms",
registry=registry,
)
assignment_operations = Counter(
"assignment_operations",
f"{app_name}_assignment_operations",
"Total assigment operations (completed/error)",
["operation"],
registry=registry,
)
assign_latency = Histogram(
"assign_latency", "Assignment latency in ms", registry=registry
f"{app_name}_assign_latency", "Assignment latency in ms", registry=registry
)
total_rebalances = Gauge(
"total_rebalances", "Total rebalances", registry=registry
f"{app_name}_total_rebalances", "Total rebalances", registry=registry
)
total_rebalances_recovering = Gauge(
"total_rebalances_recovering",
f"{app_name}_total_rebalances_recovering",
"Total rebalances recovering",
registry=registry,
)
rebalance_done_consumer_latency = Histogram(
"rebalance_done_consumer_latency",
f"{app_name}_rebalance_done_consumer_latency",
"Consumer replying that rebalance is done to broker in ms",
registry=registry,
)
rebalance_done_latency = Histogram(
"rebalance_done_latency",
f"{app_name}_rebalance_done_latency",
"Rebalance finished latency in ms",
registry=registry,
)
count_metrics_by_name = Gauge(
"metrics_by_name", "Total metrics by name", ["metric"], registry=registry
f"{app_name}_metrics_by_name",
"Total metrics by name",
["metric"],
registry=registry,
)
http_status_codes = Counter(
"http_status_codes",
f"{app_name}_http_status_codes",
"Total http_status code",
["status_code"],
registry=registry,
)
http_latency = Histogram(
"http_latency", "Http response latency in ms", registry=registry
f"{app_name}_http_latency", "Http response latency in ms", registry=registry
)
topic_partition_end_offset = Gauge(
"topic_partition_end_offset",
f"{app_name}_topic_partition_end_offset",
"Offset ends per topic/partition",
["topic", "partition"],
registry=registry,
)
topic_partition_offset_commited = Gauge(
"topic_partition_offset_commited",
f"{app_name}_topic_partition_offset_commited",
"Offset commited per topic/partition",
["topic", "partition"],
registry=registry,
)
consumer_commit_latency = Histogram(
"consumer_commit_latency",
f"{app_name}_consumer_commit_latency",
"Consumer commit latency in ms",
registry=registry,
)
Expand Down Expand Up @@ -295,7 +307,7 @@ class PrometheusMonitor(Monitor):
from faust.sensors.prometheus import setup_prometheus_sensors
app = faust.App('example', broker='kafka://')
setup_prometheus_sensors(app, pattern='/metrics')
setup_prometheus_sensors(app, pattern='/metrics', 'example_app_name')
"""

ERROR = "error"
Expand Down

0 comments on commit e0010f7

Please sign in to comment.