Skip to content

Commit

Permalink
Add ability to send compressed payloads for metrics and distribution (D…
Browse files Browse the repository at this point in the history
  • Loading branch information
zippolyte authored and David Bouchare committed Nov 28, 2019
1 parent 17e067c commit bf398d7
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 29 deletions.
11 changes: 10 additions & 1 deletion datadog/api/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
import time
import zlib

# datadog
from datadog.api import _api_version, _max_timeouts, _backoff_period
Expand Down Expand Up @@ -46,7 +47,8 @@ def _get_http_client(cls):

@classmethod
def submit(cls, method, path, api_version=None, body=None, attach_host_name=False,
response_formatter=None, error_formatter=None, suppress_response_errors_on_codes=None, **params):
response_formatter=None, error_formatter=None, suppress_response_errors_on_codes=None,
compress_payload=False, **params):
"""
Make an HTTP API request
Expand Down Expand Up @@ -74,6 +76,9 @@ def submit(cls, method, path, api_version=None, body=None, attach_host_name=Fals
status codes
:type suppress_response_errors_on_codes: None|list(int)
:param compress_payload: compress the payload using zlib
:type compress_payload: bool
:param params: dictionary to be sent in the query string of the request
:type params: dictionary
Expand Down Expand Up @@ -135,6 +140,10 @@ def submit(cls, method, path, api_version=None, body=None, attach_host_name=Fals
body = json.dumps(body)
headers['Content-Type'] = 'application/json'

if compress_payload:
body = zlib.compress(body.encode("utf-8"))
headers["Content-Encoding"] = "deflate"

# Construct the URL
url = construct_url(_api_host, api_version, path)

Expand Down
9 changes: 7 additions & 2 deletions datadog/api/distributions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ class Distribution(SendableAPIResource):
_resource_name = 'distribution_points'

@classmethod
def send(cls, distributions=None, attach_host_name=True, **distribution):
def send(cls, distributions=None, attach_host_name=True, compress_payload=False, **distribution):
"""
Submit a distribution metric or a list of distribution metrics to the distribution metric
API
:param compress_payload: compress the payload using zlib
:type compress_payload: bool
:param metric: the name of the time series
:type metric: string
:param points: a (timestamp, [list of values]) pair or
Expand All @@ -33,4 +36,6 @@ def send(cls, distributions=None, attach_host_name=True, **distribution):
# One distribution is sent
distribution['points'] = format_points(distribution['points'])
series_dict = {"series": [distribution]}
return super(Distribution, cls).send(attach_host_name=attach_host_name, **series_dict)
return super(Distribution, cls).send(
attach_host_name=attach_host_name, compress_payload=compress_payload, **series_dict
)
9 changes: 7 additions & 2 deletions datadog/api/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _rename_metric_type(metric):
metric['type'] = metric.pop('metric_type')

@classmethod
def send(cls, metrics=None, attach_host_name=True, **single_metric):
def send(cls, metrics=None, attach_host_name=True, compress_payload=False, **single_metric):
"""
Submit a metric or a list of metrics to the metric API
A metric dictionary should consist of 5 keys: metric, points, host, tags, type (some of which optional),
Expand All @@ -56,6 +56,9 @@ def send(cls, metrics=None, attach_host_name=True, **single_metric):
:param metric: the name of the time series
:type metric: string
:param compress_payload: compress the payload using zlib
:type compress_payload: bool
:param metrics: a list of dictionaries, each item being a metric to send
:type metrics: list
Expand Down Expand Up @@ -99,7 +102,9 @@ def send(cls, metrics=None, attach_host_name=True, **single_metric):
except KeyError:
raise KeyError("'points' parameter is required")

return super(Metric, cls).send(attach_host_name=attach_host_name, **metrics_dict)
return super(Metric, cls).send(
attach_host_name=attach_host_name, compress_payload=compress_payload, **metrics_dict
)

@classmethod
def query(cls, **params):
Expand Down
20 changes: 15 additions & 5 deletions datadog/api/resources.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""
Datadog API resources.
"""
# datadog

from datadog.api.api_client import APIClient

Expand Down Expand Up @@ -56,7 +55,7 @@ class SendableAPIResource(object):
Fork of CreateableAPIResource class with different method names
"""
@classmethod
def send(cls, attach_host_name=False, id=None, **body):
def send(cls, attach_host_name=False, id=None, compress_payload=False, **body):
"""
Create an API resource object
Expand All @@ -66,6 +65,9 @@ def send(cls, attach_host_name=False, id=None, **body):
:param id: create a new resource object as a child of the given object
:type id: id
:param compress_payload: compress the payload using zlib
:type compress_payload: bool
:param body: new resource object attributes
:type body: dictionary
Expand All @@ -74,14 +76,22 @@ def send(cls, attach_host_name=False, id=None, **body):
api_version = getattr(cls, '_api_version', None)

if id is None:
return APIClient.submit('POST', cls._resource_name, api_version, body,
attach_host_name=attach_host_name)
return APIClient.submit(
'POST',
cls._resource_name,
api_version,
body,
attach_host_name=attach_host_name,
compress_payload=compress_payload
)

