Skip to content

Commit

Permalink
[forwarder] enable multiple parallel requests
Browse files Browse the repository at this point in the history
Throttling is still there, so this should only improve the situation where
there is a long running request (especially for mutliple endpoints, if one of
them is timeouting, this should improve the situation of the other endpoint)
  • Loading branch information
degemer committed Jun 1, 2016
1 parent 55e47b4 commit fa0f09f
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,21 @@ def flush(self):
class TransactionManager(object):
"""Holds any transaction derived object list and make sure they
are all commited, without exceeding parameters (throttling, memory consumption) """
DEFAULT_MAX_PARALLELISM = 5

def __init__(self, max_wait_for_replay, max_queue_size, throttling_delay):
def __init__(self, max_wait_for_replay, max_queue_size, throttling_delay, max_parallelism=None):
self._MAX_WAIT_FOR_REPLAY = max_wait_for_replay
self._MAX_QUEUE_SIZE = max_queue_size
self._THROTTLING_DELAY = throttling_delay
self._MAX_PARALLELISM = max_parallelism or self.DEFAULT_MAX_PARALLELISM

self._flush_without_ioloop = False # useful for tests

self._transactions = [] # List of all non commited transactions
self._total_count = 0 # Maintain size/count not to recompute it everytime
self._total_size = 0
self._flush_count = 0
self._running_flushes = 0
self._transactions_received = 0
self._transactions_flushed = 0

Expand Down Expand Up @@ -180,25 +183,23 @@ def flush(self):

def flush_next(self):

if len(self._trs_to_flush) > 0:

if self._trs_to_flush is not None and \
len(self._trs_to_flush) > 0 and \
self._running_flushes < self._MAX_PARALLELISM:
td = self._last_flush + self._THROTTLING_DELAY - datetime.utcnow()
# Python 2.7 has this built in, python < 2.7 don't...
if hasattr(td,'total_seconds'):
delay = td.total_seconds()
else:
delay = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10.0**6
delay = td.total_seconds()

if delay <= 0:
tr = self._trs_to_flush.pop()
self._last_flush = datetime.utcnow()
log.debug("Flushing transaction %d" % tr.get_id())
try:
self._running_flushes += 1
tr.flush()
except Exception,e :
except Exception as e:
log.exception(e)
self.tr_error(tr)
self.flush_next()
self.flush_next()
else:
# Wait a little bit more
tornado_ioloop = get_tornado_ioloop()
Expand All @@ -213,13 +214,15 @@ def flush_next(self):
self._trs_to_flush = None

def tr_error(self,tr):
self._running_flushes -= 1
tr.inc_error_count()
tr.compute_next_flush(self._MAX_WAIT_FOR_REPLAY)
log.warn("Transaction %d in error (%s error%s), it will be replayed after %s" %
(tr.get_id(), tr.get_error_count(), plural(tr.get_error_count()),
tr.get_next_flush()))

def tr_error_too_big(self,tr):
self._running_flushes -= 1
tr.inc_error_count()
log.warn("Transaction %d is %sKB, it has been rejected as too large. \
It will not be replayed." % (tr.get_id(), tr.get_size() / 1024))
Expand All @@ -239,6 +242,7 @@ def tr_error_too_big(self,tr):

def tr_success(self,tr):
log.debug("Transaction %d completed" % tr.get_id())
self._running_flushes -= 1
self._transactions.remove(tr)
self._total_count -= 1
self._total_size -= tr.get_size()
Expand Down

0 comments on commit fa0f09f

Please sign in to comment.