Skip to content

Commit

Permalink
bug #31621 [Messenger] Fix missing auto_setup for RedisTransport (cha…
Browse files Browse the repository at this point in the history
…lasr)

This PR was merged into the 4.3 branch.

Discussion
----------

[Messenger] Fix missing auto_setup for RedisTransport

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  | no
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | n/a
| License       | MIT
| Doc PR        | n/a

Should be my last PR for messenger 4.3's Redis transport :)
Not having it makes it inconsistent with other transports and is especially annoying in tests.

Commits
-------

d27bc2a [Messenger] Fix missing auto_setup for RedisTransport
  • Loading branch information
fabpot committed May 27, 2019
2 parents 75c1d5c + d27bc2a commit 4c1df8a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 17 deletions.
Expand Up @@ -42,13 +42,13 @@ public function testFromDsn()
public function testFromDsnWithOptions()
{
$this->assertEquals(
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false], [
'host' => 'localhost',
'port' => 6379,
], [
'serializer' => 2,
]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2])
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2, 'auto_setup' => false])
);
}

Expand Down Expand Up @@ -117,10 +117,6 @@ public function testGetAfterReject()
{
$redis = new \Redis();
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis);
try {
$connection->setup();
} catch (TransportException $e) {
}

$connection->add('1', []);
$connection->add('2', []);
Expand All @@ -139,10 +135,6 @@ public function testGetNonBlocking()
$redis = new \Redis();

$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
try {
$connection->setup();
} catch (TransportException $e) {
}

$this->assertNull($connection->get()); // no message, should return null immediately
$connection->add('1', []);
Expand Down
39 changes: 32 additions & 7 deletions src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
Expand Up @@ -27,20 +27,29 @@
*/
class Connection
{
private const DEFAULT_OPTIONS = [
'stream' => 'messages',
'group' => 'symfony',
'consumer' => 'consumer',
'auto_setup' => true,
];

private $connection;
private $stream;
private $group;
private $consumer;
private $autoSetup;
private $couldHavePendingMessages = true;

public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
{
$this->connection = $redis ?: new \Redis();
$this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379);
$this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP);
$this->stream = $configuration['stream'] ?? '' ?: 'messages';
$this->group = $configuration['group'] ?? '' ?: 'symfony';
$this->consumer = $configuration['consumer'] ?? '' ?: 'consumer';
$this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
}

public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
Expand All @@ -51,9 +60,9 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re

$pathParts = explode('/', $parsedUrl['path'] ?? '');

$stream = $pathParts[1] ?? '';
$group = $pathParts[2] ?? '';
$consumer = $pathParts[3] ?? '';
$stream = $pathParts[1] ?? null;
$group = $pathParts[2] ?? null;
$consumer = $pathParts[3] ?? null;

$connectionCredentials = [
'host' => $parsedUrl['host'] ?? '127.0.0.1',
Expand All @@ -64,11 +73,21 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
parse_str($parsedUrl['query'], $redisOptions);
}

return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer], $connectionCredentials, $redisOptions, $redis);
$autoSetup = null;
if (\array_key_exists('auto_setup', $redisOptions)) {
$autoSetup = filter_var($redisOptions['auto_setup'], FILTER_VALIDATE_BOOLEAN);
unset($redisOptions['auto_setup']);
}

return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer, 'auto_setup' => $autoSetup], $connectionCredentials, $redisOptions, $redis);
}

public function get(): ?array
{
if ($this->autoSetup) {
$this->setup();
}

$messageId = '>'; // will receive new messages

if ($this->couldHavePendingMessages) {
Expand Down Expand Up @@ -141,6 +160,10 @@ public function reject(string $id): void

public function add(string $body, array $headers): void
{
if ($this->autoSetup) {
$this->setup();
}

$e = null;
try {
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
Expand All @@ -161,5 +184,7 @@ public function setup(): void
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

$this->autoSetup = false;
}
}

0 comments on commit 4c1df8a

Please sign in to comment.