Skip to content

Commit

Permalink
feature #27321 [Messenger][Profiler] Trace middleware execution (ogiz…
Browse files Browse the repository at this point in the history
…anagi)

This PR was merged into the 4.2-dev branch.

Discussion
----------

[Messenger][Profiler] Trace middleware execution

| Q             | A
| ------------- | ---
| Branch?       | master <!-- see below -->
| Bug fix?      | no
| New feature?  | yes <!-- don't forget to update src/**/CHANGELOG.md files -->
| BC breaks?    | no     <!-- see https://symfony.com/bc -->
| Deprecations? | no <!-- don't forget to update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass?   | yes    <!-- please add some, will be required by reviewers -->
| Fixed tickets | part of #27262   <!-- #-prefixed issue number(s), if any -->
| License       | MIT
| Doc PR        | N/A

This is a start for #27262 with:
- traceable Messenger middlewares
- ~~a dedicated category for http kernel controller args resolvers~~ => See #28387

<img width="1071" alt="screenshot 2018-05-20 a 12 23 55" src="https://user-images.githubusercontent.com/2211145/40278071-98c04924-5c2a-11e8-9770-d78ac62d2c16.PNG">

Messenger middleware are traced, with bus info (if not shared accros buses):

<img width="1069" alt="screenshot 2018-05-20 a 12 28 15" src="https://user-images.githubusercontent.com/2211145/40278073-9e6979f4-5c2a-11e8-9657-ee3aa057a5be.PNG">

Another possibility is to use the middleware id instead of the class (with or without extra bus info?):

<img width="1074" alt="screenshot 2018-05-20 a 12 32 24" src="https://user-images.githubusercontent.com/2211145/40278074-9e85f43a-5c2a-11e8-9f13-ad41de342079.PNG">

(_of course, collected times are faked here using `usleep` in the traceable middleware_)

Commits
-------

e974f67 [Messenger][Profiler] Trace middleware execution
  • Loading branch information
fabpot committed Sep 8, 2018
2 parents 9af885c + e974f67 commit cd39b51
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 2 deletions.
Expand Up @@ -9,6 +9,7 @@
'event_listener': '#00B8F5',
'template': '#66CC00',
'doctrine': '#FF6633',
'messenger.middleware': '#BDB81E',
'controller.argument_value_resolver': '#8c5de6',
} %}
{% endif %}
Expand Down
Expand Up @@ -22,6 +22,7 @@
use Symfony\Component\Messenger\Handler\ChainHandler;
use Symfony\Component\Messenger\Handler\Locator\ContainerHandlerLocator;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
use Symfony\Component\Messenger\Middleware\Enhancers\TraceableMiddleware;
use Symfony\Component\Messenger\TraceableMessageBus;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
Expand All @@ -37,13 +38,15 @@ class MessengerPass implements CompilerPassInterface
private $busTag;
private $senderTag;
private $receiverTag;
private $debugStopwatchId;

public function __construct(string $handlerTag = 'messenger.message_handler', string $busTag = 'messenger.bus', string $senderTag = 'messenger.sender', string $receiverTag = 'messenger.receiver')
public function __construct(string $handlerTag = 'messenger.message_handler', string $busTag = 'messenger.bus', string $senderTag = 'messenger.sender', string $receiverTag = 'messenger.receiver', string $debugStopwatchId = 'debug.stopwatch')
{
$this->handlerTag = $handlerTag;
$this->busTag = $busTag;
$this->senderTag = $senderTag;
$this->receiverTag = $receiverTag;
$this->debugStopwatchId = $debugStopwatchId;
}

/**
Expand Down Expand Up @@ -303,6 +306,7 @@ private function registerBusToCollector(ContainerBuilder $container, string $bus

private function registerBusMiddleware(ContainerBuilder $container, string $busId, array $middlewareCollection)
{
$debug = $container->getParameter('kernel.debug') && $container->has($this->debugStopwatchId);
$middlewareReferences = array();
foreach ($middlewareCollection as $middlewareItem) {
$id = $middlewareItem['id'];
Expand All @@ -315,7 +319,7 @@ private function registerBusMiddleware(ContainerBuilder $container, string $busI
throw new RuntimeException(sprintf('Invalid middleware "%s": define such service to be able to use it.', $id));
}

if (($definition = $container->findDefinition($messengerMiddlewareId))->isAbstract()) {
if ($isDefinitionAbstract = ($definition = $container->findDefinition($messengerMiddlewareId))->isAbstract()) {
$childDefinition = new ChildDefinition($messengerMiddlewareId);
$count = \count($definition->getArguments());
foreach (array_values($arguments ?? array()) as $key => $argument) {
Expand All @@ -329,6 +333,21 @@ private function registerBusMiddleware(ContainerBuilder $container, string $busI
throw new RuntimeException(sprintf('Invalid middleware factory "%s": a middleware factory must be an abstract definition.', $id));
}

if ($debug) {
$container->register($debugMiddlewareId = '.messenger.debug.traced.'.$messengerMiddlewareId, TraceableMiddleware::class)
// Decorates with a high priority so it's applied the earliest:
->setDecoratedService($messengerMiddlewareId, null, 100)
->setArguments(array(
new Reference($debugMiddlewareId.'.inner'),
new Reference($this->debugStopwatchId),
// In case the definition isn't abstract,
// we cannot be sure the service instance is used by one bus only.
// So we only inject the bus name when the original definition is abstract.
$isDefinitionAbstract ? $busId : null,
))
;
}

$middlewareReferences[] = new Reference($messengerMiddlewareId);
}

Expand Down
@@ -0,0 +1,69 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Middleware\Enhancers;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\EnvelopeAwareInterface;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Stopwatch\Stopwatch;

/**
* Collects some data about a middleware.
*
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*/
class TraceableMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
{
private $inner;
private $stopwatch;
private $busName;
private $eventCategory;

public function __construct(MiddlewareInterface $inner, Stopwatch $stopwatch, string $busName = null, string $eventCategory = 'messenger.middleware')
{
$this->inner = $inner;
$this->stopwatch = $stopwatch;
$this->busName = $busName;
$this->eventCategory = $eventCategory;
}

/**
* @param Envelope $envelope
*/
public function handle($envelope, callable $next)
{
$class = \get_class($this->inner);
$eventName = 'c' === $class[0] && 0 === strpos($class, "class@anonymous\0") ? get_parent_class($class).'@anonymous' : $class;

if ($this->busName) {
$eventName .= " (bus: {$this->busName})";
}

$this->stopwatch->start($eventName, $this->eventCategory);

try {
$result = $this->inner->handle($envelope->getMessageFor($this->inner), function ($message) use ($next, $eventName) {
$this->stopwatch->stop($eventName);
$result = $next($message);
$this->stopwatch->start($eventName, $this->eventCategory);

return $result;
});
} finally {
if ($this->stopwatch->isStarted($eventName)) {
$this->stopwatch->stop($eventName);
}
}

return $result;
}
}
Expand Up @@ -41,6 +41,7 @@
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Stopwatch\Stopwatch;

