Skip to content

Commit

Permalink
Merge pull request #9 from Happyr/sfevent
Browse files Browse the repository at this point in the history
Make sure you can extend the consumer by using symfony event dispatcher
  • Loading branch information
Nyholm committed Mar 31, 2016
2 parents 4326eef + a568c47 commit 2d4eb52
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 22 deletions.
60 changes: 60 additions & 0 deletions Consumer/ExtendableEnvelopeConsumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?php

namespace Happyr\Mq2phpBundle\Consumer;

use Happyr\Mq2phpBundle\Event\PreHandleMessage;
use SimpleBus\Serialization\Envelope\Serializer\MessageInEnvelopSerializer;
use SimpleBus\Message\Bus\MessageBus;
use SimpleBus\Asynchronous\Consumer\SerializedEnvelopeConsumer as SimpleBusSerializedEnvelopeConsumer;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

/**
* Use this consumer to easily implement an asynchronous message consumer.
*/
class ExtendableEnvelopeConsumer implements SimpleBusSerializedEnvelopeConsumer
{
/**
* @var MessageInEnvelopSerializer
*/
private $messageInEnvelopeSerializer;

/**
* @var MessageBus
*/
private $messageBus;

/**
* @var EventDispatcherInterface
*/
private $dispathcer;

/**
* @param MessageInEnvelopSerializer $messageInEnvelopeSerializer
* @param MessageBus $messageBus
* @param EventDispatcherInterface $dispathcer
*/
public function __construct(
MessageInEnvelopSerializer $messageInEnvelopeSerializer,
MessageBus $messageBus,
EventDispatcherInterface $dispathcer
) {
$this->messageInEnvelopeSerializer = $messageInEnvelopeSerializer;
$this->messageBus = $messageBus;
$this->dispathcer = $dispathcer;
}

/**
* @param string $serializedEnvelope
*/
public function consume($serializedEnvelope)
{
// Unserialize
$envelope = $this->messageInEnvelopeSerializer->unwrapAndDeserialize($serializedEnvelope);

// Tell the world
$this->dispathcer->dispatch(PreHandleMessage::NAME, new PreHandleMessage($envelope));

// Handle the message
$this->messageBus->handle($envelope->message());
}
}
42 changes: 42 additions & 0 deletions DependencyInjection/Compiler/RegisterConsumers.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

namespace Happyr\Mq2phpBundle\DependencyInjection\Compiler;

use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Reference;

/**
* Register the message bus argument to our consumer.
*
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
*/
class RegisterConsumers implements CompilerPassInterface
{
/**
* @param ContainerBuilder $container
*/
public function process(ContainerBuilder $container)
{
$this->replaceArgumentWithReference($container, 'happyr.mq2php.extendable_command_envelope_consumer', 'simple_bus.asynchronous.command_bus');
$this->replaceArgumentWithReference($container, 'happyr.mq2php.extendable_event_envelope_consumer', 'simple_bus.asynchronous.event_bus');
}

/**
* @param ContainerBuilder $container
* @param string $serviceId
* @param string $referenceId
*/
private function replaceArgumentWithReference(ContainerBuilder $container, $serviceId, $referenceId)
{
if (!$container->hasDefinition($serviceId)) {
return;
}

if (!$container->hasDefinition($referenceId)) {
$container->removeDefinition($serviceId);
}

$container->getDefinition($serviceId)->replaceArgument(1, new Reference($referenceId));
}
}
11 changes: 3 additions & 8 deletions DependencyInjection/HappyrMq2phpExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public function load(array $configs, ContainerBuilder $container)

// Add the command and event queue names to the consumer wrapper
$def = $container->getDefinition('happyr.mq2php.consumer_wrapper');
$def->replaceArgument(2, $config['command_queue'])
->replaceArgument(3, $config['event_queue']);
$def->replaceArgument(0, $config['command_queue'])
->replaceArgument(1, $config['event_queue']);

$serializerId = 'happyr.mq2php.message_serializer';
if (!$config['enabled']) {
Expand All @@ -54,12 +54,7 @@ private function requireBundle($bundleName, ContainerBuilder $container)
{
$enabledBundles = $container->getParameter('kernel.bundles');
if (!isset($enabledBundles[$bundleName])) {
throw new \LogicException(
sprintf(
'You need to enable "%s" as well',
$bundleName
)
);
throw new \LogicException(sprintf('You need to enable "%s" as well', $bundleName));
}
}
}
32 changes: 32 additions & 0 deletions Event/PreHandleMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

