Skip to content

Commit

Permalink
Merge pull request #237 from andrewwhitehead/trace-http
Browse files Browse the repository at this point in the history
Add statistics on HTTP requests to timing output
  • Loading branch information
andrewwhitehead committed Oct 28, 2019
2 parents e675216 + 6fb6897 commit f5447cd
Show file tree
Hide file tree
Showing 12 changed files with 264 additions and 165 deletions.
9 changes: 7 additions & 2 deletions aries_cloudagent/admin/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ..stats import Collector
from ..task_processor import TaskProcessor
from ..transport.outbound.queue.base import BaseOutboundMessageQueue
from ..transport.stats import StatsTracer

from .base_server import BaseAdminServer
from .error import AdminSetupError
Expand Down Expand Up @@ -386,7 +387,11 @@ async def send_webhook(self, topic: str, payload: dict):

async def _process_webhooks(self):
"""Continuously poll webhook queue and dispatch to targets."""
self.webhook_session = ClientSession()
session_args = {}
collector: Collector = await self.context.inject(Collector, required=False)
if collector:
session_args["trace_configs"] = [StatsTracer(collector, "webhook-http:")]
self.webhook_session = ClientSession(**session_args)
self.webhook_processor = TaskProcessor(max_pending=5)
async for topic, payload in self.webhook_queue:
for queue in self.websocket_queues.values():
Expand Down Expand Up @@ -434,6 +439,6 @@ async def complete_webhooks(self):
"""Wait for all pending webhooks to be dispatched, used in testing."""
if self.webhook_queue:
await self.webhook_queue.join()
self.webhook_queue.stop()
self.webhook_queue.reset()
if self.webhook_processor:
await self.webhook_processor.wait_done()
140 changes: 112 additions & 28 deletions aries_cloudagent/admin/tests/test_admin_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,99 @@

from aiohttp.test_utils import AioHTTPTestCase, unittest_run_loop, unused_port
from aiohttp import web
from asynctest import TestCase as AsyncTestCase
from asynctest.mock import patch

from ...config.injection_context import InjectionContext
from ...config.provider import ClassProvider
from ...messaging.outbound_message import OutboundMessage
from ...transport.outbound.queue.base import BaseOutboundMessageQueue
from ...transport.outbound.queue.basic import BasicOutboundMessageQueue

from ..server import AdminServer


class TestAdminServerApp(AioHTTPTestCase):
async def outbound_message_router(self, *args):
pass
class TestAdminServerBasic(AsyncTestCase):
async def setUp(self):
self.message_results = []

