Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions codeguru_profiler_agent/agent_metadata/agent_debug_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import logging
import os

from codeguru_profiler_agent.utils.synchronization import synchronized
from codeguru_profiler_agent.utils.time import to_iso

logger = logging.getLogger(__name__)


class ErrorsMetadata:
def __init__(self):
self.reset()

def reset(self):
"""
We want to differentiate API call errors more granularly. We want to gather ResourceNotFoundException errors
because we are going to get this exception with auto-create feature and want to monitor how many times
the agent is not able to create the PG and resulting in subsequent ResourceNotFoundException.
"""
self.errors_count = 0
self.sdk_client_errors = 0
self.configure_agent_errors = 0
self.configure_agent_rnfe_auto_create_enabled_errors = 0
self.create_profiling_group_errors = 0
self.post_agent_profile_errors = 0
self.post_agent_profile_rnfe_auto_create_enabled_errors = 0

def serialize_to_json(self):
"""
This needs to be compliant with errors count schema.
"""
return {
"errorsCount": self.errors_count,
"sdkClientErrors": self.sdk_client_errors,
"configureAgentErrors": self.configure_agent_errors,
"configureAgentRnfeAutoCreateEnabledErrors": self.configure_agent_rnfe_auto_create_enabled_errors,
"createProfilingGroupErrors": self.create_profiling_group_errors,
"postAgentProfileErrors": self.post_agent_profile_errors,
"postAgentProfileRnfeAutoCreateEnabledErrors": self.post_agent_profile_rnfe_auto_create_enabled_errors
}

@synchronized
def increment_sdk_error(self, error_type):
"""
ErrorsCount is the umbrella of all the kinds of error we want to capture. Currently we have only SdkClientErrors
in it. SdkClientErrors is comprised of different API level errors like ConfigureAgentErrors,
PostAgentProfileErrors, CreateProfilingGroupErrors.
:param error_type: The type of API level error that we want to capture.
"""
self.errors_count += 1
self.sdk_client_errors += 1

"""
Special handling for ResourceNotFoundException errors.
For example configureAgentRnfeAutoCreateEnabledErrors is also a configureAgentErrors.
"""
if error_type == "configureAgentErrors":
self.configure_agent_errors += 1
elif error_type == "configureAgentRnfeAutoCreateEnabledErrors":
self.configure_agent_errors += 1
self.configure_agent_rnfe_auto_create_enabled_errors += 1
elif error_type == "createProfilingGroupErrors":
self.create_profiling_group_errors += 1
elif error_type == "postAgentProfileErrors":
self.post_agent_profile_errors += 1
elif error_type == "postAgentProfileRnfeAutoCreateEnabledErrors":
self.post_agent_profile_errors += 1
self.post_agent_profile_rnfe_auto_create_enabled_errors += 1

def record_sdk_error(self, error_type):
self.increment_sdk_error(error_type)


class AgentDebugInfo:
def __init__(self, errors_metadata=None, agent_start_time=None, timer=None):
self.process_id = get_process_id()
self.errors_metadata = errors_metadata
self.agent_start_time = agent_start_time
self.timer = timer

def serialize_to_json(self):
"""
This needs to be compliant with agent debug info schema.
"""
json = {}

self.add_agent_start_time(json)
self.add_process_id(json)
self.add_errors_metadata(json)
self.add_generic_metrics(json)

return json

def add_agent_start_time(self, json):
if self.agent_start_time is not None:
json["agentStartTime"] = to_iso(self.agent_start_time)

def add_errors_metadata(self, json):
if self.errors_metadata is not None:
json["errorsCount"] = self.errors_metadata.serialize_to_json()

def add_process_id(self, json):
if self.process_id is not None:
json["processId"] = self.process_id

def add_generic_metrics(self, json):
if self.timer is not None and self.timer.metrics:
generic_metrics = {}

for metric, metric_value in self.timer.metrics.items():
generic_metrics[metric + "_timings_max"] = metric_value.max
generic_metrics[metric + "_timings_average"] = metric_value.average()

