Skip to content

Commit

Permalink
Merge 7881f09 into 17b825a
Browse files Browse the repository at this point in the history
  • Loading branch information
HazardDede committed Nov 19, 2019
2 parents 17b825a + 7881f09 commit 510bfaf
Show file tree
Hide file tree
Showing 13 changed files with 439 additions and 31 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pylint:
mypy:
mypy \
$(SOURCE_PATH)/logging.py \
$(SOURCE_PATH)/metrics.py \
$(SOURCE_PATH)/models.py \
$(SOURCE_PATH)/selector.py \
$(SOURCE_PATH)/utils.py \
Expand Down
20 changes: 17 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
5.4\. [Advanced selector expressions (0.12.0+)](#advancedselectorexpressions0.12.0+)
5.5\. [UDF Throttle (0.15.0+)](#udfthrottle0.15.0+)
5.6\. [Docker images](#dockerimages)
5.7\. [Prometheus metrics](#prometheusmetrics)
6\. [Plugins](#plugins)
7\. [Changelog](#changelog)

Expand Down Expand Up @@ -93,15 +94,18 @@ Tip: You can validate your config without actually executing it with
Pull 'n' Push
Usage:
pnp [(-c | --check)] [(-v | --verbose)] [--log=<log_conf>] <configuration>
pnp [(--engine=<engine>)] [(-v | --verbose)] [--log=<log_conf>] [--metrics=<port>] <configuration>
pnp (-c | --check) <configuration>
pnp (-h | --help)
pnp --version
Options:
-c --check Only check configuration and do not run it.
-v --verbose Switches log level to debug.
--log=<log_conf> Specify logging configuration to load.
--engine=<engine> Override engine from configuration file (thread, process, sequential, async).
-h --help Show this screen.
--log=<log_conf> Specify logging configuration to load.
--metrics=<port> Enable prometheus metrics server on the specified port.
-v --verbose Switches log level to debug.
--version Show version.
```

Expand Down Expand Up @@ -603,6 +607,16 @@ docker run --rm \
hazard/pnp:latest
```

<a name="prometheusmetrics"></a>

### 5.7\. Prometheus metrics

By passing `--metrics=<port>` to `pnp` at the command line level or by setting the `exporting PNP_METRICS=<port>`
pull 'n' push will start a prometheus server to serve various metrics to a prometheus scraper.

Please see [this](https://prometheus.io/docs/prometheus/latest/configuration/configuration/) how to configure
prometheus correctly.

<a name="plugins"></a>

## 6\. Plugins
Expand Down
17 changes: 14 additions & 3 deletions README.mdpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,18 @@ Tip: You can validate your config without actually executing it with
Pull 'n' Push

Usage:
pnp [(-c | --check)] [(-v | --verbose)] [--log=<log_conf>] <configuration>
pnp [(--engine=<engine>)] [(-v | --verbose)] [--log=<log_conf>] [--metrics=<port>] <configuration>
pnp (-c | --check) <configuration>
pnp (-h | --help)
pnp --version

Options:
-c --check Only check configuration and do not run it.
-v --verbose Switches log level to debug.
--log=<log_conf> Specify logging configuration to load.
--engine=<engine> Override engine from configuration file (thread, process, sequential, async).
-h --help Show this screen.
--log=<log_conf> Specify logging configuration to load.
--metrics=<port> Enable prometheus metrics server on the specified port.
-v --verbose Switches log level to debug.
--version Show version.
```

Expand Down Expand Up @@ -306,6 +309,14 @@ docker run --rm \
hazard/pnp:latest
```

### Prometheus metrics

By passing `--metrics=<port>` to `pnp` at the command line level or by setting the `exporting PNP_METRICS=<port>`
pull 'n' push will start a prometheus server to serve various metrics to a prometheus scraper.

Please see [this](https://prometheus.io/docs/prometheus/latest/configuration/configuration/) how to configure
prometheus correctly.

## Plugins

A complete list of plugins is available [here](https://github.com/HazardDede/pnp/blob/master/docs/plugins/README.md)
Expand Down
234 changes: 234 additions & 0 deletions pnp/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
"""Metric related helper methods and classes."""
import functools
from typing import Optional, Callable, Any

import prometheus_client as metric # type: ignore

from .typing import Component, Payload
from .utils import CallbackTimer, Loggable, class_fullname

PULL_LABELS = ['pull', 'classname']

PULL_EVENTS_EMITTED_TOTAL = metric.Counter(
name='pull_events_emitted_total',
documentation="Number of events emitted from the pull",
labelnames=PULL_LABELS
)

PULL_EXITS_TOTAL = metric.Counter(
name='pull_exits_total',
documentation="Number of times the pull was restarted",
labelnames=PULL_LABELS
)

PULL_FAILS_TOTAL = metric.Counter(
name='pull_fails_total',
documentation="Number of times the pull has failed",
labelnames=PULL_LABELS
)

PULL_POLL_EXECUTION_SECONDS = metric.Histogram(
name='pull_poll_execution_time_seconds',
documentation="Measures execution time of a pull poll operation",
labelnames=PULL_LABELS
)

PULL_STARTS_TOTAL = metric.Counter(
name='pull_starts_total',
documentation="Number of times the pull has started",
labelnames=PULL_LABELS
)


PUSH_LABELS = ['push', 'classname']

PUSH_EVENTS_PROCESSED_TOTAL = metric.Counter(
name='push_events_processed_total',
documentation="Number of events the push has processed",
labelnames=PUSH_LABELS
)

PUSH_EVENTS_FAILED_TOTAL = metric.Counter(
name='push_events_failed_total',
documentation="Number of events the push has failed to process",
labelnames=PUSH_LABELS
)

PUSH_EXECUTION_TIME_SECONDS = metric.Histogram(
name='push_execution_time_seconds',
documentation="Measures execution time of a push operation",
labelnames=PUSH_LABELS
)

UDF_LABELS = ['udf', 'classname']

UDF_EVENTS_PROCESSED_TOTAL = metric.Counter(
name='udf_events_processed_total',
documentation="Number of events the udf has processed",
labelnames=UDF_LABELS
)

UDF_EVENTS_FAILED_TOTAL = metric.Counter(
name='udf_events_failed_total',
documentation="Number of events the udf has failed to process",
labelnames=UDF_LABELS
)

UDF_EXECUTION_TIME_SECONDS = metric.Histogram(
name='udf_execution_time_seconds',
documentation="Measures execution time of a udf operation",
labelnames=UDF_LABELS
)


def start_httpd(port: int) -> None:
"""Start the metrics server."""
# The start_http_server actually spawns a daemon thread
metric.start_http_server(port)


class ComponentMetrics(Loggable):
"""Generic class for common metric operations. All metric components should subclass
from this."""

EVENTS_TOTAL_METRIC = None # type: Optional[metric.Counter]
EXECUTION_TIME_METRIC = None # type: Optional[metric.Histogram]
FAILS_TOTAL_METRIC = None # type: Optional[metric.Counter]

def __init__(self, **labels: str):
self._labels = labels
self._reset_metrics()

def _reset_metrics(self) -> None:
if self.EVENTS_TOTAL_METRIC:
self.EVENTS_TOTAL_METRIC.labels(**self._labels).inc(0)
if self.FAILS_TOTAL_METRIC:
self.FAILS_TOTAL_METRIC.labels(**self._labels).inc(0)

def track_event(self, amount: int = 1) -> None:
"""Tracks if one or more events passed this component."""
if not self.EVENTS_TOTAL_METRIC:
return

amount = int(amount)
if amount <= 0:
self.logger.warning("Method `track_event` was called with an amount of 0"
"or less. This seems to be unintended.")
return

self.EVENTS_TOTAL_METRIC.labels(**self._labels).inc(amount)

def track_fail(self) -> None:
"""Tracks if the component has failed for some reason."""
if not self.FAILS_TOTAL_METRIC:
return

self.FAILS_TOTAL_METRIC.labels(**self._labels).inc()

def track_execution_time(self, elapsed: float) -> None:
"""Tracks the execution time of the component primary operation."""
if not self.EXECUTION_TIME_METRIC:
return
self.EXECUTION_TIME_METRIC.labels(**self._labels).observe(elapsed)


class PullMetrics(ComponentMetrics):
"""Pull related metrics."""

EVENTS_TOTAL_METRIC = PULL_EVENTS_EMITTED_TOTAL
EXECUTION_TIME_METRIC = PULL_POLL_EXECUTION_SECONDS
FAILS_TOTAL_METRIC = PULL_FAILS_TOTAL

def __init__(self, pull: Component):
super().__init__(pull=pull.name, classname=class_fullname(pull))

def _reset_metrics(self) -> None:
super()._reset_metrics()
PULL_EXITS_TOTAL.labels(**self._labels).inc(0)
PULL_STARTS_TOTAL.labels(**self._labels).inc(0)

def track_start(self) -> None:
"""Track the start of a pull."""
PULL_STARTS_TOTAL.labels(**self._labels).inc()

def track_exit(self) -> None:
"""Track the exit of a pull."""
PULL_EXITS_TOTAL.labels(**self._labels).inc()


class PushMetrics(ComponentMetrics):
"""Push related metrics."""

EVENTS_TOTAL_METRIC = PUSH_EVENTS_PROCESSED_TOTAL
EXECUTION_TIME_METRIC = PUSH_EXECUTION_TIME_SECONDS
FAILS_TOTAL_METRIC = PUSH_EVENTS_FAILED_TOTAL

def __init__(self, push: Component):
super().__init__(push=push.name, classname=class_fullname(push))


class UDFMetrics(ComponentMetrics):
"""User-defined-functions related metrics."""

EVENTS_TOTAL_METRIC = UDF_EVENTS_PROCESSED_TOTAL
EXECUTION_TIME_METRIC = UDF_EXECUTION_TIME_SECONDS
FAILS_TOTAL_METRIC = UDF_EVENTS_FAILED_TOTAL

def __init__(self, udf: Component):
super().__init__(udf=udf.name, classname=class_fullname(udf))


def _track_call(fun_enter: Callable[[], None], fun_exit: Callable[[], None],
fun_runtime: Callable[[float], None]) -> Callable[..., Payload]:
"""Decorator to track related metrics when calling a method.
Will track the increase the event counter, the error count (if any) and observe the
operation execution time."""
def _inner(fun: Callable[..., Payload]) -> Callable[..., Payload]:
def _observe(elapsed: float) -> None:
fun_runtime(elapsed)

@functools.wraps(fun)
def _wrapper(*args: Any, **kwargs: Any) -> Any:
fun_enter()
try:
with CallbackTimer(_observe):
return fun(*args, **kwargs)
except:
fun_exit()
raise

@functools.wraps(fun)
async def _async_wrapper(*args: Any, **kwargs: Any) -> Any:
fun_enter()
try:
with CallbackTimer(_observe):
return await fun(*args, **kwargs)
except:
fun_exit()
raise

import inspect
wrap_fun = _wrapper
if inspect.iscoroutinefunction(fun):
wrap_fun = _async_wrapper

# Keep a loop hole to avoid this decorator
wrap_fun.original_ = fun # type: ignore
return wrap_fun

return _inner


def track_pull(metrics: PullMetrics) -> Callable[..., Payload]:
"""Decorator to track pull related metrics when running a pull.
Will track the number of starts (maybe multiple due to errors) and the number of exits.
"""
return _track_call(metrics.track_start, metrics.track_exit, lambda elapsed: None)


def track_event(metrics: ComponentMetrics) -> Callable[..., Payload]:
"""Decorator to track related metrics when calling a method.
Will track the increase the event counter, the error count (if any) and observe the
operation execution time."""

return _track_call(metrics.track_event, metrics.track_fail, metrics.track_execution_time)

0 comments on commit 510bfaf

Please sign in to comment.