/
MessageBus.php
85 lines (75 loc) · 3.12 KB
/
MessageBus.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
<?php declare(strict_types=1);
/**
* This file is part of the daikon-cqrs/message-bus project.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Daikon\MessageBus;
use Daikon\MessageBus\Channel\ChannelInterface;
use Daikon\MessageBus\Channel\ChannelMap;
use Daikon\MessageBus\Exception\ChannelUnknown;
use Daikon\MessageBus\Exception\EnvelopeNotAcceptable;
use Daikon\Metadata\MetadataInterface;
use Daikon\Metadata\Metadata;
use Daikon\Metadata\MetadataEnricherInterface;
use Daikon\Metadata\MetadataEnricherList;
final class MessageBus implements MessageBusInterface
{
private ChannelMap $channelMap;
private MetadataEnricherList $metadataEnrichers;
private string $envelopeType;
public function __construct(
ChannelMap $channelMap,
MetadataEnricherList $metadataEnrichers = null,
string $envelopeType = null
) {
$this->channelMap = $channelMap;
$this->metadataEnrichers = $metadataEnrichers ?? new MetadataEnricherList;
$this->envelopeType = $envelopeType ?? Envelope::class;
}
public function publish(MessageInterface $message, string $channelKey, MetadataInterface $metadata = null): void
{
if (!$this->channelMap->has($channelKey)) {
throw new ChannelUnknown("Channel '$channelKey' has not been registered on message bus.");
}
$metadata = $this->enrichMetadata($metadata ?? Metadata::makeEmpty());
/** @var EnvelopeInterface $envelope */
$envelope = $this->envelopeType::wrap($message, $metadata);
/** @var ChannelInterface $channel */
$channel = $this->channelMap->get($channelKey);
$channel->publish($envelope, $this);
}
public function receive(EnvelopeInterface $envelope): void
{
$this->verify($envelope);
$channelKey = (string)$envelope->getMetadata()->get(ChannelInterface::METADATA_KEY);
if (!$this->channelMap->has($channelKey)) {
throw new ChannelUnknown("Channel '$channelKey' has not been registered on message bus.");
}
/** @var ChannelInterface $channel */
$channel = $this->channelMap->get($channelKey);
$channel->receive($envelope);
}
private function enrichMetadata(MetadataInterface $metadata): MetadataInterface
{
return array_reduce(
$this->metadataEnrichers->unwrap(),
function (MetadataInterface $metadata, MetadataEnricherInterface $metadataEnricher): MetadataInterface {
return $metadataEnricher->enrich($metadata);
},
$metadata
);
}
private function verify(EnvelopeInterface $envelope): void
{
$metadata = $envelope->getMetadata();
if (!$metadata->has(ChannelInterface::METADATA_KEY)) {
throw new EnvelopeNotAcceptable(
"Channel key '".ChannelInterface::METADATA_KEY."' missing in metadata of ".
"Envelope '{$envelope->getUuid()->toString()}' received on message bus.",
EnvelopeNotAcceptable::CHANNEL_KEY_MISSING
);
}
}
}