Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New method to post events from UDP packets to payloads #852

Merged
merged 12 commits into from
Mar 10, 2014
8 changes: 4 additions & 4 deletions aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,13 +416,13 @@ def submit_metric(self, name, value, mtype, tags=None, hostname=None,

def event(self, title, text, date_happened=None, alert_type=None, aggregation_key=None, source_type_name=None, priority=None, tags=None, hostname=None):
event = {
'title': title,
'text': text,
'msg_title': title,
'msg_text': text,
}
if date_happened is not None:
event['date_happened'] = date_happened
event['timestamp'] = date_happened
else:
event['date_happened'] = int(time())
event['timestamp'] = int(time())
if alert_type is not None:
event['alert_type'] = alert_type
if aggregation_key is not None:
Expand Down
48 changes: 31 additions & 17 deletions dogstatsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import sys
from time import time
import threading
import math
from urllib import urlencode

# project
from aggregator import MetricsBucketAggregator
from checks.check_status import DogstatsdStatus
from config import get_config
from daemon import Daemon, AgentSupervisor
from util import json, PidFile, get_hostname, plural
from util import json, PidFile, get_hostname, plural, get_uuid, chunks

log = logging.getLogger('dogstatsd')

Expand All @@ -40,6 +41,7 @@
FLUSH_LOGGING_PERIOD = 70
FLUSH_LOGGING_INITIAL = 10
FLUSH_LOGGING_COUNT = 5
EVENT_CHUNK_SIZE = 50

def serialize_metrics(metrics):
return json.dumps({"series" : metrics})
Expand All @@ -53,7 +55,7 @@ class Reporter(threading.Thread):
server.
"""

def __init__(self, interval, metrics_aggregator, api_host, api_key=None, use_watchdog=False):
def __init__(self, interval, metrics_aggregator, api_host, api_key=None, use_watchdog=False, event_chunk_size=None):
threading.Thread.__init__(self)
self.interval = int(interval)
self.finished = threading.Event()
Expand All @@ -68,6 +70,7 @@ def __init__(self, interval, metrics_aggregator, api_host, api_key=None, use_wat

self.api_key = api_key
self.api_host = api_host
self.event_chunk_size = event_chunk_size or EVENT_CHUNK_SIZE

self.http_conn_cls = http_client.HTTPSConnection

Expand Down Expand Up @@ -175,28 +178,38 @@ def submit_events(self, events):
headers = {'Content-Type':'application/json'}
method = 'POST'

params = {}
if self.api_key:
params['api_key'] = self.api_key
url = '/api/v1/events?%s' % urlencode(params)

status = None
conn = self.http_conn_cls(self.api_host)
try:
for event in events:
events_len = len(events)
event_chunk_size = self.event_chunk_size

for chunk in chunks(events, event_chunk_size):
payload = {
'apiKey': self.api_key,
'events': {
'api': chunk
},
'uuid': get_uuid(),
'internalHostname': get_hostname()
}
params = {}
if self.api_key:
params['api_key'] = self.api_key
url = '/intake?%s' % urlencode(params)

status = None
conn = self.http_conn_cls(self.api_host)
try:
start_time = time()
body = serialize_event(event)
log.debug('Sending event: %s' % body)
conn.request(method, url, body, headers)
conn.request(method, url, json.dumps(payload), headers)

response = conn.getresponse()
status = response.status
response.close()
duration = round((time() - start_time) * 1000.0, 4)
log.debug("%s %s %s%s (%sms)" % (
status, method, self.api_host, url, duration))
finally:
conn.close()

finally:
conn.close()

class Server(object):
"""
Expand Down Expand Up @@ -336,6 +349,7 @@ def init(config_path=None, use_watchdog=False, use_forwarder=False):
non_local_traffic = c['non_local_traffic']
forward_to_host = c.get('statsd_forward_host')
forward_to_port = c.get('statsd_forward_port')
event_chunk_size = c.get('event_chunk_size')

target = c['dd_url']
if use_forwarder:
Expand All @@ -350,7 +364,7 @@ def init(config_path=None, use_watchdog=False, use_forwarder=False):
aggregator = MetricsBucketAggregator(hostname, aggregator_interval, recent_point_threshold=c.get('recent_point_threshold', None))

# Start the reporting thread.
reporter = Reporter(interval, aggregator, target, api_key, use_watchdog)
reporter = Reporter(interval, aggregator, target, api_key, use_watchdog, event_chunk_size)

# Start the server on an IPv4 stack
# Default to loopback
Expand Down
36 changes: 18 additions & 18 deletions tests/test_bucket_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def sort_by(m):
@staticmethod
def sort_events(metrics):
def sort_by(m):
return (m['title'], m['text'], ','.join(m.get('tags', None) or []))
return (m['msg_title'], m['msg_text'], ','.join(m.get('tags', None) or []))
return sorted(metrics, key=sort_by)

def sleep_for_interval_length(self, interval=None):
Expand Down Expand Up @@ -883,19 +883,19 @@ def test_event_tags(self):
assert True
else:
assert False, "event['tags'] shouldn't be defined when no tags aren't explicited in the packet"
nt.assert_equal(first['title'], 'title1')
nt.assert_equal(first['text'], 'text')
nt.assert_equal(first['msg_title'], 'title1')
nt.assert_equal(first['msg_text'], 'text')

nt.assert_equal(second['title'], 'title2')
nt.assert_equal(second['text'], 'text')
nt.assert_equal(second['msg_title'], 'title2')
nt.assert_equal(second['msg_text'], 'text')
nt.assert_equal(second['tags'], sorted(['t1']))

nt.assert_equal(third['title'], 'title3')
nt.assert_equal(third['text'], 'text')
nt.assert_equal(third['msg_title'], 'title3')
nt.assert_equal(third['msg_text'], 'text')
nt.assert_equal(third['tags'], sorted(['t1', 't2:v2', 't3', 't4']))

nt.assert_equal(fourth['title'], 'title4')
nt.assert_equal(fourth['text'], 'text')
nt.assert_equal(fourth['msg_title'], 'title4')
nt.assert_equal(fourth['msg_text'], 'text')
nt.assert_equal(fourth['aggregation_key'], 'key')
nt.assert_equal(fourth['priority'], 'normal')
nt.assert_equal(fourth['tags'], sorted(['t1', 't2']))
Expand All @@ -913,11 +913,11 @@ def test_event_title(self):
assert len(events) == 5
first, second, third, fourth, fifth = events

nt.assert_equal(first['title'], '')
nt.assert_equal(second['title'], u'2intitulé')
nt.assert_equal(third['title'], '3title content')
nt.assert_equal(fourth['title'], '4title|content')
nt.assert_equal(fifth['title'], '5title\\ntitle')
nt.assert_equal(first['msg_title'], '')
nt.assert_equal(second['msg_title'], u'2intitulé')
nt.assert_equal(third['msg_title'], '3title content')
nt.assert_equal(fourth['msg_title'], '4title|content')
nt.assert_equal(fifth['msg_title'], '5title\\ntitle')

def test_event_text(self):
stats = MetricsBucketAggregator('myhost', interval=self.interval)
Expand All @@ -931,10 +931,10 @@ def test_event_text(self):
assert len(events) == 4
first, second, third, fourth = events

nt.assert_equal(first['text'], '')
nt.assert_equal(second['text'], 'text|content')
nt.assert_equal(third['text'], 'First line\nSecond line')
nt.assert_equal(fourth['text'], u'♬ †øU †øU ¥ºu T0µ ♪')
nt.assert_equal(first['msg_text'], '')
nt.assert_equal(second['msg_text'], 'text|content')
nt.assert_equal(third['msg_text'], 'First line\nSecond line')
nt.assert_equal(fourth['msg_text'], u'♬ †øU †øU ¥ºu T0µ ♪')

def test_recent_point_threshold(self):
ag_interval = 1
Expand Down
36 changes: 18 additions & 18 deletions tests/test_dogstatsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def sort_by(m):
@staticmethod
def sort_events(metrics):
def sort_by(m):
return (m['title'], m['text'], ','.join(m.get('tags', None) or []))
return (m['msg_title'], m['msg_text'], ','.join(m.get('tags', None) or []))
return sorted(metrics, key=sort_by)

@staticmethod
Expand Down Expand Up @@ -466,19 +466,19 @@ def test_event_tags(self):
assert True
else:
assert False, "event['tags'] shouldn't be defined when no tags aren't explicited in the packet"
nt.assert_equal(first['title'], 'title1')
nt.assert_equal(first['text'], 'text')
nt.assert_equal(first['msg_title'], 'title1')
nt.assert_equal(first['msg_text'], 'text')

nt.assert_equal(second['title'], 'title2')
nt.assert_equal(second['text'], 'text')
nt.assert_equal(second['msg_title'], 'title2')
nt.assert_equal(second['msg_text'], 'text')
nt.assert_equal(second['tags'], sorted(['t1']))

nt.assert_equal(third['title'], 'title3')
nt.assert_equal(third['text'], 'text')
nt.assert_equal(third['msg_title'], 'title3')
nt.assert_equal(third['msg_text'], 'text')
nt.assert_equal(third['tags'], sorted(['t1', 't2:v2', 't3', 't4']))

nt.assert_equal(fourth['title'], 'title4')
nt.assert_equal(fourth['text'], 'text')
nt.assert_equal(fourth['msg_title'], 'title4')
nt.assert_equal(fourth['msg_text'], 'text')
nt.assert_equal(fourth['aggregation_key'], 'key')
nt.assert_equal(fourth['priority'], 'normal')
nt.assert_equal(fourth['tags'], sorted(['t1', 't2']))
Expand All @@ -496,11 +496,11 @@ def test_event_title(self):
assert len(events) == 5
first, second, third, fourth, fifth = events

nt.assert_equal(first['title'], '')
nt.assert_equal(second['title'], u'2intitulé')
nt.assert_equal(third['title'], '3title content')
nt.assert_equal(fourth['title'], '4title|content')
nt.assert_equal(fifth['title'], '5title\\ntitle')
nt.assert_equal(first['msg_title'], '')
nt.assert_equal(second['msg_title'], u'2intitulé')
nt.assert_equal(third['msg_title'], '3title content')
nt.assert_equal(fourth['msg_title'], '4title|content')
nt.assert_equal(fifth['msg_title'], '5title\\ntitle')

def test_event_text(self):
stats = MetricsAggregator('myhost')
Expand All @@ -514,10 +514,10 @@ def test_event_text(self):
assert len(events) == 4
first, second, third, fourth = events

nt.assert_equal(first['text'], '')
nt.assert_equal(second['text'], 'text|content')
nt.assert_equal(third['text'], 'First line\nSecond line')
nt.assert_equal(fourth['text'], u'♬ †øU †øU ¥ºu T0µ ♪')
nt.assert_equal(first['msg_text'], '')
nt.assert_equal(second['msg_text'], 'text|content')
nt.assert_equal(third['msg_text'], 'First line\nSecond line')
nt.assert_equal(fourth['msg_text'], u'♬ †øU †øU ¥ºu T0µ ♪')

def test_recent_point_threshold(self):
threshold = 100
Expand Down
21 changes: 21 additions & 0 deletions util.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
except ImportError:
from md5 import md5


VALID_HOSTNAME_RFC_1123_PATTERN = re.compile(r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$")
MAX_HOSTNAME_LEN = 255
# Import json for the agent. Try simplejson first, then the stdlib version and
Expand Down Expand Up @@ -590,3 +591,23 @@ def is_unix(name=None):
def is_win32(name=None):
name = name or sys.platform
return name == "win32"

"""
Iterable Recipes
"""

def chunks(iterable, chunk_size):
"""Generate sequences of `chunk_size` elements from `iterable`."""
iterable = iter(iterable)
while True:
chunk = [None] * chunk_size
count = 0
try:
for _ in range(chunk_size):
chunk[count] = iterable.next()
count += 1
yield chunk[:count]
except StopIteration:
if count:
yield chunk[:count]
break