Skip to content

Commit

Permalink
Merge pull request #4 from Netuitive/0.0.8
Browse files Browse the repository at this point in the history
0.0.8
  • Loading branch information
shawnbutts committed Jun 8, 2016
2 parents 0578ec5 + f2c66fb commit f6488e4
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 11 deletions.
5 changes: 5 additions & 0 deletions HISTORY.md
@@ -1,6 +1,11 @@
History
-------

Version 0.0.8 - Jun 08 2016
---------------------------
* protect against memory starvation
* improve tests

Version 0.0.7 - Jun 07 2016
---------------------------
* fix samples not being cleared properly
Expand Down
5 changes: 5 additions & 0 deletions libs/elements.py
Expand Up @@ -61,6 +61,11 @@ def add(self, metricId, ts, val, metricType, sign=None, rate=None,
def delete(self, elementId):
del self.elements[elementId]

def delete_all(self):
self.elements = {}
self.element.metrics = {}
self.elements[self.hostname] = self.element

def clear_samples(self, elementId=None, everything=False):
logger.debug('Element.clear_samples for ' + str(elementId))
try:
Expand Down
23 changes: 22 additions & 1 deletion libs/poster.py
Expand Up @@ -41,6 +41,9 @@ def __init__(self, config, element, version='develop'):
self.events = []
self.elements = Elements(self.hostname, element)

self.flush_error_count = 0
self.flush_error_max = max(self.interval * 15, 900)

def stop(self):
logger.debug("Poster Shutting down")
self.runner.set()
Expand All @@ -54,7 +57,11 @@ def run(self):
logger.debug('Waiting {0} seconds'.format(self.interval))
self.runner.wait(self.interval)
logger.debug('Flushing')
self.flush()
if self.flush():
logger.debug('Flush sucessful')

else:
logger.error('Error during flush')

def flush(self):
"""
Expand All @@ -63,6 +70,16 @@ def flush(self):

try:
with self.lock:
if self.flush_error_count >= self.flush_error_max:
logger.error(
'failed to post for at least {0} seconds. '.format(
self.flush_error_max) +
'dropping data to prevent memory starvation.'
)

self.elements.delete_all()
return(False)

timestamp = int(time.time())

# add some of our internal metric samples
Expand Down Expand Up @@ -161,10 +178,14 @@ def flush(self):
self.packet_count = 0
self.event_count = 0
self.events = []
return(True)

except Exception as e:
logger.error(e, exc_info=True)

self.flush_error_count += self.interval
return(False)

def submit(self, message, ts):
"""
process incoming messages
Expand Down
2 changes: 1 addition & 1 deletion netuitive-statsd
Expand Up @@ -64,7 +64,7 @@ from setproctitle import setproctitle
import libs

# Constants
__version__ = "0.0.7"
__version__ = "0.0.8"
__author__ = "Netuitive, Inc."
__license__ = "Apache 2.0"

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -18,7 +18,7 @@

setup(
name='netuitive_statsd',
version='0.0.7',
version='0.0.8',
description="Netuitive StatsD server",
long_description='Netuitive StatsD server\n',
author="Netuitive",
Expand Down
155 changes: 147 additions & 8 deletions tests/test_poster.py
Expand Up @@ -103,6 +103,11 @@ def setUp(self):
self.poster4 = libs.Poster(self.config, self.myElement3)
self.poster4.start()

self.myElement4 = libs.Element(
self.config['hostname'], self.config['element_type'])
self.poster5 = libs.Poster(self.config, self.myElement4)
self.poster5.start()

self.lock = threading.Lock()

def test_Poster_config(self):
Expand Down Expand Up @@ -916,8 +921,7 @@ def test_element_type(self):
self.assertEqual(j, f)

@mock.patch('netuitive.client.urllib2.urlopen')
@mock.patch('netuitive.client.logging')
def test_sample_cleared(self, mock_logging, mock_post):
def test_sample_cleared(self, mock_post):

mock_post.return_value = MockResponse(code=200)

Expand All @@ -936,9 +940,7 @@ def test_sample_cleared(self, mock_logging, mock_post):
e = self.poster4.elements.elements[ename]
e.prepare()

j = json.loads(json.dumps(
self.poster4.elements, default=lambda o: o.__dict__,
sort_keys=True))
j = self.poster4.elements

# everything should have 1 sample
self.assertEqual(
Expand Down Expand Up @@ -968,9 +970,7 @@ def test_sample_cleared(self, mock_logging, mock_post):

self.poster4.flush()

j = json.loads(json.dumps(
self.poster4.elements, default=lambda o: o.__dict__,
sort_keys=True))
j = self.poster4.elements

self.assertEqual(
len(j['elements']['testelement']['element']['samples']), 0)
Expand Down Expand Up @@ -1002,11 +1002,150 @@ def test_sample_cleared(self, mock_logging, mock_post):
self.poster4.elements.delete('host2')
self.poster4.elements.delete('host3')

@mock.patch('netuitive.client.urllib2.urlopen')
def test_sample_cleared(self, mock_post):

mock_post.return_value = MockResponse(code=200)

with self.lock:
self.poster4.submit('counter:1|c', self.timestamp)
self.poster4.submit('counter:1|c|#h:host1', self.timestamp)
self.poster4.submit('counter:1|c|#h:host2', self.timestamp)
self.poster4.submit('counter:1|c|#h:host3', self.timestamp)

self.poster4.submit('counter:1|c', self.timestamp)
self.poster4.submit('counter:1|c|#h:host1', self.timestamp)
self.poster4.submit('counter:1|c|#h:host2', self.timestamp)
self.poster4.submit('counter:1|c|#h:host3', self.timestamp)

for ename in self.poster4.elements.elements:
e = self.poster4.elements.elements[ename]
e.prepare()

j = self.poster4.elements

# everything should have 1 sample
self.assertEqual(
len(j.elements['testelement'].element.samples), 1)

self.assertEqual(
len(j.elements['host1'].element.samples), 1)

self.assertEqual(
len(j.elements['host2'].element.samples), 1)

self.assertEqual(
len(j.elements['host3'].element.samples), 1)

# everything should have 1 metric
self.assertEqual(
len(j.elements['testelement'].element.metrics), 1)

self.assertEqual(
len(j.elements['host1'].element.metrics), 1)

self.assertEqual(
len(j.elements['host2'].element.metrics), 1)

self.assertEqual(
len(j.elements['host3'].element.metrics), 1)

self.poster4.flush()

j = self.poster4.elements

self.assertEqual(
len(j.elements['testelement'].element.samples), 0)

self.assertEqual(len(j.elements['host1'].element.samples), 0)

self.assertEqual(len(j.elements['host2'].element.samples), 0)

self.assertEqual(len(j.elements['host3'].element.samples), 0)

# everything should have 0 metric
self.assertEqual(
len(j.elements['testelement'].element.metrics), 0)

self.assertEqual(
len(j.elements['host1'].element.metrics), 0)

self.assertEqual(
len(j.elements['host2'].element.metrics), 0)

self.assertEqual(
len(j.elements['host3'].element.metrics), 0)

self.poster4.elements.delete('testelement')
self.poster4.elements.delete('host1')
self.poster4.elements.delete('host2')
self.poster4.elements.delete('host3')

@mock.patch('libs.poster.logger')
def test_memory_safety(self, mock_logging):

with self.lock:
self.poster5.submit('counter:1|c', self.timestamp)
self.poster5.submit('counter:1|c|#h:host1', self.timestamp)
self.poster5.submit('counter:1|c|#h:host2', self.timestamp)
self.poster5.submit('counter:1|c|#h:host3', self.timestamp)

self.poster5.submit('counter:1|c', self.timestamp)
self.poster5.submit('counter:1|c|#h:host1', self.timestamp)
self.poster5.submit('counter:1|c|#h:host2', self.timestamp)
self.poster5.submit('counter:1|c|#h:host3', self.timestamp)

for ename in self.poster5.elements.elements:
e = self.poster5.elements.elements[ename]
e.prepare()

j = self.poster5.elements

# everything should have 1 sample
self.assertEqual(
len(j.elements['testelement'].element.samples), 1)

self.assertEqual(
len(j.elements['host1'].element.samples), 1)

self.assertEqual(
len(j.elements['host2'].element.samples), 1)

self.assertEqual(
len(j.elements['host3'].element.samples), 1)

# everything should have 1 metric
self.assertEqual(
len(j.elements['testelement'].element.metrics), 1)

self.assertEqual(
len(j.elements['host1'].element.metrics), 1)

self.assertEqual(
len(j.elements['host2'].element.metrics), 1)

self.assertEqual(
len(j.elements['host3'].element.metrics), 1)

self.poster5.flush_error_count = 901
self.poster5.flush()

self.assertEqual(mock_logging.error.call_args_list[0][0][0],
"failed to post for at least 900 seconds. dropping data to prevent memory starvation.")

j = self.poster5.elements

self.assertEqual(len(j.elements), 1)
self.assertEqual(len(j.elements['testelement'].metrics), 0)

# self.poster5.elements.delete_all()

def tearDown(self):
self.poster.stop()
self.poster2.stop()
self.poster3.stop()
self.poster4.stop()
self.poster5.stop()

if __name__ == '__main__':
unittest.main()

0 comments on commit f6488e4

Please sign in to comment.