Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to send compressed payloads for metrics and distribution #466

Merged
merged 4 commits into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion datadog/api/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
import time
import zlib

# datadog
from datadog.api import _api_version, _max_timeouts, _backoff_period
Expand Down Expand Up @@ -46,7 +47,8 @@ def _get_http_client(cls):

@classmethod
def submit(cls, method, path, api_version=None, body=None, attach_host_name=False,
response_formatter=None, error_formatter=None, suppress_response_errors_on_codes=None, **params):
response_formatter=None, error_formatter=None, suppress_response_errors_on_codes=None,
compress_payload=False, **params):
"""
Make an HTTP API request

Expand Down Expand Up @@ -74,6 +76,9 @@ def submit(cls, method, path, api_version=None, body=None, attach_host_name=Fals
status codes
:type suppress_response_errors_on_codes: None|list(int)

:param compress_payload: compress the payload using zlib
:type compress_payload: bool

:param params: dictionary to be sent in the query string of the request
:type params: dictionary

Expand Down Expand Up @@ -135,6 +140,10 @@ def submit(cls, method, path, api_version=None, body=None, attach_host_name=Fals
body = json.dumps(body)
headers['Content-Type'] = 'application/json'

if compress_payload:
body = zlib.compress(body.encode("utf-8"))
headers["Content-Encoding"] = "deflate"

# Construct the URL
url = construct_url(_api_host, api_version, path)

Expand Down
9 changes: 7 additions & 2 deletions datadog/api/distributions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ class Distribution(SendableAPIResource):
_resource_name = 'distribution_points'

@classmethod
def send(cls, distributions=None, attach_host_name=True, **distribution):
def send(cls, distributions=None, attach_host_name=True, compress_payload=False, **distribution):
"""
Submit a distribution metric or a list of distribution metrics to the distribution metric
API

:param compress_payload: compress the payload using zlib
:type compress_payload: bool
:param metric: the name of the time series
:type metric: string
:param points: a (timestamp, [list of values]) pair or
Expand All @@ -33,4 +36,6 @@ def send(cls, distributions=None, attach_host_name=True, **distribution):
# One distribution is sent
distribution['points'] = format_points(distribution['points'])
series_dict = {"series": [distribution]}
return super(Distribution, cls).send(attach_host_name=attach_host_name, **series_dict)
return super(Distribution, cls).send(
attach_host_name=attach_host_name, compress_payload=compress_payload, **series_dict
)
9 changes: 7 additions & 2 deletions datadog/api/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _rename_metric_type(metric):
metric['type'] = metric.pop('metric_type')

@classmethod
def send(cls, metrics=None, attach_host_name=True, **single_metric):
def send(cls, metrics=None, attach_host_name=True, compress_payload=False, **single_metric):
"""
Submit a metric or a list of metrics to the metric API
A metric dictionary should consist of 5 keys: metric, points, host, tags, type (some of which optional),
Expand All @@ -56,6 +56,9 @@ def send(cls, metrics=None, attach_host_name=True, **single_metric):
:param metric: the name of the time series
:type metric: string

:param compress_payload: compress the payload using zlib
:type compress_payload: bool

:param metrics: a list of dictionaries, each item being a metric to send
:type metrics: list

Expand Down Expand Up @@ -99,7 +102,9 @@ def send(cls, metrics=None, attach_host_name=True, **single_metric):
except KeyError:
raise KeyError("'points' parameter is required")

return super(Metric, cls).send(attach_host_name=attach_host_name, **metrics_dict)
return super(Metric, cls).send(
attach_host_name=attach_host_name, compress_payload=compress_payload, **metrics_dict
)

@classmethod
def query(cls, **params):
Expand Down
20 changes: 15 additions & 5 deletions datadog/api/resources.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""
Datadog API resources.
"""
# datadog

from datadog.api.api_client import APIClient

Expand Down Expand Up @@ -56,7 +55,7 @@ class SendableAPIResource(object):
Fork of CreateableAPIResource class with different method names
"""
@classmethod
def send(cls, attach_host_name=False, id=None, **body):
def send(cls, attach_host_name=False, id=None, compress_payload=False, **body):
"""
Create an API resource object

Expand All @@ -66,6 +65,9 @@ def send(cls, attach_host_name=False, id=None, **body):
:param id: create a new resource object as a child of the given object
:type id: id

:param compress_payload: compress the payload using zlib
:type compress_payload: bool

:param body: new resource object attributes
:type body: dictionary

Expand All @@ -74,14 +76,22 @@ def send(cls, attach_host_name=False, id=None, **body):
api_version = getattr(cls, '_api_version', None)

if id is None:
return APIClient.submit('POST', cls._resource_name, api_version, body,
attach_host_name=attach_host_name)
return APIClient.submit(
'POST',
cls._resource_name,
api_version,
body,
attach_host_name=attach_host_name,
compress_payload=compress_payload
)