if generic_metrics:
json["genericMetrics"] = generic_metrics


def get_process_id():
try:
return os.getpid()
except Exception as e:
logger.info("Failed to get the process id", exc_info=True)
return None

8 changes: 7 additions & 1 deletion codeguru_profiler_agent/local_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
import datetime

from codeguru_profiler_agent.agent_metadata.agent_debug_info import AgentDebugInfo
from codeguru_profiler_agent.reporter.agent_configuration import AgentConfiguration
from codeguru_profiler_agent.metrics.with_timer import with_timer
from codeguru_profiler_agent.model.profile import Profile
Expand Down Expand Up @@ -30,20 +31,23 @@ def __init__(self, reporter, environment=dict()):
:param host_weight: (required inside environment) scale factor used to rescale the profile collected in this
host to make the profile representative of the whole fleet
:param timer: (required inside environment) timer to be used for metrics
:param errors_metadata: (required inside environment) metadata capturing errors in the current profile.
:param profile_factory: (inside environment) the factory to created profiler; default Profile.
:param clock: (inside environment) clock to be used; default is time.time
"""
self.reporter = reporter
self.profiling_group_name = environment["profiling_group_name"]
self.host_weight = environment["host_weight"]
self.timer = environment["timer"]
self.errors_metadata = environment["errors_metadata"]

self.profile_factory = environment.get("profile_factory") or Profile
self.clock = environment.get("clock") or time.time

self.profile = None
self.memory_limit_bytes = environment["memory_limit_bytes"]
self.last_report_attempted = current_milli_time(clock=self.clock)
self.agent_start_time = current_milli_time(clock=self.clock)

self.reset()

Expand Down Expand Up @@ -71,14 +75,16 @@ def _check_memory_limit(self):
self.flush(force=True)

def reset(self):
self.errors_metadata.reset()
self.timer.reset()
self.profile = self.profile_factory(
profiling_group_name=self.profiling_group_name,
sampling_interval_seconds=AgentConfiguration.get().sampling_interval.total_seconds(),
host_weight=self.host_weight,
start=current_milli_time(clock=self.clock),
agent_debug_info=AgentDebugInfo(self.errors_metadata, self.agent_start_time, self.timer),
clock=self.clock
)
self.timer.reset()

@with_timer("flush")
def flush(self, force=False, reset=True):
Expand Down
6 changes: 5 additions & 1 deletion codeguru_profiler_agent/model/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


class Profile:
def __init__(self, profiling_group_name, sampling_interval_seconds, host_weight, start, clock=time.time):
def __init__(self, profiling_group_name, sampling_interval_seconds, host_weight, start, agent_debug_info, clock=time.time):
"""
A profile holds the root node of the call graph and the metadata related to the profile
"""
Expand All @@ -35,6 +35,7 @@ def __init__(self, profiling_group_name, sampling_interval_seconds, host_weight,
self.host_weight = int(host_weight)
self._start_process_time = time.process_time() # provides process time in fractional seconds as float.
self.overhead_ms = 0
self.agent_debug_info = agent_debug_info

@property
def end(self):
Expand Down Expand Up @@ -97,6 +98,9 @@ def _insert_stack(self, stack, runnable_count_increase=1):
def get_memory_usage_bytes(self):
return self.memory_counter.get_memory_usage_bytes()

def serialize_agent_debug_info_to_json(self):
return self.agent_debug_info.serialize_to_json()

def pause(self):
if self.last_pause is not None:
# pause gets called when profile is paused
Expand Down
3 changes: 3 additions & 0 deletions codeguru_profiler_agent/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from datetime import timedelta
from random import SystemRandom
from types import MappingProxyType as UnmodifiableDict

from codeguru_profiler_agent.agent_metadata.agent_debug_info import ErrorsMetadata
from codeguru_profiler_agent.agent_metadata.agent_metadata import AgentMetadata
from codeguru_profiler_agent.profiler_disabler import ProfilerDisabler
from codeguru_profiler_agent.reporter.agent_configuration import AgentConfiguration, AgentConfigurationMerger
Expand Down Expand Up @@ -167,6 +169,7 @@ def _setup_final_environment(self, environment, environment_override):
frozenset({environment['profiler_thread_name']}.union(environment['excluded_threads']))
# TODO delay metadata lookup until we need it
environment['agent_metadata'] = environment.get('agent_metadata') or AgentMetadata()
environment['errors_metadata'] = environment.get('errors_metadata') or ErrorsMetadata()
environment['collector'] = environment.get('collector') or self._select_collector(environment)
environment["profiler_disabler"] = environment.get('profiler_disabler') or ProfilerDisabler(environment)
return UnmodifiableDict(environment)
Expand Down
6 changes: 5 additions & 1 deletion codeguru_profiler_agent/sdk_reporter/profile_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,15 @@ def encode_content(self):
"start": int(self._profile.start),
"end": int(self._profile.end),
"agentMetadata": self._encode_agent_metadata(),
"callgraph": self._encode_call_graph(self._profile.callgraph)
"callgraph": self._encode_call_graph(self._profile.callgraph),
"debugInfo": self._encode_debug_info()
}

return json.dumps(profile_in_map)

def _encode_debug_info(self):
return self._profile.serialize_agent_debug_info_to_json()

def _encode_agent_metadata(self):
profile_duration_seconds = self._profile.get_active_millis_since_start() / 1000.0
sample_weight = 1.0 if (profile_duration_seconds == 0) else self._profile.total_sample_count / profile_duration_seconds
Expand Down
17 changes: 16 additions & 1 deletion codeguru_profiler_agent/sdk_reporter/sdk_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
logger = logging.getLogger(__name__)
AWS_EXECUTION_ENV_KEY = "AWS_EXECUTION_ENV"


class SdkReporter(Reporter):
"""
Handles communication with the CodeGuru Profiler Service backend.
Encodes profiles using the ProfilerEncoder and reports them using the CodeGuru profiler SDK.
"""
is_create_pg_called_during_submit_profile = False

def __init__(self, environment):
"""
:param environment: dependency container dictionary for the current profiler.
Expand All @@ -35,6 +37,7 @@ def __init__(self, environment):
self.timer = environment.get("timer")
self.metadata = environment["agent_metadata"]
self.agent_config_merger = environment["agent_config_merger"]
self.errors_metadata = environment["errors_metadata"]

