Skip to content

Commit

Permalink
Add a RabbitMq producer and consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
sroze committed Nov 16, 2016
1 parent 6992861 commit c07b574
Show file tree
Hide file tree
Showing 9 changed files with 444 additions and 5 deletions.
55 changes: 55 additions & 0 deletions spec/Tolerance/Bridge/RabbitMqBundle/Tracer/TracedConsumerSpec.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?php

namespace spec\Tolerance\Bridge\RabbitMqBundle\Tracer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use PhpSpec\ObjectBehavior;
use Prophecy\Argument;
use Tolerance\Tracer\Span\Identifier;
use Tolerance\Tracer\Span\Span;
use Tolerance\Tracer\SpanFactory\Amqp\AmqpSpanFactory;
use Tolerance\Tracer\SpanStack\SpanStack;
use Tolerance\Tracer\Tracer;

class TracedConsumerSpec extends ObjectBehavior
{
function let(ConsumerInterface $decoratedConsumer, Tracer $tracer, SpanStack $spanStack, AmqpSpanFactory $amqpSpanFactory)
{
$amqpSpanFactory->fromConsumedMessage(Argument::type(AMQPMessage::class))->willReturn(new Span(
Identifier::fromString('1234'),
'name',
Identifier::fromString('1234')
));

$this->beConstructedWith($decoratedConsumer, $tracer, $spanStack, $amqpSpanFactory);
}

function it_is_a_consumer()
{
$this->shouldImplement(ConsumerInterface::class);
}

function it_returns_the_decorated_consumer_result(ConsumerInterface $decoratedConsumer)
{
$message = new AMQPMessage('');

$decoratedConsumer->execute($message)->shouldBeCalled()->willReturn(4);

$this->execute($message)->shouldReturn(4);
}

function it_traces_the_span(ConsumerInterface $decoratedConsumer, Tracer $tracer)
{
$tracer->trace(Argument::containing(Argument::type(Span::class)))->shouldBeCalled();

$this->execute(new AMQPMessage(''));
}

function it_adds_the_span_in_the_stack(ConsumerInterface $decoratedConsumer, SpanStack $spanStack)
{
$spanStack->push(Argument::type(Span::class))->shouldBeCalled();

$this->execute(new AMQPMessage(''));
}
}
49 changes: 49 additions & 0 deletions spec/Tolerance/Bridge/RabbitMqBundle/Tracer/TracedProducerSpec.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

namespace spec\Tolerance\Bridge\RabbitMqBundle\Tracer;

use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use PhpSpec\ObjectBehavior;
use Prophecy\Argument;
use Tolerance\Tracer\Span\Identifier;
use Tolerance\Tracer\Span\Span;
use Tolerance\Tracer\SpanFactory\Amqp\AmqpSpanFactory;
use Tolerance\Tracer\Tracer;

class TracedProducerSpec extends ObjectBehavior
{
function let(ProducerInterface $decoratedProducer, AmqpSpanFactory $amqpSpanFactory, Tracer $tracer)
{
$this->beConstructedWith($decoratedProducer, $amqpSpanFactory, $tracer);
}

function it_is_a_producer()
{
$this->shouldImplement(ProducerInterface::class);
}

function it_should_add_the_trace_headers(ProducerInterface $decoratedProducer, AmqpSpanFactory $amqpSpanFactory, Tracer $tracer)
{
$span = new Span(
Identifier::fromString('1234'),
'name',
Identifier::fromString('trace')
);

$amqpSpanFactory->fromProducedMessage(Argument::type(AMQPMessage::class))->willReturn($span);

$decoratedProducer->publish('', '', Argument::that(function(array $properties) {
if (!array_key_exists('application_headers', $properties)) {
return false;
}

return isset($properties['application_headers']['X-B3-SpanId']) &&
isset($properties['application_headers']['X-B3-TraceId']);
}))->shouldBeCalled();

$tracer->trace([$span])->shouldBeCalled();

$this->publish('', '');
}
}
51 changes: 51 additions & 0 deletions spec/Tolerance/Tracer/SpanFactory/Amqp/AmqpSpanFactorySpec.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php

namespace spec\Tolerance\Tracer\SpanFactory\Amqp;

use PhpAmqpLib\Message\AMQPMessage;
use PhpSpec\ObjectBehavior;
use Prophecy\Argument;
use Tolerance\Tracer\Clock\Clock;
use Tolerance\Tracer\EndpointResolver\EndpointResolver;
use Tolerance\Tracer\IdentifierGenerator\IdentifierGenerator;
use Tolerance\Tracer\Span\Identifier;
use Tolerance\Tracer\Span\Span;
use Tolerance\Tracer\SpanStack\SpanStack;

