Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion howfast_apm/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import os

# Constants
# Where to send the performance data
HOWFAST_APM_COLLECTOR_URL = os.environ.get(
'HOWFAST_APM_COLLECTOR_URL',
"https://api.howfast.tech/v1.1/apm-collector/store",
)

# Record interactions of the API with external sources (HTTP requests, etc)
HOWFAST_APM_RECORD_INTERACTIONS = os.environ.get(
'HOWFAST_APM_RECORD_INTERACTIONS',
False,
)
63 changes: 54 additions & 9 deletions howfast_apm/core.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
import logging
from typing import Optional
from typing import Optional, List
from datetime import datetime
from queue import Full, Empty

from .config import HOWFAST_APM_RECORD_INTERACTIONS
from .queue import queue, Runner
from .hook_requests import install_hooks, Interaction

logger = logging.getLogger('howfast_apm')

Expand All @@ -14,10 +16,19 @@ class CoreAPM(object):
Base class that provides the shared code for:
* starting the background thread
* pushing points to the queue
* storing external interactions
"""

app_id: Optional[str]

record_interactions: bool
interactions: List[Interaction] = []

def __init__(self, record_interactions=HOWFAST_APM_RECORD_INTERACTIONS):
self.record_interactions = bool(record_interactions)
logger.debug("Interactions will %s", 'be enabled' if self.record_interactions else 'NOT be enabled')
self.interactions = []

def setup(
self,
# The HowFast app ID to use
Expand All @@ -31,6 +42,8 @@ def setup(
if self.app_id:
logger.info(f"HowFast APM configured with DSN {self.app_id}")
self.start_background_thread()
if self.record_interactions:
self.setup_hooks()
else:
logger.warning(f"HowFast APM initialized with no DSN, reporting will be disabled.")

Expand All @@ -40,22 +53,54 @@ def start_background_thread(self):
self.runner.start()
# TODO: stop the thread at some point?

@staticmethod
def setup_hooks(self) -> None:
""" Install hooks to register what is slow """
install_hooks(self.record_interaction)

def record_interaction(self, interaction: Interaction) -> None:
""" Save the interaction """
self.interactions.append(interaction)

def reset_interactions(self):
self.interactions = []

def save_point(
self,
time_request_started: datetime,
time_elapsed: float, # seconds
method: str,
uri: str,
endpoint: str = None, # function name handling the request
) -> None:
""" Save a request/response performance information """
item = (
time_request_started,
time_elapsed,
method,
uri,
endpoint,
"""
Save a request/response performance information.

