Skip to content

Commit

Permalink
Merge pull request #37 from NextThought/issue31
Browse files Browse the repository at this point in the history
Make the transaction loop emit more metrics. Fixes #31.
  • Loading branch information
jamadden committed Nov 29, 2019
2 parents 5bff928 + 734e019 commit 7ac78a7
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 22 deletions.
3 changes: 3 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
- Reduce the default number of attempts to 4 (one attempt and 3
retries). See `issue 35 <https://github.com/NextThought/nti.transactions/issues/35>`_.

- Make the transaction loop emit more metrics. See `issue 31
<https://github.com/NextThought/nti.transactions/issues/31>`_.

3.0.0 (2019-09-06)
==================

Expand Down
110 changes: 91 additions & 19 deletions src/nti/transactions/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def exc_clear():

import six

from perfmetrics import statsd_client as _statsd_client

from zope.exceptions.exceptionformatter import format_exception
from zope.exceptions.exceptionformatter import print_exception
from zope.cachedescriptors.property import Lazy
Expand All @@ -46,7 +48,6 @@ def exc_clear():
'TransactionLoop',
]


def _do_commit(tx, description, long_commit_duration, perf_counter=getattr(time,
'perf_counter',
time.time)):
Expand Down Expand Up @@ -93,20 +94,42 @@ class TransactionLoop(object):
commit is vetoed by :meth:`should_abort_due_to_no_side_effects` or
:meth:`should_veto_commit` is never retried.
Running the loop will increment these :mod:`perfmetrics` counters:
transaction.retry
The number of retries required in any given loop. Note that
if the handler raises non-retryable exceptions, this number will
be inflated.
transaction.side_effect_free
How many side-effect free transactions there have been.
transaction.vetoed
How many transactions were vetoed.
transaction.doomed
The number of doomed transactions.
transaction.successful
The number of transactions that successfully returned.
transaction.failed
The number of transactions that did not successfully return.
.. important::
Instances of this class must be thread safe, and running the loop should
not mutate the state of this object.
.. versionchanged:: 3.0
When this object is called, the thread-local default or global
:class:`transaction.TransactionManager` is placed into explicit
mode (if it wasn't already). The handler callable is forbidden
from beginning, aborting or committing the transaction. Explicit
transactions can be faster in ZODB, and are easier to reason
about.
If the application begins, commits or aborts the transaction, it
can expect this object to raise
:exc:`transaction.interfaces.NoTransaction`,
:exc:`transaction.interfaces.AlreadyInTransaction` or
:exc:`nti.transactions.interfaces.TransactionLifecycleError`.
When this object is called, the thread-local default or global
:class:`transaction.TransactionManager` is placed into explicit
mode (if it wasn't already). The handler callable is forbidden
from beginning, aborting or committing the transaction. Explicit
transactions can be faster in ZODB, and are easier to reason
about.
If the application begins, commits or aborts the transaction, it
can expect this object to raise
:exc:`transaction.interfaces.NoTransaction`,
:exc:`transaction.interfaces.AlreadyInTransaction` or
:exc:`nti.transactions.interfaces.TransactionLifecycleError`.
"""

class AbortAndReturn(Exception):
Expand Down Expand Up @@ -146,6 +169,8 @@ def __init__(self, response, reason):
#: This can be set in a subclass if not passed to the constructor.
long_commit_duration = DEFAULT_LONG_RUNNING_COMMIT_IN_SECS # seconds

transaction_sample_rate = 0.2

#: The default return value from :meth:`should_abort_due_to_no_side_effects`.
#: If you are not subclassing, or you do not need access to the arguments
#: of the called function to make this determination, you may set this
Expand Down Expand Up @@ -173,14 +198,14 @@ def __init__(
self.sleep = sleep
self.random = random.SystemRandom()

def prep_for_retry(self, number, *args, **kwargs):
def prep_for_retry(self, attempts_remaining, *args, **kwargs):
"""
Called just after a transaction begins if there will be
more than one attempt possible. Do any preparation
needed to cleanup or prepare reattempts, or raise
:class:`AbortAndReturn` if that's not possible.
:param int number: How many attempts remain. Will always be
:param int attempts_remaining: How many attempts remain. Will always be
at least 1.
"""

Expand Down Expand Up @@ -332,6 +357,37 @@ def tearDown():
.. versionadded:: 3.0
"""

_statsd_client = _statsd_client

class _StatCollector(object):
# Encapsulates the calls we make to send stats.
__slots__ = ('client', 'rate', 'buf')
def __init__(self, client, rate):
self.client = client
self.rate = rate
# We will always send at least two stats, so we can save some network traffic
# using a buffer. We shouldn't ever send enough to overflow a UDP packet
# (based on napkin math)
self.buf = []

def __call__(self, name, count=1):
self.client.incr(name, count=count, buf=self.buf, rate=self.rate)

def flush(self):
self.client.sendbuf(self.buf)
del self.buf

class _NullStatCollector(object):
@staticmethod
def __call__(name, count=1):
"Does nothing"

@staticmethod
def flush():
"Does nothing"

_null_stat_collector = _NullStatCollector()

def __call__(self, *args, **kwargs):
note = self.describe_transaction(*args, **kwargs)

Expand All @@ -345,15 +401,27 @@ def __call__(self, *args, **kwargs):
# our local Transaction object and the global state.
was_explicit = txm.explicit
txm.explicit = True
client = self._statsd_client()
stats = (
self._StatCollector(client, self.transaction_sample_rate)
if client
else self._null_stat_collector
)

try:
self.setUp()
return self.__loop(txm, note, args, kwargs)
return self.__loop(txm, note, stats, args, kwargs)
except Exception:
stats('transaction.failed')
stats('transaction.retry', self.attempts - 1)
raise
finally:
txm.explicit = was_explicit
self.tearDown()
stats.flush()

def __loop(self, txm, note, args, kwargs): # pylint:disable=too-many-branches
def __loop(self, txm, note, stats, args, kwargs):
# pylint:disable=too-many-branches,too-many-statements
attempts_remaining = self.attempts
need_retry = self.attempts > 1
begin = txm.begin
Expand Down Expand Up @@ -411,13 +479,17 @@ def __loop(self, txm, note, args, kwargs): # pylint:disable=too-many-branches
# NOTE: We raise these as an exception instead
# of aborting in the loop so that we don't
# retry if something goes wrong aborting
stats('transaction.side_effect_free')
raise self.AbortAndReturn(result, "side-effect free")

if tx.isDoomed() or self.should_veto_commit(result, *args, **kwargs):
stats('transaction.doomed' if tx.isDoomed() else 'transaction.vetoed')
raise self.AbortAndReturn(result, "doomed or vetoed")

_do_commit(tx, note, self.long_commit_duration)

stats('transaction.successful')
if attempts_remaining != self.attempts - 1:
stats('transaction.retry', self.attempts - 1 - attempts_remaining)
return result
except ForeignTransactionError:
# They left a transaction hanging around. If it's
Expand Down Expand Up @@ -506,7 +578,7 @@ def __handle_generic_exception(self, tx, attempts_remaining,
retryable = self._retryable(tx, exc_info)
self._abort_on_exception(exc_info, retryable, attempts_remaining, tx)

if attempts_remaining <= 0 or not retryable: # AFTER the abort
if attempts_remaining <= 0 or not retryable:
_reraise(*exc_info)
finally:
exc_info = None
Expand Down
5 changes: 2 additions & 3 deletions src/nti/transactions/pyramid_tween.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class TransactionTween(TransactionLoop):
# delay backoff time/multiplier, long_commit_duration and
# side_effect_free as config params. Maybe model on pyramid_tm?

def prep_for_retry(self, number, request): # pylint:disable=arguments-differ
def prep_for_retry(self, attempts_remaining, request): # pylint:disable=arguments-differ
"""
Prepares the request for possible retries.
Expand Down Expand Up @@ -178,8 +178,7 @@ def prep_for_retry(self, number, request): # pylint:disable=arguments-differ

# We attempt to fix that here. (This is the best place because
# we are now sure the body is seekable.)
# TODO: Document this.
if number == (self.attempts - 1) \
if attempts_remaining == (self.attempts - 1) \
and request.method in ('POST', 'PUT') \
and request.content_type == 'application/x-www-form-urlencoded':
# This needs tested.
Expand Down
44 changes: 44 additions & 0 deletions src/nti/transactions/tests/test_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
from hamcrest import raises
from hamcrest import has_property
from hamcrest import none
from hamcrest import has_items
from hamcrest import greater_than_or_equal_to

import fudge

from nti.testing.matchers import is_true
from nti.testing.matchers import is_false
from nti.testing.matchers import has_length

from ..interfaces import CommitFailedError
from ..interfaces import AbortFailedError
Expand All @@ -33,6 +36,10 @@
from transaction.interfaces import NoTransaction
from transaction.interfaces import AlreadyInTransaction

from perfmetrics.testing import FakeStatsDClient
from perfmetrics.testing.matchers import is_counter
from perfmetrics import statsd_client_stack


class TestCommit(unittest.TestCase):
class RaisingCommit(object):
Expand Down Expand Up @@ -90,17 +97,33 @@ def test_commit_clean_but_long(self, fake_logger):
fake_logger.expects_call()
_do_commit(self.RaisingCommit(None), '', -1)

class TrueStatsDClient(FakeStatsDClient):
# https://github.com/zodb/perfmetrics/issues/23
def __bool__(self):
return True
__nonzero__ = __bool__

class TestLoop(unittest.TestCase):

def setUp(self):
try:
transaction.abort()
except NoTransaction:
pass
self.statsd_client = TrueStatsDClient()
self.statsd_client.random = lambda: 0 # Ignore rate, capture all packets
statsd_client_stack.push(self.statsd_client)

def tearDown(self):
statsd_client_stack.pop()

def test_trivial(self):
result = TransactionLoop(lambda a: a, retries=1, long_commit_duration=1, sleep=1)(1)
assert_that(result, is_(1))
# May or may not get a transaction.commit stat first, depending on random
assert_that(self.statsd_client.packets, has_length(greater_than_or_equal_to(1)))
assert_that(self.statsd_client.observations[-1],
is_counter(name='transaction.successful', value=1))

def test_explicit(self):
assert_that(transaction.manager, has_property('explicit', is_false()))
Expand Down Expand Up @@ -189,6 +212,11 @@ def handler():
result = loop()
assert_that(result, is_("hi"))
assert_that(calls, is_([1] * raise_count))
observations = self.statsd_client.observations
assert_that(observations,
has_items(
is_counter(name='transaction.successful', value=1),
is_counter(name='transaction.retry', value=raise_count)))
return loop

def test_custom_retriable(self):
Expand All @@ -210,6 +238,10 @@ def handler():
raise MyError()
loop = TransactionLoop(handler, sleep=0.01, retries=100000000)
assert_that(calling(loop), raises(MyError))
observations = self.statsd_client.observations
assert_that(observations,
has_items(
is_counter(name='transaction.failed', value=1)))

def test_isRetryableError_exception(self):
# If the transaction.isRetryableError() raises, for some reason,
Expand Down Expand Up @@ -282,6 +314,10 @@ class Loop(TransactionLoop):

result = Loop(lambda: 42)()
assert_that(result, is_(42))
observations = self.statsd_client.observations
assert_that(observations,
has_items(
is_counter(name='transaction.side_effect_free', value=1)))

@fudge.patch('transaction._transaction.Transaction.nti_abort')
def test_abort_doomed(self, fake_abort):
Expand All @@ -294,6 +330,10 @@ def handler():

result = TransactionLoop(handler)()
assert_that(result, is_(42))
observations = self.statsd_client.observations
assert_that(observations,
has_items(
is_counter(name='transaction.doomed', value=1)))

@fudge.patch('transaction._manager.TransactionManager.begin',
'transaction._manager.TransactionManager.get')
Expand All @@ -312,6 +352,10 @@ def should_veto_commit(self, result, *args, **kwargs):

result = Loop(lambda: 42)()
assert_that(result, is_(42))
observations = self.statsd_client.observations
assert_that(observations,
has_items(
is_counter(name='transaction.vetoed', value=1)))

@fudge.patch('transaction._manager.TransactionManager.begin')
def test_abort_systemexit(self, fake_begin):
Expand Down

0 comments on commit 7ac78a7

Please sign in to comment.