Skip to content

Commit

Permalink
use processors for transactions, and test that it actually happens (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
beniwohli committed Nov 2, 2017
1 parent 0163ad1 commit 2bb7282
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ https://github.com/elastic/apm-agent-python/compare/v1.0.0.dev2\...master[Check

* added a background thread to process the transactions queue every 60 seconds (configurable) ({pull}68[#68])
* adapted trace context for SQL traces to new API ({pull}77[#77])
* ensured that transaction data is also passed through processors ({pull}84[#84])

[[release-v1.0.0.dev2]]
[float]
Expand Down
15 changes: 10 additions & 5 deletions elasticapm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ def begin_transaction(self, transaction_type):
def end_transaction(self, name, status_code=None):
self.instrumentation_store.end_transaction(status_code, name)
if self.instrumentation_store.should_collect():
self._traces_collect()
self._collect_transactions()
if not self._send_timer:
# send first batch of data after config._wait_to_first_send
self._start_send_timer(timeout=min(self.config._wait_to_first_send, self.config.traces_send_frequency))
Expand All @@ -461,7 +461,7 @@ def set_transaction_extra_data(self, data, _key=None):
transaction._context[_key] = data

def close(self):
self._traces_collect()
self._collect_transactions()
if self._send_timer:
self._stop_send_timer()
for url, transport in self._transports.items():
Expand Down Expand Up @@ -492,23 +492,28 @@ def handle_transport_fail(self, exception=None, **kwargs):
)
self.state.set_fail()

def _traces_collect(self):
def _collect_transactions(self):
self._stop_send_timer()
transactions = self.instrumentation_store.get_all()
transactions = []
for transaction in self.instrumentation_store.get_all():
for processor in self.processors:
transaction = processor(self, transaction)
transactions.append(transaction)
if not transactions:
return

data = self.build_msg({
'transactions': transactions,
})

api_path = defaults.TRANSACTIONS_API_PATH

self.send(server=self.config.server + api_path, **data)
self._start_send_timer()

def _start_send_timer(self, timeout=None):
timeout = timeout or self.config.traces_send_frequency
self._send_timer = threading.Timer(timeout, self._traces_collect)
self._send_timer = threading.Timer(timeout, self._collect_transactions)
self._send_timer.start()

def _stop_send_timer(self):
Expand Down
2 changes: 1 addition & 1 deletion elasticapm/contrib/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def _send_remote(self, url, data, headers=None):

def _start_send_timer(self, timeout=None):
timeout = timeout or self.config.traces_send_frequency
self._send_timer = AsyncTimer(timeout, self._traces_collect)
self._send_timer = AsyncTimer(timeout, self._collect_transactions)

def _stop_send_timer(self):
if self._send_timer:
Expand Down
4 changes: 2 additions & 2 deletions tests/client/client_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def test_send_with_auth_header(time, send_remote):

@mock.patch('elasticapm.transport.http_urllib3.Urllib3Transport.send')
@mock.patch('elasticapm.transport.http_urllib3.Urllib3Transport.close')
@mock.patch('elasticapm.base.Client._traces_collect')
@mock.patch('elasticapm.base.Client._collect_transactions')
def test_client_shutdown_sync(mock_traces_collect, mock_close, mock_send):
client = Client(
server='http://example.com',
Expand All @@ -307,7 +307,7 @@ def test_client_shutdown_sync(mock_traces_collect, mock_close, mock_send):


@mock.patch('elasticapm.transport.http_urllib3.Urllib3Transport.send')
@mock.patch('elasticapm.base.Client._traces_collect')
@mock.patch('elasticapm.base.Client._collect_transactions')
def test_client_shutdown_async(mock_traces_collect, mock_send):
client = Client(
server='http://example.com',
Expand Down
32 changes: 32 additions & 0 deletions tests/processors/tests.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import

import mock
import pytest

from elasticapm import Client, processors
from elasticapm.utils import compat
from tests.fixtures import elasticapm_client


@pytest.fixture()
Expand Down Expand Up @@ -222,3 +224,33 @@ def test_mark_in_app_frames():
assert not frames[3]['in_app']
assert frames[4]['in_app']
assert not frames[5]['in_app']


def dummy_processor(client, data):
data['processed'] = True
return data


@pytest.mark.parametrize('elasticapm_client', [{'processors': 'tests.processors.tests.dummy_processor'}], indirect=True)
def test_transactions_processing(elasticapm_client):
for i in range(5):
elasticapm_client.begin_transaction('dummy')
elasticapm_client.end_transaction('dummy_transaction', 'success')
elasticapm_client._collect_transactions()
for transaction in elasticapm_client.events[0]['transactions']:
assert transaction['processed'] is True


@pytest.mark.parametrize('elasticapm_client', [{'processors': 'tests.processors.tests.dummy_processor'}], indirect=True)
def test_exception_processing(elasticapm_client):
try:
1 / 0
except ZeroDivisionError:
elasticapm_client.capture_exception()
assert elasticapm_client.events[0]['errors'][0]['processed'] is True


@pytest.mark.parametrize('elasticapm_client', [{'processors': 'tests.processors.tests.dummy_processor'}], indirect=True)
def test_message_processing(elasticapm_client):
elasticapm_client.capture_message('foo')
assert elasticapm_client.events[0]['errors'][0]['processed'] is True

0 comments on commit 2bb7282

Please sign in to comment.