Skip to content

Commit

Permalink
Merge c3fe242 into 121ff41
Browse files Browse the repository at this point in the history
  • Loading branch information
fmasa committed Oct 22, 2019
2 parents 121ff41 + c3fe242 commit 4d47131
Show file tree
Hide file tree
Showing 14 changed files with 415 additions and 5 deletions.
5 changes: 5 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,10 @@
"Fmasa\\": "src/"
}
},
"autoload-dev": {
"psr-4": {
"Fixtures\\": "tests/fixtures"
}
},
"minimum-stability": "stable"
}
93 changes: 92 additions & 1 deletion src/Messenger/DI/MessengerExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Fmasa\Messenger\LazyHandlersLocator;
use Fmasa\Messenger\Tracy\LogToPanelMiddleware;
use Fmasa\Messenger\Tracy\MessengerPanel;
use Fmasa\Messenger\Transport\SendersLocator;
use Nette\DI\CompilerExtension;
use Nette\DI\Definitions\ServiceDefinition;
use Nette\DI\Definitions\Statement;
Expand All @@ -21,6 +22,12 @@
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransportFactory;
use Symfony\Component\Messenger\Transport\InMemoryTransportFactory;
use Symfony\Component\Messenger\Transport\RedisExt\RedisTransportFactory;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\TransportFactory;
use function array_filter;
use function array_keys;
use function array_map;
Expand All @@ -32,22 +39,40 @@

class MessengerExtension extends CompilerExtension
{
private const TAG_HANDLER = 'messenger.messageHandler';
private const TAG_HANDLER = 'messenger.messageHandler';
private const TAG_TRANSPORT_FACTORY = 'messenger.transportFactory';

private const HANDLERS_LOCATOR_SERVICE_NAME = '.handlersLocator';
private const PANEL_MIDDLEWARE_SERVICE_NAME = '.middleware.panel';
private const PANEL_SERVICE_NAME = 'panel';

private const DEFAULT_FACTORIES = [
'amqp' => AmqpTransportFactory::class,
'inMemory' => InMemoryTransportFactory::class,
'redis' => RedisTransportFactory::class,
];

public function getConfigSchema() : Schema
{
return Expect::structure([
'buses' => Expect::arrayOf(Expect::from(new BusConfig())),
'transports' => Expect::arrayOf(Expect::anyOf(
Expect::string(),
Expect::from(new TransportConfig())
)),
'routing' => Expect::arrayOf(
Expect::anyOf(Expect::string(), Expect::listOf(Expect::string()))
),
]);
}

public function loadConfiguration() : void
{
$builder = $this->getContainerBuilder();

$this->processTransports();
$this->processRouting();

foreach ($this->getConfig()->buses as $busName => $busConfig) {
assert($busConfig instanceof BusConfig);

Expand All @@ -66,6 +91,9 @@ public function loadConfiguration() : void
$handlersLocator = $builder->addDefinition($this->prefix($busName . self::HANDLERS_LOCATOR_SERVICE_NAME))
->setFactory(LazyHandlersLocator::class);

$middleware[] = $builder->addDefinition($this->prefix($busName . '.sendMiddleware'))
->setFactory(SendMessageMiddleware::class);

$middleware[] = $builder->addDefinition($this->prefix($busName . '.defaultMiddleware'))
->setFactory(HandleMessageMiddleware::class, [$handlersLocator, $busConfig->allowNoHandlers]);

Expand Down Expand Up @@ -126,6 +154,8 @@ public function beforeCompile() : void

$handlersLocator->setArguments([$handlers]);
}

$this->passRegisteredTransportFactoriesToMainFactory();
}

public function afterCompile(ClassType $class) : void
Expand All @@ -137,6 +167,55 @@ public function afterCompile(ClassType $class) : void
$this->enableTracyIntegration($class);
}

