Skip to content

Commit

Permalink
fixed receiver (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
luca-rath authored and alexander-schranz committed Nov 29, 2018
1 parent 47f5081 commit 5f47f9e
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 8 deletions.
6 changes: 3 additions & 3 deletions Message/DomainEventMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public function __construct(string $name, string $type, string $id, array $paylo
$this->type = $type;
$this->id = $id;
$this->payload = $payload;
$this->created = new \DateTimeImmutable();
$this->created = (new \DateTimeImmutable())->format('c');
}

public function getName(): string
Expand All @@ -60,7 +60,7 @@ public function getPayload(): array
return $this->payload;
}

public function getCreated(): \DateTimeImmutable
public function getCreated(): string
{
return $this->created;
}
Expand All @@ -72,7 +72,7 @@ public function toArray(): array
'type' => $this->type,
'id' => $this->id,
'payload' => $this->payload,
'created' => $this->created->format('c'),
'created' => $this->created,
];
}
}
15 changes: 11 additions & 4 deletions Transport/RedisStreamReceiver.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
namespace HandcraftedInTheAlps\Bundle\RedisTransportBundle\Transport;

use Redis;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

Expand Down Expand Up @@ -57,9 +57,16 @@ public function __construct(Redis $redis, string $stream, string $group = null,
public function receive(callable $handler): void
{
foreach ($this->read() as $key => $message) {
// TODO receive message
$content = (array) json_decode($message['content']);

$this->ack($key, $message);
$content = [
'body' => $content['body'],
'headers' => (array) $content['headers'],
];

$handler($this->serializer->decode($content));

$this->ack($key);
}
}

Expand Down Expand Up @@ -112,7 +119,7 @@ private function read()
}
}

private function ack(string $key, array $message)
private function ack(string $key)
{
if ($this->group) {
$this->redis->xAck($this->stream, $this->group, [$key]);
Expand Down
2 changes: 1 addition & 1 deletion Transport/RedisStreamSender.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);

$this->redis->xAdd($this->stream, '*', $encodedMessage);
$this->redis->xAdd($this->stream, '*', ['content' => json_encode($encodedMessage)]);

return $envelope;
}
Expand Down

0 comments on commit 5f47f9e

Please sign in to comment.