Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Submit metrics for transform tasks
Browse files Browse the repository at this point in the history
This submits coarse statsd metrics for transform tasks:
* Counter for task dispatches
* Timer for running tasks
* Task error counter.
  • Loading branch information
mattrco committed Feb 5, 2015
1 parent 607ed5f commit afbc8f2
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
21 changes: 21 additions & 0 deletions backdrop/core/errors.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from functools import wraps


class BackdropError(StandardError):
pass

Expand All @@ -23,3 +26,21 @@ class InvalidOperationError(ValueError):
"""Raised if an invalid collect function is provided, or if an error
is raised from a collect function"""
pass


def incr_on_error(stats_client, metric_name):
"""
Increments metric_name if func() raises an exception, then
re-raises the exception.
"""

def decorator(func):
@wraps(func)
def wrapped(*args, **kwargs):
try:
func(*args, **kwargs)
except:
stats_client.incr(metric_name)
raise
return wrapped
return decorator
11 changes: 11 additions & 0 deletions backdrop/transformers/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

from os import getenv

from statsd import StatsClient

from backdrop.core.log_handler import get_log_file_handler
from backdrop.core.errors import incr_on_error

from worker import app, config

Expand All @@ -15,6 +18,9 @@
logger.addHandler(
get_log_file_handler("log/{}.log".format(GOVUK_ENV), logging.DEBUG))

stats_client = StatsClient(prefix=getenv("GOVUK_STATSD_PREFIX",
"pp.apps.backdrop.transformers.worker"))


@app.task(ignore_result=True)
def entrypoint(dataset_id, earliest, latest):
Expand All @@ -36,6 +42,7 @@ def entrypoint(dataset_id, earliest, latest):
'backdrop.transformers.dispatch.run_transform',
args=(data_set_config, transform, earliest, latest)
)
stats_client.incr('dispatch')


def get_query_parameters(transform, earliest, latest):
Expand Down Expand Up @@ -93,6 +100,8 @@ def get_or_get_and_create_output_dataset(transform, input_dataset):


@app.task(ignore_result=True)
@stats_client.timer('run_transform')
@incr_on_error(stats_client, 'run_transform.error')
def run_transform(data_set_config, transform, earliest, latest):
data_set = DataSet.from_group_and_type(
config.BACKDROP_READ_URL,
Expand All @@ -113,3 +122,5 @@ def run_transform(data_set_config, transform, earliest, latest):
transform,
data_set_config)
output_data_set.post(transformed_data)

stats_client.incr('run_transform.success')

0 comments on commit afbc8f2

Please sign in to comment.