Skip to content

Commit

Permalink
feature #30757 [Messenger] Adding MessageCountAwareInterface to get t…
Browse files Browse the repository at this point in the history
…ransport message count (weaverryan)

This PR was squashed before being merged into the 4.3-dev branch (closes #30757).

Discussion
----------

[Messenger] Adding MessageCountAwareInterface to get transport message count

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | none
| License       | MIT
| Doc PR        | symfony/symfony-docs#11236

This adds a new optional interface that receivers should implement to give an approximate number of the messages "waiting" to be handled. Why? Because, with this, you could design a system that dynamically adds/removes worker processes if a specific transport is getting slammed and needs help. Creating that system could be something we discuss for core later, but this at least makes it possible - and means it could be implemented by the user or in a bundle... which I might do if we don't get it in core ;).

Commits
-------

fc5b0cf [Messenger] Adding MessageCountAwareInterface to get transport message count
  • Loading branch information
fabpot committed Apr 3, 2019
2 parents 9ed2f2b + fc5b0cf commit 574097f
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 16 deletions.
2 changes: 2 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Expand Up @@ -4,6 +4,8 @@ CHANGELOG
4.3.0
-----

* Added optional `MessageCountAwareInterface` that receivers can implement
to give information about how many messages are waiting to be processed.
* [BC BREAK] The `Envelope::__construct()` signature changed:
you can no longer pass an unlimited number of stamps as the second,
third, fourth, arguments etc: stamps are now an array passed to the
Expand Down
Expand Up @@ -167,6 +167,24 @@ public function testItReceivesSignals()
, $process->getOutput());
}

public function testItCountsMessagesInQueue()
{
$serializer = $this->createSerializer();

$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
$connection->queue()->purge();

$sender = new AmqpSender($connection, $serializer);

$sender->send($first = new Envelope(new DummyMessage('First')));
$sender->send($second = new Envelope(new DummyMessage('Second')));
$sender->send($second = new Envelope(new DummyMessage('Third')));

sleep(1); // give amqp a moment to have the messages ready
$this->assertSame(3, $connection->countMessagesInQueue());
}

private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
{
$timedOutTime = time() + $timeoutInSeconds;
Expand Down
Expand Up @@ -37,6 +37,9 @@ public function testGetAMessageWillChangeItsStatus()
$queryBuilder
->method('getSQL')
->willReturn('');
$queryBuilder
->method('getParameters')
->willReturn([]);
$driverConnection
->method('prepare')
->willReturn($stmt);
Expand All @@ -54,6 +57,9 @@ public function testGetWithNoPendingMessageWillReturnNull()
$driverConnection = $this->getDBALConnectionMock();
$stmt = $this->getStatementMock(false);

$queryBuilder
->method('getParameters')
->willReturn([]);
$driverConnection->expects($this->once())
->method('createQueryBuilder')
->willReturn($queryBuilder);
Expand Down Expand Up @@ -119,6 +125,7 @@ private function getQueryBuilderMock()
$queryBuilder->method('orderBy')->willReturn($queryBuilder);
$queryBuilder->method('setMaxResults')->willReturn($queryBuilder);
$queryBuilder->method('setParameter')->willReturn($queryBuilder);
$queryBuilder->method('setParameters')->willReturn($queryBuilder);

return $queryBuilder;
}
Expand Down
Expand Up @@ -100,6 +100,46 @@ public function testItRetrieveTheFirstAvailableMessage()
$this->assertEquals('{"message": "Hi available"}', $encoded['body']);
}

public function testItCountMessages()
{
// insert messages
// one currently handled
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi handled"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'delivered_at' => Connection::formatDateTime(\DateTime::createFromFormat('U.u', microtime(true))),
]);
// one available later
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi delayed"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime((new \DateTime())->modify('+1 minute')),
]);
// one available
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi available"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
]);
// another available
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi available"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
]);

$this->assertSame(2, $this->connection->getMessageCount());
}

public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
{
$twoHoursAgo = new \DateTime('now');
Expand Down
Expand Up @@ -15,6 +15,7 @@
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand All @@ -26,7 +27,7 @@
*
* @experimental in 4.2
*/
class AmqpReceiver implements ReceiverInterface
class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
{
private $serializer;
private $connection;
Expand Down Expand Up @@ -87,6 +88,14 @@ public function reject(Envelope $envelope): void
$this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope));
}

/**
* {@inheritdoc}
*/
public function getMessageCount(): int
{
return $this->connection->countMessagesInQueue();
}