class MessengerPassTest extends TestCase
{
Expand Down Expand Up @@ -561,6 +562,39 @@ public function testMiddlewareFactoryDefinitionMustBeAbstract()
(new MessengerPass())->process($container);
}

public function testDecoratesWithTraceableMiddlewareOnDebug()
{
$container = $this->getContainerBuilder();

$container->register($busId = 'message_bus', MessageBusInterface::class)->setArgument(0, array())->addTag('messenger.bus');
$container->register('abstract_middleware', UselessMiddleware::class)->setAbstract(true);
$container->register('concrete_middleware', UselessMiddleware::class);

$container->setParameter($middlewareParameter = $busId.'.middleware', array(
array('id' => 'abstract_middleware'),
array('id' => 'concrete_middleware'),
));

$container->setParameter('kernel.debug', true);
$container->register('debug.stopwatch', Stopwatch::class);

(new MessengerPass())->process($container);

$this->assertNotNull($concreteDef = $container->getDefinition('.messenger.debug.traced.concrete_middleware'));
$this->assertEquals(array(
new Reference('.messenger.debug.traced.concrete_middleware.inner'),
new Reference('debug.stopwatch'),
null,
), $concreteDef->getArguments());

$this->assertNotNull($abstractDef = $container->getDefinition(".messenger.debug.traced.$busId.middleware.abstract_middleware"));
$this->assertEquals(array(
new Reference(".messenger.debug.traced.$busId.middleware.abstract_middleware.inner"),
new Reference('debug.stopwatch'),
$busId,
), $abstractDef->getArguments());
}

public function testItRegistersTheDebugCommand()
{
$container = $this->getContainerBuilder($commandBusId = 'command_bus');
Expand Down
@@ -0,0 +1,104 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Middleware\Enhancers;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\Enhancers\TraceableMiddleware;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Stopwatch\Stopwatch;

/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*/
class TraceableMiddlewareTest extends TestCase
{
public function testHandle()
{
$busId = 'command_bus';
$envelope = Envelope::wrap($message = new DummyMessage('Hello'));

$middleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
$middleware->expects($this->once())
->method('handle')
->with($message, $this->anything())
->will($this->returnCallback(function ($message, callable $next) {
return $next($message);
}))
;

$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$next
->expects($this->once())
->method('__invoke')
->with($message)
->willReturn($expectedReturnedValue = 'Hello')
;

$stopwatch = $this->createMock(Stopwatch::class);
$stopwatch->expects($this->once())->method('isStarted')->willReturn(true);
$stopwatch->expects($this->exactly(2))
->method('start')
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'), 'messenger.middleware')
;
$stopwatch->expects($this->exactly(2))
->method('stop')
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'))
;

$traced = new TraceableMiddleware($middleware, $stopwatch, $busId);

$this->assertSame($expectedReturnedValue, $traced->handle($envelope, $next));
}

/**
* @expectedException \RuntimeException
* @expectedExceptionMessage Foo exception from next callable
*/
public function testHandleWithException()
{
$busId = 'command_bus';
$envelope = Envelope::wrap($message = new DummyMessage('Hello'));

$middleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
$middleware->expects($this->once())
->method('handle')
->with($message, $this->anything())
->will($this->returnCallback(function ($message, callable $next) {
return $next($message);
}))
;

$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$next
->expects($this->once())
->method('__invoke')
->willThrowException(new \RuntimeException('Foo exception from next callable'))
;

$stopwatch = $this->createMock(Stopwatch::class);
$stopwatch->expects($this->once())->method('isStarted')->willReturn(true);
// Start is only expected to be called once, as an exception is thrown by the next callable:
$stopwatch->expects($this->exactly(1))
->method('start')
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'), 'messenger.middleware')
;
$stopwatch->expects($this->exactly(2))
->method('stop')
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'))
;

$traced = new TraceableMiddleware($middleware, $stopwatch, $busId);
$traced->handle($envelope, $next);
}
}
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/composer.json
Expand Up @@ -26,6 +26,7 @@
"symfony/process": "~3.4|~4.0",
"symfony/property-access": "~3.4|~4.0",
"symfony/serializer": "~3.4|~4.0",
"symfony/stopwatch": "~3.4|~4.0",
"symfony/validator": "~3.4|~4.0",
"symfony/var-dumper": "~3.4|~4.0"
},
Expand Down

0 comments on commit cd39b51

Please sign in to comment.