Skip to content

Commit

Permalink
Add support for direct publisher (#4)
Browse files Browse the repository at this point in the history
* Add support for direct publisher

* Updated reade

* Applied changes from StyleCI
  • Loading branch information
Nyholm committed Jun 21, 2017
1 parent 9c0fcf2 commit cbcb648
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 3 deletions.
25 changes: 24 additions & 1 deletion DependencyInjection/CompilerPass/CompilerPasses.php
Expand Up @@ -2,6 +2,7 @@

namespace Happyr\SimpleBusBundle\DependencyInjection\CompilerPass;

use Happyr\SimpleBusBundle\Message\Publisher\DirectPublisher;
use Happyr\SimpleBusBundle\Message\Publisher\RabbitMQPublisher;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
Expand Down Expand Up @@ -147,13 +148,35 @@ private function processMessageHandlers(ContainerBuilder $container)
*/
private function replaceSimpleBusPublisher(ContainerBuilder $container)
{
if ($container->has('simple_bus.rabbit_mq_bundle_bridge.event_publisher')) {
if (!$container->has('simple_bus.rabbit_mq_bundle_bridge.event_publisher')) {
return;
}

if (!$container->getParameter('happyr.simplebus.direct_publisher')) {
$container->getDefinition('simple_bus.rabbit_mq_bundle_bridge.event_publisher')
->setClass(RabbitMQPublisher::class)
->setLazy(true);
$container->getDefinition('simple_bus.rabbit_mq_bundle_bridge.command_publisher')
->setClass(RabbitMQPublisher::class)
->setLazy(true);
} else {
$container->getDefinition('simple_bus.rabbit_mq_bundle_bridge.event_publisher')
->setClass(DirectPublisher::class)
->setLazy(true)
->setArguments([
new Reference('happyr.mq2php.consumer_wrapper'),
new Reference('happyr.mq2php.message_serializer'),
$container->getParameter('happyr.mq2php.event_queue_name'),
]);

$container->getDefinition('simple_bus.rabbit_mq_bundle_bridge.command_publisher')
->setClass(DirectPublisher::class)
->setLazy(true)
->setArguments([
new Reference('happyr.mq2php.consumer_wrapper'),
new Reference('happyr.mq2php.message_serializer'),
$container->getParameter('happyr.mq2php.command_queue_name'),
]);
}
}

Expand Down
2 changes: 1 addition & 1 deletion DependencyInjection/Configuration.php
Expand Up @@ -30,7 +30,7 @@ public function getConfigTreeBuilder()
->scalarNode('event_subscriber_namespace')->defaultNull()->end()
->scalarNode('event_subscriber_path')->defaultNull()->end()
->end()->end()

->booleanNode('use_direct_publisher')->defaultFalse()->end()
->end();

return $treeBuilder;
Expand Down
1 change: 1 addition & 0 deletions DependencyInjection/HappyrSimpleBusExtension.php
Expand Up @@ -37,6 +37,7 @@ public function load(array $configs, ContainerBuilder $container)
$this->requireBundle('OldSoundRabbitMqBundle', $container);
$this->requireBundle('JMSSerializerBundle', $container);

$container->setParameter('happyr.simplebus.direct_publisher', $config['use_direct_publisher']);
if ($config['auto_register_handlers']['enabled']) {
$handlerPath = $config['auto_register_handlers']['command_handler_path'];
if (empty($handlerPath)) {
Expand Down
79 changes: 79 additions & 0 deletions Message/Publisher/DirectPublisher.php
@@ -0,0 +1,79 @@
<?php

namespace Happyr\SimpleBusBundle\Message\Publisher;

use Happyr\Mq2phpBundle\Service\ConsumerWrapper;
use Happyr\SimpleBusBundle\Message\DelayedMessage;
use SimpleBus\Asynchronous\Publisher\Publisher;
use SimpleBus\Serialization\Envelope\Serializer\MessageInEnvelopSerializer;

class DirectPublisher implements Publisher
{
/**
* @var ConsumerWrapper
*/
private $consumer;

/**
* @var MessageInEnvelopSerializer
*/
private $serializer;

/**
* @var string
*/
private $queueName;

private $messages = [];

private $delayedMessages = [];

/**
* @param ConsumerWrapper $consumer
*/
public function __construct(ConsumerWrapper $consumer, MessageInEnvelopSerializer $serializer, string $queueName)
{
$this->consumer = $consumer;
$this->serializer = $serializer;
$this->queueName = $queueName;
}

public function publish($message)
{
if ($message instanceof DelayedMessage) {
$this->delayedMessages[$message->getDelayedTime()][] = $this->serializer->wrapAndSerialize($message);
} else {
$this->messages[] = $this->serializer->wrapAndSerialize($message);
}
}

public function consume()
{
foreach ($this->messages as $message) {
$this->doConsume($message);
}

ksort($this->delayedMessages);
foreach ($this->delayedMessages as $messages) {
foreach ($messages as $message) {
$this->doConsume($message);
}
}
}

public function __destruct()
{
$this->consume();
}

/**
* @param string $data
*/
private function doConsume(string $data)
{
$message = json_decode($data, true);
$body = $message['body'];

$this->consumer->consume($this->queueName, $body);
}
}
13 changes: 12 additions & 1 deletion Readme.md
Expand Up @@ -25,7 +25,6 @@ class AppKernel extends Kernel
{
$bundles = [
// ...
new Happyr\SimpleBusBundle\HappyrSimpleBusBundle(),
new Happyr\Mq2phpBundle\HappyrMq2phpBundle(),
new SimpleBus\SymfonyBridge\SimpleBusCommandBusBundle(),
new SimpleBus\SymfonyBridge\SimpleBusEventBusBundle(),
Expand All @@ -34,6 +33,7 @@ class AppKernel extends Kernel
new SimpleBus\JMSSerializerBundleBridge\SimpleBusJMSSerializerBundleBridgeBundle(),
new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(),
new JMS\SerializerBundle\JMSSerializerBundle(),
new Happyr\SimpleBusBundle\HappyrSimpleBusBundle(),
];
// ...
}
Expand Down Expand Up @@ -132,3 +132,14 @@ Be aware of the following base classes.
* `BaseEventSubscriber` implements `AutoRegisteredEventSubscriber`
* `HandlesMessagesAsync` (For async handlers/subscribers)
* `DelayedMessage` (For async messages with a delay)

### Direct publisher

If do not want to use a queue you may use the direct publisher.

```php
happyr_simplebus:
use_direct_publisher: true

```

0 comments on commit cbcb648

Please sign in to comment.