This method is called by subclasses with their framework-specific information. We then add
the core-level collected performance data (interactions) and call self._save_point().
"""
self._save_point(
time_request_started=time_request_started,
time_elapsed=time_elapsed,
method=method,
uri=uri,
endpoint=endpoint,
interactions=self.interactions,
)
# Reset the list of interactions, since it's specific to a request/point
self.interactions = []

@staticmethod
def _save_point(**kwargs) -> None:
""" Save a request/response performance information """

interaction_list = []
interactions = kwargs.get('interactions', [])
while interactions:
interaction_list.append(interactions.pop().serialize())

# Forward the arguments to the queue
item = kwargs

# Capped queue
pushed = False
while pushed is False:
Expand Down
4 changes: 4 additions & 0 deletions howfast_apm/flask.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ def __init__(
app_id: str = None,
# Endpoints not to monitor
endpoints_blacklist: List[str] = None,
# Other configuration parameters passed to the CoreAPM constructor
**kwargs,
):
super().__init__(**kwargs)

self.app = app
self.wsgi_app = app.wsgi_app
# Overwrite the WSGI application
Expand Down
117 changes: 117 additions & 0 deletions howfast_apm/hook_requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import sys
import logging

from typing import Callable, Any
from timeit import default_timer as timer

logger = logging.getLogger('howfast_apm')


class Interaction(object):
""" An external interaction with other services """
# Can be "request"
type: str
# Name holds the URL if type is "request"
name: str
elapsed: float
extra: dict

def __init__(self, type, name, elapsed, extra=None):
self.type = type
self.name = name
self.elapsed = elapsed
self.extra = extra or {}

def serialize(self):
""" JSON-serialize the Interaction """
return {
"type": self.type,
"name": self.name,
"elapsed": self.elapsed,
"extra": self.extra,
}


def install_hooks(record_interaction: Callable[[Interaction], Any]) -> None:
""" Install the HTTP hooks """
patch_requests_module = True
try:
# Try to import the module to see if it's available
import requests # noqa
except ModuleNotFoundError:
# Maybe requests is not installed / available in the instrumented code
patch_requests_module = False

tmp_urllib = sys.modules['urllib']
if patch_requests_module:
tmp_requests = sys.modules['requests']

def get_patched(func, meta_extractor: Callable):

def patched_request(*args, **kwargs):
start = timer()
resp = func(*args, **kwargs)
elapsed = timer() - start

try:
method, name = meta_extractor(*args, **kwargs)

record_interaction(
Interaction(
type="request",
name=name,
elapsed=elapsed,
extra={'method': method.lower()},
))
# Catch any exception because we don't want it to bubble up to the real app
except Exception:
logger.error("Unable to record interaction", exc_info=True) # pragma: nocover

return resp

return patched_request

# TODO: build method extractor for urlopen
tmp_urllib.request.urlopen = get_patched(
tmp_urllib.request.urlopen,
lambda *args, **kwargs: [None, None],
)
sys.modules['urllib'] = tmp_urllib

if patch_requests_module:
tmp_requests.request = get_patched(
tmp_requests.request,
lambda *args, **kwargs: [
args[0] if len(args) > 0 else kwargs.get('method'),
args[1] if len(args) > 1 else kwargs.get('url'),
],
)

def request_alias_extractor(method):
return lambda *args, **kwargs: [
method,
args[0] if len(args) > 0 else kwargs.get('url'),
]

tmp_requests.get = get_patched(
tmp_requests.get,
meta_extractor=request_alias_extractor('get'),
)
tmp_requests.post = get_patched(
tmp_requests.post,
meta_extractor=request_alias_extractor('post'),
)
tmp_requests.head = get_patched(
tmp_requests.head,
meta_extractor=request_alias_extractor('head'),
)
tmp_requests.put = get_patched(
tmp_requests.put,
meta_extractor=request_alias_extractor('put'),
)
tmp_requests.delete = get_patched(
tmp_requests.delete,
meta_extractor=request_alias_extractor('delete'),
)

sys.modules['requests'] = tmp_requests
22 changes: 11 additions & 11 deletions howfast_apm/queue.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import requests
from logging import getLogger
from threading import Thread
from typing import List
from typing import List, Dict, Any

from queue import Queue, Empty
from .config import HOWFAST_APM_COLLECTOR_URL
Expand All @@ -26,7 +26,7 @@ class Runner(Thread):
batch_size = 100

# Local list of the points to be sent to the API
current_batch: List[tuple]
current_batch: List[Dict[str, Any]]

def __init__(self, queue: Queue, app_id: str):
self.queue = queue
Expand Down Expand Up @@ -76,16 +76,16 @@ def run_once(self):
return

@staticmethod
def serialize_point(point: tuple) -> tuple:
def serialize_point(point: dict) -> tuple:
""" Prepare the point to be sent to the API """
(
time_request_started,
time_elapsed,
method,
uri,
endpoint,
) = point
return (method, uri, time_request_started.isoformat(), time_elapsed, endpoint)
return (
point['method'],
point['uri'],
point['time_request_started'].isoformat(),
point['time_elapsed'],
point['endpoint'],
point['interactions'],
)

def send_batch(self):
""" Process one performance point """
Expand Down
Loading