private function processTransports() : void
{
$builder = $this->getContainerBuilder();

$transportFactory = $builder->addDefinition($this->prefix('transportFactory'))
->setFactory(TransportFactory::class);

foreach (self::DEFAULT_FACTORIES as $name => $factoryClass) {
$builder->addDefinition($this->prefix('transportFactory.' . $name))
->setFactory($factoryClass)
->setTags([self::TAG_TRANSPORT_FACTORY => true]);
}

$defaultSerializer = $builder->addDefinition($this->prefix('defaultSerializer'))
->setFactory(PhpSerializer::class);

foreach ($this->getConfig()->transports as $transportName => $transportConfig) {
assert(is_string($transportConfig) || $transportConfig instanceof TransportConfig);

if (is_string($transportConfig)) {
$dsn = $transportConfig;
$options = [];
} else {
$dsn = $transportConfig->dsn;
$options = $transportConfig->options;
}

$builder->addDefinition($this->prefix('transport.' . $transportName))
->setFactory([$transportFactory, 'createTransport'], [$dsn, $options, $defaultSerializer])
->setTags([SendersLocator::TAG_SENDER_ALIAS => $transportName]);
}
}

private function processRouting() : void
{
$this->getContainerBuilder()->addDefinition($this->prefix('sendersLocator'))
->setFactory(
SendersLocator::class,
[
array_map(
static function ($oneOrManyTransports) : array {
return is_string($oneOrManyTransports) ? [$oneOrManyTransports] : $oneOrManyTransports;
},
$this->getConfig()->routing
),
]
);
}

/**
* @return string[] Service names
*/
Expand Down Expand Up @@ -217,4 +296,16 @@ private function isPanelEnabled() : bool
{
return $this->getContainerBuilder()->findByType(LogToPanelMiddleware::class) !== [];
}

private function passRegisteredTransportFactoriesToMainFactory() : void
{
$builder = $this->getContainerBuilder();

$transportFactory = $builder->getDefinition($this->prefix('transportFactory'));
assert($transportFactory instanceof ServiceDefinition);

$transportFactory->setArguments([
array_map([$builder, 'getDefinition'], array_keys($builder->findByTag(self::TAG_TRANSPORT_FACTORY))),
]);
}
}
17 changes: 17 additions & 0 deletions src/Messenger/DI/TransportConfig.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Fmasa\Messenger\DI;

/**
* @internal
*/
final class TransportConfig
{
/** @var string */
public $dsn;

/** @var mixed[] */
public $options = [];
}
29 changes: 29 additions & 0 deletions src/Messenger/MessageTypeResolver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace Fmasa\Messenger;

use Symfony\Component\Messenger\Envelope;
use function class_implements;
use function class_parents;
use function get_class;

/**
* @internal This class is not part of public API and can be changed between versions
*/
final class MessageTypeResolver
{
/**
* @return string[]
*/
public static function listTypes(Envelope $envelope) : array
{
$class = get_class($envelope->getMessage());

return [$class => $class]
+ class_parents($class)
+ class_implements($class)
+ ['*' => '*'];
}
}
61 changes: 61 additions & 0 deletions src/Messenger/Transport/SendersLocator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?php

declare(strict_types=1);

namespace Fmasa\Messenger\Transport;

use Fmasa\Messenger\MessageTypeResolver;
use Nette\DI\Container;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\UnknownSenderException;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
use function sprintf;