def get_admin_server(self) -> AdminServer:
def get_admin_server(self, settings: dict = None) -> AdminServer:
context = InjectionContext()
context.injector.bind_provider(
BaseOutboundMessageQueue, ClassProvider(BasicOutboundMessageQueue)
)
context.settings["admin.admin_insecure_mode"] = True
server = AdminServer(
if settings:
context.update_settings(settings)
return AdminServer(
"0.0.0.0", unused_port(), context, self.outbound_message_router
)
return server

@unittest_run_loop
async def test_start_bad_settings(self):
server = self.get_admin_server()
server.context.settings["admin.admin_insecure_mode"] = None

try:
await server.start()
except AssertionError:
return True

raise Exception
async def outbound_message_router(self, *args):
self.message_results.append(args)

@unittest_run_loop
async def test_start_stop(self):
server = self.get_admin_server()
with self.assertRaises(AssertionError):
await self.get_admin_server().start()

settings = {"admin.admin_insecure_mode": False}
with self.assertRaises(AssertionError):
await self.get_admin_server(settings).start()

settings = {
"admin.admin_insecure_mode": True,
"admin.admin_api_key": "test-api-key",
}
with self.assertRaises(AssertionError):
await self.get_admin_server(settings).start()

settings = {
"admin.admin_insecure_mode": False,
"admin.admin_api_key": "test-api-key",
}
server = self.get_admin_server(settings)
await server.start()
await server.stop()

async def test_responder_send(self):
message = OutboundMessage("{}")
admin_server = self.get_admin_server()
await admin_server.responder.send_outbound(message)
assert self.message_results == [(message,)]

@unittest_run_loop
async def test_responder_webhook(self):
with patch.object(AdminServer, "send_webhook", autospec=True) as sender:
admin_server = self.get_admin_server()
test_topic = "test_topic"
test_payload = {"test": "TEST"}
await admin_server.responder.send_webhook(test_topic, test_payload)
sender.assert_awaited_once_with(admin_server, test_topic, test_payload)


class TestAdminServerClient(AioHTTPTestCase):
async def setUpAsync(self):
self.message_results = []

async def get_application(self):
"""
Override the get_app method to return your application.
"""
return await self.get_admin_server().make_application()

async def outbound_message_router(self, *args):
self.message_results.append(args)

def get_admin_server(self) -> AdminServer:
context = InjectionContext()
context.injector.bind_provider(
BaseOutboundMessageQueue, ClassProvider(BasicOutboundMessageQueue)
)
context.settings["admin.admin_insecure_mode"] = True
server = AdminServer(
"0.0.0.0", unused_port(), context, self.outbound_message_router
)
return server

# the unittest_run_loop decorator can be used in tandem with
# the AioHTTPTestCase to simplify running
# tests that are asynchronous
Expand All @@ -62,7 +108,6 @@ async def test_swagger(self):
resp = await self.client.request("GET", "/api/doc")
assert resp.status == 200
text = await resp.text()
print(text)
assert "Swagger UI" in text

@unittest_run_loop
Expand All @@ -73,19 +118,50 @@ async def test_status(self):
resp = await self.client.request("POST", "/status/reset")
assert resp.status == 200

@unittest_run_loop
async def test_status(self):
resp = await self.client.request("GET", "/status")
result = await resp.json()
assert isinstance(result, dict)

@unittest_run_loop
async def test_websocket(self):
async with self.client.ws_connect("/ws") as ws:
result = await ws.receive_json()
assert result["topic"] == "settings"


class TestAdminServerSecure(AioHTTPTestCase):
TEST_API_KEY = "test-api-key"

async def get_application(self):
"""
Override the get_app method to return your application.
"""
return await self.get_admin_server().make_application()

async def outbound_message_router(self, *args):
self.message_results.append(args)

def get_admin_server(self) -> AdminServer:
context = InjectionContext()
context.injector.bind_provider(
BaseOutboundMessageQueue, ClassProvider(BasicOutboundMessageQueue)
)
context.settings["admin.admin_api_key"] = self.TEST_API_KEY
self.server = AdminServer(
"0.0.0.0", unused_port(), context, self.outbound_message_router
)
return self.server

@unittest_run_loop
async def test_status_insecure(self):
resp = await self.client.request("GET", "/status")
assert resp.status == 401

@unittest_run_loop
async def test_status_secure(self):
resp = await self.client.request(
"GET", "/status", headers={"x-api-key": self.TEST_API_KEY}
)
result = await resp.json()
assert isinstance(result, dict)


class TestAdminServerWebhook(AioHTTPTestCase):
async def setUpAsync(self):
self.hook_results = []
Expand All @@ -104,8 +180,9 @@ def get_admin_server(self) -> AdminServer:
context.injector.bind_provider(
BaseOutboundMessageQueue, ClassProvider(BasicOutboundMessageQueue)
)
context.settings["admin.admin_insecure_mode"] = True
server = AdminServer(
"localhost", unused_port(), context, self.outbound_message_router
"0.0.0.0", unused_port(), context, self.outbound_message_router
)
return server

Expand All @@ -121,9 +198,16 @@ async def get_application(self):
async def test_webhook(self):
server_addr = f"http://localhost:{self.server.port}"
admin_server = self.get_admin_server()
await admin_server.start()

admin_server.add_webhook_target(server_addr)
test_topic = "test_topic"
test_payload = {"test": "TEST"}
await admin_server.send_webhook(test_topic, test_payload)
await asyncio.wait_for(admin_server.complete_webhooks(), 5.0)
assert self.hook_results == [(test_topic, test_payload)]

admin_server.remove_webhook_target(server_addr)
assert admin_server.webhook_targets == {}

await admin_server.stop()
8 changes: 6 additions & 2 deletions aries_cloudagent/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,14 @@ async def setup(self):
self.logger.exception("Unable to register inbound transport")
raise

# Fetch stats collector, if any
collector = await context.inject(Collector, required=False)

# Register all outbound transports
outbound_queue = await context.inject(BaseOutboundMessageQueue)
self.outbound_transport_manager = OutboundTransportManager(outbound_queue)
self.outbound_transport_manager = OutboundTransportManager(
outbound_queue, collector
)
outbound_transports = context.settings.get("transport.outbound_configs") or []
for outbound_transport in outbound_transports:
try:
Expand Down Expand Up @@ -124,7 +129,6 @@ async def setup(self):
self.context = context
self.dispatcher = Dispatcher(self.context)

collector = await context.inject(Collector, required=False)
if collector:
# add stats to our own methods
collector.wrap(
Expand Down
26 changes: 18 additions & 8 deletions aries_cloudagent/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,33 @@ def __init__(self, collector: "Collector", groups: Sequence[str]):
"""Initialize the Timer instance."""
self.collector = collector
self.groups = groups
self.start = None
self.start_time = None

@classmethod
def now(cls):
"""Fetch a standard timer value."""
return time.perf_counter()

def start(self) -> "Timer":
"""Start the timer."""
self.start_time = self.now()
return self

def stop(self):
"""Stop the timer."""
if self.start_time:
dur = self.now() - self.start_time
for grp in self.groups:
self.collector.log(grp, dur)
self.start_time = None

def __enter__(self):
"""Enter the context manager."""
self.start = self.now()
return self
return self.start()

def __exit__(self, type, value, tb):
"""Exit the context manager."""
dur = self.now() - self.start
for grp in self.groups:
self.collector.log(grp, dur)
self.stop()


class Collector:
Expand Down Expand Up @@ -126,13 +136,13 @@ def wrap(
):
"""Wrap a method on a class or class instance."""
if not prop_name:
return
raise ValueError("missing prop_name")
if isinstance(prop_name, str):
method = getattr(obj, prop_name, None)
if method:
setattr(obj, prop_name, self(method, groups))
elif not ignore_missing:
raise KeyError(prop_name)
raise AttributeError(prop_name)
else:
for prop in prop_name:
self.wrap(obj, prop, groups)
Expand Down
7 changes: 7 additions & 0 deletions aries_cloudagent/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,15 @@ def test_wrap(self):
"TestStats.test_method_decorator.<locals>.TestClass.test_wrap",
}

with self.assertRaises(AttributeError):
stats.wrap(instance, "test_missing")

with self.assertRaises(ValueError):
stats.wrap(instance, "")

async def test_disable(self):
stats = Collector()
assert stats.enabled
stats.enabled = False

stats.log("test", 1.0)
Expand Down
12 changes: 12 additions & 0 deletions aries_cloudagent/transport/outbound/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,25 @@

from ...error import BaseError
from ...messaging.outbound_message import OutboundMessage
from ...stats import Collector


class BaseOutboundTransport(ABC):
"""Base outbound transport class."""

def __init__(self) -> None:
"""Initialize a `BaseOutboundTransport` instance."""
self._collector = None

@property
def collector(self) -> Collector:
"""Accessor for the stats collector instance."""
return self._collector

@collector.setter
def collector(self, coll: Collector):
"""Assign a new stats collector instance."""
self._collector = coll

async def __aenter__(self):
"""Async context manager enter."""
Expand Down
8 changes: 7 additions & 1 deletion aries_cloudagent/transport/outbound/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from aiohttp import ClientSession

from ...messaging.outbound_message import OutboundMessage
from ..stats import StatsTracer

from .base import BaseOutboundTransport

Expand All @@ -21,7 +22,12 @@ def __init__(self) -> None:

async def start(self):
"""Start the transport."""
self.client_session = ClientSession()
session_args = {}
if self.collector:
session_args["trace_configs"] = [
StatsTracer(self.collector, "outbound-http:")
]
self.client_session = ClientSession(**session_args)
return self

async def stop(self):
Expand Down

0 comments on commit f5447cd

Please sign in to comment.