Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added timer to trigger send if no incoming transactions trigger it #68

Merged
merged 1 commit into from
Oct 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@
[[changelog]]
== Changelog

[[release-next]]
[float]
=== Unreleased

* added a background thread to process the transactions queue every 60 seconds (configurable) ({pull}68[#68])

https://github.com/elastic/apm-agent-python/compare/v1.0.0.dev1\...master[Check the HEAD diff]

[[release-v1.0.0.dev2]]
[float]
=== v1.0.0.dev2
Expand Down
20 changes: 20 additions & 0 deletions elasticapm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import platform
import socket
import sys
import threading
import time
import warnings
import zlib
Expand Down Expand Up @@ -112,6 +113,8 @@ def __init__(self, config=None, **defaults):
self.config.disable_send = True
return

self._send_timer = None

self._transport_class = import_string(self.config.transport_class)
self._transports = {}

Expand Down Expand Up @@ -439,6 +442,9 @@ def end_transaction(self, name, status_code=None):
self.instrumentation_store.end_transaction(status_code, name)
if self.instrumentation_store.should_collect():
self._traces_collect()
if not self._send_timer:
# send first batch of data after config._wait_to_first_send
self._start_send_timer(timeout=min(self.config._wait_to_first_send, self.config.traces_send_frequency))

def set_transaction_name(self, name):
transaction = get_transaction()
Expand All @@ -456,6 +462,8 @@ def set_transaction_extra_data(self, data, _key=None):

def close(self):
self._traces_collect()
if self._send_timer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that a new transaction is started during the time that close is processing? If so, it looks like it could happen that the end_transaction could trigger a new _start_send_timer. This is probably an edge case that shouldn't be too bad though, because I assume when close() is called the whole application is going to be restarted..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly. Outside of test scenarios, .close() is only called by application shutdown logic. At this point, the application shouldn't accept any new transactions.

self._stop_send_timer()
for url, transport in self._transports.items():
transport.close()

Expand Down Expand Up @@ -485,6 +493,7 @@ def handle_transport_fail(self, exception=None, **kwargs):
self.state.set_fail()

def _traces_collect(self):
self._stop_send_timer()
transactions = self.instrumentation_store.get_all()
if not transactions:
return
Expand All @@ -495,6 +504,17 @@ def _traces_collect(self):
api_path = defaults.TRANSACTIONS_API_PATH

self.send(server=self.config.server + api_path, **data)
self._start_send_timer()

def _start_send_timer(self, timeout=None):
timeout = timeout or self.config.traces_send_frequency
self._send_timer = threading.Timer(timeout, self._traces_collect)
self._send_timer.start()

def _stop_send_timer(self):
if self._send_timer and self._send_timer.is_alive() and not self._send_timer == threading.current_thread():
self._send_timer.cancel()
self._send_timer.join()

def get_app_info(self):
language_version = platform.python_version()
Expand Down
3 changes: 3 additions & 0 deletions elasticapm/conf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ class Config(_ConfigBase):
disable_send = _BoolConfigValue('DISABLE_SEND', default=False)
disable_instrumentation = _BoolConfigValue('DISABLE_INSTRUMENTATION', default=False)

# undocumented configuration
_wait_to_first_send = _ConfigValue('_WAIT_TO_FIRST_SEND', type=int, default=5)


def setup_logging(handler, exclude=['elasticapm',
'gunicorn',
Expand Down
25 changes: 25 additions & 0 deletions elasticapm/contrib/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,28 @@ def _send_remote(self, url, data, headers=None):
task = loop.create_task(
transport.send(data, headers, timeout=self.config.timeout))
task.add_done_callback(self.handle_transport_response)

def _start_send_timer(self, timeout=None):
timeout = timeout or self.config.traces_send_frequency
self._send_timer = AsyncTimer(timeout, self._traces_collect)

def _stop_send_timer(self):
if self._send_timer:
self._send_timer.cancel()


class AsyncTimer:
def __init__(self, interval, callback):
self.interval = interval
self.callback = callback
self.task = asyncio.ensure_future(self._job())
self._done = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What for are you using this self._done information?


async def _job(self):
await asyncio.sleep(self.interval)
await self._callback()
self._done = True

def cancel(self):
if not self._done:
self.task.cancel()
4 changes: 4 additions & 0 deletions elasticapm/transport/http_urllib3.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import logging
import os

import certifi
Expand All @@ -10,6 +11,8 @@
from elasticapm.transport.http import AsyncHTTPTransport, HTTPTransport
from elasticapm.utils import compat

logger = logging.getLogger(__name__)


class Urllib3Transport(HTTPTransport):

Expand Down Expand Up @@ -46,6 +49,7 @@ def send(self, data, headers, timeout=None):
response = self.http.urlopen(
'POST', url, body=data, headers=headers, timeout=timeout, preload_content=False
)
logger.info('Sent request, url=%s size=%.2fkb status=%s', url, len(data) / 1024.0, response.status)
except Exception as e:
print_trace = True
if isinstance(e, MaxRetryError) and isinstance(e.reason, TimeoutError):
Expand Down
32 changes: 32 additions & 0 deletions tests/asyncio/test_asyncio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@ async def send(self, data, headers, timeout):
await asyncio.sleep(0.0001)


class DummyTransport:
async_mode = False

def __init__(self, *args, **kwargs):
pass

async def send(self, data, headers, timeout):
return

def close(self):
pass


@pytest.mark.asyncio
async def test_client_success():
from elasticapm.contrib.asyncio import Client
Expand Down Expand Up @@ -86,3 +99,22 @@ async def test_client_failure_stdlib_exception(mocker):
with pytest.raises(TransportException):
await task
assert client.state.status == 0


@pytest.mark.asyncio
async def test_client_send_timer():
from elasticapm.contrib.asyncio.client import Client, AsyncTimer

client = Client(
transport_class='tests.asyncio.test_asyncio_client.DummyTransport'
)

assert client._send_timer is None

client.begin_transaction('test_type')
client.end_transaction('test')

assert isinstance(client._send_timer, AsyncTimer)
assert client._send_timer.interval == 5

client.close()
17 changes: 17 additions & 0 deletions tests/client/client_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time

import mock
import pytest

import elasticapm
from elasticapm.base import Client, ClientState
Expand Down Expand Up @@ -503,3 +504,19 @@ def test_empty_transport_disables_send():
assert 'TRANSPORT_CLASS' in client.config.errors

assert client.config.disable_send


@pytest.mark.parametrize('elasticapm_client', [{'traces_send_frequency': 2}], indirect=True)
def test_send_timer(elasticapm_client):
assert elasticapm_client._send_timer is None
assert elasticapm_client.config.traces_send_frequency == 2
elasticapm_client.begin_transaction('test_type')
elasticapm_client.end_transaction('test')

assert elasticapm_client._send_timer is not None
assert elasticapm_client._send_timer.interval == 2
assert elasticapm_client._send_timer.is_alive()

elasticapm_client.close()

assert not elasticapm_client._send_timer.is_alive()
6 changes: 6 additions & 0 deletions tests/contrib/django/django_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,7 @@ def test_stacktrace_filtered_for_elasticapm(client, elasticapm_client):
assert traces[1]['stacktrace'][0]['module'].startswith('django.template')


@pytest.mark.parametrize('elasticapm_client', [{'_wait_to_first_send': 100}], indirect=True)
def test_perf_template_render(benchmark, client, elasticapm_client):
responses = []
with mock.patch("elasticapm.traces.TransactionsStore.should_collect") as should_collect:
Expand All @@ -971,6 +972,7 @@ def test_perf_template_render(benchmark, client, elasticapm_client):
assert len(transaction['traces']) == 2


@pytest.mark.parametrize('elasticapm_client', [{'_wait_to_first_send': 100}], indirect=True)
def test_perf_template_render_no_middleware(benchmark, client, elasticapm_client):
responses = []
with mock.patch(
Expand All @@ -986,6 +988,7 @@ def test_perf_template_render_no_middleware(benchmark, client, elasticapm_client
assert len(transactions) == 0


@pytest.mark.parametrize('elasticapm_client', [{'_wait_to_first_send': 100}], indirect=True)
@pytest.mark.django_db(transaction=True)
def test_perf_database_render(benchmark, client, elasticapm_client):
responses = []
Expand All @@ -1010,6 +1013,7 @@ def test_perf_database_render(benchmark, client, elasticapm_client):


@pytest.mark.django_db
@pytest.mark.parametrize('elasticapm_client', [{'_wait_to_first_send': 100}], indirect=True)
def test_perf_database_render_no_instrumentation(benchmark, elasticapm_client):
elasticapm_client.instrumentation_store.get_all()
responses = []
Expand All @@ -1029,6 +1033,7 @@ def test_perf_database_render_no_instrumentation(benchmark, elasticapm_client):


@pytest.mark.django_db
@pytest.mark.parametrize('elasticapm_client', [{'_wait_to_first_send': 100}], indirect=True)
def test_perf_transaction_with_collection(benchmark, elasticapm_client):
elasticapm_client.instrumentation_store.get_all()
with mock.patch("elasticapm.traces.TransactionsStore.should_collect") as should_collect:
Expand Down Expand Up @@ -1058,6 +1063,7 @@ def result():


@pytest.mark.django_db
@pytest.mark.parametrize('elasticapm_client', [{'_wait_to_first_send': 100}], indirect=True)
def test_perf_transaction_without_middleware(benchmark, elasticapm_client):
elasticapm_client.instrumentation_store.get_all()
with mock.patch("elasticapm.traces.TransactionsStore.should_collect") as should_collect:
Expand Down
20 changes: 17 additions & 3 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
import pytest

from tests.helpers import get_tempstoreclient
from elasticapm.base import Client


@pytest.fixture()
def elasticapm_client():
return get_tempstoreclient()
def elasticapm_client(request):
client_config = getattr(request, 'param', {})
app_name = client_config.pop('app_name', 'myapp')
secret_token = client_config.pop('secret_token', 'test_key')
client = TempStoreClient(app_name=app_name, secret_token=secret_token, **client_config)
yield client
client.close()


class TempStoreClient(Client):
def __init__(self, **defaults):
self.events = []
super(TempStoreClient, self).__init__(**defaults)

def send(self, **kwargs):
self.events.append(kwargs)
15 changes: 0 additions & 15 deletions tests/helpers.py

This file was deleted.