def _encode_profile(self, profile):
output_profile_stream = io.BytesIO()
Expand Down Expand Up @@ -76,18 +79,23 @@ def refresh_configuration(self):
# We handle service exceptions like this in boto3
# see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
if error.response['Error']['Code'] == 'ValidationException':
self.errors_metadata.record_sdk_error("configureAgentErrors")
self.agent_config_merger.disable_profiling()
self._log_request_failed(operation="configure_agent", exception=error)
if error.response['Error']['Code'] == 'ResourceNotFoundException':
elif error.response['Error']['Code'] == 'ResourceNotFoundException':
if self.should_auto_create_profiling_group():
self.errors_metadata.record_sdk_error("configureAgentRnfeAutoCreateEnabledErrors")
logger.info(
"Profiling group not found. Will try to create a profiling group "
"with name = {} and compute platform = {} and retry calling configure agent after 5 minutes. "
"Make sure that Lambda's execution role has AmazonCodeGuruProfilerAgentAccess policy added."
.format(self.profiling_group_name, 'AWSLambda'))
self.create_profiling_group()
else:
self.errors_metadata.record_sdk_error("configureAgentErrors")
self.agent_config_merger.disable_profiling()
else:
self.errors_metadata.record_sdk_error("configureAgentErrors")
except Exception as e:
self._log_request_failed(operation="configure_agent", exception=e)

