diff --git a/src/Symfony/Bridge/Doctrine/SchemaListener/MessengerTransportDoctrineSchemaSubscriber.php b/src/Symfony/Bridge/Doctrine/SchemaListener/MessengerTransportDoctrineSchemaSubscriber.php new file mode 100644 index 000000000000..96699cb13098 --- /dev/null +++ b/src/Symfony/Bridge/Doctrine/SchemaListener/MessengerTransportDoctrineSchemaSubscriber.php @@ -0,0 +1,96 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Bridge\Doctrine\SchemaListener; + +use Doctrine\Common\EventSubscriber; +use Doctrine\DBAL\Event\SchemaCreateTableEventArgs; +use Doctrine\DBAL\Events; +use Doctrine\ORM\Tools\Event\GenerateSchemaEventArgs; +use Doctrine\ORM\Tools\ToolEvents; +use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport; +use Symfony\Component\Messenger\Transport\TransportInterface; + +/** + * Automatically adds any required database tables to the Doctrine Schema. + * + * @author Ryan Weaver + */ +final class MessengerTransportDoctrineSchemaSubscriber implements EventSubscriber +{ + private const PROCESSING_TABLE_FLAG = self::class.':processing'; + + private $transports; + + /** + * @param iterable|TransportInterface[] $transports + */ + public function __construct(iterable $transports) + { + $this->transports = $transports; + } + + public function postGenerateSchema(GenerateSchemaEventArgs $event): void + { + $dbalConnection = $event->getEntityManager()->getConnection(); + foreach ($this->transports as $transport) { + if (!$transport instanceof DoctrineTransport) { + continue; + } + + $transport->configureSchema($event->getSchema(), $dbalConnection); + } + } + + public function onSchemaCreateTable(SchemaCreateTableEventArgs $event): void + { + $table = $event->getTable(); + + // if this method triggers a nested create table below, allow Doctrine to work like normal + if ($table->hasOption(self::PROCESSING_TABLE_FLAG)) { + return; + } + + foreach ($this->transports as $transport) { + if (!$transport instanceof DoctrineTransport) { + continue; + } + + $extraSql = $transport->getExtraSetupSqlForTable($table); + if (null === $extraSql) { + continue; + } + + // avoid this same listener from creating a loop on this table + $table->addOption(self::PROCESSING_TABLE_FLAG, true); + $createTableSql = $event->getPlatform()->getCreateTableSQL($table); + + /* + * Add all the SQL needed to create the table and tell Doctrine + * to "preventDefault" so that only our SQL is used. This is + * the only way to inject some extra SQL. + */ + $event->addSql($createTableSql); + $event->addSql($extraSql); + $event->preventDefault(); + + return; + } + } + + public function getSubscribedEvents(): array + { + return [ + ToolEvents::postGenerateSchema, + Events::onSchemaCreateTable, + ]; + } +} diff --git a/src/Symfony/Bridge/Doctrine/SchemaListener/PdoCacheAdapterDoctrineSchemaSubscriber.php b/src/Symfony/Bridge/Doctrine/SchemaListener/PdoCacheAdapterDoctrineSchemaSubscriber.php new file mode 100644 index 000000000000..41330e7971b5 --- /dev/null +++ b/src/Symfony/Bridge/Doctrine/SchemaListener/PdoCacheAdapterDoctrineSchemaSubscriber.php @@ -0,0 +1,50 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Bridge\Doctrine\SchemaListener; + +use Doctrine\Common\EventSubscriber; +use Doctrine\ORM\Tools\Event\GenerateSchemaEventArgs; +use Doctrine\ORM\Tools\ToolEvents; +use Symfony\Component\Cache\Adapter\PdoAdapter; + +/** + * Automatically adds the cache table needed for the PdoAdapter. + * + * @author Ryan Weaver + */ +final class PdoCacheAdapterDoctrineSchemaSubscriber implements EventSubscriber +{ + private $pdoAdapters; + + /** + * @param iterable|PdoAdapter[] $pdoAdapters + */ + public function __construct(iterable $pdoAdapters) + { + $this->pdoAdapters = $pdoAdapters; + } + + public function postGenerateSchema(GenerateSchemaEventArgs $event): void + { + $dbalConnection = $event->getEntityManager()->getConnection(); + foreach ($this->pdoAdapters as $pdoAdapter) { + $pdoAdapter->configureSchema($event->getSchema(), $dbalConnection); + } + } + + public function getSubscribedEvents(): array + { + return [ + ToolEvents::postGenerateSchema, + ]; + } +} diff --git a/src/Symfony/Bridge/Doctrine/Tests/SchemaListener/MessengerTransportDoctrineSchemaSubscriberTest.php b/src/Symfony/Bridge/Doctrine/Tests/SchemaListener/MessengerTransportDoctrineSchemaSubscriberTest.php new file mode 100644 index 000000000000..6bff7c0d395d --- /dev/null +++ b/src/Symfony/Bridge/Doctrine/Tests/SchemaListener/MessengerTransportDoctrineSchemaSubscriberTest.php @@ -0,0 +1,99 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Bridge\Doctrine\Tests\SchemaListener; + +use Doctrine\DBAL\Connection; +use Doctrine\DBAL\Event\SchemaCreateTableEventArgs; +use Doctrine\DBAL\Platforms\AbstractPlatform; +use Doctrine\DBAL\Schema\Schema; +use Doctrine\DBAL\Schema\Table; +use Doctrine\ORM\EntityManagerInterface; +use Doctrine\ORM\Tools\Event\GenerateSchemaEventArgs; +use PHPUnit\Framework\TestCase; +use Symfony\Bridge\Doctrine\SchemaListener\MessengerTransportDoctrineSchemaSubscriber; +use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport; +use Symfony\Component\Messenger\Transport\TransportInterface; + +class MessengerTransportDoctrineSchemaSubscriberTest extends TestCase +{ + public function testPostGenerateSchema() + { + $schema = new Schema(); + $dbalConnection = $this->createMock(Connection::class); + $entityManager = $this->createMock(EntityManagerInterface::class); + $entityManager->expects($this->once()) + ->method('getConnection') + ->willReturn($dbalConnection); + $event = new GenerateSchemaEventArgs($entityManager, $schema); + + $doctrineTransport = $this->createMock(DoctrineTransport::class); + $doctrineTransport->expects($this->once()) + ->method('configureSchema') + ->with($schema, $dbalConnection); + $otherTransport = $this->createMock(TransportInterface::class); + $otherTransport->expects($this->never()) + ->method($this->anything()); + + $subscriber = new MessengerTransportDoctrineSchemaSubscriber([$doctrineTransport, $otherTransport]); + $subscriber->postGenerateSchema($event); + } + + public function testOnSchemaCreateTable() + { + $platform = $this->createMock(AbstractPlatform::class); + $table = new Table('queue_table'); + $event = new SchemaCreateTableEventArgs($table, [], [], $platform); + + $otherTransport = $this->createMock(TransportInterface::class); + $otherTransport->expects($this->never()) + ->method($this->anything()); + + $doctrineTransport = $this->createMock(DoctrineTransport::class); + $doctrineTransport->expects($this->once()) + ->method('getExtraSetupSqlForTable') + ->with($table) + ->willReturn('ALTER TABLE pizza ADD COLUMN extra_cheese boolean'); + + // we use the platform to generate the full create table sql + $platform->expects($this->once()) + ->method('getCreateTableSQL') + ->with($table) + ->willReturn('CREATE TABLE pizza (id integer NOT NULL)'); + + $subscriber = new MessengerTransportDoctrineSchemaSubscriber([$otherTransport, $doctrineTransport]); + $subscriber->onSchemaCreateTable($event); + $this->assertTrue($event->isDefaultPrevented()); + $this->assertSame([ + 'CREATE TABLE pizza (id integer NOT NULL)', + 'ALTER TABLE pizza ADD COLUMN extra_cheese boolean', + ], $event->getSql()); + } + + public function testOnSchemaCreateTableNoExtraSql() + { + $platform = $this->createMock(AbstractPlatform::class); + $table = new Table('queue_table'); + $event = new SchemaCreateTableEventArgs($table, [], [], $platform); + + $doctrineTransport = $this->createMock(DoctrineTransport::class); + $doctrineTransport->expects($this->once()) + ->method('getExtraSetupSqlForTable') + ->willReturn(null); + + $platform->expects($this->never()) + ->method('getCreateTableSQL'); + + $subscriber = new MessengerTransportDoctrineSchemaSubscriber([$doctrineTransport]); + $subscriber->onSchemaCreateTable($event); + $this->assertFalse($event->isDefaultPrevented()); + } +} diff --git a/src/Symfony/Bridge/Doctrine/Tests/SchemaListener/PdoCacheAdapterDoctrineSchemaSubscriberTest.php b/src/Symfony/Bridge/Doctrine/Tests/SchemaListener/PdoCacheAdapterDoctrineSchemaSubscriberTest.php new file mode 100644 index 000000000000..9cf70e943ed2 --- /dev/null +++ b/src/Symfony/Bridge/Doctrine/Tests/SchemaListener/PdoCacheAdapterDoctrineSchemaSubscriberTest.php @@ -0,0 +1,42 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Bridge\Doctrine\Tests\SchemaListener; + +use Doctrine\DBAL\Connection; +use Doctrine\DBAL\Schema\Schema; +use Doctrine\ORM\EntityManagerInterface; +use Doctrine\ORM\Tools\Event\GenerateSchemaEventArgs; +use PHPUnit\Framework\TestCase; +use Symfony\Bridge\Doctrine\SchemaListener\PdoCacheAdapterDoctrineSchemaSubscriber; +use Symfony\Component\Cache\Adapter\PdoAdapter; + +class PdoCacheAdapterDoctrineSchemaSubscriberTest extends TestCase +{ + public function testPostGenerateSchema() + { + $schema = new Schema(); + $dbalConnection = $this->createMock(Connection::class); + $entityManager = $this->createMock(EntityManagerInterface::class); + $entityManager->expects($this->once()) + ->method('getConnection') + ->willReturn($dbalConnection); + $event = new GenerateSchemaEventArgs($entityManager, $schema); + + $pdoAdapter = $this->createMock(PdoAdapter::class); + $pdoAdapter->expects($this->once()) + ->method('configureSchema') + ->with($schema, $dbalConnection); + + $subscriber = new PdoCacheAdapterDoctrineSchemaSubscriber([$pdoAdapter]); + $subscriber->postGenerateSchema($event); + } +} diff --git a/src/Symfony/Bridge/Doctrine/composer.json b/src/Symfony/Bridge/Doctrine/composer.json index bb588a08b240..52c6fdf68a28 100644 --- a/src/Symfony/Bridge/Doctrine/composer.json +++ b/src/Symfony/Bridge/Doctrine/composer.json @@ -26,11 +26,13 @@ }, "require-dev": { "symfony/stopwatch": "^4.4|^5.0", + "symfony/cache": "^5.1", "symfony/config": "^4.4|^5.0", "symfony/dependency-injection": "^4.4|^5.0", "symfony/form": "^5.1", "symfony/http-kernel": "^5.0", "symfony/messenger": "^4.4|^5.0", + "symfony/doctrine-messenger": "^5.1", "symfony/property-access": "^4.4|^5.0", "symfony/property-info": "^5.0", "symfony/proxy-manager-bridge": "^4.4|^5.0", diff --git a/src/Symfony/Component/Cache/Adapter/PdoAdapter.php b/src/Symfony/Component/Cache/Adapter/PdoAdapter.php index 1bc2f1515be2..f9cee34931a8 100644 --- a/src/Symfony/Component/Cache/Adapter/PdoAdapter.php +++ b/src/Symfony/Component/Cache/Adapter/PdoAdapter.php @@ -115,24 +115,8 @@ public function createTable() $conn = $this->getConnection(); if ($conn instanceof Connection) { - $types = [ - 'mysql' => 'binary', - 'sqlite' => 'text', - 'pgsql' => 'string', - 'oci' => 'string', - 'sqlsrv' => 'string', - ]; - if (!isset($types[$this->driver])) { - throw new \DomainException(sprintf('Creating the cache table is currently not implemented for PDO driver "%s".', $this->driver)); - } - $schema = new Schema(); - $table = $schema->createTable($this->table); - $table->addColumn($this->idCol, $types[$this->driver], ['length' => 255]); - $table->addColumn($this->dataCol, 'blob', ['length' => 16777215]); - $table->addColumn($this->lifetimeCol, 'integer', ['unsigned' => true, 'notnull' => false]); - $table->addColumn($this->timeCol, 'integer', ['unsigned' => true]); - $table->setPrimaryKey([$this->idCol]); + $this->addTableToSchema($schema); foreach ($schema->toSql($conn->getDatabasePlatform()) as $sql) { $conn->exec($sql); @@ -169,6 +153,23 @@ public function createTable() $conn->exec($sql); } + /** + * Adds the Table to the Schema if the adapter uses this Connection. + */ + public function configureSchema(Schema $schema, Connection $forConnection): void + { + // only update the schema for this connection + if ($forConnection !== $this->getConnection()) { + return; + } + + if ($schema->hasTable($this->table)) { + return; + } + + $this->addTableToSchema($schema); + } + /** * {@inheritdoc} */ @@ -467,4 +468,25 @@ private function getServerVersion(): string return $this->serverVersion; } + + private function addTableToSchema(Schema $schema): void + { + $types = [ + 'mysql' => 'binary', + 'sqlite' => 'text', + 'pgsql' => 'string', + 'oci' => 'string', + 'sqlsrv' => 'string', + ]; + if (!isset($types[$this->driver])) { + throw new \DomainException(sprintf('Creating the cache table is currently not implemented for PDO driver "%s".', $this->driver)); + } + + $table = $schema->createTable($this->table); + $table->addColumn($this->idCol, $types[$this->driver], ['length' => 255]); + $table->addColumn($this->dataCol, 'blob', ['length' => 16777215]); + $table->addColumn($this->lifetimeCol, 'integer', ['unsigned' => true, 'notnull' => false]); + $table->addColumn($this->timeCol, 'integer', ['unsigned' => true]); + $table->setPrimaryKey([$this->idCol]); + } } diff --git a/src/Symfony/Component/Cache/Tests/Adapter/PdoDbalAdapterTest.php b/src/Symfony/Component/Cache/Tests/Adapter/PdoDbalAdapterTest.php index 0e45324c0c12..1efc204d0ff4 100644 --- a/src/Symfony/Component/Cache/Tests/Adapter/PdoDbalAdapterTest.php +++ b/src/Symfony/Component/Cache/Tests/Adapter/PdoDbalAdapterTest.php @@ -11,7 +11,10 @@ namespace Symfony\Component\Cache\Tests\Adapter; +use Doctrine\DBAL\Connection; +use Doctrine\DBAL\Driver; use Doctrine\DBAL\DriverManager; +use Doctrine\DBAL\Schema\Schema; use Psr\Cache\CacheItemPoolInterface; use Symfony\Component\Cache\Adapter\PdoAdapter; use Symfony\Component\Cache\Tests\Traits\PdoPruneableTrait; @@ -43,4 +46,50 @@ public function createCachePool(int $defaultLifetime = 0): CacheItemPoolInterfac { return new PdoAdapter(DriverManager::getConnection(['driver' => 'pdo_sqlite', 'path' => self::$dbFile]), '', $defaultLifetime); } + + public function testConfigureSchema() + { + $connection = DriverManager::getConnection(['driver' => 'pdo_sqlite', 'path' => self::$dbFile]); + $schema = new Schema(); + + $adapter = new PdoAdapter($connection); + $adapter->configureSchema($schema, $connection); + $this->assertTrue($schema->hasTable('cache_items')); + } + + public function testConfigureSchemaDifferentDbalConnection() + { + $otherConnection = $this->createConnectionMock(); + $schema = new Schema(); + + $adapter = $this->createCachePool(); + $adapter->configureSchema($schema, $otherConnection); + $this->assertFalse($schema->hasTable('cache_items')); + } + + public function testConfigureSchemaTableExists() + { + $connection = DriverManager::getConnection(['driver' => 'pdo_sqlite', 'path' => self::$dbFile]); + $schema = new Schema(); + $schema->createTable('cache_items'); + + $adapter = new PdoAdapter($connection); + $adapter->configureSchema($schema, $connection); + $table = $schema->getTable('cache_items'); + $this->assertEmpty($table->getColumns(), 'The table was not overwritten'); + } + + private function createConnectionMock() + { + $connection = $this->createMock(Connection::class); + $driver = $this->createMock(Driver::class); + $driver->expects($this->any()) + ->method('getName') + ->willReturn('pdo_mysql'); + $connection->expects($this->any()) + ->method('getDriver') + ->willReturn($driver); + + return $connection; + } } diff --git a/src/Symfony/Component/Lock/Store/PdoStore.php b/src/Symfony/Component/Lock/Store/PdoStore.php index c6a7dd841924..a58d1b285dfd 100644 --- a/src/Symfony/Component/Lock/Store/PdoStore.php +++ b/src/Symfony/Component/Lock/Store/PdoStore.php @@ -146,7 +146,7 @@ public function save(Key $key) public function putOffExpiration(Key $key, float $ttl) { if ($ttl < 1) { - throw new InvalidTtlException(sprintf('"%s()" expects a TTL greater or equals to 1 second. Got %s.', __METHOD__, $ttl)); + throw new InvalidTtlException(sprintf('"%s()" expects a TTL greater or equals to 1 second. Got "%s".', __METHOD__, $ttl)); } $key->reduceLifetime($ttl); @@ -249,11 +249,7 @@ public function createTable(): void if ($conn instanceof Connection) { $schema = new Schema(); - $table = $schema->createTable($this->table); - $table->addColumn($this->idCol, 'string', ['length' => 64]); - $table->addColumn($this->tokenCol, 'string', ['length' => 44]); - $table->addColumn($this->expirationCol, 'integer', ['unsigned' => true]); - $table->setPrimaryKey([$this->idCol]); + $this->addTableToSchema($schema); foreach ($schema->toSql($conn->getDatabasePlatform()) as $sql) { $conn->exec($sql); @@ -285,6 +281,22 @@ public function createTable(): void $conn->exec($sql); } + /** + * Adds the Table to the Schema if it doesn't exist. + */ + public function configureSchema(Schema $schema): void + { + if (!$this->getConnection() instanceof Connection) { + throw new \BadMethodCallException(sprintf('"%s::%s()" is only supported when using a doctrine/dbal Connection.', __CLASS__, __METHOD__)); + } + + if ($schema->hasTable($this->table)) { + return; + } + + $this->addTableToSchema($schema); + } + /** * Cleans up the table by removing all expired locks. */ @@ -351,4 +363,13 @@ private function getCurrentTimestampStatement(): string return time(); } } + + private function addTableToSchema(Schema $schema): void + { + $table = $schema->createTable($this->table); + $table->addColumn($this->idCol, 'string', ['length' => 64]); + $table->addColumn($this->tokenCol, 'string', ['length' => 44]); + $table->addColumn($this->expirationCol, 'integer', ['unsigned' => true]); + $table->setPrimaryKey([$this->idCol]); + } } diff --git a/src/Symfony/Component/Lock/Tests/Store/PdoDbalStoreTest.php b/src/Symfony/Component/Lock/Tests/Store/PdoDbalStoreTest.php index 264c99829c98..44adca1ca068 100644 --- a/src/Symfony/Component/Lock/Tests/Store/PdoDbalStoreTest.php +++ b/src/Symfony/Component/Lock/Tests/Store/PdoDbalStoreTest.php @@ -11,7 +11,9 @@ namespace Symfony\Component\Lock\Tests\Store; +use Doctrine\DBAL\Connection; use Doctrine\DBAL\DriverManager; +use Doctrine\DBAL\Schema\Schema; use Symfony\Component\Lock\PersistingStoreInterface; use Symfony\Component\Lock\Store\PdoStore; @@ -59,4 +61,12 @@ public function testAbortAfterExpiration() { $this->markTestSkipped('Pdo expects a TTL greater than 1 sec. Simulating a slow network is too hard'); } + + public function testConfigureSchema() + { + $store = new PdoStore($this->createMock(Connection::class), ['db_table' => 'lock_table']); + $schema = new Schema(); + $store->configureSchema($schema); + $this->assertTrue($schema->hasTable('lock_table')); + } } diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php index dca9440e189c..27a7e6b9e0de 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php @@ -16,6 +16,7 @@ use Doctrine\DBAL\Platforms\AbstractPlatform; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Schema\AbstractSchemaManager; +use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Schema\SchemaConfig; use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer; use PHPUnit\Framework\TestCase; @@ -343,4 +344,37 @@ public function testFindAll() $this->assertEquals('{"message":"Hi again"}', $doctrineEnvelopes[1]['body']); $this->assertEquals(['type' => DummyMessage::class], $doctrineEnvelopes[1]['headers']); } + + public function testConfigureSchema() + { + $driverConnection = $this->getDBALConnectionMock(); + $schema = new Schema(); + + $connection = new Connection(['table_name' => 'queue_table'], $driverConnection); + $connection->configureSchema($schema, $driverConnection); + $this->assertTrue($schema->hasTable('queue_table')); + } + + public function testConfigureSchemaDifferentDbalConnection() + { + $driverConnection = $this->getDBALConnectionMock(); + $driverConnection2 = $this->getDBALConnectionMock(); + $schema = new Schema(); + + $connection = new Connection([], $driverConnection); + $connection->configureSchema($schema, $driverConnection2); + $this->assertFalse($schema->hasTable('messenger_messages')); + } + + public function testConfigureSchemaTableExists() + { + $driverConnection = $this->getDBALConnectionMock(); + $schema = new Schema(); + $schema->createTable('messenger_messages'); + + $connection = new Connection([], $driverConnection); + $connection->configureSchema($schema, $driverConnection); + $table = $schema->getTable('messenger_messages'); + $this->assertEmpty($table->getColumns(), 'The table was not overwritten'); + } } diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportTest.php index b1f89fc03d0c..cbe3e49d46d4 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportTest.php @@ -11,6 +11,8 @@ namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport; +use Doctrine\DBAL\Connection as DbalConnection; +use Doctrine\DBAL\Schema\Schema; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Bridge\Doctrine\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection; @@ -50,6 +52,23 @@ public function testReceivesMessages() $this->assertSame($decodedMessage, $envelopes[0]->getMessage()); } + public function testConfigureSchema() + { + $transport = $this->getTransport( + null, + $connection = $this->createMock(Connection::class) + ); + + $schema = new Schema(); + $dbalConnection = $this->createMock(DbalConnection::class); + + $connection->expects($this->once()) + ->method('configureSchema') + ->with($schema, $dbalConnection); + + $transport->configureSchema($schema, $dbalConnection); + } + private function getTransport(SerializerInterface $serializer = null, Connection $connection = null): DoctrineTransport { $serializer = $serializer ?: $this->createMock(SerializerInterface::class); diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/PostgreSqlConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/PostgreSqlConnectionTest.php index 501fd785b2f9..3a3d780aef36 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/PostgreSqlConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/PostgreSqlConnectionTest.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport; use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer; +use Doctrine\DBAL\Schema\Table; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection; @@ -43,4 +44,24 @@ public function testUnserialize() $connection = new PostgreSqlConnection([], $driverConnection, $schemaSynchronizer); $connection->__wakeup(); } + + public function testGetExtraSetupSql() + { + $driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class); + $connection = new PostgreSqlConnection(['table_name' => 'queue_table'], $driverConnection); + + $table = new Table('queue_table'); + $table->addOption('_symfony_messenger_table_name', 'queue_table'); + $this->assertStringContainsString('CREATE TRIGGER', $connection->getExtraSetupSqlForTable($table)); + } + + public function testGetExtraSetupSqlWrongTable() + { + $driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class); + $connection = new PostgreSqlConnection(['table_name' => 'queue_table'], $driverConnection); + + $table = new Table('queue_table'); + // don't set the _symfony_messenger_table_name option + $this->assertNull($connection->getExtraSetupSqlForTable($table)); + } } diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php index 50218e2bb74a..3b61bdcbf0bd 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php @@ -19,6 +19,7 @@ use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer; use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer; +use Doctrine\DBAL\Schema\Table; use Doctrine\DBAL\Types\Type; use Doctrine\DBAL\Types\Types; use Symfony\Component\Messenger\Exception\InvalidArgumentException; @@ -33,6 +34,8 @@ */ class Connection implements ResetInterface { + protected const TABLE_OPTION_NAME = '_symfony_messenger_table_name'; + protected const DEFAULT_OPTIONS = [ 'table_name' => 'messenger_messages', 'queue_name' => 'default', @@ -290,6 +293,31 @@ public function find($id): ?array return false === $data ? null : $this->decodeEnvelopeHeaders($data); } + /** + * @internal + */ + public function configureSchema(Schema $schema, DBALConnection $forConnection): void + { + // only update the schema for this connection + if ($forConnection !== $this->driverConnection) { + return; + } + + if ($schema->hasTable($this->configuration['table_name'])) { + return; + } + + $this->addTableToSchema($schema); + } + + /** + * @internal + */ + public function getExtraSetupSqlForTable(Table $createdTable): ?string + { + return null; + } + private function createAvailableMessagesQueryBuilder(): QueryBuilder { $now = new \DateTime(); @@ -341,7 +369,16 @@ private function executeQuery(string $sql, array $parameters = [], array $types private function getSchema(): Schema { $schema = new Schema([], [], $this->driverConnection->getSchemaManager()->createSchemaConfig()); + $this->addTableToSchema($schema); + + return $schema; + } + + private function addTableToSchema(Schema $schema): void + { $table = $schema->createTable($this->configuration['table_name']); + // add an internal option to mark that we created this & the non-namespaced table name + $table->addOption(self::TABLE_OPTION_NAME, $this->configuration['table_name']); $table->addColumn('id', self::$useDeprecatedConstants ? Type::BIGINT : Types::BIGINT) ->setAutoincrement(true) ->setNotnull(true); @@ -361,8 +398,6 @@ private function getSchema(): Schema $table->addIndex(['queue_name']); $table->addIndex(['available_at']); $table->addIndex(['delivered_at']); - - return $schema; } private function decodeEnvelopeHeaders(array $doctrineEnvelope): array diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransport.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransport.php index e9695e03a197..1974aa178bac 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransport.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransport.php @@ -11,6 +11,9 @@ namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport; +use Doctrine\DBAL\Connection as DbalConnection; +use Doctrine\DBAL\Schema\Schema; +use Doctrine\DBAL\Schema\Table; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; @@ -98,6 +101,22 @@ public function setup(): void $this->connection->setup(); } + /** + * Adds the Table to the Schema if this transport uses this connection. + */ + public function configureSchema(Schema $schema, DbalConnection $forConnection): void + { + $this->connection->configureSchema($schema, $forConnection); + } + + /** + * Adds extra SQL if the given table was created by the Connection. + */ + public function getExtraSetupSqlForTable(Table $createdTable): ?string + { + return $this->connection->getExtraSetupSqlForTable($createdTable); + } + private function getReceiver(): DoctrineReceiver { return $this->receiver = new DoctrineReceiver($this->connection, $this->serializer); diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php index ae15a40f4cd8..25dedcf4c847 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php @@ -11,6 +11,8 @@ namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport; +use Doctrine\DBAL\Schema\Table; + /** * Uses PostgreSQL LISTEN/NOTIFY to push messages to workers. * @@ -83,7 +85,25 @@ public function setup(): void { parent::setup(); - $sql = sprintf(<<<'SQL' + $this->driverConnection->exec($this->getTriggerSql()); + } + + public function getExtraSetupSqlForTable(Table $createdTable): ?string + { + if (!$createdTable->hasOption(self::TABLE_OPTION_NAME)) { + return null; + } + + if ($createdTable->getOption(self::TABLE_OPTION_NAME) !== $this->configuration['table_name']) { + return null; + } + + return $this->getTriggerSql(); + } + + private function getTriggerSql(): string + { + return sprintf(<<<'SQL' LOCK TABLE %1$s; -- create trigger function CREATE OR REPLACE FUNCTION notify_%1$s() RETURNS TRIGGER AS $$ @@ -102,7 +122,6 @@ public function setup(): void FOR EACH ROW EXECUTE PROCEDURE notify_%1$s(); SQL , $this->configuration['table_name']); - $this->driverConnection->exec($sql); } private function unlisten() diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/composer.json b/src/Symfony/Component/Messenger/Bridge/Doctrine/composer.json index 41652c8aae03..b2c7b983798c 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/composer.json +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/composer.json @@ -23,6 +23,7 @@ "symfony/service-contracts": "^1.1|^2" }, "require-dev": { + "doctrine/orm": "^2.6.3", "symfony/serializer": "^4.4|^5.0", "symfony/property-access": "^4.4|^5.0" },