path = '{resource_name}/{resource_id}'.format(
resource_name=cls._resource_name,
resource_id=id
)
return APIClient.submit('POST', path, api_version, body, attach_host_name=attach_host_name)
return APIClient.submit(
'POST', path, api_version, body, attach_host_name=attach_host_name, compress_payload=compress_payload
)


class UpdatableAPIResource(object):
Expand Down
8 changes: 6 additions & 2 deletions datadog/threadstats/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

class ThreadStats(object):

def __init__(self, namespace="", constant_tags=None):
def __init__(self, namespace="", constant_tags=None, compress_payload=False):
"""
Initialize a threadstats object.
Expand All @@ -36,6 +36,9 @@ def __init__(self, namespace="", constant_tags=None):
:param constant_tags: Tags to attach to every metric reported by this client
:type constant_tags: list of strings
:param compress_payload: compress the payload using zlib
:type compress_payload: bool
:envvar DATADOG_TAGS: Tags to attach to every metric reported by ThreadStats client
:type DATADOG_TAGS: list of strings
"""
Expand All @@ -48,6 +51,7 @@ def __init__(self, namespace="", constant_tags=None):

# State
self._disabled = True
self.compress_payload = compress_payload

def start(self, flush_interval=10, roll_up_interval=10, device=None,
flush_in_thread=True, flush_in_greenlet=False, disabled=False):
Expand Down Expand Up @@ -108,7 +112,7 @@ def start(self, flush_interval=10, roll_up_interval=10, device=None,
# The reporter is responsible for sending metrics off to their final destination.
# It's abstracted to support easy unit testing and in the near future, forwarding
# to the datadog agent.
self.reporter = HttpReporter()
self.reporter = HttpReporter(compress_payload=self.compress_payload)

self._is_flush_in_progress = False
self.flush_count = 0
Expand Down
7 changes: 5 additions & 2 deletions datadog/threadstats/reporters.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ def flush(self, metrics):

class HttpReporter(Reporter):

def __init__(self, compress_payload=False):
self.compress_payload = compress_payload

def flush_distributions(self, distributions):
api.Distribution.send(distributions)
api.Distribution.send(distributions, compress_payload=self.compress_payload)

def flush_metrics(self, metrics):
api.Metric.send(metrics)
api.Metric.send(metrics, compress_payload=self.compress_payload)

def flush_events(self, events):
for event in events:
Expand Down
6 changes: 4 additions & 2 deletions tests/integration/api/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,14 @@ def test_metrics(self):
def retry_condition(r):
return not r["series"]

# Send metrics with single and multi points
# Send metrics with single and multi points, and with compression
assert dog.Metric.send(metric=metric_name_single, points=1, host=host_name)["status"] == "ok"
points = [(now_ts - 60, 1), (now_ts, 2)]
assert dog.Metric.send(metric=metric_name_list, points=points, host=host_name)["status"] == "ok"
points = (now_ts - 60, 1)
assert dog.Metric.send(metric=metric_name_tuple, points=points, host=host_name)["status"] == "ok"
assert dog.Metric.send(
metric=metric_name_tuple, points=points, host=host_name, compress_payload=True
)["status"] == "ok"

metric_query_single = get_with_retry(
"Metric",
Expand Down
72 changes: 65 additions & 7 deletions tests/unit/api/test_api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# stdlib
from copy import deepcopy
from functools import wraps
import json
import os
import tempfile
from time import time
import zlib

# 3p
import mock, pytest
Expand Down Expand Up @@ -205,7 +206,6 @@ def test_initialize_options(self):
initialize(api_key=API_KEY, mute=False)
self.assertRaises(ApiError, MyCreatable.create)


def test_return_raw_response(self):
# Test default initialization sets return_raw_response to False
initialize()
Expand All @@ -217,7 +217,6 @@ def test_return_raw_response(self):
initialize(api_key="aaaaaaaaaa", app_key="123456", return_raw_response=True)
data, raw = api.Monitor.get_all()


def test_default_values(self):
with EnvVars(ignore=[
"DATADOG_API_KEY",
Expand Down Expand Up @@ -442,7 +441,7 @@ def test_actionable(self):
)
self.request_called_with(
'POST',
API_HOST +'/api/v1/actionables/{0}/actionname'.format(str(actionable_object_id)),
API_HOST + '/api/v1/actionables/{0}/actionname'.format(str(actionable_object_id)),
params={},
data={'mydata': 'val', 'mydata2': 'val2'}
)
Expand Down Expand Up @@ -633,14 +632,73 @@ def test_data_type_support(self):
m_long = int(1) # long in Python 3.x

if not is_p3k():
m_long = long(1)
m_long = long(1) # noqa: F821

supported_data_types = [1, 1.0, m_long, Decimal(1), Fraction(1, 2)]

for point in supported_data_types:
serie = dict(metric='metric.numerical', points=point)
self.submit_and_assess_metric_payload(serie)

def test_compression(self):
"""
Metric and Distribution support zlib compression
"""

# By default, there is no compression
# Metrics
series = dict(metric="metric.1", points=[(time(), 13.)])
Metric.send(attach_host_name=False, **series)
_, kwargs = self.request_mock.call_args()
req_data = kwargs["data"]
headers = kwargs["headers"]
assert "Content-Encoding" not in headers
assert req_data == json.dumps({"series": [series]})
# Same result when explicitely False
Metric.send(compress_payload=False, attach_host_name=False, **series)
_, kwargs = self.request_mock.call_args()
req_data = kwargs["data"]
headers = kwargs["headers"]
assert "Content-Encoding" not in headers
assert req_data == json.dumps({"series": [series]})
# Distributions
series = dict(metric="metric.1", points=[(time(), 13.)])
Distribution.send(attach_host_name=False, **series)
_, kwargs = self.request_mock.call_args()
req_data = kwargs["data"]
headers = kwargs["headers"]
assert "Content-Encoding" not in headers
assert req_data == json.dumps({"series": [series]})
# Same result when explicitely False
Distribution.send(compress_payload=False, attach_host_name=False, **series)
_, kwargs = self.request_mock.call_args()
req_data = kwargs["data"]
headers = kwargs["headers"]
assert "Content-Encoding" not in headers
assert req_data == json.dumps({"series": [series]})

# Enabling compression
# Metrics
series = dict(metric="metric.1", points=[(time(), 13.)])
compressed_series = zlib.compress(json.dumps({"series": [series]}).encode("utf-8"))
Metric.send(compress_payload=True, attach_host_name=False, **series)
_, kwargs = self.request_mock.call_args()
req_data = kwargs["data"]
headers = kwargs["headers"]
assert "Content-Encoding" in headers
assert headers["Content-Encoding"] == "deflate"
assert req_data == compressed_series
# Distributions
series = dict(metric='metric.1', points=[(time(), 13.)])
compressed_series = zlib.compress(json.dumps({"series": [series]}).encode("utf-8"))
Distribution.send(compress_payload=True, attach_host_name=False, **series)
_, kwargs = self.request_mock.call_args()
req_data = kwargs["data"]
headers = kwargs["headers"]
assert "Content-Encoding" in headers
assert headers["Content-Encoding"] == "deflate"
assert req_data == compressed_series


class TestServiceCheckResource(DatadogAPIWithInitialization):

Expand All @@ -664,7 +722,7 @@ class TestUserResource(DatadogAPIWithInitialization):
def test_create_user(self):
User.create(handle="handle", name="name", access_role="ro")
self.request_called_with(
"POST", "https://example.com/api/v1/user", data={"handle": "handle", "name": "name", "access_role":"ro"}
"POST", "https://example.com/api/v1/user", data={"handle": "handle", "name": "name", "access_role": "ro"}
)

def test_get_user(self):
Expand All @@ -676,7 +734,7 @@ def test_update_user(self):
self.request_called_with(
"PUT",
"https://example.com/api/v1/user/handle",
data={"name": "name", "access_role":"ro", "email": "email", "disabled": "disabled"}
data={"name": "name", "access_role": "ro", "email": "email", "disabled": "disabled"}
)

def test_delete_user(self):
Expand Down
20 changes: 14 additions & 6 deletions tests/unit/threadstats/test_threadstats.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ def assertMetric(self, name=None, value=None, tags=None, count=None):
)
)

def test_init(self):
# Test compress_payload setting
t = ThreadStats(compress_payload=True)
t.start()
assert t.reporter.compress_payload is True
t.stop()
# Default value
t = ThreadStats()
t.start()
assert t.reporter.compress_payload is False
t.stop()

def test_timed_decorator(self):
dog = ThreadStats()
dog.start(roll_up_interval=1, flush_in_thread=False)
Expand Down Expand Up @@ -415,7 +427,6 @@ def test_distribution(self):
dog.flush(150.0)
assert_equal(len(reporter.distributions), 0)


def test_default_host_and_device(self):
dog = ThreadStats()
dog.start(roll_up_interval=1, flush_in_thread=False)
Expand Down Expand Up @@ -731,10 +742,9 @@ def test_metric_type(self):
dog.histogram('histogram.1', 20, 100.0)
dog.flush(200.0)

(first, second, p75, p85, p95, p99, avg, cnt,
max_, min_) = self.sort_metrics(reporter.metrics)
(first, second, p75, p85, p95, p99, avg, cnt, max_, min_) = self.sort_metrics(reporter.metrics)

# Assert Metric type
# Assert Metric type
assert_equal(first['type'], 'rate')
assert_equal(second['type'], 'gauge')
assert_equal(p75['type'], 'gauge')
Expand All @@ -746,9 +756,7 @@ def test_metric_type(self):
assert_equal(max_['type'], 'gauge')
assert_equal(min_['type'], 'gauge')


# Test lambda_wrapper (uses ThreadStats under the hood)

def test_basic_lambda_decorator(self):

@datadog_lambda_wrapper
Expand Down

0 comments on commit bf398d7

Please sign in to comment.