Skip to content

Commit

Permalink
[forwarder] parallelism and endpoint errors tests
Browse files Browse the repository at this point in the history
  • Loading branch information
degemer committed Jun 15, 2016
1 parent 6808e6e commit bcefdc0
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 10 deletions.
118 changes: 110 additions & 8 deletions tests/core/test_transaction.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# stdlib
from datetime import datetime, timedelta
import threading
import time
import unittest

# 3rd party
Expand All @@ -26,6 +28,8 @@ def __init__(self, size, manager):
self._trManager = manager
self._size = size
self._flush_count = 0
self._endpoint = 'https://example.com'
self._api_key = 'a' * 32

self.is_flushable = False

Expand All @@ -39,6 +43,31 @@ def flush(self):
self._trManager.flush_next()


class SleepingTransaction(Transaction):
def __init__(self, manager, delay=0.5):
Transaction.__init__(self)
self._trManager = manager
self._size = 1
self._flush_count = 0
self._endpoint = 'https://example.com'
self._api_key = 'a' * 32
self.delay = delay

self.is_flushable = False

def flush(self):
threading.Timer(self.delay, self.post_flush).start()

def post_flush(self):
self._flush_count = self._flush_count + 1
if self.is_flushable:
self._trManager.tr_success(self)
else:
self._trManager.tr_error(self)

self._trManager.flush_next()


@attr(requires='core_integration')
class TestTransaction(unittest.TestCase):

Expand All @@ -49,13 +78,13 @@ def testMemoryLimit(self):
"""Test memory limit as well as simple flush"""

# No throttling, no delay for replay
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE, timedelta(seconds=0))
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE,
timedelta(seconds=0), max_endpoint_errors=100)

step = 10
oneTrSize = (MAX_QUEUE_SIZE / step) - 1
for i in xrange(step):
tr = memTransaction(oneTrSize, trManager)
trManager.append(tr)
trManager.append(memTransaction(oneTrSize, trManager))

trManager.flush()

Expand All @@ -66,8 +95,7 @@ def testMemoryLimit(self):
self.assertEqual(tr._flush_count, 1)

# Try to add one more
tr = memTransaction(oneTrSize + 10, trManager)
trManager.append(tr)
trManager.append(memTransaction(oneTrSize + 10, trManager))

# At this point, transaction one (the oldest) should have been removed from the list
self.assertEqual(len(trManager._transactions), step)
Expand All @@ -92,7 +120,8 @@ def testThrottling(self):
"""Test throttling while flushing"""

# No throttling, no delay for replay
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE, THROTTLING_DELAY)
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE,
THROTTLING_DELAY, max_endpoint_errors=100)
trManager._flush_without_ioloop = True # Use blocking API to emulate tornado ioloop

# Add 3 transactions, make sure no memory limit is in the way
Expand Down Expand Up @@ -123,7 +152,8 @@ def testCustomEndpoint(self):
app._agentConfig = config
app.use_simple_http_client = True

trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE, THROTTLING_DELAY)
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE,
THROTTLING_DELAY, max_endpoint_errors=100)
trManager._flush_without_ioloop = True # Use blocking API to emulate tornado ioloop
MetricTransaction._trManager = trManager
MetricTransaction.set_application(app)
Expand Down Expand Up @@ -156,7 +186,8 @@ def testEndpoints(self):
app._agentConfig = config
app.use_simple_http_client = True

trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE, THROTTLING_DELAY)
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE,
THROTTLING_DELAY, max_endpoint_errors=100)
trManager._flush_without_ioloop = True # Use blocking API to emulate tornado ioloop
MetricTransaction._trManager = trManager
MetricTransaction.set_application(app)
Expand Down Expand Up @@ -209,3 +240,74 @@ def testEndpoints(self):
r = requests.post(url, data=json.dumps({'check': 'test', 'status': 0}),
headers={'Content-Type': "application/json"})
r.raise_for_status()

def test_endpoint_error(self):
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE,
timedelta(seconds=0), max_endpoint_errors=2)

step = 10
oneTrSize = (MAX_QUEUE_SIZE / step) - 1
for i in xrange(step):
trManager.append(memTransaction(oneTrSize, trManager))

trManager.flush()

# There should be exactly step transaction in the list,
# and only 2 of them with a flush count of 1
self.assertEqual(len(trManager._transactions), step)
flush_count = 0
for tr in trManager._transactions:
flush_count += tr._flush_count
self.assertEqual(flush_count, 2)

# If we retry to flush, two OTHER transactions should be tried
trManager.flush()

self.assertEqual(len(trManager._transactions), step)
flush_count = 0
for tr in trManager._transactions:
flush_count += tr._flush_count
self.assertIn(tr._flush_count, [0, 1])
self.assertEqual(flush_count, 4)

# Finally when it's possible to flush, everything should go smoothly
for tr in trManager._transactions:
tr.is_flushable = True

trManager.flush()
self.assertEqual(len(trManager._transactions), 0)

def test_parallelism(self):
step = 4
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE,
timedelta(seconds=0), max_parallelism=step,
max_endpoint_errors=100)
for i in xrange(step):
trManager.append(SleepingTransaction(trManager))

trManager.flush()
self.assertEqual(trManager._running_flushes, step)
self.assertEqual(trManager._finished_flushes, 0)
# If _trs_to_flush != None, it means that it's still running as it should be
self.assertEqual(trManager._trs_to_flush, [])
time.sleep(1)

# It should be finished
self.assertEqual(trManager._running_flushes, 0)
self.assertEqual(trManager._finished_flushes, step)
self.assertIs(trManager._trs_to_flush, None)

def test_no_parallelism(self):
step = 2
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE,
timedelta(seconds=0), max_parallelism=1,
max_endpoint_errors=100)
for i in xrange(step):
trManager.append(SleepingTransaction(trManager, delay=1))
trManager.flush()
# Flushes should be sequential
for i in xrange(step):
self.assertEqual(trManager._running_flushes, 1)
self.assertEqual(trManager._finished_flushes, i)
self.assertEqual(len(trManager._trs_to_flush), step - (i + 1))
time.sleep(1)
5 changes: 3 additions & 2 deletions transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ class TransactionManager(object):
"""Holds any transaction derived object list and make sure they
are all commited, without exceeding parameters (throttling, memory consumption) """

def __init__(self, max_wait_for_replay, max_queue_size, throttling_delay, max_parallelism=1):
def __init__(self, max_wait_for_replay, max_queue_size, throttling_delay,
max_parallelism=1, max_endpoint_errors=4):
self._MAX_WAIT_FOR_REPLAY = max_wait_for_replay
self._MAX_QUEUE_SIZE = max_queue_size
self._THROTTLING_DELAY = throttling_delay
self._MAX_PARALLELISM = max_parallelism
self._MAX_ENDPOINT_ERRORS = 4
self._MAX_ENDPOINT_ERRORS = max_endpoint_errors

self._flush_without_ioloop = False # useful for tests

Expand Down

0 comments on commit bcefdc0

Please sign in to comment.