Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Profiler to finish upload data in the background when stopped #1322

Merged
merged 6 commits into from
Apr 9, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
94 changes: 85 additions & 9 deletions ddtrace/profiling/_periodic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
import sys
import threading

from ddtrace.profiling import _service
from ddtrace.vendor import attr


PERIODIC_THREAD_IDS = set()


Expand All @@ -13,10 +17,17 @@ class PeriodicThread(threading.Thread):

"""

def __init__(self, interval, target, name=None):
"""Create the periodic thread."""
def __init__(self, interval, target, name=None, on_shutdown=None):
"""Create a periodic thread.

:param interval: The interval in seconds to wait between execution of the periodic function.
:param target: The periodic function to execute every interval.
:param name: The name of the thread.
:param on_shutdown: The function to call when the thread shuts down.
"""
super(PeriodicThread, self).__init__(name=name)
self._target = target
self._on_shutdown = on_shutdown
self.interval = interval
self.quit = threading.Event()
self.daemon = True
Expand All @@ -32,9 +43,13 @@ def stop(self):

def run(self):
"""Run the target function periodically."""
while not self.quit.wait(self.interval):
self._target()
PERIODIC_THREAD_IDS.remove(self.ident)
try:
while not self.quit.wait(self.interval):
self._target()
if self._on_shutdown is not None:
self._on_shutdown()
finally:
PERIODIC_THREAD_IDS.remove(self.ident)


class _GeventPeriodicThread(PeriodicThread):
Expand All @@ -48,9 +63,15 @@ class _GeventPeriodicThread(PeriodicThread):
# That's the value Python 2 uses in its `threading` module
SLEEP_INTERVAL = 0.005

def __init__(self, interval, target, name=None):
"""Create the periodic thread."""
super(_GeventPeriodicThread, self).__init__(interval, target, name)
def __init__(self, interval, target, name=None, on_shutdown=None):
"""Create a periodic thread.

