diff --git a/codeguru_profiler_agent/local_aggregator.py b/codeguru_profiler_agent/local_aggregator.py index fff2205..a04c304 100644 --- a/codeguru_profiler_agent/local_aggregator.py +++ b/codeguru_profiler_agent/local_aggregator.py @@ -44,7 +44,7 @@ def __init__(self, reporter, environment=dict()): self.memory_limit_bytes = environment["memory_limit_bytes"] self.last_report_attempted = current_milli_time(clock=self.clock) - self._reset() + self.reset() def add(self, sample): """ @@ -69,7 +69,7 @@ def _check_memory_limit(self): "Profiler memory usage limit has been reached") self.flush(force=True) - def _reset(self): + def reset(self): self.profile = self.profile_factory( profiling_group_name=self.profiling_group_name, sampling_interval_seconds=AgentConfiguration.get().sampling_interval.total_seconds(), @@ -80,7 +80,7 @@ def _reset(self): self.timer.reset() @with_timer("flush") - def flush(self, force=False): + def flush(self, force=False, reset=True): now = current_milli_time(clock=self.clock) reported = False if not force and not self._is_over_reporting_interval(now): @@ -92,8 +92,8 @@ def flush(self, force=False): self._report_profile(now) reported = True - if force or reported: - self._reset() + if force or (reset and reported): + self.reset() return reported def refresh_configuration(self): diff --git a/codeguru_profiler_agent/profiler_disabler.py b/codeguru_profiler_agent/profiler_disabler.py index c912e3d..906ec7c 100644 --- a/codeguru_profiler_agent/profiler_disabler.py +++ b/codeguru_profiler_agent/profiler_disabler.py @@ -2,7 +2,6 @@ import time import logging from codeguru_profiler_agent.reporter.agent_configuration import AgentConfiguration -from codeguru_profiler_agent.utils.time import current_milli_time logger = logging.getLogger(__name__) CHECK_KILLSWITCH_FILE_INTERVAL_SECONDS = 60 @@ -20,13 +19,18 @@ def __init__(self, environment, clock=time.time): self.killswitch = KillSwitch(environment['killswitch_filepath'], clock) self.memory_limit_bytes = environment['memory_limit_bytes'] + def should_stop_sampling(self, profile=None): + return (self.killswitch.is_killswitch_on() + or self.cpu_usage_check.is_sampling_cpu_usage_limit_reached(profile) + or self._is_memory_limit_reached(profile)) + def should_stop_profiling(self, profile=None): return (self.killswitch.is_killswitch_on() - or self.cpu_usage_check.is_cpu_usage_limit_reached(profile) - or profile is not None and self._is_memory_limit_reached(profile)) + or self.cpu_usage_check.is_overall_cpu_usage_limit_reached(profile) + or self._is_memory_limit_reached(profile)) def _is_memory_limit_reached(self, profile): - return profile.get_memory_usage_bytes() > self.memory_limit_bytes + return False if profile is None else profile.get_memory_usage_bytes() > self.memory_limit_bytes class CpuUsageCheck: @@ -38,19 +42,45 @@ class CpuUsageCheck: def __init__(self, timer): self.timer = timer - def is_cpu_usage_limit_reached(self, profile=None): + def is_overall_cpu_usage_limit_reached(self, profile=None): + """ + This function carries out an overall cpu limit check that covers the cpu overhead caused for the full + sampling cycle: refresh config -> (sample -> aggregate) * n -> profile submission. We expect this function to + be called after profile submission. + """ profiler_metric = self.timer.metrics.get("runProfiler") - if not profiler_metric or profiler_metric.counter < MINIMUM_MEASURES_IN_DURATION_METRICS: + if not profile or not profiler_metric or profiler_metric.counter < MINIMUM_MEASURES_IN_DURATION_METRICS: + return False + + used_time_percentage = 100 * profiler_metric.total/(profile.get_active_millis_since_start()/1000) + + cpu_limit_percentage = AgentConfiguration.get().cpu_limit_percentage + + if used_time_percentage >= cpu_limit_percentage: + logger.debug(self.timer.metrics) + logger.info( + "Profiler overall cpu usage limit reached: {:.2f} % (limit: {:.2f} %), will stop CodeGuru Profiler." + .format(used_time_percentage, cpu_limit_percentage)) + return True + else: + return False + + def is_sampling_cpu_usage_limit_reached(self, profile=None): + sample_and_aggregate_metric = self.timer.metrics.get("sampleAndAggregate") + if not sample_and_aggregate_metric or \ + sample_and_aggregate_metric.counter < MINIMUM_MEASURES_IN_DURATION_METRICS: return False sampling_interval_seconds = self._get_average_sampling_interval_seconds(profile) - used_time_percentage = 100 * profiler_metric.average() / sampling_interval_seconds + used_time_percentage = 100 * sample_and_aggregate_metric.average() / sampling_interval_seconds + + cpu_limit_percentage = AgentConfiguration.get().cpu_limit_percentage - if used_time_percentage >= AgentConfiguration.get().cpu_limit_percentage: + if used_time_percentage >= cpu_limit_percentage: logger.debug(self.timer.metrics) logger.info( - "Profiler cpu usage limit reached: {:.2f} % (limit: {:.2f} %), will stop CodeGuru Profiler.".format( - used_time_percentage, AgentConfiguration.get().cpu_limit_percentage)) + "Profiler sampling cpu usage limit reached: {:.2f} % (limit: {:.2f} %), will stop CodeGuru Profiler." + .format(used_time_percentage, cpu_limit_percentage)) return True else: return False diff --git a/codeguru_profiler_agent/profiler_runner.py b/codeguru_profiler_agent/profiler_runner.py index d2c433f..ae36290 100644 --- a/codeguru_profiler_agent/profiler_runner.py +++ b/codeguru_profiler_agent/profiler_runner.py @@ -71,27 +71,37 @@ def _profiling_command(self): if self._first_execution: self.collector.setup() self._first_execution = False - return self._run_profiler() + sample_result = self._run_profiler() + if sample_result.success and sample_result.is_end_of_cycle: + if self.profiler_disabler.should_stop_profiling(profile=self.collector.profile): + return False + self.collector.reset() + return True + return sample_result.success except: logger.info("An unexpected issue caused the profiling command to terminate.", exc_info=True) return False @with_timer("runProfiler") def _run_profiler(self): - if self.profiler_disabler.should_stop_profiling(self.collector.profile): - return False + if self.profiler_disabler.should_stop_sampling(self.collector.profile): + return RunProfilerStatus(success=False, is_end_of_cycle=False) if not self.is_profiling_in_progress: self._refresh_configuration() # after the refresh we may be working on a profile if self.is_profiling_in_progress: - if self.collector.flush(): + if self.collector.flush(reset=False): self.is_profiling_in_progress = False - return True - sample = self.sampler.sample() - self.collector.add(sample) - return True + return RunProfilerStatus(success=True, is_end_of_cycle=True) + self._sample_and_aggregate() + return RunProfilerStatus(success=True, is_end_of_cycle=False) + + @with_timer("sampleAndAggregate") + def _sample_and_aggregate(self): + sample = self.sampler.sample() + self.collector.add(sample) def is_running(self): return self.scheduler.is_running() @@ -125,3 +135,9 @@ def pause(self, block=False): """ self.scheduler.pause(block) self.collector.profile.pause() + + +class RunProfilerStatus: + def __init__(self, success, is_end_of_cycle): + self.success = success + self.is_end_of_cycle = is_end_of_cycle diff --git a/test/acceptance/test_cpu_limit.py b/test/acceptance/test_cpu_limit.py index e413a21..d73cf23 100644 --- a/test/acceptance/test_cpu_limit.py +++ b/test/acceptance/test_cpu_limit.py @@ -31,6 +31,6 @@ def test_profiler_terminates(self): # With sampling_interval to be 0.01 seconds, having runProfiler as 0.5 seconds should breach # the cpu limit. We need to sample 20 times before we check the CPU limit for i in range(20): - self.timer.record('runProfiler', 0.5) + self.timer.record('sampleAndAggregate', 0.5) assert wait_for(lambda: not self.profiler.is_running(), timeout_seconds=5) diff --git a/test/unit/test_profiler_disabler.py b/test/unit/test_profiler_disabler.py index 638aa68..4d3cacd 100644 --- a/test/unit/test_profiler_disabler.py +++ b/test/unit/test_profiler_disabler.py @@ -28,10 +28,10 @@ def set_agent_config(sampling_interval_seconds=1, cpu_limit_percentage=DEFAULT_C def assert_config_sampling_interval_used(process_duration_check, profile): - assert process_duration_check.is_cpu_usage_limit_reached(profile) + assert process_duration_check.is_sampling_cpu_usage_limit_reached(profile) set_agent_config(sampling_interval_seconds=42, cpu_limit_percentage=80) - assert not process_duration_check.is_cpu_usage_limit_reached(profile) + assert not process_duration_check.is_sampling_cpu_usage_limit_reached(profile) class TestProfilerDisabler: @@ -59,28 +59,57 @@ def test_it_sets_all_parameters(self): assert AgentConfiguration.get().cpu_limit_percentage == DEFAULT_CPU_LIMIT_PERCENTAGE -class TestWhenAnyFails(TestProfilerDisabler): - @before - def before(self): - super().before() - self.profiler = Mock() - self.disabler.killswitch = Mock() - self.disabler.cpu_usage_check = Mock() - self.disabler._is_memory_limit_reached = Mock(return_value=False) - self.disabler.killswitch.is_killswitch_on = Mock(return_value=False) - self.disabler.killswitch.is_process_duration_limit_reached = Mock(return_value=False) +class TestShouldStopSampling: + class TestWhenAnyFails(TestProfilerDisabler): + @before + def before(self): + super().before() + self.disabler.killswitch = Mock() + self.disabler.cpu_usage_check = Mock() + self.disabler.cpu_usage_check.is_sampling_cpu_usage_limit_reached = Mock(return_value=False) + self.disabler._is_memory_limit_reached = Mock(return_value=False) + self.disabler.killswitch.is_killswitch_on = Mock(return_value=False) + self.disabler.killswitch.is_process_duration_limit_reached = Mock(return_value=False) + assert not self.disabler.should_stop_sampling() + + def test_it_stops_profiling_if_killswitch_is_on(self): + self.disabler.killswitch.is_killswitch_on = Mock(return_value=True) + assert self.disabler.should_stop_sampling() + + def test_it_stops_profiling_if_memory_limit_is_reached(self): + self.disabler._is_memory_limit_reached = Mock(return_value=True) + assert self.disabler.should_stop_sampling() + + def test_it_stops_profiling_if_process_duration_is_reached(self): + self.disabler.cpu_usage_check.is_sampling_cpu_usage_limit_reached = Mock(return_value=True) + assert self.disabler.should_stop_sampling() + + +class TestShouldStopProfiling: + class TestWhenAnyFails(TestProfilerDisabler): + @before + def before(self): + super().before() + self.profiler = Mock() + self.disabler.killswitch = Mock() + self.disabler.cpu_usage_check = Mock() + self.disabler.cpu_usage_check.is_overall_cpu_usage_limit_reached = Mock(return_value=False) + self.disabler._is_memory_limit_reached = Mock(return_value=False) + self.disabler.killswitch.is_killswitch_on = Mock(return_value=False) + self.disabler.killswitch.is_process_duration_limit_reached = Mock(return_value=False) + assert not self.disabler.should_stop_profiling() - def test_it_stops_profiling_if_killswitch_is_on(self): - self.disabler.killswitch.is_killswitch_on = Mock(return_value=True) - assert self.disabler.should_stop_profiling(self.profiler) + def test_it_stops_profiling_if_killswitch_is_on(self): + self.disabler.killswitch.is_killswitch_on = Mock(return_value=True) + assert self.disabler.should_stop_profiling() - def test_it_stops_profiling_if_memory_limit_is_reached(self): - self.disabler._is_memory_limit_reached = Mock(return_value=True) - assert self.disabler.should_stop_profiling(self.profiler) + def test_it_stops_profiling_if_memory_limit_is_reached(self): + self.disabler._is_memory_limit_reached = Mock(return_value=True) + assert self.disabler.should_stop_profiling() - def test_it_stops_profiling_if_process_duration_is_reached(self): - self.disabler.cpu_usage_check.is_cpu_usage_limit_reached = Mock(return_value=True) - assert self.disabler.should_stop_profiling(self.profiler) + def test_it_stops_profiling_if_process_duration_is_reached(self): + self.disabler.cpu_usage_check.is_overall_cpu_usage_limit_reached = Mock(return_value=True) + assert self.disabler.should_stop_profiling() class TestKillSwitch: @@ -145,17 +174,17 @@ def test_it_returns_false_after_a_minute(self): assert not self.killswitch.is_killswitch_on() -class TestCpuUsageCheck: +class TestSamplingCpuUsageCheck: def before(self): self.timer = Timer() self.profile = Mock(spec=Profile) for i in range(20): - self.timer.record('runProfiler', 0.5) + self.timer.record('sampleAndAggregate', 0.5) set_agent_config(sampling_interval_seconds=1, cpu_limit_percentage=10) self.process_duration_check = CpuUsageCheck(self.timer) -class TestGetAverageSamplingIntervalSeconds(TestCpuUsageCheck): +class TestGetAverageSamplingIntervalSeconds(TestSamplingCpuUsageCheck): @before def before(self): super().before() @@ -176,7 +205,7 @@ def test_when_profiler_sample_count_less_than_min_samples_in_profile_it_returns_ assert CpuUsageCheck._get_average_sampling_interval_seconds(self.profile) == 23 -class TestIsCpuUsageLimitReached(TestCpuUsageCheck): +class TestIsSamplingCpuUsageLimitReached(TestSamplingCpuUsageCheck): @before def before(self): super().before() @@ -187,43 +216,70 @@ def before(self): yield def test_it_calls_get_average_sampling_interval_with_profile(self): - self.process_duration_check.is_cpu_usage_limit_reached(self.profile) + self.process_duration_check.is_sampling_cpu_usage_limit_reached(self.profile) self.get_average_sampling_interval_mock.assert_called_once_with(self.profile) def test_when_average_duration_exceeds_limit_it_returns_true(self): # timer: (0.5/4) * 100= 12.5% - assert self.process_duration_check.is_cpu_usage_limit_reached() + assert self.process_duration_check.is_sampling_cpu_usage_limit_reached() - def test_when_average_duragtion_is_below_limit_it_returns_false(self): + def test_when_average_duration_is_below_limit_it_returns_false(self): # timer: (0.5/4) * 100= 12.5% set_agent_config(cpu_limit_percentage=13) - assert not self.process_duration_check.is_cpu_usage_limit_reached() + assert not self.process_duration_check.is_sampling_cpu_usage_limit_reached() def test_when_profile_is_none_it_calls_get_average_sampling_interval_without_profile(self): - self.process_duration_check.is_cpu_usage_limit_reached() + self.process_duration_check.is_sampling_cpu_usage_limit_reached() self.get_average_sampling_interval_mock.assert_called_once_with(None) -class TestWhenTimerDoesNotHaveTheKey(TestCpuUsageCheck): +class TestIsOverallCpuUsageLimitReached(): + @before + def before(self): + self.timer = Timer() + self.profile = Mock(spec=Profile) + for i in range(20): + self.timer.record('runProfiler', 0.5) + set_agent_config(cpu_limit_percentage=9) + self.process_duration_check = CpuUsageCheck(self.timer) + self.profile.get_active_millis_since_start = Mock(return_value=100*1000) + + def test_when_average_duration_exceeds_limit_it_returns_true(self): + # timer: (0.5*20/100) * 100= 10% + assert self.process_duration_check.is_overall_cpu_usage_limit_reached(self.profile) + + def test_when_average_duration_is_below_limit_it_returns_false(self): + # timer: (0.5*20/100) * 100= 10% + set_agent_config(cpu_limit_percentage=11) + assert not self.process_duration_check.is_overall_cpu_usage_limit_reached(self.profile) + + def test_when_profile_is_none_it_returns_false(self): + assert not self.process_duration_check.is_overall_cpu_usage_limit_reached() + + +class TestWhenTimerDoesNotHaveTheKey(TestSamplingCpuUsageCheck): @before def before(self): super().before() def test_it_returns_false(self): self.process_duration_check.timer = Timer() - assert not self.process_duration_check.is_cpu_usage_limit_reached() + assert not self.process_duration_check.is_sampling_cpu_usage_limit_reached() -class TestWhenTimerDoesNotHaveEnoughMeasures(TestCpuUsageCheck): +class TestWhenTimerDoesNotHaveEnoughMeasures(TestSamplingCpuUsageCheck): @before def before(self): super().before() - - def test_it_returns_false(self): self.timer.reset() for i in range(4): - self.timer.record('runProfiler', 0.5) - assert not self.process_duration_check.is_cpu_usage_limit_reached() + self.timer.record('sampleAndAggregate', 0.5) + + def test_sampling_cpu_usage_limit_reached_returns_false(self): + assert not self.process_duration_check.is_sampling_cpu_usage_limit_reached() + + def test_overall_cpu_usage_limit_reached_returns_false(self): + assert not self.process_duration_check.is_overall_cpu_usage_limit_reached() class TestMemoryLimitCheck: diff --git a/test/unit/test_profiler_runner.py b/test/unit/test_profiler_runner.py index a40127f..946b843 100644 --- a/test/unit/test_profiler_runner.py +++ b/test/unit/test_profiler_runner.py @@ -17,6 +17,7 @@ def before(self): self.mock_collector = MagicMock(name="collector", spec=LocalAggregator) self.mock_disabler = MagicMock(name="profile", spec=ProfilerDisabler) self.mock_disabler.should_stop_profiling.return_value = False + self.mock_disabler.should_stop_sampling.return_value = False self.mock_sampler = MagicMock(name="sampler", spec=Sampler) self.environment = { @@ -80,11 +81,21 @@ def test_when_it_reports_it_does_not_sample(self): self.mock_collector.flush.assert_called_once() self.mock_collector.add.assert_not_called() - def test_when_disabler_say_to_stop(self): + def test_when_disabler_says_to_stop_profiling_it_does_not_start(self): self.mock_disabler.should_stop_profiling.return_value = True + + assert not self.profiler_runner.start() + + self.mock_collector.refresh_configuration.assert_not_called() + self.mock_collector.add.assert_not_called() + + def test_when_disabler_says_to_stop_sampling_it_does_not_do_anything(self): + self.mock_disabler.should_stop_sampling.return_value = True self.profiler_runner._profiling_command() + # As disabler.stop_sampling() returns True, _profiling_command() should not attempt to carry out any action. self.mock_collector.refresh_configuration.assert_not_called() + self.mock_sampler.sample.assert_not_called() self.mock_collector.add.assert_not_called() def test_when_orchestrator_says_no_to_profiler(self):