Expand Down Expand Up @@ -117,12 +125,17 @@ def report(self, profile):
if error.response['Error']['Code'] == 'ResourceNotFoundException':
if self.should_auto_create_profiling_group():
self.__class__.is_create_pg_called_during_submit_profile = True
self.errors_metadata.record_sdk_error("postAgentProfileRnfeAutoCreateEnabledErrors")
logger.info(
"Profiling group not found. Will try to create a profiling group "
"with name = {} and compute platform = {} and retry reporting during next invocation. "
"Make sure that Lambda's execution role has AmazonCodeGuruProfilerAgentAccess policy added."
.format(self.profiling_group_name, 'AWSLambda'))
self.create_profiling_group()
else:
self.errors_metadata.record_sdk_error("postAgentProfileErrors")
else:
self.errors_metadata.record_sdk_error("postAgentProfileErrors")
return False
except Exception as e:
self._log_request_failed(operation="post_agent_profile", exception=e)
Expand All @@ -143,6 +156,8 @@ def create_profiling_group(self):
if error.response['Error']['Code'] == 'ConflictException':
logger.info("Profiling Group with name {} already exists. Please use a different name."
.format(self.profiling_group_name))
else:
self.errors_metadata.record_sdk_error("createProfilingGroupErrors")
except Exception as e:
self._log_request_failed(operation="create_profiling_group", exception=e)

Expand Down
15 changes: 15 additions & 0 deletions codeguru_profiler_agent/utils/synchronization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import functools
import threading


def synchronized(wrapped):
"""The missing @synchronized decorator

https://git.io/vydTA"""
_lock = threading.RLock()

@functools.wraps(wrapped)
def _wrapper(*args, **kwargs):
with _lock:
return wrapped(*args, **kwargs)
return _wrapper
5 changes: 4 additions & 1 deletion codeguru_profiler_agent/utils/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import time
from datetime import datetime


def to_iso(epoch_milli):
try:
return datetime.fromtimestamp(epoch_milli / 1000).isoformat()
return datetime.utcfromtimestamp(epoch_milli / 1000).isoformat(
timespec='milliseconds') + "Z" # ISO 8601 date-time format
except ValueError:
return str(epoch_milli)


def current_milli_time(clock=time.time):
return int(clock() * 1000)
8 changes: 6 additions & 2 deletions test/integration/test_live_backend_reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from datetime import timedelta

from codeguru_profiler_agent.agent_metadata.agent_debug_info import ErrorsMetadata, AgentDebugInfo
from test.help_utils import MY_PROFILING_GROUP_NAME_FOR_INTEG_TESTS
from test.pytestutils import before

Expand Down Expand Up @@ -32,8 +33,10 @@ def before(self):
stacks=[[Frame(MY_PROFILING_GROUP_NAME_FOR_INTEG_TESTS)]],
attempted_sample_threads_count=1,
seen_threads_count=1)
errors_metadata = ErrorsMetadata()

self.profile = Profile(MY_PROFILING_GROUP_NAME_FOR_INTEG_TESTS, 1.0, 1.0, five_minutes_ago_millis)
self.profile = Profile(MY_PROFILING_GROUP_NAME_FOR_INTEG_TESTS, 1.0, 1.0, five_minutes_ago_millis,
AgentDebugInfo(errors_metadata))
# FIXME: Remove adding the end time manually below after feature fully support
self.profile.end = now_millis
self.profile.add(sample)
Expand All @@ -47,7 +50,8 @@ def before(self):
"minimum_time_reporting": timedelta(minutes=6),
"max_stack_depth": 2345,
"cpu_limit_percentage": 29,
"agent_metadata": AgentMetadata(fleet_info=DefaultFleetInfo())
"agent_metadata": AgentMetadata(fleet_info=DefaultFleetInfo()),
"errors_metadata": errors_metadata
}
self.environment["codeguru_profiler_builder"] = CodeGuruClientBuilder(self.environment)
self.agent_config = AgentConfiguration(
Expand Down
Loading