final class SendersLocator implements SendersLocatorInterface
{
public const TAG_SENDER_ALIAS = 'messenger.sender.alias';

/** @var array<string, string[]> */
private $messageTypeToSender;

/** @var Container */
private $container;

/**
* @param array<string, string[]> $messageTypeToSenders
*/
public function __construct(array $messageTypeToSenders, Container $container)
{
$this->messageTypeToSender = $messageTypeToSenders;
$this->container = $container;
}

/**
* @inheritDoc
*/
public function getSenders(Envelope $envelope) : iterable
{
foreach (MessageTypeResolver::listTypes($envelope) as $type) {
foreach ($this->messageTypeToSender[$type] ?? [] as $alias) {
yield $alias => $this->getSenderByAlias($alias);
}
}
}

/**
* @inheritDoc
*/
public function getSenderByAlias(string $alias) : SenderInterface
{
foreach ($this->container->findByTag(self::TAG_SENDER_ALIAS) as $serviceName => $serviceAlias) {
if ($serviceAlias !== $alias) {
continue;
}

return $this->container->getService($serviceName);
}

throw new UnknownSenderException(sprintf('Unknown sender alias "%s".', $alias));
}
}
66 changes: 62 additions & 4 deletions tests/DI/MessengerExtensionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

namespace Fmasa\Messenger\DI;

use Fixtures\CustomTransport;
use Fixtures\Message;
use Fixtures\Message2;
use Fixtures\Message3;
use Fixtures\MessageImplementingInterface;
use Fixtures\Stamp;
use Fmasa\Messenger\Exceptions\InvalidHandlerService;
use Fmasa\Messenger\Exceptions\MultipleHandlersFound;
Expand All @@ -16,6 +20,7 @@
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use function array_map;
use function assert;

Expand Down Expand Up @@ -136,6 +141,63 @@ public function testLogToPanelMiddlewareIsNotRegisteredIfPanelIsDisabled() : voi
$this->assertNull($container->getByType(LogToPanelMiddleware::class, false));
}

/**
* @param mixed $message
* @param string[] $transports
*
* @dataProvider dataMessagedRoutedToMemoryTransport
*/
public function testMessageIsPassedToTransport($message, array $transports) : void
{
$container = $this->getContainer(__DIR__ . '/transports.neon');

$bus = $container->getService('messenger.default.bus');
assert($bus instanceof MessageBusInterface);

$this->assertSame(
$transports,
array_map(
static function (SentStamp $stamp) : string {
return $stamp->getSenderAlias();
},
$bus->dispatch($message)->all(SentStamp::class)
)
);
}

/**
* @return mixed[]
*/
public static function dataMessagedRoutedToMemoryTransport() : array
{
return [
'message routed to one transport' => [new Message(), ['memory1']],
'message routed to one transport (set as array)' => [new Message2(), ['memory1']],
'message routed to two transports' => [new Message3(), ['memory1', 'memory2']],
'message routed to two transports (to first via interface, to second via class name)'=> [
new MessageImplementingInterface(),
['memory2', 'memory1'],
],
];
}

public function testRegisterCustomTransport() : void
{
$container = $this->getContainer(__DIR__ . '/customTransport.neon');

$bus = $container->getService('messenger.default.bus');
assert($bus instanceof MessageBusInterface);

$message = new Message();

$result = $bus->dispatch($message);
$stamp = $result->last(SentStamp::class);
assert($stamp instanceof SentStamp);

$this->assertSame('test', $stamp->getSenderAlias());
$this->assertSame([$message], $container->getByType(CustomTransport::class)->getSentMessages());
}

/**
* @param string[] $expectedResults
*/
Expand All @@ -160,10 +222,6 @@ private function getContainer(string $configFile) : Container
$configurator->setTempDirectory(__DIR__ . '/../temp');
$configurator->setDebugMode(true);

$robotLoader = $configurator->createRobotLoader();
$robotLoader->addDirectory(__DIR__ . '/../fixtures');
$robotLoader->register();

$configurator->addConfig(__DIR__ . '/base.neon');
$configurator->addConfig($configFile);

Expand Down
13 changes: 13 additions & 0 deletions tests/DI/customTransport.neon
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
messenger:
buses:
default:
singleHandlerPerMessage: true
transports:
test: custom://foo
routing:
Fixtures\Message: test

services:
- Fixtures\CustomTransport
- class: Fixtures\CustomTransportFactory
tags: [messenger.transportFactory]

0 comments on commit 4d47131

Please sign in to comment.