class AmqpSpanFactorySpec extends ObjectBehavior
{
function let(IdentifierGenerator $identifierGenerator, Clock $clock, SpanStack $spanStack, EndpointResolver $endpointResolver)
{
$identifierGenerator->generate()->willReturn(Identifier::fromString('1234'));

$this->beConstructedWith($identifierGenerator, $clock, $spanStack, $endpointResolver);
}

function it_generates_a_message_identifier_from_incoming_message_without_headers()
{
$span = $this->fromConsumedMessage(new AMQPMessage(''));
$span->shouldHaveAnIdentifier();
}

function it_uses_the_information_from_headers_if_found()
{
$span = $this->fromConsumedMessage(new AMQPMessage('', [
'application_headers' => [
'X-B3-TraceId' => '1234',
'X-B3-SpanId' => '9876',
]
]));

$span->getIdentifier()->shouldBeLike(Identifier::fromString('9876'));
$span->getTraceIdentifier()->shouldBeLike(Identifier::fromString('1234'));
}

public function getMatchers()
{
return [
'haveAnIdentifier' => function(Span $span) {
return null !== $span->getIdentifier();
},
];
}
}
68 changes: 68 additions & 0 deletions src/Tolerance/Bridge/RabbitMqBundle/Tracer/TracedConsumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?php

/*
* This file is part of the Tolerance package.
*
* (c) Samuel ROZE <samuel.roze@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Tolerance\Bridge\RabbitMqBundle\Tracer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Tolerance\Tracer\SpanFactory\Amqp\AmqpSpanFactory;
use Tolerance\Tracer\SpanStack\SpanStack;
use Tolerance\Tracer\Tracer;

final class TracedConsumer implements ConsumerInterface
{
/**
* @var ConsumerInterface
*/
private $decoratedConsumer;

/**
* @var Tracer
*/
private $tracer;

/**
* @var SpanStack
*/
private $spanStack;

/**
* @var AmqpSpanFactory
*/
private $amqpSpanFactory;

/**
* @param ConsumerInterface $decoratedConsumer
* @param Tracer $tracer
* @param SpanStack $spanStack
* @param AmqpSpanFactory $amqpSpanFactory
*/
public function __construct(ConsumerInterface $decoratedConsumer, Tracer $tracer, SpanStack $spanStack, AmqpSpanFactory $amqpSpanFactory)
{
$this->decoratedConsumer = $decoratedConsumer;
$this->tracer = $tracer;
$this->spanStack = $spanStack;
$this->amqpSpanFactory = $amqpSpanFactory;
}

/**
* {@inheritdoc}
*/
public function execute(AMQPMessage $msg)
{
$span = $this->amqpSpanFactory->fromConsumedMessage($msg);

$this->tracer->trace([$span]);
$this->spanStack->push($span);

return $this->decoratedConsumer->execute($msg);
}
}
71 changes: 71 additions & 0 deletions src/Tolerance/Bridge/RabbitMqBundle/Tracer/TracedProducer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?php

/*
* This file is part of the Tolerance package.
*
* (c) Samuel ROZE <samuel.roze@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Tolerance\Bridge\RabbitMqBundle\Tracer;

use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Tolerance\Tracer\SpanFactory\Amqp\AmqpSpanFactory;
use Tolerance\Tracer\Tracer;

final class TracedProducer implements ProducerInterface
{
/**
* @var ProducerInterface
*/
private $decoratedProducer;

/**
* @var AmqpSpanFactory
*/
private $amqpSpanFactory;

/**
* @var Tracer
*/
private $tracer;

/**
* @param ProducerInterface $decoratedProducer
* @param AmqpSpanFactory $amqpSpanFactory
* @param Tracer $tracer
*/
public function __construct(ProducerInterface $decoratedProducer, AmqpSpanFactory $amqpSpanFactory, Tracer $tracer)
{
$this->decoratedProducer = $decoratedProducer;
$this->amqpSpanFactory = $amqpSpanFactory;
$this->tracer = $tracer;
}

/**
* {@inheritdoc}
*/
public function publish($msgBody, $routingKey = '', $additionalProperties = array())
{
$message = new AMQPMessage((string) $msgBody, array_merge($additionalProperties, ['routing_key' => $routingKey]));
$span = $this->amqpSpanFactory->fromProducedMessage($message);

$additionalProperties = array_merge([
'application_headers' => [
'X-B3-SpanId' => (string) $span->getIdentifier(),
'X-B3-TraceId' => (string) $span->getTraceIdentifier(),
'X-B3-ParentSpanId' => (string) $span->getParentIdentifier(),
'X-B3-Flags' => $span->getDebug() ? '1' : '0',
],
], $additionalProperties);

$result = $this->decoratedProducer->publish($msgBody, $routingKey, $additionalProperties);

$this->tracer->trace([$span]);

return $result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public function __construct(ClientInterface $httpClient, $hostname, $port, $user
/**
* @param string $vhost
* @param string $name
* @param int $interval
* @param int $interval
*
* @return array
*/
Expand Down
2 changes: 1 addition & 1 deletion src/Tolerance/Operation/Runner/RetryOperationRunner.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public function run(Operation $operation)

/**
* @param Operation $operation
*
*
* @return mixed
*/
private function runOperation(Operation $operation)
Expand Down
Loading

0 comments on commit c07b574

Please sign in to comment.