namespace Happyr\Mq2phpBundle\Event;

use SimpleBus\Serialization\Envelope\Envelope;
use Symfony\Component\EventDispatcher\Event;

class PreHandleMessage extends Event
{
const NAME = 'happyr.mq2php.pre_handle_message';

/**
* @var Envelope
*/
private $envelope;

/**
* @param Envelope $envelope
*/
public function __construct(Envelope $envelope)
{
$this->envelope = $envelope;
}

/**
* @return Envelope
*/
public function getEnvelope()
{
return $this->envelope;
}
}
7 changes: 7 additions & 0 deletions HappyrMq2phpBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,15 @@

namespace Happyr\Mq2phpBundle;

use Happyr\Mq2phpBundle\DependencyInjection\Compiler\RegisterConsumers;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\HttpKernel\Bundle\Bundle;

class HappyrMq2phpBundle extends Bundle
{
public function build(ContainerBuilder $container)
{
$container->addCompilerPass(new RegisterConsumers());
}

}
22 changes: 19 additions & 3 deletions Resources/config/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,23 @@ services:
calls:
- [setLogger, ["@?logger"]]
arguments:
- @simple_bus.asynchronous.standard_serialized_command_envelope_consumer
- @simple_bus.asynchronous.standard_serialized_event_envelope_consumer
- ~
- ~
- ~
- "@?happyr.mq2php.extendable_command_envelope_consumer"
- "@?happyr.mq2php.extendable_event_envelope_consumer"

happyr.mq2php.extendable_command_envelope_consumer:
class: Happyr\Mq2phpBundle\Consumer\ExtendableEnvelopeConsumer
public: false
arguments:
- '@simple_bus.asynchronous.message_serializer'
- ~
- '@event_dispatcher'

happyr.mq2php.extendable_event_envelope_consumer:
class: Happyr\Mq2phpBundle\Consumer\ExtendableEnvelopeConsumer
public: false
arguments:
- '@simple_bus.asynchronous.message_serializer'
- ~
- '@event_dispatcher'
33 changes: 23 additions & 10 deletions Service/ConsumerWrapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ class ConsumerWrapper implements LoggerAwareInterface
private $logger;

/**
* @param SerializedEnvelopeConsumer $commandConsumer
* @param SerializedEnvelopeConsumer $eventConsumer
* @param string $commandQueueName
* @param string $eventQueueName
* @param SerializedEnvelopeConsumer $commandConsumer
* @param SerializedEnvelopeConsumer $eventConsumer
*/
public function __construct(
SerializedEnvelopeConsumer $commandConsumer,
SerializedEnvelopeConsumer $eventConsumer,
$commandQueueName,
$eventQueueName
$eventQueueName,
SerializedEnvelopeConsumer $commandConsumer = null,
SerializedEnvelopeConsumer $eventConsumer = null
) {
$this->commandConsumer = $commandConsumer;
$this->eventConsumer = $eventConsumer;
Expand Down Expand Up @@ -84,7 +84,7 @@ public function setLogger(LoggerInterface $logger)
/**
* @param string $level
* @param string $message
* @param array $context
* @param array $context
*/
private function log($level, $message, array $context = [])
{
Expand All @@ -94,15 +94,28 @@ private function log($level, $message, array $context = [])
}

/**
* Consume a message and make sure we log errors
* Consume a message and make sure we log errors.
*
* @param string $queueName
* @param mixed $message
* @param string $queueName
* @param mixed $message
* @param SerializedEnvelopeConsumer $consumer
*
* @throws \Exception
*/
private function doConsume($queueName, $message, SerializedEnvelopeConsumer $consumer)
private function doConsume($queueName, $message, SerializedEnvelopeConsumer $consumer = null)
{
if ($consumer === null) {
$this->log(
'error',
sprintf('No consumer was found for queue named "%s"', $queueName),
[
'message' => $message,
]
);

return;
}

try {
$consumer->consume($message);
} catch (\Exception $e) {
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"require": {
"php": ">=5.4",
"psr/log": "~1.0",
"simple-bus/asynchronous-bundle":"^2.0"
"simple-bus/asynchronous-bundle":"^2.0",
"symfony/event-dispatcher": "^2.3|^3.0"
},
"autoload": {
"psr-4": { "Happyr\\Mq2phpBundle\\": "" }
Expand Down

0 comments on commit 2d4eb52

Please sign in to comment.