diff --git a/aries_cloudagent/config/argparse.py b/aries_cloudagent/config/argparse.py index 028d2a7d35..0fce72c3bf 100644 --- a/aries_cloudagent/config/argparse.py +++ b/aries_cloudagent/config/argparse.py @@ -622,14 +622,17 @@ def get_settings(self, args: Namespace) -> dict: settings["trace.label"] = args.label else: settings["trace.label"] = "aca-py.agent" - if settings.get("trace.enabled"): + if settings.get("trace.enabled") or settings.get("trace.target"): + # make sure we can trace to the configured target + # (target can be set even if tracing is off) try: trace_event( settings, None, handler="ArgParse", - outcome="Successfully configured aca-py", - raise_errors=True + outcome="Successfully_configured_aca-py", + raise_errors=True, + force_trace=True ) except Exception as e: raise ArgsParseError( diff --git a/aries_cloudagent/messaging/agent_message.py b/aries_cloudagent/messaging/agent_message.py index c18a94df0b..e8e339fb5b 100644 --- a/aries_cloudagent/messaging/agent_message.py +++ b/aries_cloudagent/messaging/agent_message.py @@ -19,6 +19,10 @@ from .decorators.default import DecoratorSet from .decorators.signature_decorator import SignatureDecorator from .decorators.thread_decorator import ThreadDecorator +from .decorators.trace_decorator import ( + TraceDecorator, TraceReport, + TRACE_MESSAGE_TARGET, TRACE_LOG_TARGET +) from .models.base import ( BaseModel, BaseModelError, @@ -291,6 +295,84 @@ def assign_thread_id(self, thid: str, pthid: str = None): """ self._thread = ThreadDecorator(thid=thid, pthid=pthid) + @property + def _trace(self) -> TraceDecorator: + """ + Accessor for the message's trace decorator. + + Returns: + The TraceDecorator for this message + + """ + return self._decorators.get("trace") + + @_trace.setter + def _trace(self, val: Union[TraceDecorator, dict]): + """ + Setter for the message's trace decorator. + + Args: + val: TraceDecorator or dict to set as the trace + """ + self._decorators["trace"] = val + + def assign_trace_from(self, msg: "AgentMessage"): + """ + Copy trace information from a previous message. + + Args: + msg: The received message containing optional trace information + """ + if msg and msg._trace: + # ignore if not a valid type + if (isinstance(msg._trace, TraceDecorator) or + isinstance(msg._trace, dict)): + self._trace = msg._trace + + def assign_trace_decorator(self, context, trace): + """ + Copy trace from a json structure. + + Args: + trace: string containing trace json stucture + """ + if trace: + self.add_trace_decorator( + target=context.get("trace.target") if context else TRACE_LOG_TARGET, + full_thread=True, + ) + + def add_trace_decorator( + self, + target: str = TRACE_LOG_TARGET, + full_thread: bool = True + ): + """ + Create a new trace decorator. + + Args: + target: The trace target + full_thread: Full thread flag + """ + if self._trace: + # don't replace if there is already a trace decorator + # (potentially holding trace reports already) + self._trace._target = target + self._trace._full_thread = full_thread + else: + self._trace = TraceDecorator(target=target, full_thread=full_thread) + + def add_trace_report(self, val: Union[TraceReport, dict]): + """ + Append a new trace report. + + Args: + val: The trace target + """ + if not self._trace: + self.add_trace_decorator(target=TRACE_MESSAGE_TARGET, full_thread=True) + self._trace.append_trace_report(val) + class AgentMessageSchema(BaseModelSchema): """AgentMessage schema.""" diff --git a/aries_cloudagent/messaging/decorators/default.py b/aries_cloudagent/messaging/decorators/default.py index 2061032bf8..0d6d9f27c1 100644 --- a/aries_cloudagent/messaging/decorators/default.py +++ b/aries_cloudagent/messaging/decorators/default.py @@ -5,6 +5,7 @@ from .localization_decorator import LocalizationDecorator from .signature_decorator import SignatureDecorator from .thread_decorator import ThreadDecorator +from .trace_decorator import TraceDecorator from .timing_decorator import TimingDecorator from .transport_decorator import TransportDecorator @@ -12,6 +13,7 @@ "l10n": LocalizationDecorator, "sig": SignatureDecorator, "thread": ThreadDecorator, + "trace": TraceDecorator, "timing": TimingDecorator, "transport": TransportDecorator, } diff --git a/aries_cloudagent/messaging/decorators/tests/test_trace_decorator.py b/aries_cloudagent/messaging/decorators/tests/test_trace_decorator.py new file mode 100644 index 0000000000..b6591f5bd7 --- /dev/null +++ b/aries_cloudagent/messaging/decorators/tests/test_trace_decorator.py @@ -0,0 +1,158 @@ +from ..trace_decorator import TraceDecorator, TraceReport, TRACE_MESSAGE_TARGET + +from unittest import TestCase + + +class TestTraceDecorator(TestCase): + + target_api = "http://example.com/api/trace/" + full_thread_api = False + target_msg = TRACE_MESSAGE_TARGET + full_thread_msg = True + + msg_id = "msg-001" + thread_id = "thid-001" + traced_type = "msg-001/my_type" + timestamp = "123456789.123456" + str_time = "2018-03-27 18:23:45.123Z" + handler = "agent name" + ellapsed_milli = 27 + outcome = "OK ..." + + + def test_init_api(self): + + decorator = TraceDecorator( + target=self.target_api, + full_thread=self.full_thread_api, + ) + assert decorator.target == self.target_api + assert decorator.full_thread == self.full_thread_api + + def test_init_message(self): + + x_msg_id = self.msg_id + x_thread_id = self.thread_id + x_trace_report = TraceReport( + msg_id=x_msg_id, + thread_id=x_thread_id, + traced_type=self.traced_type, + timestamp=self.timestamp, + str_time=self.str_time, + handler=self.handler, + ellapsed_milli=self.ellapsed_milli, + outcome=self.outcome, + ) + + decorator = TraceDecorator( + target=self.target_msg, + full_thread=self.full_thread_msg, + trace_reports=[x_trace_report,], + ) + assert decorator.target == self.target_msg + assert decorator.full_thread == self.full_thread_msg + assert len(decorator.trace_reports) == 1 + trace_report = decorator.trace_reports[0] + assert trace_report.msg_id == self.msg_id + assert trace_report.thread_id == self.thread_id + assert trace_report.traced_type == self.traced_type + assert trace_report.timestamp == self.timestamp + assert trace_report.str_time == self.str_time + assert trace_report.handler == self.handler + assert trace_report.ellapsed_milli == self.ellapsed_milli + assert trace_report.outcome == self.outcome + + def test_serialize_load(self): + + x_msg_id = self.msg_id + x_thread_id = self.thread_id + x_trace_report = TraceReport( + msg_id=x_msg_id, + thread_id=x_thread_id, + traced_type=self.traced_type, + timestamp=self.timestamp, + str_time=self.str_time, + handler=self.handler, + ellapsed_milli=self.ellapsed_milli, + outcome=self.outcome, + ) + + decorator = TraceDecorator( + target=self.target_msg, + full_thread=self.full_thread_msg, + trace_reports=[x_trace_report,x_trace_report,], + ) + + dumped = decorator.serialize() + loaded = TraceDecorator.deserialize(dumped) + + assert loaded.target == decorator.target + assert loaded.full_thread == decorator.full_thread + assert len(loaded.trace_reports) == 2 + trace_report = loaded.trace_reports[0] + assert trace_report.msg_id == x_trace_report.msg_id + assert trace_report.thread_id == x_trace_report.thread_id + assert trace_report.traced_type == x_trace_report.traced_type + assert trace_report.timestamp == x_trace_report.timestamp + assert trace_report.str_time == x_trace_report.str_time + assert trace_report.handler == x_trace_report.handler + assert trace_report.ellapsed_milli == x_trace_report.ellapsed_milli + assert trace_report.outcome == x_trace_report.outcome + + def test_trace_reports(self): + decorator = TraceDecorator( + target=self.target_msg, + full_thread=self.full_thread_msg, + ) + assert len(decorator.trace_reports) == 0 + + x_msg_id = self.msg_id + x_thread_id = self.thread_id + x_trace_report = TraceReport( + msg_id=x_msg_id, + thread_id=x_thread_id, + traced_type=self.traced_type, + timestamp=self.timestamp, + str_time=self.str_time, + handler=self.handler, + ellapsed_milli=self.ellapsed_milli, + outcome=self.outcome, + ) + decorator.append_trace_report(x_trace_report) + assert len(decorator.trace_reports) == 1 + + y_msg_id = self.msg_id + y_thread_id = self.thread_id + y_trace_report = TraceReport( + msg_id=y_msg_id, + thread_id=y_thread_id, + traced_type=self.traced_type, + timestamp=self.timestamp, + str_time=self.str_time, + handler=self.handler, + ellapsed_milli=self.ellapsed_milli, + outcome=self.outcome, + ) + decorator.append_trace_report(y_trace_report) + assert len(decorator.trace_reports) == 2 + trace_report = decorator.trace_reports[1] + assert trace_report.msg_id == x_trace_report.msg_id + assert trace_report.thread_id == x_trace_report.thread_id + + z_msg_id = self.msg_id+"-z" + z_thread_id = self.thread_id+"-z" + z_trace_report = TraceReport( + msg_id=z_msg_id, + thread_id=z_thread_id, + traced_type=self.traced_type, + timestamp=self.timestamp, + str_time=self.str_time, + handler=self.handler, + ellapsed_milli=self.ellapsed_milli, + outcome=self.outcome, + ) + decorator.append_trace_report(z_trace_report) + assert len(decorator.trace_reports) == 3 + trace_report = decorator.trace_reports[2] + assert trace_report.msg_id == self.msg_id+"-z" + assert trace_report.thread_id == self.thread_id+"-z" diff --git a/aries_cloudagent/messaging/decorators/trace_decorator.py b/aries_cloudagent/messaging/decorators/trace_decorator.py new file mode 100644 index 0000000000..af36bf7970 --- /dev/null +++ b/aries_cloudagent/messaging/decorators/trace_decorator.py @@ -0,0 +1,315 @@ +""" +A message decorator for trace events. + +A trace decorator identifies a responsibility on the processor +to record information on message processing events. +""" + +from typing import Sequence + +from marshmallow import fields + +from ..models.base import BaseModel, BaseModelSchema +from ..valid import UUIDFour + + +TRACE_MESSAGE_TARGET = "message" +TRACE_LOG_TARGET = "log" + + +class TraceReport(BaseModel): + """Class representing a Trace Report.""" + + class Meta: + """TraceReport metadata.""" + + schema_class = "TraceReport" + + def __init__( + self, + *, + msg_id: str = None, + thread_id: str = None, + traced_type: str = None, + timestamp: str = None, + str_time: str = None, + handler: str = None, + ellapsed_milli: int = None, + outcome: str = None, + ): + """ + Initialize a TraceReport instance. + + Args: + msg_id: ... + thread_id: ... + traced_type: ... + timestamp: ... + str_time: ... + handler: ... + ellapsed_milli: ... + outcome: ... + """ + super(TraceReport, self).__init__() + self._msg_id = msg_id + self._thread_id = thread_id + self._traced_type = traced_type + self._timestamp = timestamp + self._str_time = str_time + self._handler = handler + self._ellapsed_milli = ellapsed_milli + self._outcome = outcome + + @property + def msg_id(self): + """ + Accessor for msg_id. + + Returns: + The msg_id + + """ + return self._msg_id + + @property + def thread_id(self): + """ + Accessor for thread_id. + + Returns: + The thread_id + + """ + return self._thread_id + + @property + def traced_type(self): + """ + Accessor for traced_type. + + Returns: + The sender traced_type + + """ + return self._traced_type + + @property + def timestamp(self): + """ + Accessor for timestamp. + + Returns: + The sender timestamp + + """ + return self._timestamp + + @property + def str_time(self): + """ + Accessor for str_time. + + Returns: + Formatted representation of the sender timestamp + + """ + return self._str_time + + @property + def handler(self): + """ + Accessor for handler. + + Returns: + The sender handler + + """ + return self._handler + + @property + def ellapsed_milli(self): + """ + Accessor for ellapsed_milli. + + Returns: + The sender ellapsed_milli + + """ + return self._ellapsed_milli + + @property + def outcome(self): + """ + Accessor for outcome. + + Returns: + The sender outcome + + """ + return self._outcome + + +class TraceDecorator(BaseModel): + """Class representing trace decorator.""" + + class Meta: + """TraceDecorator metadata.""" + + schema_class = "TraceDecoratorSchema" + + def __init__( + self, + *, + target: str = None, + full_thread: bool = True, + trace_reports: Sequence = None, + ): + """ + Initialize a TraceDecorator instance. + + Args: + target: The "target" can refer to a url (as above) or the term + "message", which is a request to append trace information + to the message itself. + full_thread: An optional flag to indicate tracing should be included + on all subsequent messages in the thread (on by default). + trace_reports: Trace reports contain information about a message + processing at a specific point in time, along with a timestamp. + Trace reports can be used to identify steps in the processing + of a message or thread, and support troubleshooting and + performance issues. + """ + super(TraceDecorator, self).__init__() + self._target = target + self._full_thread = full_thread + self._trace_reports = trace_reports and list(trace_reports) or None + + @property + def target(self): + """ + Accessor for trace target. + + Returns: + The target for tracing messages + + """ + return self._target + + @property + def full_thread(self): + """ + Accessor for full_thread flag. + + Returns: + The full_thread flag + + """ + return self._full_thread + + @property + def trace_reports(self): + """ + Set of trace reports for this message. + + Returns: + The trace reports that have been logged on this message/thread + so far. (Only for target="message".) + + """ + if not self._trace_reports: + return [] + return self._trace_reports + + def append_trace_report(self, trace_report: TraceReport): + """Append a trace report to this decorator.""" + if not self._trace_reports: + self._trace_reports = [] + self._trace_reports.append(trace_report) + + +class TraceReportSchema(BaseModelSchema): + """Trace report schema.""" + + class Meta: + """TraceReportSchema metadata.""" + + model_class = TraceReport + + msg_id = fields.Str( + required=True, + allow_none=False, + description="Message Id", + example=UUIDFour.EXAMPLE, # typically a UUID4 but not necessarily + ) + thread_id = fields.Str( + required=True, + allow_none=False, + description="Message Id", + example=UUIDFour.EXAMPLE, # typically a UUID4 but not necessarily + ) + traced_type = fields.Str( + required=False, + allow_none=True, + description="Type of traced message", + example="TODO", + ) + timestamp = fields.Str( + required=True, + allow_none=False, + description="Timestamp of traced event", + example="123456789.123456", + ) + str_time = fields.Str( + required=True, + allow_none=False, + description="Formatted timestamp of traced event", + example="2018-03-27 18:23:45.123Z", + ) + handler = fields.Str( + required=False, + allow_none=True, + description="Description of the message handler", + example="TODO", + ) + ellapsed_milli = fields.Int( + required=False, + allow_none=True, + description="Elapsed milliseconds processing time", + example=27, + ) + outcome = fields.Str( + required=False, + allow_none=True, + description="Outcome description", + example="TODO", + ) + + +class TraceDecoratorSchema(BaseModelSchema): + """Trace decorator schema used in serialization/deserialization.""" + + class Meta: + """TraceDecoratorSchema metadata.""" + + model_class = TraceDecorator + + target = fields.Str( + required=True, + allow_none=False, + description="Trace report target", + example="'http://example.com/tracer', or 'message'", + ) + full_thread = fields.Boolean( + required=False, + allow_none=True, + description="Parent thread identifier", + example="True", + ) + trace_reports = fields.List( + fields.Nested(TraceReportSchema), + required=False, + allow_none=True, + description=( + "The set of reports collected so far for this message or thread" + ), + ) diff --git a/aries_cloudagent/messaging/models/base_record.py b/aries_cloudagent/messaging/models/base_record.py index ed78fb564d..6ac5dff1ce 100644 --- a/aries_cloudagent/messaging/models/base_record.py +++ b/aries_cloudagent/messaging/models/base_record.py @@ -440,6 +440,30 @@ def __eq__(self, other: Any) -> bool: return False +class BaseExchangeRecord(BaseRecord): + """Represents a base record with event tracing capability.""" + + def __init__( + self, + id: str = None, + state: str = None, + *, + trace: bool = False, + **kwargs, + ): + """Initialize a new V10CredentialExchange.""" + super().__init__(id, state, **kwargs) + self.trace = trace + + def __eq__(self, other: Any) -> bool: + """Comparison between records.""" + if type(other) is type(self): + return (self.value == other.value + and self.tags == other.tags + and self.trace == other.trace) + return False + + class BaseRecordSchema(BaseModelSchema): """Schema to allow serialization/deserialization of base records.""" @@ -457,3 +481,18 @@ class Meta: description="Time of last record update", **INDY_ISO8601_DATETIME, ) + + +class BaseExchangeSchema(BaseRecordSchema): + """Base schema for exchange records.""" + + class Meta: + """BaseExchangeSchema metadata.""" + + model_class = BaseExchangeRecord + + trace = fields.Boolean( + description="Record trace information, based on agent configuration", + required=False, + default=False, + ) diff --git a/aries_cloudagent/messaging/tests/test_agent_message.py b/aries_cloudagent/messaging/tests/test_agent_message.py index e86f029af8..85536d7f75 100644 --- a/aries_cloudagent/messaging/tests/test_agent_message.py +++ b/aries_cloudagent/messaging/tests/test_agent_message.py @@ -1,8 +1,10 @@ from asynctest import TestCase as AsyncTestCase from marshmallow import fields +import json from ..agent_message import AgentMessage, AgentMessageSchema from ..decorators.signature_decorator import SignatureDecorator +from ..decorators.trace_decorator import TraceReport, TRACE_LOG_TARGET from ...wallet.basic import BasicWallet @@ -86,3 +88,63 @@ async def test_assign_thread(self): reply.assign_thread_from(msg) assert reply._thread_id == msg._thread_id assert reply._thread_id != reply._id + + async def test_add_tracing(self): + msg = BasicAgentMessage() + msg.add_trace_decorator() + tracer = msg._trace + assert tracer.target == TRACE_LOG_TARGET + assert tracer.full_thread == True + + trace_report = TraceReport( + msg_id=msg._id, + thread_id=msg._thread_id, + traced_type=msg._type, + timestamp="123456789.123456", + str_time="2019-01-01 12:34:56.7", + handler="function.START", + ellapsed_milli=27, + outcome="OK! ...", + ) + msg.add_trace_report(trace_report) + tracer = msg._trace + trace_reports = tracer.trace_reports + assert len(trace_reports) == 1 + msg_trace_report = trace_reports[0] + assert msg_trace_report.msg_id == msg._id + assert msg_trace_report.thread_id == msg._thread_id + assert msg_trace_report.handler == trace_report.handler + assert msg_trace_report.ellapsed_milli == trace_report.ellapsed_milli + assert msg_trace_report.traced_type == msg._type + assert msg_trace_report.outcome == trace_report.outcome + + msg2 = BasicAgentMessage() + msg2.assign_thread_from(msg) + msg2.assign_trace_from(msg) + tracer = msg2._trace + trace_reports = tracer.trace_reports + assert len(trace_reports) == 1 + + trace_report2 = TraceReport( + msg_id=msg2._id, + thread_id=msg2._thread_id, + traced_type=msg2._type, + timestamp="123456789.123456", + str_time="2019-01-01 12:34:56.7", + handler="function.END", + ellapsed_milli=72, + outcome="A OK! ...", + ) + msg2.add_trace_report(trace_report2) + tracer = msg2._trace + trace_reports = tracer.trace_reports + assert len(trace_reports) == 2 + msg_trace_report = trace_reports[1] + assert msg_trace_report.msg_id == msg2._id + assert msg_trace_report.thread_id == msg2._thread_id + assert msg_trace_report.handler == trace_report2.handler + assert msg_trace_report.ellapsed_milli == trace_report2.ellapsed_milli + assert msg_trace_report.traced_type == msg2._type + assert msg_trace_report.outcome == trace_report2.outcome + + print("tracer:", tracer.serialize()) diff --git a/aries_cloudagent/protocols/basicmessage/handlers/basicmessage_handler.py b/aries_cloudagent/protocols/basicmessage/handlers/basicmessage_handler.py index 21697bd0cf..bd44fad82f 100644 --- a/aries_cloudagent/protocols/basicmessage/handlers/basicmessage_handler.py +++ b/aries_cloudagent/protocols/basicmessage/handlers/basicmessage_handler.py @@ -51,6 +51,7 @@ async def handle(self, context: RequestContext, responder: BaseResponder): if reply: reply_msg = BasicMessage(content=reply) reply_msg.assign_thread_from(context.message) + reply_msg.assign_trace_from(context.message) if "l10n" in context.message._decorators: reply_msg._decorators["l10n"] = context.message._decorators["l10n"] await responder.send_reply(reply_msg) diff --git a/aries_cloudagent/protocols/connections/manager.py b/aries_cloudagent/protocols/connections/manager.py index 70d5657412..cdb363214a 100644 --- a/aries_cloudagent/protocols/connections/manager.py +++ b/aries_cloudagent/protocols/connections/manager.py @@ -463,6 +463,7 @@ async def create_response( ) # Assign thread information response.assign_thread_from(request) + response.assign_trace_from(request) # Sign connection field using the invitation key wallet: BaseWallet = await self.context.inject(BaseWallet) await response.sign_field("connection", connection.invitation_key, wallet) diff --git a/aries_cloudagent/protocols/credentials/routes.py b/aries_cloudagent/protocols/credentials/routes.py index 91352a5a21..ec38a8c6d3 100644 --- a/aries_cloudagent/protocols/credentials/routes.py +++ b/aries_cloudagent/protocols/credentials/routes.py @@ -167,7 +167,7 @@ async def credentials_get(request: web.BaseRequest): except WalletNotFoundError: raise web.HTTPNotFound() - return web.json_response(credential) + return web.json_response(json.loads(credential)) @docs(tags=["credentials"], summary="Remove a credential from the wallet by id") diff --git a/aries_cloudagent/protocols/discovery/handlers/query_handler.py b/aries_cloudagent/protocols/discovery/handlers/query_handler.py index 5e48f79c09..5bf4b9d746 100644 --- a/aries_cloudagent/protocols/discovery/handlers/query_handler.py +++ b/aries_cloudagent/protocols/discovery/handlers/query_handler.py @@ -20,4 +20,5 @@ async def handle(self, context: RequestContext, responder: BaseResponder): result = await registry.prepare_disclosed(context, protocols) reply = Disclose(protocols=result) reply.assign_thread_from(context.message) + reply.assign_trace_from(context.message) await responder.send_reply(reply) diff --git a/aries_cloudagent/protocols/introduction/demo_service.py b/aries_cloudagent/protocols/introduction/demo_service.py index 0d116af28e..00c4935ce7 100644 --- a/aries_cloudagent/protocols/introduction/demo_service.py +++ b/aries_cloudagent/protocols/introduction/demo_service.py @@ -100,6 +100,7 @@ async def return_invitation( invitation=invitation.invitation, message=invitation.message ) msg.assign_thread_from(invitation) + msg.assign_trace_from(invitation) value["state"] = "complete" await storage.update_record_value(row, json.dumps(value)) diff --git a/aries_cloudagent/protocols/introduction/handlers/invitation_request_handler.py b/aries_cloudagent/protocols/introduction/handlers/invitation_request_handler.py index 69f84701c4..dab13c3108 100644 --- a/aries_cloudagent/protocols/introduction/handlers/invitation_request_handler.py +++ b/aries_cloudagent/protocols/introduction/handlers/invitation_request_handler.py @@ -32,4 +32,5 @@ async def handle(self, context: RequestContext, responder: BaseResponder): _connection, invite = await connection_mgr.create_invitation() response = Invitation(invitation=invite) response.assign_thread_from(context.message) + response.assign_trace_from(context.message) await responder.send_reply(response) diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/manager.py b/aries_cloudagent/protocols/issue_credential/v1_0/manager.py index a39e33ee5e..3ddc0be089 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/manager.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/manager.py @@ -100,6 +100,7 @@ async def prepare_send( initiator=V10CredentialExchange.INITIATOR_SELF, role=V10CredentialExchange.ROLE_ISSUER, credential_proposal_dict=credential_proposal.serialize(), + trace=(credential_proposal._trace is not None) ) (credential_exchange, credential_offer) = await self.create_offer( credential_exchange_record=credential_exchange, @@ -121,6 +122,7 @@ async def create_proposal( schema_version: str = None, cred_def_id: str = None, issuer_did: str = None, + trace: bool = False, ) -> V10CredentialExchange: """ Create a credential proposal. @@ -154,6 +156,10 @@ async def create_proposal( cred_def_id=cred_def_id, issuer_did=issuer_did, ) + credential_proposal_message.assign_trace_decorator( + self.context.settings, + trace, + ) if auto_remove is None: auto_remove = not self.context.settings.get("preserve_exchange_records") @@ -166,6 +172,7 @@ async def create_proposal( credential_proposal_dict=credential_proposal_message.serialize(), auto_offer=auto_offer, auto_remove=auto_remove, + trace=trace ) await credential_exchange_record.save( self.context, reason="create credential proposal" @@ -198,6 +205,7 @@ async def receive_proposal(self) -> V10CredentialExchange: auto_issue=self.context.settings.get( "debug.auto_respond_credential_request" ), + trace=(credential_proposal_message._trace is not None) ) await credential_exchange_record.save( self.context, reason="receive credential proposal" @@ -223,6 +231,10 @@ async def create_offer( credential_proposal_message = CredentialProposal.deserialize( credential_exchange_record.credential_proposal_dict ) + credential_proposal_message.assign_trace_decorator( + self.context.settings, + credential_exchange_record.trace, + ) cred_def_id = await self._match_sent_cred_def_id( { t: getattr(credential_proposal_message, t) @@ -263,6 +275,10 @@ async def _create(cred_def_id): credential_offer_message._thread = { "thid": credential_exchange_record.thread_id } + credential_offer_message.assign_trace_decorator( + self.context.settings, + credential_exchange_record.trace, + ) credential_exchange_record.thread_id = credential_offer_message._thread_id credential_exchange_record.schema_id = credential_offer["schema_id"] @@ -321,6 +337,7 @@ async def receive_offer(self) -> V10CredentialExchange: initiator=V10CredentialExchange.INITIATOR_EXTERNAL, role=V10CredentialExchange.ROLE_HOLDER, credential_proposal_dict=credential_proposal_dict, + trace=(credential_offer_message._trace is not None) ) credential_exchange_record.credential_offer = indy_offer @@ -407,6 +424,10 @@ async def _create(): credential_request_message._thread = { "thid": credential_exchange_record.thread_id } + credential_request_message.assign_trace_decorator( + self.context.settings, + credential_exchange_record.trace, + ) credential_exchange_record.state = V10CredentialExchange.STATE_REQUEST_SENT await credential_exchange_record.save( @@ -544,6 +565,10 @@ async def issue_credential( ], ) credential_message._thread = {"thid": credential_exchange_record.thread_id} + credential_message.assign_trace_decorator( + self.context.settings, + credential_exchange_record.trace, + ) return (credential_exchange_record, credential_message) @@ -657,6 +682,10 @@ async def store_credential( credential_exchange_record.thread_id, credential_exchange_record.parent_thread_id, ) + credential_ack_message.assign_trace_decorator( + self.context.settings, + credential_exchange_record.trace, + ) if credential_exchange_record.auto_remove: # Delete the exchange record since we're done with it diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py b/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py index cf215d6b1b..dc5daf2791 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/models/credential_exchange.py @@ -6,11 +6,13 @@ from marshmallow.validate import OneOf from .....config.injection_context import InjectionContext -from .....messaging.models.base_record import BaseRecord, BaseRecordSchema +from .....messaging.models.base_record import ( + BaseExchangeRecord, BaseExchangeSchema +) from .....messaging.valid import INDY_CRED_DEF_ID, INDY_SCHEMA_ID, UUIDFour -class V10CredentialExchange(BaseRecord): +class V10CredentialExchange(BaseExchangeRecord): """Represents an Aries#0036 credential exchange.""" class Meta: @@ -63,10 +65,16 @@ def __init__( auto_issue: bool = False, auto_remove: bool = True, error_msg: str = None, + trace: bool = False, **kwargs, ): """Initialize a new V10CredentialExchange.""" - super().__init__(credential_exchange_id, state, **kwargs) + super().__init__( + credential_exchange_id, + state, + trace=trace, + **kwargs + ) self._id = credential_exchange_id self.connection_id = connection_id self.thread_id = thread_id @@ -89,6 +97,7 @@ def __init__( self.auto_issue = auto_issue self.auto_remove = auto_remove self.error_msg = error_msg + self.trace = trace @property def credential_exchange_id(self) -> str: @@ -121,6 +130,7 @@ def record_value(self) -> dict: "revocation_id", "role", "state", + "trace", ) } @@ -145,7 +155,7 @@ def __eq__(self, other: Any) -> bool: return super().__eq__(other) -class V10CredentialExchangeSchema(BaseRecordSchema): +class V10CredentialExchangeSchema(BaseExchangeSchema): """Schema to allow serialization/deserialization of credential exchange records.""" class Meta: diff --git a/aries_cloudagent/protocols/issue_credential/v1_0/routes.py b/aries_cloudagent/protocols/issue_credential/v1_0/routes.py index 0836d4ab68..555f45e8fe 100644 --- a/aries_cloudagent/protocols/issue_credential/v1_0/routes.py +++ b/aries_cloudagent/protocols/issue_credential/v1_0/routes.py @@ -34,7 +34,7 @@ V10CredentialExchangeSchema, ) -from ....utils.tracing import trace_event, get_timer +from ....utils.tracing import trace_event, get_timer, AdminAPIMessageTracingSchema class V10AttributeMimeTypesResultSchema(Schema): @@ -56,7 +56,7 @@ class V10CredentialStoreRequestSchema(Schema): credential_id = fields.Str(required=False) -class V10CredentialProposalRequestSchemaBase(Schema): +class V10CredentialProposalRequestSchemaBase(AdminAPIMessageTracingSchema): """Base class for request schema for sending credential proposal admin message.""" connection_id = fields.UUID( @@ -106,7 +106,7 @@ class V10CredentialProposalRequestMandSchema(V10CredentialProposalRequestSchemaB credential_proposal = fields.Nested(CredentialPreviewSchema, required=True) -class V10CredentialOfferRequestSchema(Schema): +class V10CredentialOfferRequestSchema(AdminAPIMessageTracingSchema): """Request schema for sending credential offer admin message.""" connection_id = fields.UUID( @@ -266,6 +266,7 @@ async def credential_exchange_send(request: web.BaseRequest): if not preview_spec: raise web.HTTPBadRequest(reason="credential_proposal must be provided") auto_remove = body.get("auto_remove") + trace_msg = body.get("trace") preview = CredentialPreview.deserialize(preview_spec) try: @@ -283,6 +284,10 @@ async def credential_exchange_send(request: web.BaseRequest): credential_proposal=preview, **{t: body.get(t) for t in CRED_DEF_TAGS if body.get(t)}, ) + credential_proposal.assign_trace_decorator( + context.settings, + trace_msg, + ) trace_event( context.settings, @@ -340,6 +345,7 @@ async def credential_exchange_send_proposal(request: web.BaseRequest): preview_spec = body.get("credential_proposal") preview = CredentialPreview.deserialize(preview_spec) if preview_spec else None auto_remove = body.get("auto_remove") + trace_msg = body.get("trace") try: connection_record = await ConnectionRecord.retrieve_by_id( @@ -358,6 +364,7 @@ async def credential_exchange_send_proposal(request: web.BaseRequest): comment=comment, credential_preview=preview, auto_remove=auto_remove, + trace=trace_msg, **{t: body.get(t) for t in CRED_DEF_TAGS if body.get(t)}, ) @@ -414,6 +421,7 @@ async def credential_exchange_send_free_offer(request: web.BaseRequest): auto_remove = body.get("auto_remove") comment = body.get("comment") preview_spec = body.get("credential_preview") + trace_msg = body.get("trace") if not cred_def_id: raise web.HTTPBadRequest(reason="cred_def_id is required") @@ -442,6 +450,10 @@ async def credential_exchange_send_free_offer(request: web.BaseRequest): credential_proposal=credential_preview, cred_def_id=cred_def_id, ) + credential_proposal.assign_trace_decorator( + context.settings, + trace_msg, + ) credential_proposal_dict = credential_proposal.serialize() else: credential_proposal_dict = None @@ -453,6 +465,7 @@ async def credential_exchange_send_free_offer(request: web.BaseRequest): credential_proposal_dict=credential_proposal_dict, auto_issue=auto_issue, auto_remove=auto_remove, + trace=trace_msg, ) credential_manager = CredentialManager(context) diff --git a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_ack_handler.py b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_ack_handler.py index 4b6fa30daa..633a3d55de 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_ack_handler.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_ack_handler.py @@ -10,6 +10,8 @@ from ..manager import PresentationManager from ..messages.presentation_ack import PresentationAck +from .....utils.tracing import trace_event, get_timer + class PresentationAckHandler(BaseHandler): """Message handler class for presentation acks.""" @@ -22,6 +24,8 @@ async def handle(self, context: RequestContext, responder: BaseResponder): context: request context responder: responder callback """ + r_time = get_timer() + self._logger.debug("PresentationAckHandler called with context %s", context) assert isinstance(context.message, PresentationAck) self._logger.info( @@ -34,3 +38,10 @@ async def handle(self, context: RequestContext, responder: BaseResponder): presentation_manager = PresentationManager(context) await presentation_manager.receive_presentation_ack() + + trace_event( + context.settings, + context.message, + outcome="PresentationAckHandler.handle.END", + perf_counter=r_time + ) diff --git a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_handler.py b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_handler.py index 7d477faab9..e62247be31 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_handler.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_handler.py @@ -9,6 +9,8 @@ from ..manager import PresentationManager from ..messages.presentation import Presentation +from .....utils.tracing import trace_event, get_timer + class PresentationHandler(BaseHandler): """Message handler class for presentations.""" @@ -22,6 +24,8 @@ async def handle(self, context: RequestContext, responder: BaseResponder): responder: responder callback """ + r_time = get_timer() + self._logger.debug("PresentationHandler called with context %s", context) assert isinstance(context.message, Presentation) self._logger.info( @@ -33,5 +37,19 @@ async def handle(self, context: RequestContext, responder: BaseResponder): presentation_exchange_record = await presentation_manager.receive_presentation() + r_time = trace_event( + context.settings, + context.message, + outcome="PresentationHandler.handle.END", + perf_counter=r_time + ) + if context.settings.get("debug.auto_verify_presentation"): await presentation_manager.verify_presentation(presentation_exchange_record) + + trace_event( + context.settings, + presentation_exchange_record, + outcome="PresentationHandler.handle.VERIFY", + perf_counter=r_time + ) diff --git a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_proposal_handler.py b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_proposal_handler.py index ef1fa2f4fe..b25978cd4b 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_proposal_handler.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_proposal_handler.py @@ -10,6 +10,8 @@ from ..manager import PresentationManager from ..messages.presentation_proposal import PresentationProposal +from .....utils.tracing import trace_event, get_timer + class PresentationProposalHandler(BaseHandler): """Message handler class for presentation proposals.""" @@ -23,6 +25,8 @@ async def handle(self, context: RequestContext, responder: BaseResponder): responder: responder callback """ + r_time = get_timer() + self._logger.debug( "PresentationProposalHandler called with context %s", context ) @@ -40,6 +44,13 @@ async def handle(self, context: RequestContext, responder: BaseResponder): presentation_manager = PresentationManager(context) presentation_exchange_record = await presentation_manager.receive_proposal() + r_time = trace_event( + context.settings, + context.message, + outcome="PresentationProposalHandler.handle.END", + perf_counter=r_time + ) + # If auto_respond_presentation_proposal is set, reply with proof req if context.settings.get("debug.auto_respond_presentation_proposal"): ( @@ -51,3 +62,10 @@ async def handle(self, context: RequestContext, responder: BaseResponder): ) await responder.send_reply(presentation_request_message) + + trace_event( + context.settings, + presentation_request_message, + outcome="PresentationProposalHandler.handle.PRESENT", + perf_counter=r_time + ) diff --git a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_request_handler.py b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_request_handler.py index 23b861743a..1f4bbf9731 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_request_handler.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/handlers/presentation_request_handler.py @@ -15,6 +15,8 @@ from ..models.presentation_exchange import V10PresentationExchange from ..util.indy import indy_proof_req_preview2indy_requested_creds +from .....utils.tracing import trace_event, get_timer + class PresentationRequestHandler(BaseHandler): """Message handler class for Aries#0037 v1.0 presentation requests.""" @@ -28,6 +30,8 @@ async def handle(self, context: RequestContext, responder: BaseResponder): responder: responder callback """ + r_time = get_timer() + self._logger.debug("PresentationRequestHandler called with context %s", context) assert isinstance(context.message, PresentationRequest) self._logger.info( @@ -62,6 +66,7 @@ async def handle(self, context: RequestContext, responder: BaseResponder): auto_present=context.settings.get( "debug.auto_respond_presentation_request" ), + trace=(context.message._trace is not None), ) presentation_exchange_record.presentation_request = indy_proof_request @@ -69,6 +74,13 @@ async def handle(self, context: RequestContext, responder: BaseResponder): presentation_exchange_record ) + r_time = trace_event( + context.settings, + context.message, + outcome="PresentationRequestHandler.handle.END", + perf_counter=r_time + ) + # If auto_present is enabled, respond immediately with presentation if presentation_exchange_record.auto_present: presentation_preview = None @@ -100,3 +112,10 @@ async def handle(self, context: RequestContext, responder: BaseResponder): ) await responder.send_reply(presentation_message) + + trace_event( + context.settings, + presentation_message, + outcome="PresentationRequestHandler.handle.PRESENT", + perf_counter=r_time + ) diff --git a/aries_cloudagent/protocols/present_proof/v1_0/manager.py b/aries_cloudagent/protocols/present_proof/v1_0/manager.py index 7ccae587c0..7bdbbec815 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/manager.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/manager.py @@ -80,6 +80,7 @@ async def create_exchange_for_proposal( state=V10PresentationExchange.STATE_PROPOSAL_SENT, presentation_proposal_dict=presentation_proposal_message.serialize(), auto_present=auto_present, + trace=(presentation_proposal_message._trace is not None), ) await presentation_exchange_record.save( self.context, reason="create presentation proposal" @@ -103,6 +104,7 @@ async def receive_proposal(self): role=V10PresentationExchange.ROLE_VERIFIER, state=V10PresentationExchange.STATE_PROPOSAL_RECEIVED, presentation_proposal_dict=presentation_proposal_message.serialize(), + trace=(presentation_proposal_message._trace is not None) ) await presentation_exchange_record.save( self.context, reason="receive presentation request" @@ -155,6 +157,10 @@ async def create_bound_request( presentation_request_message._thread = { "thid": presentation_exchange_record.thread_id } + presentation_request_message.assign_trace_decorator( + self.context.settings, + presentation_exchange_record.trace, + ) presentation_exchange_record.thread_id = presentation_request_message._thread_id presentation_exchange_record.state = V10PresentationExchange.STATE_REQUEST_SENT @@ -187,6 +193,7 @@ async def create_exchange_for_request( role=V10PresentationExchange.ROLE_VERIFIER, state=V10PresentationExchange.STATE_REQUEST_SENT, presentation_request=presentation_request_message.indy_proof_request(), + trace=(presentation_request_message._trace is not None) ) await presentation_exchange_record.save( self.context, reason="create (free) presentation request" @@ -420,6 +427,10 @@ async def create_presentation( ) presentation_message._thread = {"thid": presentation_exchange_record.thread_id} + presentation_message.assign_trace_decorator( + self.context.settings, + presentation_exchange_record.trace, + ) # save presentation exchange state presentation_exchange_record.state = ( @@ -594,6 +605,10 @@ async def send_presentation_ack( presentation_ack_message._thread = { "thid": presentation_exchange_record.thread_id } + presentation_ack_message.assign_trace_decorator( + self.context.settings, + presentation_exchange_record.trace, + ) await responder.send_reply(presentation_ack_message) else: diff --git a/aries_cloudagent/protocols/present_proof/v1_0/models/presentation_exchange.py b/aries_cloudagent/protocols/present_proof/v1_0/models/presentation_exchange.py index 811b965958..058bfd2ca2 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/models/presentation_exchange.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/models/presentation_exchange.py @@ -5,11 +5,13 @@ from marshmallow import fields from marshmallow.validate import OneOf -from .....messaging.models.base_record import BaseRecord, BaseRecordSchema +from .....messaging.models.base_record import ( + BaseExchangeRecord, BaseExchangeSchema +) from .....messaging.valid import UUIDFour -class V10PresentationExchange(BaseRecord): +class V10PresentationExchange(BaseExchangeRecord): """Represents an Aries#0037 v1.0 presentation exchange.""" class Meta: @@ -52,10 +54,16 @@ def __init__( verified: str = None, auto_present: bool = False, error_msg: str = None, + trace: bool = False, **kwargs ): """Initialize a new PresentationExchange.""" - super().__init__(presentation_exchange_id, state, **kwargs) + super().__init__( + presentation_exchange_id, + state, + trace=trace, + **kwargs + ) self.connection_id = connection_id self.thread_id = thread_id self.initiator = initiator @@ -67,6 +75,7 @@ def __init__( self.verified = verified self.auto_present = auto_present self.error_msg = error_msg + self.trace = trace @property def presentation_exchange_id(self) -> str: @@ -89,6 +98,7 @@ def record_value(self) -> dict: "auto_present", "error_msg", "verified", + "trace", ) } @@ -97,7 +107,7 @@ def __eq__(self, other: Any) -> bool: return super().__eq__(other) -class V10PresentationExchangeSchema(BaseRecordSchema): +class V10PresentationExchangeSchema(BaseExchangeSchema): """Schema for de/serialization of v1.0 presentation exchange records.""" class Meta: diff --git a/aries_cloudagent/protocols/present_proof/v1_0/models/tests/test_record.py b/aries_cloudagent/protocols/present_proof/v1_0/models/tests/test_record.py index 20fc822da2..96f952da1d 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/models/tests/test_record.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/models/tests/test_record.py @@ -33,4 +33,5 @@ def test_record(self): "auto_present": True, "error_msg": "error", "verified": False, + "trace": False, } diff --git a/aries_cloudagent/protocols/present_proof/v1_0/routes.py b/aries_cloudagent/protocols/present_proof/v1_0/routes.py index 7ed2355e1d..ede5bd57ca 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/routes.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/routes.py @@ -35,6 +35,8 @@ from .message_types import ATTACH_DECO_IDS, PRESENTATION_REQUEST +from ....utils.tracing import trace_event, get_timer, AdminAPIMessageTracingSchema + class V10PresentationExchangeListSchema(Schema): """Result schema for an Aries#0037 v1.0 presentation exchange query.""" @@ -45,7 +47,7 @@ class V10PresentationExchangeListSchema(Schema): ) -class V10PresentationProposalRequestSchema(Schema): +class V10PresentationProposalRequestSchema(AdminAPIMessageTracingSchema): """Request schema for sending a presentation proposal admin message.""" connection_id = fields.UUID( @@ -172,7 +174,7 @@ class IndyProofRequestSchema(Schema): ) -class V10PresentationRequestRequestSchema(Schema): +class V10PresentationRequestRequestSchema(AdminAPIMessageTracingSchema): """Request schema for sending a proof request.""" connection_id = fields.UUID( @@ -207,7 +209,7 @@ class IndyRequestedCredsRequestedPredSchema(Schema): ) -class V10PresentationRequestSchema(Schema): +class V10PresentationRequestSchema(AdminAPIMessageTracingSchema): """Request schema for sending a presentation.""" self_attested_attributes = fields.Dict( @@ -386,6 +388,8 @@ async def presentation_exchange_send_proposal(request: web.BaseRequest): The presentation exchange details """ + r_time = get_timer() + context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] @@ -409,6 +413,11 @@ async def presentation_exchange_send_proposal(request: web.BaseRequest): comment=comment, presentation_proposal=PresentationPreview.deserialize(presentation_preview), ) + trace_msg = body.get("trace") + presentation_proposal_message.assign_trace_decorator( + context.settings, + trace_msg, + ) auto_present = body.get( "auto_present", context.settings.get("debug.auto_respond_presentation_request") ) @@ -422,8 +431,16 @@ async def presentation_exchange_send_proposal(request: web.BaseRequest): presentation_proposal_message=presentation_proposal_message, auto_present=auto_present, ) + await outbound_handler(presentation_proposal_message, connection_id=connection_id) + trace_event( + context.settings, + presentation_proposal_message, + outcome="presentation_exchange_propose.END", + perf_counter=r_time + ) + return web.json_response(presentation_exchange_record.serialize()) @@ -449,6 +466,8 @@ async def presentation_exchange_create_request(request: web.BaseRequest): The presentation exchange details """ + r_time = get_timer() + context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] @@ -468,6 +487,11 @@ async def presentation_exchange_create_request(request: web.BaseRequest): ) ], ) + trace_msg = body.get("trace") + presentation_request_message.assign_trace_decorator( + context.settings, + trace_msg, + ) presentation_manager = PresentationManager(context) @@ -479,6 +503,13 @@ async def presentation_exchange_create_request(request: web.BaseRequest): await outbound_handler(presentation_request_message, connection_id=None) + trace_event( + context.settings, + presentation_request_message, + outcome="presentation_exchange_create_request.END", + perf_counter=r_time + ) + return web.json_response(presentation_exchange_record.serialize()) @@ -499,6 +530,8 @@ async def presentation_exchange_send_free_request(request: web.BaseRequest): The presentation exchange details """ + r_time = get_timer() + context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] @@ -529,6 +562,11 @@ async def presentation_exchange_send_free_request(request: web.BaseRequest): ) ], ) + trace_msg = body.get("trace") + presentation_request_message.assign_trace_decorator( + context.settings, + trace_msg, + ) presentation_manager = PresentationManager(context) @@ -541,6 +579,13 @@ async def presentation_exchange_send_free_request(request: web.BaseRequest): await outbound_handler(presentation_request_message, connection_id=connection_id) + trace_event( + context.settings, + presentation_request_message, + outcome="presentation_exchange_send_request.END", + perf_counter=r_time + ) + return web.json_response(presentation_exchange_record.serialize()) @@ -561,6 +606,8 @@ async def presentation_exchange_send_bound_request(request: web.BaseRequest): The presentation exchange details """ + r_time = get_timer() + context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] @@ -590,9 +637,21 @@ async def presentation_exchange_send_bound_request(request: web.BaseRequest): presentation_exchange_record, presentation_request_message, ) = await presentation_manager.create_bound_request(presentation_exchange_record) + trace_msg = body.get("trace") + presentation_request_message.assign_trace_decorator( + context.settings, + trace_msg, + ) await outbound_handler(presentation_request_message, connection_id=connection_id) + trace_event( + context.settings, + presentation_request_message, + outcome="presentation_exchange_send_request.END", + perf_counter=r_time + ) + return web.json_response(presentation_exchange_record.serialize()) @@ -610,6 +669,7 @@ async def presentation_exchange_send_presentation(request: web.BaseRequest): The presentation exchange details """ + r_time = get_timer() context = request.app["request_context"] outbound_handler = request.app["outbound_message_router"] @@ -649,8 +709,21 @@ async def presentation_exchange_send_presentation(request: web.BaseRequest): }, comment=body.get("comment"), ) + trace_msg = body.get("trace") + presentation_message.assign_trace_decorator( + context.settings, + trace_msg, + ) await outbound_handler(presentation_message, connection_id=connection_id) + + trace_event( + context.settings, + presentation_message, + outcome="presentation_exchange_send_request.END", + perf_counter=r_time + ) + return web.json_response(presentation_exchange_record.serialize()) @@ -667,6 +740,8 @@ async def presentation_exchange_verify_presentation(request: web.BaseRequest): The presentation exchange details """ + r_time = get_timer() + context = request.app["request_context"] presentation_exchange_id = request.match_info["pres_ex_id"] @@ -695,6 +770,13 @@ async def presentation_exchange_verify_presentation(request: web.BaseRequest): presentation_exchange_record ) + trace_event( + context.settings, + presentation_exchange_record, + outcome="presentation_exchange_verify.END", + perf_counter=r_time + ) + return web.json_response(presentation_exchange_record.serialize()) diff --git a/aries_cloudagent/protocols/present_proof/v1_0/tests/test_routes.py b/aries_cloudagent/protocols/present_proof/v1_0/tests/test_routes.py index 1114d6fead..7ce8ab6639 100644 --- a/aries_cloudagent/protocols/present_proof/v1_0/tests/test_routes.py +++ b/aries_cloudagent/protocols/present_proof/v1_0/tests/test_routes.py @@ -649,6 +649,7 @@ async def test_presentation_exchange_verify_presentation(self): return_value=mock_presentation_exchange ) mock_presentation_exchange.connection_id = "dummy" + mock_presentation_exchange.thread_id = "dummy" mock_presentation_exchange.serialize = async_mock.MagicMock() mock_presentation_exchange.serialize.return_value = {"hello": "world"} diff --git a/aries_cloudagent/protocols/trustping/handlers/ping_handler.py b/aries_cloudagent/protocols/trustping/handlers/ping_handler.py index fe00b4dd3d..585fef688c 100644 --- a/aries_cloudagent/protocols/trustping/handlers/ping_handler.py +++ b/aries_cloudagent/protocols/trustping/handlers/ping_handler.py @@ -35,6 +35,7 @@ async def handle(self, context: RequestContext, responder: BaseResponder): if context.message.response_requested: reply = PingResponse() reply.assign_thread_from(context.message) + reply.assign_trace_from(context.message) await responder.send_reply(reply) if context.settings.get("debug.monitor_ping"): diff --git a/aries_cloudagent/transport/outbound/manager.py b/aries_cloudagent/transport/outbound/manager.py index 2b5ef9ed5e..0cb9df3b3d 100644 --- a/aries_cloudagent/transport/outbound/manager.py +++ b/aries_cloudagent/transport/outbound/manager.py @@ -335,12 +335,20 @@ async def _process_loop(self): if deliver: queued.state = QueuedOutboundMessage.STATE_DELIVER - trace_event( + p_time = trace_event( self.context.settings, - queued.message, - outcome="OutboundTransportManager._process_loop.DELIVER", + queued.message if queued.message else queued.payload, + outcome="OutboundTransportManager.DELIVER.START." + + queued.endpoint, ) self.deliver_queued_message(queued) + trace_event( + self.context.settings, + queued.message if queued.message else queued.payload, + outcome="OutboundTransportManager.DELIVER.END." + + queued.endpoint, + perf_counter=p_time, + ) upd_buffer.append(queued) @@ -356,12 +364,18 @@ async def _process_loop(self): new_pending += 1 else: queued.state = QueuedOutboundMessage.STATE_ENCODE - trace_event( + p_time = trace_event( self.context.settings, - queued.message, - outcome="OutboundTransportManager._process_loop.ENCODE", + queued.message if queued.message else queued.payload, + outcome="OutboundTransportManager.ENCODE.START", ) self.encode_queued_message(queued) + trace_event( + self.context.settings, + queued.message if queued.message else queued.payload, + outcome="OutboundTransportManager.ENCODE.END", + perf_counter=p_time, + ) else: new_pending += 1 diff --git a/aries_cloudagent/utils/tests/test_tracing.py b/aries_cloudagent/utils/tests/test_tracing.py index ff7a2945c7..c4d918bf11 100644 --- a/aries_cloudagent/utils/tests/test_tracing.py +++ b/aries_cloudagent/utils/tests/test_tracing.py @@ -5,6 +5,7 @@ from ...transport.inbound.message import InboundMessage from ...transport.outbound.message import OutboundMessage from ...messaging.agent_message import AgentMessage +from ...messaging.decorators.trace_decorator import TRACE_MESSAGE_TARGET from ...protocols.trustping.messages.ping import Ping from .. import tracing as test_module @@ -70,3 +71,28 @@ async def test_post_event_with_error(self): assert False except requests.exceptions.ConnectionError as e: pass + + def test_post_msg_decorator_event(self): + message = Ping() + message._thread = {"thid": "dummy_thread_id_12345"} + assert message._trace is None + context = { + "trace.enabled": True, + "trace.target": TRACE_MESSAGE_TARGET, + "trace.tag": "acapy.trace", + } + test_module.trace_event( + context, + message, + handler="message_handler", + perf_counter=None, + outcome="processed OK", + ) + trace = message._trace + assert trace is not None + assert trace.target == TRACE_MESSAGE_TARGET + assert trace.full_thread == True + trace_reports = trace.trace_reports + assert len(trace_reports) == 1 + trace_report = trace_reports[0] + assert trace_report.thread_id == message._thread.thid diff --git a/aries_cloudagent/utils/tracing.py b/aries_cloudagent/utils/tracing.py index aa6719e27e..aeba80685a 100644 --- a/aries_cloudagent/utils/tracing.py +++ b/aries_cloudagent/utils/tracing.py @@ -6,20 +6,105 @@ import datetime import requests +from marshmallow import fields, Schema + from ..transport.inbound.message import InboundMessage from ..transport.outbound.message import OutboundMessage from ..messaging.agent_message import AgentMessage +from ..messaging.decorators.trace_decorator import ( + TraceReport, TRACE_MESSAGE_TARGET, TRACE_LOG_TARGET +) +from ..messaging.models.base_record import BaseExchangeRecord LOGGER = logging.getLogger(__name__) DT_FMT = '%Y-%m-%d %H:%M:%S.%f%z' +class AdminAPIMessageTracingSchema(Schema): + """ + Request/result schema including agent message tracing. + + This is to be used as a superclass for aca-py admin input/output + messages that need to support tracing. + """ + + trace = fields.Boolean( + description="Record trace information, based on agent configuration", + required=False, + default=False, + ) + + def get_timer() -> float: """Return a timer.""" return time.perf_counter() +def tracing_enabled(context, message) -> bool: + """Determine whether to log trace messages or not.""" + # check if tracing is explicitely on + if context.get("trace.enabled"): + return True + + if message: + if isinstance(message, AgentMessage): + # if there is a trace decorator on the messages then continue to trace + if message._trace: + return True + elif isinstance(message, BaseExchangeRecord): + if message.trace: + return True + elif isinstance(message, dict): + # if there is a trace decorator on the messages then continue to trace + if message.get("~trace"): + return True + if message.get("trace"): + return message.get("trace") + elif isinstance(message, str): + if "~trace" in message: + return True + if "trace" in message: + msg = json.loads(message) + return msg.get("trace") + elif isinstance(message, OutboundMessage): + if message.payload and isinstance(message.payload, AgentMessage): + if message.payload._trace: + return True + elif message.payload and isinstance(message.payload, dict): + if message.payload.get("~trace") or message.payload.get("trace"): + return True + elif message.payload and isinstance(message.payload, str): + if "~trace" in message.payload or "trace" in message.payload: + return True + + # default off + return False + + +def decode_inbound_message(message): + """Return bundled message if appropriate.""" + + if message and isinstance(message, OutboundMessage): + if message.payload and isinstance(message.payload, AgentMessage): + return message.payload + elif message.payload and isinstance(message.payload, dict): + return message.payload + elif message.payload and isinstance(message.payload, str): + try: + return json.loads(message.payload) + except Exception: + pass + elif message and isinstance(message, str): + try: + return json.loads(message) + except Exception: + pass + + # default is the provided message + return message + + def trace_event( context, message, @@ -39,13 +124,15 @@ def trace_event( ("log", "message" or an http endpoint) context["trace.tag"]: Tag to be included in trace output message: the current message, can be an AgentMessage, - InboundMessage or OutboundMessage + InboundMessage, OutboundMessage or Exchange record event: Dict that will be converted to json and posted to the target """ ret = time.perf_counter() - if force_trace or context.get("trace.enabled"): + if force_trace or tracing_enabled(context, message): + message = decode_inbound_message(message) + # build the event to log # TODO check instance type of message to determine how to # get message and thread id's @@ -58,43 +145,77 @@ def trace_event( thread_id = "" msg_type = "" if message and isinstance(message, AgentMessage): - msg_id = message._id - thread_id = message._thread.thid if message._thread else message._id - msg_type = message._type + msg_id = str(message._id) + if message._thread and message._thread.thid: + thread_id = str(message._thread.thid) + else: + thread_id = msg_id + msg_type = str(message._type) elif message and isinstance(message, InboundMessage): # TODO not sure if we can log an InboundMessage before it's "handled" - msg_id = message.session_id if message.session_id else "N/A" - thread_id = message.session_id if message.session_id else "N/A" - msg_type = "InboundMessage" + msg_id = str(message.session_id) if message.session_id else "N/A" + thread_id = str(message.session_id) if message.session_id else "N/A" + msg_type = str(message.__class__.__name__) elif message and isinstance(message, OutboundMessage): - msg_id = message.reply_thread_id if message.reply_thread_id else "N/A" - thread_id = message.reply_thread_id if message.reply_thread_id else "N/A" - msg_type = "OutboundMessage" + msg_id = str(message.reply_thread_id) if message.reply_thread_id else "N/A" + thread_id = msg_id + msg_type = str(message.__class__.__name__) elif message and isinstance(message, dict): - msg_id = message["msg_id"] - thread_id = message["thread_id"] - msg_type = message["type"] + msg_id = str(message["@id"]) if message.get("@id") else "N/A" + if message.get("~thread") and message["~thread"].get("thid"): + thread_id = str(message["~thread"]["thid"]) + elif message.get("thread_id"): + thread_id = str(message["thread_id"]) + else: + thread_id = msg_id + if message.get("@type"): + msg_type = str(message["@type"]) + else: + if message.get("~thread"): + msg_type = "dict:Message" + elif message.get("thread_id"): + msg_type = "dict:Exchange" + else: + msg_type = "dict" + elif isinstance(message, BaseExchangeRecord): + msg_id = "N/A" + thread_id = str(message.thread_id) + msg_type = str(message.__class__.__name__) + else: + msg_id = "N/A" + thread_id = "N/A" + msg_type = str(message.__class__.__name__) ep_time = time.time() str_time = datetime.datetime.utcfromtimestamp(ep_time).strftime(DT_FMT) event = { - "message_id": msg_id, + "msg_id": msg_id, "thread_id": thread_id if thread_id else msg_id, "traced_type": msg_type, "timestamp": ep_time, "str_time": str_time, - "handler": handler, + "handler": str(handler), "ellapsed_milli": int(1000 * (ret - perf_counter)) if perf_counter else 0, - "outcome": outcome, + "outcome": str(outcome), } event_str = json.dumps(event) try: - # check our target - if context["trace.target"] == "message": - # TODO, just log for now - LOGGER.setLevel(logging.INFO) - LOGGER.info(" %s %s", context["trace.tag"], event_str) - elif context["trace.target"] == "log": + # check our target - if we get this far we know we are logging the event + if (context["trace.target"] == TRACE_MESSAGE_TARGET + and isinstance(message, AgentMessage)): + # add a trace report to the existing message + trace_report = TraceReport( + msg_id=event["msg_id"], + thread_id=event["thread_id"], + traced_type=event["traced_type"], + timestamp=event["timestamp"], + str_time=event["str_time"], + handler=event["handler"], + ellapsed_milli=event["ellapsed_milli"], + outcome=event["outcome"], + ) + message.add_trace_report(trace_report) + elif context["trace.target"] == TRACE_LOG_TARGET: # write to standard log file LOGGER.setLevel(logging.INFO) LOGGER.info(" %s %s", context["trace.tag"], event_str) @@ -110,9 +231,9 @@ def trace_event( if raise_errors: raise LOGGER.error( - "Error logging trace %s %s %s", - context["trace.target"], - context["trace.tag"], + "Error logging trace target: %s tag: %s event: %s", + context.get("trace.target"), + context.get("trace.tag"), event_str ) LOGGER.exception(e) diff --git a/demo/EFK-stack/docker-compose.yml b/demo/EFK-stack/docker-compose.yml index 8200a87b4f..9434ff5ada 100644 --- a/demo/EFK-stack/docker-compose.yml +++ b/demo/EFK-stack/docker-compose.yml @@ -24,6 +24,8 @@ services: image: docker.elastic.co/kibana/kibana:7.4.2 environment: ELASTICSEARCH_URL: http://elasticsearch:9200 + volumes: + - ./kibana/conf:/usr/share/kibana/config ports: - 5601:5601 networks: diff --git a/demo/EFK-stack/kibana/conf/kibana.yml b/demo/EFK-stack/kibana/conf/kibana.yml new file mode 100644 index 0000000000..cf19a76416 --- /dev/null +++ b/demo/EFK-stack/kibana/conf/kibana.yml @@ -0,0 +1,7 @@ +# Default Kibana configuration for docker target +server.name: kibana +server.host: "0" +elasticsearch.hosts: [ "http://elasticsearch:9200" ] +xpack.monitoring.ui.container.elasticsearch.enabled: true +xpack.reporting.enabled: true +xpack.reporting.csv.maxSizeBytes: 10485760 diff --git a/demo/EFK-stack/requirements.txt b/demo/EFK-stack/requirements.txt new file mode 100644 index 0000000000..78f00d2cf5 --- /dev/null +++ b/demo/EFK-stack/requirements.txt @@ -0,0 +1,5 @@ +# Elasticsearch 7.x +elasticsearch>=7.0.0,<8.0.0 + +# Elasticsearch 7.x +elasticsearch-dsl>=7.0.0,<8.0.0 diff --git a/demo/EFK-stack/search.py b/demo/EFK-stack/search.py new file mode 100644 index 0000000000..a4e3921e8e --- /dev/null +++ b/demo/EFK-stack/search.py @@ -0,0 +1,58 @@ + +import csv + +from elasticsearch_dsl import connections +from elasticsearch_dsl import Search + + +connections.create_connection(hosts=['localhost'], timeout=20) + +s = Search(index='fluentd-*') +# only return the selected fields +s = s.source(['str_time', 'timestamp', 'handler', 'ellapsed_milli', 'thread_id', 'msg_id', 'outcome', 'traced_type']) +s = s.sort("timestamp") +events = [] +for x in s.scan(): + events.append({ + "str_time": x.str_time, + "timestamp": x.timestamp, + "handler": x.handler, + "ellapsed_milli": x.ellapsed_milli, + "thread_id": x.thread_id, + "msg_id": x.msg_id, + "outcome": x.outcome, + "traced_type": x.traced_type + }) +sorted_events = sorted(events, key = lambda i: i['timestamp']) + +threads = {} +thread_count = 0 +agents = {} +with open('agent-events.csv', 'w', newline='') as csvfile: + spamwriter = csv.writer(csvfile) + i = 0 + spamwriter.writerow( + ["idx", "str_time", "timestamp", "handler", "ellapsed_milli", "thread_id", "msg_id", "outcome", "traced_type", "delta_agent", "delta_thread"] + ) + for x in sorted_events: + if x["handler"] in agents: + delta_agent = x["timestamp"] - agents[x["handler"]] + if delta_agent < 0: + print(i, delta_agent) + else: + delta_agent = 0 + agents[x["handler"]] = x["timestamp"] + if x["thread_id"] in threads: + delta_thread = x["timestamp"] - threads[x["thread_id"]] + if delta_thread < 0: + print(i, delta_thread) + else: + delta_thread = 0 + thread_count = thread_count + 1 + threads[x["thread_id"]] = x["timestamp"] + i = i + 1 + spamwriter.writerow( + [i, x["str_time"], x["timestamp"], x["handler"], x["ellapsed_milli"], x["thread_id"], x["msg_id"], x["outcome"], x["traced_type"], delta_agent, delta_thread] + ) + +print("Total threads=", thread_count) diff --git a/demo/run_demo b/demo/run_demo index 8c76b4f031..6a60bee033 100755 --- a/demo/run_demo +++ b/demo/run_demo @@ -8,6 +8,17 @@ AGENT="$1" shift ARGS="" +TRACE_ENABLED="" +TRACE_TAG=acapy.events +if ! [ -z "$TRACE_TARGET_URL" ]; then + TRACE_TARGET=http://${TRACE_TARGET_URL}/ +else + TRACE_TARGET=log +fi +if [ -z "$DOCKER_NET" ]; then + DOCKER_NET="bridge" +fi + for i in "$@" do if [ ! -z "$SKIP" ]; then @@ -28,15 +39,15 @@ do continue ;; --trace-log) + TRACE_ENABLED=1 TRACE_TARGET=log TRACE_TAG=acapy.events - SKIP=1 continue ;; --trace-http) - TRACE_TARGET=http://${TRACE_TARGET_URL} + TRACE_ENABLED=1 + TRACE_TARGET=http://${TRACE_TARGET_URL}/ TRACE_TAG=acapy.events - SKIP=1 continue ;; --debug-ptvsd) @@ -171,6 +182,7 @@ fi if ! [ -z "$TRACE_TARGET" ]; then DOCKER_ENV="${DOCKER_ENV} -e TRACE_TARGET=${TRACE_TARGET}" DOCKER_ENV="${DOCKER_ENV} -e TRACE_TAG=${TRACE_TAG}" + DOCKER_ENV="${DOCKER_ENV} -e TRACE_ENABLED=${TRACE_ENABLED}" fi if ! [ -z "$PUBLIC_TAILS_URL" ]; then DOCKER_ENV="${DOCKER_ENV} -e PUBLIC_TAILS_URL=${PUBLIC_TAILS_URL}" @@ -191,6 +203,7 @@ DOCKER=${DOCKER:-docker} echo "Starting $AGENT..." $DOCKER run --name $AGENT --rm -it ${DOCKER_OPTS} \ + --network=${DOCKER_NET} \ -p 0.0.0.0:$AGENT_PORT_RANGE:$AGENT_PORT_RANGE \ -v "/$(pwd)/../logs:/home/indy/logs" \ $DOCKER_ENV \ diff --git a/demo/runners/alice.py b/demo/runners/alice.py index d4453c1a48..9de513441d 100644 --- a/demo/runners/alice.py +++ b/demo/runners/alice.py @@ -29,7 +29,7 @@ def __init__( self, http_port: int, admin_port: int, no_auto: bool = False, **kwargs ): super().__init__( - "Alice Agent", + "Alice.Agent", http_port, admin_port, prefix="Alice", diff --git a/demo/runners/faber.py b/demo/runners/faber.py index 01c6c7d098..7858391fc7 100644 --- a/demo/runners/faber.py +++ b/demo/runners/faber.py @@ -36,7 +36,7 @@ def __init__( self, http_port: int, admin_port: int, no_auto: bool = False, **kwargs ): super().__init__( - "Faber Agent", + "Faber.Agent", http_port, admin_port, prefix="Faber", @@ -222,6 +222,7 @@ async def main( log_msg("Waiting for connection...") await agent.detect_connection() + exchange_tracing = False options = ( " (1) Issue Credential\n" " (2) Send Proof Request\n" @@ -233,11 +234,15 @@ async def main( " (5) Publish Revocations\n" " (6) Add Revocation Registry\n" ) - options += " (X) Exit?\n[1/2/3/{}X] ".format("4/5/6/" if revocation else "") + options += " (T) Toggle tracing on credential/proof exchange\n" + options += " (X) Exit?\n[1/2/3/{}T/X] ".format("4/5/6/" if revocation else "") async for option in prompt_loop(options): if option is None or option in "xX": break + elif option in "tT": + exchange_tracing = not exchange_tracing + log_msg(">>> Credential/Proof Exchange Tracing is {}".format("ON" if exchange_tracing else "OFF")) elif option == "1": log_status("#13 Issue credential offer to X") @@ -264,6 +269,7 @@ async def main( "auto_remove": False, "credential_preview": cred_preview, "revoc_reg_id": revocation_registry_id, + "trace": exchange_tracing, } await agent.admin_POST("/issue-credential/send-offer", offer_request) @@ -315,6 +321,7 @@ async def main( proof_request_web_request = { "connection_id": agent.connection_id, "proof_request": indy_proof_request, + "trace": exchange_tracing, } await agent.admin_POST( "/present-proof/send-request", proof_request_web_request diff --git a/demo/runners/support/agent.py b/demo/runners/support/agent.py index 09a4fab00b..3fd98c655c 100644 --- a/demo/runners/support/agent.py +++ b/demo/runners/support/agent.py @@ -34,7 +34,7 @@ TRACE_TARGET = os.getenv("TRACE_TARGET") TRACE_TAG = os.getenv("TRACE_TAG") -TRACE_ENABLED = True if TRACE_TARGET else False +TRACE_ENABLED = os.getenv("TRACE_ENABLED") DEFAULT_POSTGRES = bool(os.getenv("POSTGRES")) DEFAULT_INTERNAL_HOST = "127.0.0.1" @@ -266,6 +266,7 @@ def get_agent_args(self): ("--wallet-type", self.wallet_type), ("--wallet-name", self.wallet_name), ("--wallet-key", self.wallet_key), + "--preserve-exchange-records", ] if self.genesis_data: result.append(("--genesis-transactions", self.genesis_data)) @@ -296,6 +297,15 @@ def get_agent_args(self): ("--trace-label", self.label+".trace"), ] ) + else: + # set the tracing parameters but don't enable tracing + result.extend( + [ + ("--trace-target", self.trace_target if self.trace_target else "log"), + ("--trace-tag", self.trace_tag if self.trace_tag else "acapy.events"), + ("--trace-label", self.label+".trace"), + ] + ) if self.extra_args: result.extend(self.extra_args) diff --git a/scripts/run_tests_indy b/scripts/run_tests_indy index b6b783deb9..208166ebc2 100755 --- a/scripts/run_tests_indy +++ b/scripts/run_tests_indy @@ -12,6 +12,9 @@ if [ "$OSTYPE" == "msys" ]; then else DOCKER="docker" fi +if [ -z "$DOCKER_NET" ]; then + DOCKER_NET="bridge" +fi if [ -z "$POSTGRES_URL" ]; then if [ ! -z $(docker ps --filter name=indy-demo-postgres --quiet) ]; then @@ -24,6 +27,7 @@ if [ ! -z "$POSTGRES_URL" ]; then fi $DOCKER run --rm -ti --name aries-cloudagent-runner \ + --network=${DOCKER_NET} \ -v "/$(pwd)/../test-reports:/home/indy/src/app/test-reports" \ $DOCKER_ARGS \ aries-cloudagent-test "$@"