Skip to content

Commit

Permalink
[AIRFLOW-6530] Allow Custom Statsd Client (#7227)
Browse files Browse the repository at this point in the history
Co-authored-by: Usman Arshad <usman.arshad@skyscanner.net>
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Co-authored-by: Craig Rosie <craigrosie7@gmail.com>
  • Loading branch information
4 people committed Mar 15, 2020
1 parent 0740daf commit 8bf6a90
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 2 deletions.
9 changes: 9 additions & 0 deletions airflow/config_templates/config.yml
Expand Up @@ -1452,6 +1452,15 @@
type: string
example: ~
default: ""
- name: statsd_custom_client_path
description: |
If you want to utilise your own custom Statsd client set the relevant
module path below.
Note: The module path must exist on your PYTHONPATH for Airflow to pick it up
version_added: ~
type: string
example: ~
default: ~
- name: max_threads
description: |
The scheduler can run multiple threads in parallel to schedule dags.
Expand Down
5 changes: 5 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Expand Up @@ -703,6 +703,11 @@ statsd_datadog_enabled = False
# List of datadog tags attached to all metrics(e.g: key1:value1,key2:value2)
statsd_datadog_tags =

# If you want to utilise your own custom Statsd client set the relevant
# module path below.
# Note: The module path must exist on your PYTHONPATH for Airflow to pick it up
# statsd_custom_client_path =

# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run.
max_threads = 2
Expand Down
26 changes: 24 additions & 2 deletions airflow/stats.py
Expand Up @@ -200,10 +200,32 @@ def __init__(self, *args, **kwargs):
self.__class__.instance = DummyStatsLogger()
except (socket.gaierror, ImportError) as e:
log.warning("Could not configure StatsClient: %s, using DummyStatsLogger instead.", e)
self.__class__.instance = DummyStatsLogger()

def get_statsd_logger(self):
from statsd import StatsClient
statsd = StatsClient(
if conf.getboolean('scheduler', 'statsd_on'):
from statsd import StatsClient

if conf.has_option('scheduler', 'statsd_custom_client_path'):
custom_statsd_module_path = conf.get('scheduler', 'statsd_custom_client_path')

try:
stats_class = import_string(custom_statsd_module_path)
if not issubclass(stats_class, StatsClient):
raise Exception(
"""Your custom Statsd client must extend the statsd.StatsClient in order to ensure backwards
compatibility.""")
else:
log.info("Successfully loaded custom Statsd client "
f"from {custom_statsd_module_path}")

except Exception as err:
raise ImportError('Unable to load custom Statsd client from '
f'{custom_statsd_module_path} due to {err}')
else:
stats_class = StatsClient

statsd = stats_class(
host=conf.get('scheduler', 'statsd_host'),
port=conf.getint('scheduler', 'statsd_port'),
prefix=conf.get('scheduler', 'statsd_prefix'))
Expand Down
8 changes: 8 additions & 0 deletions docs/metrics.rst
Expand Up @@ -58,6 +58,14 @@ to the stat name if necessary and return the transformed stat name. The function
def my_custom_stat_name_handler(stat_name: str) -> str:
return stat_name.lower()[:32]
If you want to use a custom Statsd client outwith the default one provided by Airflow the following key must be added
to the configuration file alongside the module path of your custom Statsd client. This module must be available on
your PYTHONPATH.

.. code-block:: ini
[scheduler]
statsd_custom_client_path = x.y.customclient
Counters
--------
Expand Down
62 changes: 62 additions & 0 deletions tests/test_stats.py
Expand Up @@ -20,17 +20,53 @@
from unittest import mock
from unittest.mock import Mock

import statsd

import airflow
from airflow.exceptions import InvalidStatsNameException
from airflow.stats import AllowListValidator, SafeDogStatsdLogger, SafeStatsdLogger
from tests.test_utils.config import conf_vars


class CustomStatsd(statsd.StatsClient):
incr_calls = 0

def __init__(self, host=None, port=None, prefix=None):
pass

def incr(self, stat, count=1, rate=1): # pylint: disable=unused-argument
CustomStatsd.incr_calls += 1

@classmethod
def _reset(cls):
cls.incr_calls = 0


class InvalidCustomStatsd:
"""
This custom Statsd class is invalid because it does not subclass
statsd.StatsClient.
"""
incr_calls = 0

def __init__(self, host=None, port=None, prefix=None):
pass

def incr(self, stat, count=1, rate=1): # pylint: disable=unused-argument
InvalidCustomStatsd.incr_calls += 1

@classmethod
def _reset(cls):
cls.incr_calls = 0


class TestStats(unittest.TestCase):

def setUp(self):
self.statsd_client = Mock()
self.stats = SafeStatsdLogger(self.statsd_client)
CustomStatsd._reset()
InvalidCustomStatsd._reset()

def test_increment_counter_with_valid_name(self):
self.stats.incr('test_stats_run')
Expand Down Expand Up @@ -66,6 +102,32 @@ def test_does_not_send_stats_using_dogstatsd(self, mock_dogstatsd):
airflow.stats.Stats.incr("dummy_key")
mock_dogstatsd.return_value.assert_not_called()

@conf_vars({
("scheduler", "statsd_on"): "True",
("scheduler", "statsd_custom_client_path"): "tests.test_stats.CustomStatsd",
})
def test_load_custom_statsd_client(self):
importlib.reload(airflow.stats)
assert isinstance(airflow.stats.Stats.statsd, CustomStatsd)

@conf_vars({
("scheduler", "statsd_on"): "True",
("scheduler", "statsd_custom_client_path"): "tests.test_stats.CustomStatsd",
})
def test_does_use_custom_statsd_client(self):
importlib.reload(airflow.stats)
airflow.stats.Stats.incr("dummy_key")
assert CustomStatsd.incr_calls == 1

@conf_vars({
("scheduler", "statsd_on"): "True",
("scheduler", "statsd_custom_client_path"): "tests.test_stats.InvalidCustomStatsd",
})
def test_load_invalid_custom_stats_client(self):
importlib.reload(airflow.stats)
airflow.stats.Stats.incr("dummy_key")
assert InvalidCustomStatsd.incr_calls == 0

def tearDown(self) -> None:
# To avoid side-effect
importlib.reload(airflow.stats)
Expand Down

0 comments on commit 8bf6a90

Please sign in to comment.