Skip to content

Commit

Permalink
bug #30858 [Messenger] Setup the doctrine transport when consuming (v…
Browse files Browse the repository at this point in the history
…incenttouzet)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Setup the doctrine transport when consuming

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | yes
| New feature?  | no
| BC breaks?    | no
| Deprecations? | no <!-- don't forget to update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass?   | yes
| Fixed tickets |
| License       | MIT
| Doc PR        |

Currently there is an error when the table does not exists and we run the `messenger:consume` command.

This is because all queries made in the `get` method of the `Connection` are in a transaction. Therefore the table is not created.

To avoid this error I added a call to the `setup` method before starting the transaction.

I needed to add the SchemaSynchronizer as construct parameter for the tests

Commits
-------

b2f3b53 [Messenger] Setup the doctrine transport when consuming
  • Loading branch information
fabpot committed Apr 4, 2019
2 parents de7d7a9 + b2f3b53 commit 4dfb741
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 5 deletions.
Expand Up @@ -15,6 +15,7 @@
use Doctrine\DBAL\Driver\Statement;
use Doctrine\DBAL\Platforms\AbstractPlatform;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
Expand All @@ -25,6 +26,7 @@ public function testGetAMessageWillChangeItsStatus()
{
$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnectionMock();
$schemaSynchronizer = $this->getSchemaSynchronizerMock();
$stmt = $this->getStatementMock([
'id' => 1,
'body' => '{"message":"Hi"}',
Expand All @@ -44,7 +46,7 @@ public function testGetAMessageWillChangeItsStatus()
->method('prepare')
->willReturn($stmt);

$connection = new Connection([], $driverConnection);
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
$doctrineEnvelope = $connection->get();
$this->assertEquals(1, $doctrineEnvelope['id']);
$this->assertEquals('{"message":"Hi"}', $doctrineEnvelope['body']);
Expand All @@ -55,6 +57,7 @@ public function testGetWithNoPendingMessageWillReturnNull()
{
$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnectionMock();
$schemaSynchronizer = $this->getSchemaSynchronizerMock();
$stmt = $this->getStatementMock(false);

$queryBuilder
Expand All @@ -68,7 +71,7 @@ public function testGetWithNoPendingMessageWillReturnNull()
$driverConnection->expects($this->never())
->method('update');

$connection = new Connection([], $driverConnection);
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
$doctrineEnvelope = $connection->get();
$this->assertNull($doctrineEnvelope);
}
Expand Down Expand Up @@ -142,6 +145,12 @@ private function getStatementMock($expectedResult)
return $stmt;
}

private function getSchemaSynchronizerMock()
{
return $this->getMockBuilder(SchemaSynchronizer::class)
->getMock();
}

/**
* @dataProvider buildConfigurationProvider
*/
Expand Down
Expand Up @@ -164,4 +164,17 @@ public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
$this->assertEquals('{"message": "Hi requeued"}', $next['body']);
$this->connection->reject($next['id']);
}

public function testTheTransportIsSetupOnGet()
{
// If the table does not exist and we call the get (i.e run messenger:consume) the table must be setup
// so first delete the tables
$this->driverConnection->exec('DROP TABLE messenger_messages');

$this->assertNull($this->connection->get());

$this->connection->send('the body', ['my' => 'header']);
$envelope = $this->connection->get();
$this->assertEquals('the body', $envelope['body']);
}
}
Expand Up @@ -16,6 +16,7 @@
use Doctrine\DBAL\Exception\TableNotFoundException;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
use Doctrine\DBAL\Types\Type;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
Expand Down Expand Up @@ -50,11 +51,13 @@ class Connection
*/
private $configuration = [];
private $driverConnection;
private $schemaSynchronizer;

public function __construct(array $configuration, DBALConnection $driverConnection)
public function __construct(array $configuration, DBALConnection $driverConnection, SchemaSynchronizer $schemaSynchronizer = null)
{
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
$this->driverConnection = $driverConnection;
$this->schemaSynchronizer = $schemaSynchronizer ?? new SingleDatabaseSynchronizer($this->driverConnection);
}

public function getConfiguration(): array
Expand Down Expand Up @@ -127,6 +130,9 @@ public function send(string $body, array $headers, int $delay = 0): void

public function get(): ?array
{
if ($this->configuration['auto_setup']) {
$this->setup();
}
$this->driverConnection->beginTransaction();
try {
$query = $this->createAvailableMessagesQueryBuilder()
Expand Down Expand Up @@ -187,8 +193,7 @@ public function reject(string $id): bool

public function setup(): void
{
$synchronizer = new SingleDatabaseSynchronizer($this->driverConnection);
$synchronizer->updateSchema($this->getSchema(), true);
$this->schemaSynchronizer->updateSchema($this->getSchema(), true);
}

public function getMessageCount(): int
Expand Down

0 comments on commit 4dfb741

Please sign in to comment.