Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions codeguru_profiler_agent/local_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __init__(self, reporter, environment=dict()):
self.memory_limit_bytes = environment["memory_limit_bytes"]
self.last_report_attempted = current_milli_time(clock=self.clock)

self._reset()
self.reset()

def add(self, sample):
"""
Expand All @@ -69,7 +69,7 @@ def _check_memory_limit(self):
"Profiler memory usage limit has been reached")
self.flush(force=True)

def _reset(self):
def reset(self):
self.profile = self.profile_factory(
profiling_group_name=self.profiling_group_name,
sampling_interval_seconds=AgentConfiguration.get().sampling_interval.total_seconds(),
Expand All @@ -80,7 +80,7 @@ def _reset(self):
self.timer.reset()

@with_timer("flush")
def flush(self, force=False):
def flush(self, force=False, reset=True):
now = current_milli_time(clock=self.clock)
reported = False
if not force and not self._is_over_reporting_interval(now):
Expand All @@ -92,8 +92,8 @@ def flush(self, force=False):
self._report_profile(now)
reported = True

if force or reported:
self._reset()
if force or (reset and reported):
self.reset()
return reported

def refresh_configuration(self):
Expand Down
50 changes: 40 additions & 10 deletions codeguru_profiler_agent/profiler_disabler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import time
import logging
from codeguru_profiler_agent.reporter.agent_configuration import AgentConfiguration
from codeguru_profiler_agent.utils.time import current_milli_time

logger = logging.getLogger(__name__)
CHECK_KILLSWITCH_FILE_INTERVAL_SECONDS = 60
Expand All @@ -20,13 +19,18 @@ def __init__(self, environment, clock=time.time):
self.killswitch = KillSwitch(environment['killswitch_filepath'], clock)
self.memory_limit_bytes = environment['memory_limit_bytes']

def should_stop_sampling(self, profile=None):
return (self.killswitch.is_killswitch_on()
or self.cpu_usage_check.is_sampling_cpu_usage_limit_reached(profile)
or self._is_memory_limit_reached(profile))

def should_stop_profiling(self, profile=None):
return (self.killswitch.is_killswitch_on()
or self.cpu_usage_check.is_cpu_usage_limit_reached(profile)
or profile is not None and self._is_memory_limit_reached(profile))
or self.cpu_usage_check.is_overall_cpu_usage_limit_reached(profile)
or self._is_memory_limit_reached(profile))

def _is_memory_limit_reached(self, profile):
return profile.get_memory_usage_bytes() > self.memory_limit_bytes
return False if profile is None else profile.get_memory_usage_bytes() > self.memory_limit_bytes


class CpuUsageCheck:
Expand All @@ -38,19 +42,45 @@ class CpuUsageCheck:
def __init__(self, timer):
self.timer = timer

def is_cpu_usage_limit_reached(self, profile=None):
def is_overall_cpu_usage_limit_reached(self, profile=None):
"""
This function carries out an overall cpu limit check that covers the cpu overhead caused for the full
sampling cycle: refresh config -> (sample -> aggregate) * n -> profile submission. We expect this function to
be called after profile submission.
"""
profiler_metric = self.timer.metrics.get("runProfiler")
if not profiler_metric or profiler_metric.counter < MINIMUM_MEASURES_IN_DURATION_METRICS:
if not profile or not profiler_metric or profiler_metric.counter < MINIMUM_MEASURES_IN_DURATION_METRICS:
return False

used_time_percentage = 100 * profiler_metric.total/(profile.get_active_millis_since_start()/1000)

cpu_limit_percentage = AgentConfiguration.get().cpu_limit_percentage

if used_time_percentage >= cpu_limit_percentage:
logger.debug(self.timer.metrics)
logger.info(
"Profiler overall cpu usage limit reached: {:.2f} % (limit: {:.2f} %), will stop CodeGuru Profiler."
.format(used_time_percentage, cpu_limit_percentage))
return True
else:
return False

def is_sampling_cpu_usage_limit_reached(self, profile=None):
sample_and_aggregate_metric = self.timer.metrics.get("sampleAndAggregate")
if not sample_and_aggregate_metric or \
sample_and_aggregate_metric.counter < MINIMUM_MEASURES_IN_DURATION_METRICS:
return False

sampling_interval_seconds = self._get_average_sampling_interval_seconds(profile)
used_time_percentage = 100 * profiler_metric.average() / sampling_interval_seconds
used_time_percentage = 100 * sample_and_aggregate_metric.average() / sampling_interval_seconds

cpu_limit_percentage = AgentConfiguration.get().cpu_limit_percentage

if used_time_percentage >= AgentConfiguration.get().cpu_limit_percentage:
if used_time_percentage >= cpu_limit_percentage:
logger.debug(self.timer.metrics)
logger.info(
"Profiler cpu usage limit reached: {:.2f} % (limit: {:.2f} %), will stop CodeGuru Profiler.".format(
used_time_percentage, AgentConfiguration.get().cpu_limit_percentage))
"Profiler sampling cpu usage limit reached: {:.2f} % (limit: {:.2f} %), will stop CodeGuru Profiler."
.format(used_time_percentage, cpu_limit_percentage))
return True
else:
return False
Expand Down
32 changes: 24 additions & 8 deletions codeguru_profiler_agent/profiler_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,27 +71,37 @@ def _profiling_command(self):
if self._first_execution:
self.collector.setup()
self._first_execution = False
return self._run_profiler()
sample_result = self._run_profiler()
if sample_result.success and sample_result.is_end_of_cycle:
if self.profiler_disabler.should_stop_profiling(profile=self.collector.profile):
return False
self.collector.reset()
return True
return sample_result.success
except:
logger.info("An unexpected issue caused the profiling command to terminate.", exc_info=True)
return False

@with_timer("runProfiler")
def _run_profiler(self):
if self.profiler_disabler.should_stop_profiling(self.collector.profile):
return False
if self.profiler_disabler.should_stop_sampling(self.collector.profile):
return RunProfilerStatus(success=False, is_end_of_cycle=False)

if not self.is_profiling_in_progress:
self._refresh_configuration()

# after the refresh we may be working on a profile
if self.is_profiling_in_progress:
if self.collector.flush():
if self.collector.flush(reset=False):
self.is_profiling_in_progress = False
return True
sample = self.sampler.sample()
self.collector.add(sample)
return True
return RunProfilerStatus(success=True, is_end_of_cycle=True)
self._sample_and_aggregate()
return RunProfilerStatus(success=True, is_end_of_cycle=False)

@with_timer("sampleAndAggregate")
def _sample_and_aggregate(self):
sample = self.sampler.sample()
self.collector.add(sample)

def is_running(self):
return self.scheduler.is_running()
Expand Down Expand Up @@ -125,3 +135,9 @@ def pause(self, block=False):
"""
self.scheduler.pause(block)
self.collector.profile.pause()