private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope): void
{
try {
Expand Down
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
Expand All @@ -22,7 +23,7 @@
*
* @experimental in 4.2
*/
class AmqpTransport implements TransportInterface, SetupableTransportInterface
class AmqpTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
{
private $serializer;
private $connection;
Expand Down Expand Up @@ -75,6 +76,14 @@ public function setup(): void
$this->connection->setup();
}

/**
* {@inheritdoc}
*/
public function getMessageCount(): int
{
return ($this->receiver ?? $this->getReceiver())->getMessageCount();
}

private function getReceiver()
{
return $this->receiver = new AmqpReceiver($this->connection, $this->serializer);
Expand Down
Expand Up @@ -184,6 +184,14 @@ public function publish(string $body, array $headers = [], int $delay = 0): void
$this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, $flags, $attributes);
}

/**
* Returns an approximate count of the messages in a queue.
*/
public function countMessagesInQueue(): int
{
return $this->queue()->declareQueue();
}

/**
* @throws \AMQPException
*/
Expand Down
44 changes: 31 additions & 13 deletions src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Expand Up @@ -14,6 +14,7 @@
use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Exception\TableNotFoundException;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
use Doctrine\DBAL\Types\Type;
Expand Down Expand Up @@ -128,25 +129,14 @@ public function get(): ?array
{
$this->driverConnection->beginTransaction();
try {
$query = $this->driverConnection->createQueryBuilder()
->select('m.*')
->from($this->configuration['table_name'], 'm')
->where('m.delivered_at is null OR m.delivered_at < :redeliver_limit')
->andWhere('m.available_at <= :now')
->andWhere('m.queue_name = :queue_name')
$query = $this->createAvailableMessagesQueryBuilder()
->orderBy('available_at', 'ASC')
->setMaxResults(1);

$now = \DateTime::createFromFormat('U.u', microtime(true));
$redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout']));
// use SELECT ... FOR UPDATE to lock table
$doctrineEnvelope = $this->executeQuery(
$query->getSQL().' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL(),
[
':now' => self::formatDateTime($now),
':queue_name' => $this->configuration['queue_name'],
':redeliver_limit' => self::formatDateTime($redeliverLimit),
]
$query->getParameters()
)->fetch();

if (false === $doctrineEnvelope) {
Expand All @@ -161,6 +151,7 @@ public function get(): ?array
->update($this->configuration['table_name'])
->set('delivered_at', ':delivered_at')
->where('id = :id');
$now = \DateTime::createFromFormat('U.u', microtime(true));
$this->executeQuery($queryBuilder->getSQL(), [
':id' => $doctrineEnvelope['id'],
':delivered_at' => self::formatDateTime($now),
Expand Down Expand Up @@ -200,6 +191,33 @@ public function setup(): void
$synchronizer->updateSchema($this->getSchema(), true);
}

public function getMessageCount(): int
{
$queryBuilder = $this->createAvailableMessagesQueryBuilder()
->select('COUNT(m.id) as message_count')
->setMaxResults(1);

return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchColumn();
}

private function createAvailableMessagesQueryBuilder(): QueryBuilder
{
$now = \DateTime::createFromFormat('U.u', microtime(true));
$redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout']));

return $this->driverConnection->createQueryBuilder()
->select('m.*')
->from($this->configuration['table_name'], 'm')
->where('m.delivered_at is null OR m.delivered_at < :redeliver_limit')
->andWhere('m.available_at <= :now')
->andWhere('m.queue_name = :queue_name')
->setParameters([
':now' => self::formatDateTime($now),
':queue_name' => $this->configuration['queue_name'],
':redeliver_limit' => self::formatDateTime($redeliverLimit),
]);
}

private function executeQuery(string $sql, array $parameters = [])
{
$stmt = null;
Expand Down
Expand Up @@ -16,6 +16,7 @@
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand All @@ -25,7 +26,7 @@
*
* @experimental in 4.3
*/
class DoctrineReceiver implements ReceiverInterface
class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface
{
private $connection;
private $serializer;
Expand Down Expand Up @@ -81,6 +82,14 @@ public function reject(Envelope $envelope): void
$this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId());
}

/**
* {@inheritdoc}
*/
public function getMessageCount(): int
{
return $this->connection->getMessageCount();
}

private function findDoctrineReceivedStamp(Envelope $envelope): DoctrineReceivedStamp
{
/** @var DoctrineReceivedStamp|null $doctrineReceivedStamp */
Expand Down
@@ -0,0 +1,28 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Receiver;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
interface MessageCountAwareInterface
{
/**
* Returns the number of messages waiting to be handled.
*
* In some systems, this may be an approximate number.
*/
public function getMessageCount(): int;
}

0 comments on commit 574097f

Please sign in to comment.