Skip to content

Commit

Permalink
feature #36464 [RedisMessengerBridge] Add a delete_after_ack option (…
Browse files Browse the repository at this point in the history
…Seldaek)

This PR was merged into the 5.1-dev branch.

Discussion
----------

[RedisMessengerBridge] Add a delete_after_ack option

This allows Messenger to clean up processed messages from memory, avoiding a mem "leak" in redis

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| Deprecations? | no
| Tickets       | #33715
| License       | MIT
| Doc PR        | symfony/symfony-docs#... TODO - will pile it on to symfony/symfony-docs#11869 as it kinda binds together and a bigger refactor of the docs here is much needed to avoid all these gotchas

Right now by default a redis transport for messenger will leak memory as all messages stay in redis forever. You can configure `stream_max_entries` to automatically trim to a max of X entries, but that means if you have big peaks in messages you might start losing messages which have not been processed.

This PR provides an alternative to that, by deleting message as they are processed. This is ideal as it avoids having to find the right number for `stream_max_entries` (do you want to risk losing data or use more memory than needed on average?). The only catch is that if you have multiple groups consuming the same stream, the first one processing a message will delete it, so other groups will not see it. For that reason `setup()` attempts to detect this and fails hard if it is misconfigured to prevent data loss.

Commits
-------

7c416a7 [RedisMessengerBridge] Add a delete_after_ack option to automatically clean up processed messages from memory
  • Loading branch information
fabpot committed Apr 17, 2020
2 parents 67948a7 + 7c416a7 commit 2460ca5
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md
Expand Up @@ -9,3 +9,5 @@ CHANGELOG
* Deprecated use of invalid options
* Added ability to receive of old pending messages with new `redeliver_timeout`
and `claim_interval` options.
* Added a `delete_after_ack` option to the DSN as an alternative to
`stream_max_entries` to avoid leaking memory.
Expand Up @@ -307,6 +307,21 @@ public function testMaxEntries()
$connection->add('1', []);
}

public function testDeleteAfterAck()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();

$redis->expects($this->exactly(1))->method('xack')
->with('queue', 'symfony', ['1'])
->willReturn(1);
$redis->expects($this->exactly(1))->method('xdel')
->with('queue', ['1'])
->willReturn(1);

$connection = Connection::fromDsn('redis://localhost/queue?delete_after_ack=true', [], $redis); // 1 = always
$connection->ack('1');
}

public function testLastErrorGetsCleared()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
Expand Down
Expand Up @@ -32,6 +32,7 @@ class Connection
'group' => 'symfony',
'consumer' => 'consumer',
'auto_setup' => true,
'delete_after_ack' => false,
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
'dbindex' => 0,
'tls' => false,
Expand All @@ -49,6 +50,7 @@ class Connection
private $redeliverTimeout;
private $nextClaim = 0;
private $claimInterval;
private $deleteAfterAck;
private $couldHavePendingMessages = true;

public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
Expand Down Expand Up @@ -81,6 +83,7 @@ public function __construct(array $configuration, array $connectionCredentials =
$this->queue = $this->stream.'__queue';
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
$this->deleteAfterAck = $configuration['delete_after_ack'] ?? self::DEFAULT_OPTIONS['delete_after_ack'];
$this->redeliverTimeout = ($configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']) * 1000;
$this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];
}
Expand Down Expand Up @@ -114,6 +117,12 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
unset($redisOptions['stream_max_entries']);
}

$deleteAfterAck = null;
if (\array_key_exists('delete_after_ack', $redisOptions)) {
$deleteAfterAck = filter_var($redisOptions['delete_after_ack'], FILTER_VALIDATE_BOOLEAN);
unset($redisOptions['delete_after_ack']);
}

$dbIndex = null;
if (\array_key_exists('dbindex', $redisOptions)) {
$dbIndex = filter_var($redisOptions['dbindex'], FILTER_VALIDATE_INT);
Expand Down Expand Up @@ -144,6 +153,7 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
'consumer' => $redisOptions['consumer'] ?? null,
'auto_setup' => $autoSetup,
'stream_max_entries' => $maxEntries,
'delete_after_ack' => $deleteAfterAck,
'dbindex' => $dbIndex,
'redeliver_timeout' => $redeliverTimeout,
'claim_interval' => $claimInterval,
Expand Down Expand Up @@ -314,6 +324,9 @@ public function ack(string $id): void
{
try {
$acknowledged = $this->connection->xack($this->stream, $this->group, [$id]);
if ($this->deleteAfterAck) {
$acknowledged = $this->connection->xdel($this->stream, [$id]);
}
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
Expand Down Expand Up @@ -408,6 +421,18 @@ public function setup(): void
$this->connection->clearLastError();
}

if ($this->deleteAfterAck) {
$groups = $this->connection->xinfo('GROUPS', $this->stream);
if (
// support for Redis extension version 5+
(\is_array($groups) && 1 < \count($groups))
// support for Redis extension version 4.x
|| (\is_string($groups) && substr_count($groups, '"name"'))
) {
throw new LogicException(sprintf('More than one group exists for stream "%s", delete_after_ack can not be enabled as it risks deleting messages before all groups could consume them.', $this->stream));
}
}

$this->autoSetup = false;
}

Expand Down

0 comments on commit 2460ca5

Please sign in to comment.