:param interval: The interval in seconds to wait between execution of the periodic function.
:param target: The periodic function to execute every interval.
:param name: The name of the thread.
:param on_shutdown: The function to call when the thread shuts down.
"""
super(_GeventPeriodicThread, self).__init__(interval, target, name, on_shutdown)
import gevent.monkey

self._sleep = gevent.monkey.get_original("time", "sleep")
Expand All @@ -73,7 +94,8 @@ def start(self):
del threading._limbo[self]
PERIODIC_THREAD_IDS.add(self._tident)

def join(self):
def join(self, timeout=None):
# FIXME: handle the timeout argument
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of the scope of this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really but it was already broken before, I'm just not fixing it here… yet.

while not self.has_quit:
self._sleep(self.SLEEP_INTERVAL)

Expand All @@ -93,6 +115,8 @@ def run(self):
while self.quit is False and slept < self.interval:
self._sleep(self.SLEEP_INTERVAL)
slept += self.SLEEP_INTERVAL
if self._on_shutdown is not None:
self._on_shutdown()
except Exception:
# Exceptions might happen during interpreter shutdown.
# We're mimicking what `threading.Thread` does in daemon mode, we ignore them.
Expand Down Expand Up @@ -125,3 +149,55 @@ def PeriodicRealThread(*args, **kwargs):
if gevent.monkey.is_module_patched("threading"):
return _GeventPeriodicThread(*args, **kwargs)
return PeriodicThread(*args, **kwargs)


@attr.s
class PeriodicService(_service.Service):
"""A service that runs periodically."""

_interval = attr.ib()
_worker = attr.ib(default=None, init=False, repr=False)

_real_thread = False
"Class variable to override if the service should run in a real OS thread."

@property
def interval(self):
return self._interval

@interval.setter
def interval(self, value):
self._interval = value
# Update the interval of the PeriodicThread based on ours
if self._worker:
self._worker.interval = value

def start(self):
"""Start collecting profiles."""
super(PeriodicService, self).start()
periodic_thread_class = PeriodicRealThread if self._real_thread else PeriodicThread
self._worker = periodic_thread_class(
self.interval,
target=self.periodic,
name="%s:%s" % (self.__class__.__module__, self.__class__.__name__),
on_shutdown=self.on_shutdown,
)
self._worker.start()

def join(self, timeout=None):
if self._worker:
self._worker.join(timeout)

def stop(self):
"""Stop the periodic collector."""
if self._worker:
self._worker.stop()
super(PeriodicService, self).stop()

@staticmethod
def on_shutdown():
pass

@staticmethod
def periodic():
pass
38 changes: 38 additions & 0 deletions ddtrace/profiling/_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import enum

from ddtrace.vendor import attr


class ServiceStatus(enum.Enum):
"""A Service status."""

STOPPED = "stopped"
RUNNING = "running"


@attr.s
class Service(object):
"""A service that can be started or stopped."""

status = attr.ib(default=ServiceStatus.STOPPED, type=ServiceStatus, init=False)

def __enter__(self):
self.start()
return self

def __exit__(self, exc_type, exc_value, traceback):
return self.stop()

def start(self):
"""Start the service."""
if self.status == ServiceStatus.RUNNING:
raise RuntimeError("%s is already running" % self.__class__.__name__)
self.status = ServiceStatus.RUNNING

def stop(self):
"""Stop the service."""
self.status = ServiceStatus.STOPPED

@staticmethod
def join(timeout=None):
"""Join the service once stopped."""
91 changes: 7 additions & 84 deletions ddtrace/profiling/collector/__init__.py
Original file line number Diff line number Diff line change
@@ -1,105 +1,28 @@
# -*- encoding: utf-8 -*-
import abc

from ddtrace.vendor import six

from ddtrace.profiling import _attr
from ddtrace.profiling import _periodic
from ddtrace.profiling import _service
from ddtrace.vendor import attr


# This ought to use `enum.Enum`, but since it's not available in Python 2, we just use a dumb class.
@attr.s(repr=False)
class CollectorStatus(object):
"""A Collector status."""

status = attr.ib()

def __repr__(self):
return self.status.upper()


CollectorStatus.STOPPED = CollectorStatus("stopped")
CollectorStatus.RUNNING = CollectorStatus("running")


@six.add_metaclass(abc.ABCMeta)
@attr.s(slots=True)
class Collector(object):
@attr.s
class Collector(_service.Service):
"""A profile collector."""

recorder = attr.ib()
status = attr.ib(default=CollectorStatus.STOPPED, type=CollectorStatus, repr=False, init=False)

def __enter__(self):
self.start()
return self

def __exit__(self, exc_type, exc_value, traceback):
return self.stop()

@staticmethod
def _init():
pass

@abc.abstractmethod
def start(self):
"""Start collecting profiles."""
if self.status == CollectorStatus.RUNNING:
raise RuntimeError("Collector is already running")
self.status = CollectorStatus.RUNNING
self._init()

@abc.abstractmethod
def stop(self):
"""Stop collecting profiles."""
self.status = CollectorStatus.STOPPED


@attr.s(slots=True)
class PeriodicCollector(Collector):
class PeriodicCollector(Collector, _periodic.PeriodicService):
"""A collector that needs to run periodically."""

_real_thread = False

_interval = attr.ib(repr=False)
_worker = attr.ib(default=None, init=False, repr=False)

def start(self):
"""Start the periodic collector."""
super(PeriodicCollector, self).start()
periodic_thread_class = _periodic.PeriodicRealThread if self._real_thread else _periodic.PeriodicThread
self._worker = periodic_thread_class(
self.interval, target=self.collect, name="%s:%s" % (__name__, self.__class__.__name__)
)
self._worker.start()

@property
def interval(self):
return self._interval

@interval.setter
def interval(self, value):
self._interval = value
# Update the interval of the PeriodicThread based on ours
if self._worker:
self._worker.interval = value

def stop(self):
"""Stop the periodic collector."""
if self._worker:
self._worker.stop()
self._worker.join()
self._worker = None
super(PeriodicCollector, self).stop()

def collect(self):
def periodic(self):
"""Collect events and push them into the recorder."""
for events in self._collect():
for events in self.collect():
self.recorder.push_events(events)

@staticmethod
def _collect():
def collect():
"""Collect the actual data.

:return: A list of sample list to push in the recorder.
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/profiling/collector/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ def start(self):
def stop(self):
if tracemalloc is not None:
tracemalloc.stop()
super(MemoryCollector, self).stop()
super(MemoryCollector, self).stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


def _collect(self):
def collect(self):
try:
snapshot = tracemalloc.take_snapshot()
except RuntimeError:
Expand Down
5 changes: 3 additions & 2 deletions ddtrace/profiling/collector/stack.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -301,15 +301,16 @@ class StackCollector(collector.PeriodicCollector):
if value <= 0 or value > 100:
raise ValueError("Max time usage percent must be greater than 0 and smaller or equal to 100")

def _init(self):
def start(self):
self._thread_time = ThreadTime()
self._last_wall_time = compat.monotonic_ns()
super(StackCollector, self).start()

def _compute_new_interval(self, used_wall_time_ns):
interval = (used_wall_time_ns / (self.max_time_usage_pct / 100.0)) - used_wall_time_ns
return max(interval / 1e9, self.MIN_INTERVAL_TIME)

def _collect(self):
def collect(self):
# Compute wall time
now = compat.monotonic_ns()
wall_time = now - self._last_wall_time
Expand Down
11 changes: 9 additions & 2 deletions ddtrace/profiling/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,19 @@ def stop(self, flush=True):

This stops all the collectors and schedulers, waiting for them to finish their operations.

:param flush: Flush the event before stopping.
:param flush: Wait for the flush of the remaining events before stopping.
"""
for col in reversed(self.collectors):
col.stop()

for col in reversed(self.collectors):
col.join()

for s in reversed(self.schedulers):
s.stop(flush=flush)
s.stop()

if flush:
for s in reversed(self.schedulers):
s.join()

self.status = ProfilerStatus.STOPPED