path = '{resource_name}/{resource_id}'.format(
resource_name=cls._resource_name,
resource_id=id
)
return APIClient.submit('POST', path, api_version, body, attach_host_name=attach_host_name)
return APIClient.submit(
'POST', path, api_version, body, attach_host_name=attach_host_name, compress_payload=compress_payload
)


class UpdatableAPIResource(object):
Expand Down
8 changes: 6 additions & 2 deletions datadog/threadstats/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

class ThreadStats(object):

def __init__(self, namespace="", constant_tags=None):
def __init__(self, namespace="", constant_tags=None, compress_payload=False):
"""
Initialize a threadstats object.

Expand All @@ -36,6 +36,9 @@ def __init__(self, namespace="", constant_tags=None):
:param constant_tags: Tags to attach to every metric reported by this client
:type constant_tags: list of strings

:param compress_payload: compress the payload using zlib
:type compress_payload: bool

:envvar DATADOG_TAGS: Tags to attach to every metric reported by ThreadStats client
:type DATADOG_TAGS: list of strings
"""
Expand All @@ -48,6 +51,7 @@ def __init__(self, namespace="", constant_tags=None):

# State
self._disabled = True
self.compress_payload = compress_payload

def start(self, flush_interval=10, roll_up_interval=10, device=None,
flush_in_thread=True, flush_in_greenlet=False, disabled=False):
Expand Down Expand Up @@ -108,7 +112,7 @@ def start(self, flush_interval=10, roll_up_interval=10, device=None,
# The reporter is responsible for sending metrics off to their final destination.
# It's abstracted to support easy unit testing and in the near future, forwarding
# to the datadog agent.
self.reporter = HttpReporter()
self.reporter = HttpReporter(compress_payload=self.compress_payload)

self._is_flush_in_progress = False
self.flush_count = 0
Expand Down
7 changes: 5 additions & 2 deletions datadog/threadstats/reporters.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ def flush(self, metrics):

class HttpReporter(Reporter):

def __init__(self, compress_payload=False):
self.compress_payload = compress_payload

def flush_distributions(self, distributions):
api.Distribution.send(distributions)
api.Distribution.send(distributions, compress_payload=self.compress_payload)

def flush_metrics(self, metrics):
api.Metric.send(metrics)
api.Metric.send(metrics, compress_payload=self.compress_payload)

def flush_events(self, events):
for event in events:
Expand Down
6 changes: 4 additions & 2 deletions tests/integration/api/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,14 @@ def test_metrics(self):
def retry_condition(r):
return not r["series"]

# Send metrics with single and multi points
# Send metrics with single and multi points, and with compression
assert dog.Metric.send(metric=metric_name_single, points=1, host=host_name)["status"] == "ok"
points = [(now_ts - 60, 1), (now_ts, 2)]
assert dog.Metric.send(metric=metric_name_list, points=points, host=host_name)["status"] == "ok"
points = (now_ts - 60, 1)
assert dog.Metric.send(metric=metric_name_tuple, points=points, host=host_name)["status"] == "ok"
assert dog.Metric.send(
metric=metric_name_tuple, points=points, host=host_name, compress_payload=True
)["status"] == "ok"

metric_query_single = get_with_retry(
"Metric",
Expand Down
72 changes: 65 additions & 7 deletions tests/unit/api/test_api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# stdlib
from copy import deepcopy
from functools import wraps
import json
import os
import tempfile
from time import time
import zlib

# 3p
import mock, pytest
Expand Down Expand Up @@ -205,7 +206,6 @@ def test_initialize_options(self):
initialize(api_key=API_KEY, mute=False)
self.assertRaises(ApiError, MyCreatable.create)


def test_return_raw_response(self):
# Test default initialization sets return_raw_response to False
initialize()
Expand All @@ -217,7 +217,6 @@ def test_return_raw_response(self):
initialize(api_key="aaaaaaaaaa", app_key="123456", return_raw_response=True)
data, raw = api.Monitor.get_all()


def test_default_values(self):
with EnvVars(ignore=[
"DATADOG_API_KEY",
Expand Down Expand Up @@ -442,7 +441,7 @@ def test_actionable(self):
)
self.request_called_with(
'POST',
API_HOST +'/api/v1/actionables/{0}/actionname'.format(str(actionable_object_id)),
API_HOST + '/api/v1/actionables/{0}/actionname'.format(str(actionable_object_id)),
params={},
data={'mydata': 'val', 'mydata2': 'val2'}
)
Expand Down Expand Up @@ -633,14 +632,73 @@ def test_data_type_support(self):
m_long = int(1) # long in Python 3.x

if not is_p3k():
m_long = long(1)
m_long = long(1) # noqa: F821

supported_data_types = [1, 1.0, m_long, Decimal(1), Fraction(1, 2)]

for point in supported_data_types:
serie = dict(metric='metric.numerical', points=point)
self.submit_and_assess_metric_payload(serie)

def test_compression(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💜 While we're here can we explicitly test for compression being False/the default to protect against regressions/the default behavior changing?

"""
Metric and Distribution support zlib compression
"""

# By default, there is no compression
# Metrics
series = dict(metric="metric.1", points=[(time(), 13.)])
Metric.send(attach_host_name=False, **series)
_, kwargs = self.request_mock.call_args()
req_data = kwargs["data"]
headers = kwargs["headers"]
assert "Content-Encoding" not in headers
assert req_data == json.dumps({"series": [series]})
# Same result when explicitely False
Metric.send(compress_payload=False, attach_host_name=False, **series)
_, kwargs = self.request_mock.call_args()
req_data = kwargs["data"]
headers = kwargs["headers"]
assert "Content-Encoding" not in headers
assert req_data == json.dumps({"series": [series]})
# Distributions
series = dict(metric="metric.1", points=[(time(), 13.)])
Distribution.send(attach_host_name=False, **series)
_, kwargs = self.request_mock.call_args()
req_data = kwargs["data"]
headers = kwargs["headers"]
assert "Content-Encoding" not in headers
assert req_data == json.dumps({"series": [series]})
# Same result when explicitely False
Distribution.send(compress_payload=False, attach_host_name=False, **series)
_, kwargs = self.request_mock.call_args()
req_data = kwargs["data"]
headers = kwargs["headers"]
assert "Content-Encoding" not in headers
assert req_data == json.dumps({"series": [series]})

# Enabling compression
# Metrics
series = dict(metric="metric.1", points=[(time(), 13.)])
compressed_series = zlib.compress(json.dumps({"series": [series]}).encode("utf-8"))
Metric.send(compress_payload=True, attach_host_name=False, **series)
_, kwargs = self.request_mock.call_args()
req_data = kwargs["data"]
headers = kwargs["headers"]
assert "Content-Encoding" in headers
assert headers["Content-Encoding"] == "deflate"
assert req_data == compressed_series
# Distributions
series = dict(metric='metric.1', points=[(time(), 13.)])
compressed_series = zlib.compress(json.dumps({"series": [series]}).encode("utf-8"))
Distribution.send(compress_payload=True, attach_host_name=False, **series)
_, kwargs = self.request_mock.call_args()
req_data = kwargs["data"]
headers = kwargs["headers"]
assert "Content-Encoding" in headers
assert headers["Content-Encoding"] == "deflate"
assert req_data == compressed_series


class TestServiceCheckResource(DatadogAPIWithInitialization):

Expand All @@ -664,7 +722,7 @@ class TestUserResource(DatadogAPIWithInitialization):
def test_create_user(self):
User.create(handle="handle", name="name", access_role="ro")
self.request_called_with(
"POST", "https://example.com/api/v1/user", data={"handle": "handle", "name": "name", "access_role":"ro"}
"POST", "https://example.com/api/v1/user", data={"handle": "handle", "name": "name", "access_role": "ro"}
)

def test_get_user(self):
Expand All @@ -676,7 +734,7 @@ def test_update_user(self):
self.request_called_with(
"PUT",
"https://example.com/api/v1/user/handle",
data={"name": "name", "access_role":"ro", "email": "email", "disabled": "disabled"}
data={"name": "name", "access_role": "ro", "email": "email", "disabled": "disabled"}
)

def test_delete_user(self):
Expand Down
20 changes: 14 additions & 6 deletions tests/unit/threadstats/test_threadstats.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ def assertMetric(self, name=None, value=None, tags=None, count=None):
)
)

def test_init(self):
# Test compress_payload setting
t = ThreadStats(compress_payload=True)
t.start()
assert t.reporter.compress_payload is True
t.stop()
# Default value
t = ThreadStats()
t.start()
assert t.reporter.compress_payload is False
t.stop()

def test_timed_decorator(self):
dog = ThreadStats()
dog.start(roll_up_interval=1, flush_in_thread=False)
Expand Down Expand Up @@ -415,7 +427,6 @@ def test_distribution(self):
dog.flush(150.0)
assert_equal(len(reporter.distributions), 0)


def test_default_host_and_device(self):
dog = ThreadStats()
dog.start(roll_up_interval=1, flush_in_thread=False)
Expand Down Expand Up @@ -731,10 +742,9 @@ def test_metric_type(self):
dog.histogram('histogram.1', 20, 100.0)
dog.flush(200.0)

(first, second, p75, p85, p95, p99, avg, cnt,
max_, min_) = self.sort_metrics(reporter.metrics)
(first, second, p75, p85, p95, p99, avg, cnt, max_, min_) = self.sort_metrics(reporter.metrics)

# Assert Metric type
# Assert Metric type
assert_equal(first['type'], 'rate')
assert_equal(second['type'], 'gauge')
assert_equal(p75['type'], 'gauge')
Expand All @@ -746,9 +756,7 @@ def test_metric_type(self):
assert_equal(max_['type'], 'gauge')
assert_equal(min_['type'], 'gauge')


# Test lambda_wrapper (uses ThreadStats under the hood)

def test_basic_lambda_decorator(self):

@datadog_lambda_wrapper
Expand Down