class RunProfilerStatus:
def __init__(self, success, is_end_of_cycle):
self.success = success
self.is_end_of_cycle = is_end_of_cycle
2 changes: 1 addition & 1 deletion test/acceptance/test_cpu_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ def test_profiler_terminates(self):
# With sampling_interval to be 0.01 seconds, having runProfiler as 0.5 seconds should breach
# the cpu limit. We need to sample 20 times before we check the CPU limit
for i in range(20):
self.timer.record('runProfiler', 0.5)
self.timer.record('sampleAndAggregate', 0.5)

assert wait_for(lambda: not self.profiler.is_running(), timeout_seconds=5)
130 changes: 93 additions & 37 deletions test/unit/test_profiler_disabler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ def set_agent_config(sampling_interval_seconds=1, cpu_limit_percentage=DEFAULT_C


def assert_config_sampling_interval_used(process_duration_check, profile):
assert process_duration_check.is_cpu_usage_limit_reached(profile)
assert process_duration_check.is_sampling_cpu_usage_limit_reached(profile)

set_agent_config(sampling_interval_seconds=42, cpu_limit_percentage=80)
assert not process_duration_check.is_cpu_usage_limit_reached(profile)
assert not process_duration_check.is_sampling_cpu_usage_limit_reached(profile)


class TestProfilerDisabler:
Expand Down Expand Up @@ -59,28 +59,57 @@ def test_it_sets_all_parameters(self):
assert AgentConfiguration.get().cpu_limit_percentage == DEFAULT_CPU_LIMIT_PERCENTAGE


class TestWhenAnyFails(TestProfilerDisabler):
@before
def before(self):
super().before()
self.profiler = Mock()
self.disabler.killswitch = Mock()
self.disabler.cpu_usage_check = Mock()
self.disabler._is_memory_limit_reached = Mock(return_value=False)
self.disabler.killswitch.is_killswitch_on = Mock(return_value=False)
self.disabler.killswitch.is_process_duration_limit_reached = Mock(return_value=False)
class TestShouldStopSampling:
class TestWhenAnyFails(TestProfilerDisabler):
@before
def before(self):
super().before()
self.disabler.killswitch = Mock()
self.disabler.cpu_usage_check = Mock()
self.disabler.cpu_usage_check.is_sampling_cpu_usage_limit_reached = Mock(return_value=False)
self.disabler._is_memory_limit_reached = Mock(return_value=False)
self.disabler.killswitch.is_killswitch_on = Mock(return_value=False)
self.disabler.killswitch.is_process_duration_limit_reached = Mock(return_value=False)
assert not self.disabler.should_stop_sampling()

def test_it_stops_profiling_if_killswitch_is_on(self):
self.disabler.killswitch.is_killswitch_on = Mock(return_value=True)
assert self.disabler.should_stop_sampling()

def test_it_stops_profiling_if_memory_limit_is_reached(self):
self.disabler._is_memory_limit_reached = Mock(return_value=True)
assert self.disabler.should_stop_sampling()

def test_it_stops_profiling_if_process_duration_is_reached(self):
self.disabler.cpu_usage_check.is_sampling_cpu_usage_limit_reached = Mock(return_value=True)
assert self.disabler.should_stop_sampling()


class TestShouldStopProfiling:
class TestWhenAnyFails(TestProfilerDisabler):
@before
def before(self):
super().before()
self.profiler = Mock()
self.disabler.killswitch = Mock()
self.disabler.cpu_usage_check = Mock()
self.disabler.cpu_usage_check.is_overall_cpu_usage_limit_reached = Mock(return_value=False)
self.disabler._is_memory_limit_reached = Mock(return_value=False)
self.disabler.killswitch.is_killswitch_on = Mock(return_value=False)
self.disabler.killswitch.is_process_duration_limit_reached = Mock(return_value=False)
assert not self.disabler.should_stop_profiling()

def test_it_stops_profiling_if_killswitch_is_on(self):
self.disabler.killswitch.is_killswitch_on = Mock(return_value=True)
assert self.disabler.should_stop_profiling(self.profiler)
def test_it_stops_profiling_if_killswitch_is_on(self):
self.disabler.killswitch.is_killswitch_on = Mock(return_value=True)
assert self.disabler.should_stop_profiling()

def test_it_stops_profiling_if_memory_limit_is_reached(self):
self.disabler._is_memory_limit_reached = Mock(return_value=True)
assert self.disabler.should_stop_profiling(self.profiler)
def test_it_stops_profiling_if_memory_limit_is_reached(self):
self.disabler._is_memory_limit_reached = Mock(return_value=True)
assert self.disabler.should_stop_profiling()

def test_it_stops_profiling_if_process_duration_is_reached(self):
self.disabler.cpu_usage_check.is_cpu_usage_limit_reached = Mock(return_value=True)
assert self.disabler.should_stop_profiling(self.profiler)
def test_it_stops_profiling_if_process_duration_is_reached(self):
self.disabler.cpu_usage_check.is_overall_cpu_usage_limit_reached = Mock(return_value=True)
assert self.disabler.should_stop_profiling()


class TestKillSwitch:
Expand Down Expand Up @@ -145,17 +174,17 @@ def test_it_returns_false_after_a_minute(self):
assert not self.killswitch.is_killswitch_on()


class TestCpuUsageCheck:
class TestSamplingCpuUsageCheck:
def before(self):
self.timer = Timer()
self.profile = Mock(spec=Profile)
for i in range(20):
self.timer.record('runProfiler', 0.5)
self.timer.record('sampleAndAggregate', 0.5)
set_agent_config(sampling_interval_seconds=1, cpu_limit_percentage=10)
self.process_duration_check = CpuUsageCheck(self.timer)


class TestGetAverageSamplingIntervalSeconds(TestCpuUsageCheck):
class TestGetAverageSamplingIntervalSeconds(TestSamplingCpuUsageCheck):
@before
def before(self):
super().before()
Expand All @@ -176,7 +205,7 @@ def test_when_profiler_sample_count_less_than_min_samples_in_profile_it_returns_
assert CpuUsageCheck._get_average_sampling_interval_seconds(self.profile) == 23


class TestIsCpuUsageLimitReached(TestCpuUsageCheck):
class TestIsSamplingCpuUsageLimitReached(TestSamplingCpuUsageCheck):
@before
def before(self):
super().before()
Expand All @@ -187,43 +216,70 @@ def before(self):
yield

def test_it_calls_get_average_sampling_interval_with_profile(self):
self.process_duration_check.is_cpu_usage_limit_reached(self.profile)
self.process_duration_check.is_sampling_cpu_usage_limit_reached(self.profile)
self.get_average_sampling_interval_mock.assert_called_once_with(self.profile)

def test_when_average_duration_exceeds_limit_it_returns_true(self):
# timer: (0.5/4) * 100= 12.5%
assert self.process_duration_check.is_cpu_usage_limit_reached()
assert self.process_duration_check.is_sampling_cpu_usage_limit_reached()

def test_when_average_duragtion_is_below_limit_it_returns_false(self):
def test_when_average_duration_is_below_limit_it_returns_false(self):
# timer: (0.5/4) * 100= 12.5%
set_agent_config(cpu_limit_percentage=13)
assert not self.process_duration_check.is_cpu_usage_limit_reached()
assert not self.process_duration_check.is_sampling_cpu_usage_limit_reached()

def test_when_profile_is_none_it_calls_get_average_sampling_interval_without_profile(self):
self.process_duration_check.is_cpu_usage_limit_reached()
self.process_duration_check.is_sampling_cpu_usage_limit_reached()
self.get_average_sampling_interval_mock.assert_called_once_with(None)


class TestWhenTimerDoesNotHaveTheKey(TestCpuUsageCheck):
class TestIsOverallCpuUsageLimitReached():
@before
def before(self):
self.timer = Timer()
self.profile = Mock(spec=Profile)
for i in range(20):
self.timer.record('runProfiler', 0.5)
set_agent_config(cpu_limit_percentage=9)
self.process_duration_check = CpuUsageCheck(self.timer)
self.profile.get_active_millis_since_start = Mock(return_value=100*1000)

def test_when_average_duration_exceeds_limit_it_returns_true(self):
# timer: (0.5*20/100) * 100= 10%
assert self.process_duration_check.is_overall_cpu_usage_limit_reached(self.profile)

def test_when_average_duration_is_below_limit_it_returns_false(self):
# timer: (0.5*20/100) * 100= 10%
set_agent_config(cpu_limit_percentage=11)
assert not self.process_duration_check.is_overall_cpu_usage_limit_reached(self.profile)

def test_when_profile_is_none_it_returns_false(self):
assert not self.process_duration_check.is_overall_cpu_usage_limit_reached()


class TestWhenTimerDoesNotHaveTheKey(TestSamplingCpuUsageCheck):
@before
def before(self):
super().before()

def test_it_returns_false(self):
self.process_duration_check.timer = Timer()
assert not self.process_duration_check.is_cpu_usage_limit_reached()
assert not self.process_duration_check.is_sampling_cpu_usage_limit_reached()


class TestWhenTimerDoesNotHaveEnoughMeasures(TestCpuUsageCheck):
class TestWhenTimerDoesNotHaveEnoughMeasures(TestSamplingCpuUsageCheck):
@before
def before(self):
super().before()

def test_it_returns_false(self):
self.timer.reset()
for i in range(4):
self.timer.record('runProfiler', 0.5)
assert not self.process_duration_check.is_cpu_usage_limit_reached()
self.timer.record('sampleAndAggregate', 0.5)

def test_sampling_cpu_usage_limit_reached_returns_false(self):
assert not self.process_duration_check.is_sampling_cpu_usage_limit_reached()

def test_overall_cpu_usage_limit_reached_returns_false(self):
assert not self.process_duration_check.is_overall_cpu_usage_limit_reached()


class TestMemoryLimitCheck:
Expand Down
Loading