diff --git a/transaction.py b/transaction.py index 5e71cbe9e0..e01f938fa8 100644 --- a/transaction.py +++ b/transaction.py @@ -68,11 +68,13 @@ def flush(self): class TransactionManager(object): """Holds any transaction derived object list and make sure they are all commited, without exceeding parameters (throttling, memory consumption) """ + DEFAULT_MAX_PARALLELISM = 5 - def __init__(self, max_wait_for_replay, max_queue_size, throttling_delay): + def __init__(self, max_wait_for_replay, max_queue_size, throttling_delay, max_parallelism=None): self._MAX_WAIT_FOR_REPLAY = max_wait_for_replay self._MAX_QUEUE_SIZE = max_queue_size self._THROTTLING_DELAY = throttling_delay + self._MAX_PARALLELISM = max_parallelism or self.DEFAULT_MAX_PARALLELISM self._flush_without_ioloop = False # useful for tests @@ -80,6 +82,7 @@ def __init__(self, max_wait_for_replay, max_queue_size, throttling_delay): self._total_count = 0 # Maintain size/count not to recompute it everytime self._total_size = 0 self._flush_count = 0 + self._running_flushes = 0 self._transactions_received = 0 self._transactions_flushed = 0 @@ -180,25 +183,23 @@ def flush(self): def flush_next(self): - if len(self._trs_to_flush) > 0: - + if self._trs_to_flush is not None and \ + len(self._trs_to_flush) > 0 and \ + self._running_flushes < self._MAX_PARALLELISM: td = self._last_flush + self._THROTTLING_DELAY - datetime.utcnow() - # Python 2.7 has this built in, python < 2.7 don't... - if hasattr(td,'total_seconds'): - delay = td.total_seconds() - else: - delay = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10.0**6 + delay = td.total_seconds() if delay <= 0: tr = self._trs_to_flush.pop() self._last_flush = datetime.utcnow() log.debug("Flushing transaction %d" % tr.get_id()) try: + self._running_flushes += 1 tr.flush() - except Exception,e : + except Exception as e: log.exception(e) self.tr_error(tr) - self.flush_next() + self.flush_next() else: # Wait a little bit more tornado_ioloop = get_tornado_ioloop() @@ -213,6 +214,7 @@ def flush_next(self): self._trs_to_flush = None def tr_error(self,tr): + self._running_flushes -= 1 tr.inc_error_count() tr.compute_next_flush(self._MAX_WAIT_FOR_REPLAY) log.warn("Transaction %d in error (%s error%s), it will be replayed after %s" % @@ -220,6 +222,7 @@ def tr_error(self,tr): tr.get_next_flush())) def tr_error_too_big(self,tr): + self._running_flushes -= 1 tr.inc_error_count() log.warn("Transaction %d is %sKB, it has been rejected as too large. \ It will not be replayed." % (tr.get_id(), tr.get_size() / 1024)) @@ -239,6 +242,7 @@ def tr_error_too_big(self,tr): def tr_success(self,tr): log.debug("Transaction %d completed" % tr.get_id()) + self._running_flushes -= 1 self._transactions.remove(tr) self._total_count -= 1 self._total_size -= tr.get_size()