From 92046a69b6c9e89d2da7b3b856880a36ef190368 Mon Sep 17 00:00:00 2001 From: Fritz Gerneth Date: Thu, 7 Feb 2019 14:54:56 +0100 Subject: [PATCH] #189 Use locking strategy when appending to event-streams --- src/Exception/ConcurrencyExceptionFactory.php | 5 + src/MariaDbEventStore.php | 21 +- src/MySqlEventStore.php | 21 +- src/PostgresEventStore.php | 21 +- src/WriteLockStrategy.php | 21 ++ .../MariaDbMetadataLockStrategy.php | 71 ++++++ .../MysqlMetadataLockStrategy.php | 67 ++++++ src/WriteLockStrategy/NoLockStrategy.php | 29 +++ tests/MariaDbEventStoreTest.php | 110 +++++++++ tests/MySqlEventStoreTest.php | 110 +++++++++ tests/PostgresEventStoreTest.php | 73 ++++++ .../MariaDbMetadataLockStrategyTest.php | 210 ++++++++++++++++++ .../MysqlMetadataLockStrategyTest.php | 198 +++++++++++++++++ .../WriteLockStrategy/NoLockStrategyTest.php | 43 ++++ 14 files changed, 997 insertions(+), 3 deletions(-) create mode 100644 src/WriteLockStrategy.php create mode 100644 src/WriteLockStrategy/MariaDbMetadataLockStrategy.php create mode 100644 src/WriteLockStrategy/MysqlMetadataLockStrategy.php create mode 100644 src/WriteLockStrategy/NoLockStrategy.php create mode 100644 tests/WriteLockStrategy/MariaDbMetadataLockStrategyTest.php create mode 100644 tests/WriteLockStrategy/MysqlMetadataLockStrategyTest.php create mode 100644 tests/WriteLockStrategy/NoLockStrategyTest.php diff --git a/src/Exception/ConcurrencyExceptionFactory.php b/src/Exception/ConcurrencyExceptionFactory.php index 5a53862f..a8083cb6 100644 --- a/src/Exception/ConcurrencyExceptionFactory.php +++ b/src/Exception/ConcurrencyExceptionFactory.php @@ -27,4 +27,9 @@ public static function fromStatementErrorInfo(array $errorInfo): ConcurrencyExce ) ); } + + public static function failedToAcquireLock(): ConcurrencyException + { + return new ConcurrencyException('Failed to acquire lock for writing to stream'); + } } diff --git a/src/MariaDbEventStore.php b/src/MariaDbEventStore.php index 582e8e39..c3b688e9 100644 --- a/src/MariaDbEventStore.php +++ b/src/MariaDbEventStore.php @@ -26,6 +26,7 @@ use Prooph\EventStore\Pdo\Exception\ExtensionNotLoaded; use Prooph\EventStore\Pdo\Exception\RuntimeException; use Prooph\EventStore\Pdo\Util\Json; +use Prooph\EventStore\Pdo\WriteLockStrategy\NoLockStrategy; use Prooph\EventStore\Stream; use Prooph\EventStore\StreamIterator\EmptyStreamIterator; use Prooph\EventStore\StreamName; @@ -68,6 +69,11 @@ final class MariaDbEventStore implements PdoEventStore */ private $disableTransactionHandling; + /** + * @var WriteLockStrategy + */ + private $writeLockStrategy; + /** * @throws ExtensionNotLoaded */ @@ -77,12 +83,17 @@ public function __construct( PersistenceStrategy $persistenceStrategy, int $loadBatchSize = 10000, string $eventStreamsTable = 'event_streams', - bool $disableTransactionHandling = false + bool $disableTransactionHandling = false, + WriteLockStrategy $writeLockStrategy = null ) { if (! \extension_loaded('pdo_mysql')) { throw ExtensionNotLoaded::with('pdo_mysql'); } + if (null === $writeLockStrategy) { + $writeLockStrategy = new NoLockStrategy(); + } + Assertion::min($loadBatchSize, 1); $this->messageFactory = $messageFactory; @@ -91,6 +102,7 @@ public function __construct( $this->loadBatchSize = $loadBatchSize; $this->eventStreamsTable = $eventStreamsTable; $this->disableTransactionHandling = $disableTransactionHandling; + $this->writeLockStrategy = $writeLockStrategy; } public function fetchStreamMetadata(StreamName $streamName): array @@ -222,6 +234,11 @@ public function appendTo(StreamName $streamName, Iterator $streamEvents): void $tableName = $this->persistenceStrategy->generateTableName($streamName); + $lockName = '_' . $tableName . '_write_lock'; + if (!$this->writeLockStrategy->getLock($lockName)) { + throw ConcurrencyExceptionFactory::failedToAcquireLock(); + } + $rowPlaces = '(' . \implode(', ', \array_fill(0, \count($columnNames), '?')) . ')'; $allPlaces = \implode(', ', \array_fill(0, $countEntries, $rowPlaces)); @@ -271,6 +288,8 @@ public function appendTo(StreamName $streamName, Iterator $streamEvents): void if (! $this->disableTransactionHandling && $this->connection->inTransaction() && ! $this->duringCreate) { $this->connection->commit(); } + + $this->writeLockStrategy->releaseLock($lockName); } public function load( diff --git a/src/MySqlEventStore.php b/src/MySqlEventStore.php index d9a53748..8dbaa449 100644 --- a/src/MySqlEventStore.php +++ b/src/MySqlEventStore.php @@ -26,6 +26,7 @@ use Prooph\EventStore\Pdo\Exception\ExtensionNotLoaded; use Prooph\EventStore\Pdo\Exception\RuntimeException; use Prooph\EventStore\Pdo\Util\Json; +use Prooph\EventStore\Pdo\WriteLockStrategy\NoLockStrategy; use Prooph\EventStore\Stream; use Prooph\EventStore\StreamIterator\EmptyStreamIterator; use Prooph\EventStore\StreamName; @@ -68,6 +69,11 @@ final class MySqlEventStore implements PdoEventStore */ private $disableTransactionHandling; + /** + * @var WriteLockStrategy + */ + private $writeLockStrategy; + /** * @throws ExtensionNotLoaded */ @@ -77,12 +83,17 @@ public function __construct( PersistenceStrategy $persistenceStrategy, int $loadBatchSize = 10000, string $eventStreamsTable = 'event_streams', - bool $disableTransactionHandling = false + bool $disableTransactionHandling = false, + WriteLockStrategy $writeLockStrategy = null ) { if (! \extension_loaded('pdo_mysql')) { throw ExtensionNotLoaded::with('pdo_mysql'); } + if (null === $writeLockStrategy) { + $writeLockStrategy = new NoLockStrategy(); + } + Assertion::min($loadBatchSize, 1); $this->messageFactory = $messageFactory; @@ -91,6 +102,7 @@ public function __construct( $this->loadBatchSize = $loadBatchSize; $this->eventStreamsTable = $eventStreamsTable; $this->disableTransactionHandling = $disableTransactionHandling; + $this->writeLockStrategy = $writeLockStrategy; } public function fetchStreamMetadata(StreamName $streamName): array @@ -222,6 +234,11 @@ public function appendTo(StreamName $streamName, Iterator $streamEvents): void $tableName = $this->persistenceStrategy->generateTableName($streamName); + $lockName = '_' . $tableName . '_write_lock'; + if (!$this->writeLockStrategy->getLock($lockName)) { + throw ConcurrencyExceptionFactory::failedToAcquireLock(); + } + $rowPlaces = '(' . \implode(', ', \array_fill(0, \count($columnNames), '?')) . ')'; $allPlaces = \implode(', ', \array_fill(0, $countEntries, $rowPlaces)); @@ -271,6 +288,8 @@ public function appendTo(StreamName $streamName, Iterator $streamEvents): void if (! $this->disableTransactionHandling && $this->connection->inTransaction() && ! $this->duringCreate) { $this->connection->commit(); } + + $this->writeLockStrategy->releaseLock($lockName); } public function load( diff --git a/src/PostgresEventStore.php b/src/PostgresEventStore.php index c8cb8b23..5d145931 100644 --- a/src/PostgresEventStore.php +++ b/src/PostgresEventStore.php @@ -29,6 +29,7 @@ use Prooph\EventStore\Pdo\Exception\RuntimeException; use Prooph\EventStore\Pdo\Util\Json; use Prooph\EventStore\Pdo\Util\PostgresHelper; +use Prooph\EventStore\Pdo\WriteLockStrategy\NoLockStrategy; use Prooph\EventStore\Stream; use Prooph\EventStore\StreamIterator\EmptyStreamIterator; use Prooph\EventStore\StreamName; @@ -69,6 +70,11 @@ final class PostgresEventStore implements PdoEventStore, TransactionalEventStore */ private $disableTransactionHandling; + /** + * @var WriteLockStrategy + */ + private $writeLockStrategy; + /** * @throws ExtensionNotLoaded */ @@ -78,12 +84,17 @@ public function __construct( PersistenceStrategy $persistenceStrategy, int $loadBatchSize = 10000, string $eventStreamsTable = 'event_streams', - bool $disableTransactionHandling = false + bool $disableTransactionHandling = false, + WriteLockStrategy $writeLockStrategy = null ) { if (! \extension_loaded('pdo_pgsql')) { throw ExtensionNotLoaded::with('pdo_pgsql'); } + if (null === $writeLockStrategy) { + $writeLockStrategy = new NoLockStrategy(); + } + Assertion::min($loadBatchSize, 1); $this->messageFactory = $messageFactory; @@ -92,6 +103,7 @@ public function __construct( $this->loadBatchSize = $loadBatchSize; $this->eventStreamsTable = $eventStreamsTable; $this->disableTransactionHandling = $disableTransactionHandling; + $this->writeLockStrategy = $writeLockStrategy; } public function fetchStreamMetadata(StreamName $streamName): array @@ -202,6 +214,11 @@ public function appendTo(StreamName $streamName, Iterator $streamEvents): void $tableName = $this->persistenceStrategy->generateTableName($streamName); + $lockName = '_' . $tableName . '_write_lock'; + if (!$this->writeLockStrategy->getLock($lockName)) { + throw ConcurrencyExceptionFactory::failedToAcquireLock(); + } + $rowPlaces = '(' . \implode(', ', \array_fill(0, \count($columnNames), '?')) . ')'; $allPlaces = \implode(', ', \array_fill(0, $countEntries, $rowPlaces)); @@ -226,6 +243,8 @@ public function appendTo(StreamName $streamName, Iterator $streamEvents): void if ($statement->errorCode() !== '00000') { throw RuntimeException::fromStatementErrorInfo($statement->errorInfo()); } + + $this->writeLockStrategy->releaseLock($lockName); } public function load( diff --git a/src/WriteLockStrategy.php b/src/WriteLockStrategy.php new file mode 100644 index 00000000..7b033fb7 --- /dev/null +++ b/src/WriteLockStrategy.php @@ -0,0 +1,21 @@ + + * (c) 2016-2019 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Prooph\EventStore\Pdo; + +interface WriteLockStrategy +{ + public function getLock(string $name): bool; + + public function releaseLock(string $name): bool; +} diff --git a/src/WriteLockStrategy/MariaDbMetadataLockStrategy.php b/src/WriteLockStrategy/MariaDbMetadataLockStrategy.php new file mode 100644 index 00000000..4e8bfd12 --- /dev/null +++ b/src/WriteLockStrategy/MariaDbMetadataLockStrategy.php @@ -0,0 +1,71 @@ + + * (c) 2016-2019 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Prooph\EventStore\Pdo\WriteLockStrategy; + +use Prooph\EventStore\Pdo\WriteLockStrategy; + +final class MariaDbMetadataLockStrategy implements WriteLockStrategy +{ + /** + * @var \PDO + */ + private $connection; + + /** + * @var int + */ + private $timeout; + + public function __construct(\PDO $connection, int $timeout = 0xffffff) + { + if ($timeout < 0) { + throw new \InvalidArgumentException('$timeout must be larger 0.'); + } + + $this->connection = $connection; + $this->timeout = $timeout; + } + + public function getLock(string $name): bool + { + try { + $res = $this->connection->query('SELECT GET_LOCK("' . $name . '", ' . $this->timeout . ') as `get_lock`'); + } catch (\PDOException $e) { + // ER_USER_LOCK_DEADLOCK: we only care for deadlock errors and fail locking + if ('3058' === $this->connection->errorCode()) { + return false; + } + + throw $e; + } + + if (!$res) { + return false; + } + + $lockStatus = $res->fetchAll(); + if ('1' === $lockStatus[0]['get_lock']) { + return true; + } + + return false; + } + + public function releaseLock(string $name): bool + { + $this->connection->exec('DO RELEASE_LOCK("' . $name . '") as `release_lock`'); + + return true; + } +} diff --git a/src/WriteLockStrategy/MysqlMetadataLockStrategy.php b/src/WriteLockStrategy/MysqlMetadataLockStrategy.php new file mode 100644 index 00000000..f56ea193 --- /dev/null +++ b/src/WriteLockStrategy/MysqlMetadataLockStrategy.php @@ -0,0 +1,67 @@ + + * (c) 2016-2019 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Prooph\EventStore\Pdo\WriteLockStrategy; + +use Prooph\EventStore\Pdo\WriteLockStrategy; + +final class MysqlMetadataLockStrategy implements WriteLockStrategy +{ + /** + * @var \PDO + */ + private $connection; + + /** + * @var int + */ + private $timeout; + + public function __construct(\PDO $connection, int $timeout = -1) + { + $this->connection = $connection; + $this->timeout = $timeout; + } + + public function getLock(string $name): bool + { + try { + $res = $this->connection->query('SELECT GET_LOCK("' . $name . '", ' . $this->timeout . ') as `get_lock`'); + } catch (\PDOException $e) { + // ER_USER_LOCK_DEADLOCK: we only care for deadlock errors and fail locking + if ('3058' === $this->connection->errorCode()) { + return false; + } + + throw $e; + } + + if (!$res) { + return false; + } + + $lockStatus = $res->fetchAll(); + if ('1' === $lockStatus[0]['get_lock']) { + return true; + } + + return false; + } + + public function releaseLock(string $name): bool + { + $this->connection->exec('DO RELEASE_LOCK("' . $name . '") as `release_lock`'); + + return true; + } +} diff --git a/src/WriteLockStrategy/NoLockStrategy.php b/src/WriteLockStrategy/NoLockStrategy.php new file mode 100644 index 00000000..5e191c48 --- /dev/null +++ b/src/WriteLockStrategy/NoLockStrategy.php @@ -0,0 +1,29 @@ + + * (c) 2016-2019 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Prooph\EventStore\Pdo\WriteLockStrategy; + +use Prooph\EventStore\Pdo\WriteLockStrategy; + +final class NoLockStrategy implements WriteLockStrategy +{ + public function getLock(string $name): bool + { + return true; + } + + public function releaseLock(string $name): bool + { + return true; + } +} diff --git a/tests/MariaDbEventStoreTest.php b/tests/MariaDbEventStoreTest.php index 19be9d82..184c18de 100644 --- a/tests/MariaDbEventStoreTest.php +++ b/tests/MariaDbEventStoreTest.php @@ -25,10 +25,13 @@ use Prooph\EventStore\Pdo\PersistenceStrategy; use Prooph\EventStore\Pdo\PersistenceStrategy\MariaDbAggregateStreamStrategy; use Prooph\EventStore\Pdo\PersistenceStrategy\MariaDbSingleStreamStrategy; +use Prooph\EventStore\Pdo\WriteLockStrategy; +use Prooph\EventStore\Pdo\WriteLockStrategy\MariaDbMetadataLockStrategy; use Prooph\EventStore\Stream; use Prooph\EventStore\StreamName; use ProophTest\EventStore\Mock\UserCreated; use ProophTest\EventStore\Mock\UsernameChanged; +use Prophecy\Argument; use Ramsey\Uuid\Uuid; /** @@ -163,6 +166,113 @@ public function it_ignores_transaction_handling_if_flag_is_enabled(): void $eventStore->appendTo(new StreamName('Prooph\Model\User'), new ArrayIterator([$streamEvent])); } + /** + * @test + */ + public function it_requests_and_releases_locks_when_appending_streams(): void + { + $writeLockName = '__878c0b7e51ecaab95c511fc816ad2a70c9418208_write_lock'; + + $lockStrategy = $this->prophesize(WriteLockStrategy::class); + $lockStrategy->getLock(Argument::exact($writeLockName))->shouldBeCalled()->willReturn(true); + $lockStrategy->releaseLock(Argument::exact($writeLockName))->shouldBeCalled()->willReturn(true); + + $connection = $this->prophesize(\PDO::class); + + $appendStatement = $this->prophesize(\PDOStatement::class); + $appendStatement->execute(Argument::any())->willReturn(true); + $appendStatement->errorInfo()->willReturn([0 => '00000']); + $appendStatement->errorCode()->willReturn('00000'); + + $connection->inTransaction()->willReturn(false); + $connection->beginTransaction()->willReturn(true); + $connection->prepare(Argument::any())->willReturn($appendStatement); + + $eventStore = new MariaDbEventStore( + new FQCNMessageFactory(), + $connection->reveal(), + new MariaDbSingleStreamStrategy(new NoOpMessageConverter()), + 10000, + 'event_streams', + false, + $lockStrategy->reveal() + ); + + $streamEvent = UsernameChanged::with( + ['name' => 'John Doe'], + 1 + ); + + $eventStore->appendTo(new StreamName('Prooph\Model\User'), new ArrayIterator([$streamEvent])); + } + + /** + * @test + */ + public function it_throws_exception_when_lock_fails(): void + { + $this->expectException(ConcurrencyException::class); + + $lockStrategy = $this->prophesize(WriteLockStrategy::class); + $lockStrategy->getLock(Argument::any())->shouldBeCalled()->willReturn(false); + + $connection = $this->prophesize(\PDO::class); + + $eventStore = new MariaDbEventStore( + new FQCNMessageFactory(), + $connection->reveal(), + new MariaDbSingleStreamStrategy(new NoOpMessageConverter()), + 10000, + 'event_streams', + false, + $lockStrategy->reveal() + ); + + $streamEvent = UsernameChanged::with( + ['name' => 'John Doe'], + 1 + ); + + $eventStore->appendTo(new StreamName('Prooph\Model\User'), new ArrayIterator([$streamEvent])); + } + + /** + * @test + */ + public function it_can_write_to_db_with_locks_enabled(): void + { + $eventStore = new MariaDbEventStore( + new FQCNMessageFactory(), + $this->connection, + new MariaDbSingleStreamStrategy(new NoOpMessageConverter()), + 10000, + 'event_streams', + false, + new MariaDbMetadataLockStrategy($this->connection) + ); + + $streamName = new StreamName('Prooph\Model\User'); + $stream = new Stream($streamName, new ArrayIterator([])); + + $eventStore->create($stream); + + $streamEvent = UsernameChanged::with( + ['name' => 'John Doe'], + 1 + ); + + $streamEvent = $streamEvent->withAddedMetadata('tag', 'person'); + $streamEvent = $streamEvent->withAddedMetadata('_aggregate_id', Uuid::uuid4()->toString()); + $streamEvent = $streamEvent->withAddedMetadata('_aggregate_type', 'user'); + + $eventStore->appendTo($streamName, new ArrayIterator([$streamEvent])); + + $metadataMatcher = new MetadataMatcher(); + $iterator = $eventStore->load($streamName, 1, 5, $metadataMatcher); + + $this->assertCount(1, $iterator); + } + /** * @test */ diff --git a/tests/MySqlEventStoreTest.php b/tests/MySqlEventStoreTest.php index 2d71ac6e..58ca5ac5 100644 --- a/tests/MySqlEventStoreTest.php +++ b/tests/MySqlEventStoreTest.php @@ -25,10 +25,13 @@ use Prooph\EventStore\Pdo\PersistenceStrategy; use Prooph\EventStore\Pdo\PersistenceStrategy\MySqlAggregateStreamStrategy; use Prooph\EventStore\Pdo\PersistenceStrategy\MySqlSingleStreamStrategy; +use Prooph\EventStore\Pdo\WriteLockStrategy; +use Prooph\EventStore\Pdo\WriteLockStrategy\MysqlMetadataLockStrategy; use Prooph\EventStore\Stream; use Prooph\EventStore\StreamName; use ProophTest\EventStore\Mock\UserCreated; use ProophTest\EventStore\Mock\UsernameChanged; +use Prophecy\Argument; use Ramsey\Uuid\Uuid; /** @@ -183,6 +186,113 @@ public function it_ignores_transaction_handling_if_flag_is_enabled(): void $eventStore->appendTo(new StreamName('Prooph\Model\User'), new ArrayIterator([$streamEvent])); } + /** + * @test + */ + public function it_requests_and_releases_locks_when_appending_streams(): void + { + $writeLockName = '__878c0b7e51ecaab95c511fc816ad2a70c9418208_write_lock'; + + $lockStrategy = $this->prophesize(WriteLockStrategy::class); + $lockStrategy->getLock(Argument::exact($writeLockName))->shouldBeCalled()->willReturn(true); + $lockStrategy->releaseLock(Argument::exact($writeLockName))->shouldBeCalled()->willReturn(true); + + $connection = $this->prophesize(\PDO::class); + + $appendStatement = $this->prophesize(\PDOStatement::class); + $appendStatement->execute(Argument::any())->willReturn(true); + $appendStatement->errorInfo()->willReturn([0 => '00000']); + $appendStatement->errorCode()->willReturn('00000'); + + $connection->inTransaction()->willReturn(false); + $connection->beginTransaction()->willReturn(true); + $connection->prepare(Argument::any())->willReturn($appendStatement); + + $eventStore = new MySqlEventStore( + new FQCNMessageFactory(), + $connection->reveal(), + new MySqlAggregateStreamStrategy(new NoOpMessageConverter()), + 10000, + 'event_streams', + false, + $lockStrategy->reveal() + ); + + $streamEvent = UsernameChanged::with( + ['name' => 'John Doe'], + 1 + ); + + $eventStore->appendTo(new StreamName('Prooph\Model\User'), new ArrayIterator([$streamEvent])); + } + + /** + * @test + */ + public function it_throws_exception_when_lock_fails(): void + { + $this->expectException(ConcurrencyException::class); + + $lockStrategy = $this->prophesize(WriteLockStrategy::class); + $lockStrategy->getLock(Argument::any())->shouldBeCalled()->willReturn(false); + + $connection = $this->prophesize(\PDO::class); + + $eventStore = new MySqlEventStore( + new FQCNMessageFactory(), + $connection->reveal(), + new MySqlAggregateStreamStrategy(new NoOpMessageConverter()), + 10000, + 'event_streams', + false, + $lockStrategy->reveal() + ); + + $streamEvent = UsernameChanged::with( + ['name' => 'John Doe'], + 1 + ); + + $eventStore->appendTo(new StreamName('Prooph\Model\User'), new ArrayIterator([$streamEvent])); + } + + /** + * @test + */ + public function it_can_write_to_db_with_locks_enabled(): void + { + $eventStore = new MySqlEventStore( + new FQCNMessageFactory(), + $this->connection, + new MySqlSingleStreamStrategy(new NoOpMessageConverter()), + 10000, + 'event_streams', + false, + new MysqlMetadataLockStrategy($this->connection) + ); + + $streamName = new StreamName('Prooph\Model\User'); + $stream = new Stream($streamName, new ArrayIterator([])); + + $eventStore->create($stream); + + $streamEvent = UsernameChanged::with( + ['name' => 'John Doe'], + 1 + ); + + $streamEvent = $streamEvent->withAddedMetadata('tag', 'person'); + $streamEvent = $streamEvent->withAddedMetadata('_aggregate_id', Uuid::uuid4()->toString()); + $streamEvent = $streamEvent->withAddedMetadata('_aggregate_type', 'user'); + + $eventStore->appendTo($streamName, new ArrayIterator([$streamEvent])); + + $metadataMatcher = new MetadataMatcher(); + $iterator = $eventStore->load($streamName, 1, 5, $metadataMatcher); + + $this->assertCount(1, $iterator); + } + /** * @test */ diff --git a/tests/PostgresEventStoreTest.php b/tests/PostgresEventStoreTest.php index c89e5bfb..3e535549 100644 --- a/tests/PostgresEventStoreTest.php +++ b/tests/PostgresEventStoreTest.php @@ -13,6 +13,7 @@ namespace ProophTest\EventStore\Pdo; +use ArrayIterator; use PDO; use Prooph\Common\Messaging\FQCNMessageFactory; use Prooph\Common\Messaging\NoOpMessageConverter; @@ -24,11 +25,13 @@ use Prooph\EventStore\Pdo\PersistenceStrategy\PostgresAggregateStreamStrategy; use Prooph\EventStore\Pdo\PersistenceStrategy\PostgresSingleStreamStrategy; use Prooph\EventStore\Pdo\PostgresEventStore; +use Prooph\EventStore\Pdo\WriteLockStrategy; use Prooph\EventStore\Stream; use Prooph\EventStore\StreamName; use ProophTest\EventStore\Mock\UserCreated; use ProophTest\EventStore\Mock\UsernameChanged; use ProophTest\EventStore\TransactionalEventStoreTestTrait; +use Prophecy\Argument; use Ramsey\Uuid\Uuid; /** @@ -155,6 +158,76 @@ public function it_ignores_transaction_handling_if_flag_is_enabled(): void $eventStore->rollback(); } + /** + * @test + */ + public function it_requests_and_releases_locks_when_appending_streams(): void + { + $writeLockName = '__878c0b7e51ecaab95c511fc816ad2a70c9418208_write_lock'; + + $lockStrategy = $this->prophesize(WriteLockStrategy::class); + $lockStrategy->getLock(Argument::exact($writeLockName))->shouldBeCalled()->willReturn(true); + $lockStrategy->releaseLock(Argument::exact($writeLockName))->shouldBeCalled()->willReturn(true); + + $connection = $this->prophesize(\PDO::class); + + $appendStatement = $this->prophesize(\PDOStatement::class); + $appendStatement->execute(Argument::any())->willReturn(true); + $appendStatement->errorInfo()->willReturn([0 => '00000']); + $appendStatement->errorCode()->willReturn('00000'); + + $connection->inTransaction()->willReturn(false); + $connection->beginTransaction()->willReturn(true); + $connection->prepare(Argument::any())->willReturn($appendStatement); + + $eventStore = new PostgresEventStore( + new FQCNMessageFactory(), + $connection->reveal(), + new PostgresAggregateStreamStrategy(new NoOpMessageConverter()), + 10000, + 'event_streams', + false, + $lockStrategy->reveal() + ); + + $streamEvent = UsernameChanged::with( + ['name' => 'John Doe'], + 1 + ); + + $eventStore->appendTo(new StreamName('Prooph\Model\User'), new ArrayIterator([$streamEvent])); + } + + /** + * @test + */ + public function it_throws_exception_when_lock_fails(): void + { + $this->expectException(ConcurrencyException::class); + + $lockStrategy = $this->prophesize(WriteLockStrategy::class); + $lockStrategy->getLock(Argument::any())->shouldBeCalled()->willReturn(false); + + $connection = $this->prophesize(\PDO::class); + + $eventStore = new PostgresEventStore( + new FQCNMessageFactory(), + $connection->reveal(), + new PostgresAggregateStreamStrategy(new NoOpMessageConverter()), + 10000, + 'event_streams', + false, + $lockStrategy->reveal() + ); + + $streamEvent = UsernameChanged::with( + ['name' => 'John Doe'], + 1 + ); + + $eventStore->appendTo(new StreamName('Prooph\Model\User'), new ArrayIterator([$streamEvent])); + } + /** * @test */ diff --git a/tests/WriteLockStrategy/MariaDbMetadataLockStrategyTest.php b/tests/WriteLockStrategy/MariaDbMetadataLockStrategyTest.php new file mode 100644 index 00000000..a7c6aa64 --- /dev/null +++ b/tests/WriteLockStrategy/MariaDbMetadataLockStrategyTest.php @@ -0,0 +1,210 @@ + + * (c) 2016-2019 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\EventStore\Pdo\WriteLockStrategy; + +use PHPUnit\Framework\TestCase; +use Prooph\EventStore\Pdo\WriteLockStrategy\MariaDbMetadataLockStrategy; +use Prophecy\Argument; + +/** + * @group mariadb + */ +class MariaDbMetadataLockStrategyTest extends TestCase +{ + /** + * @test + */ + public function throws_exception_when_passing_negative_timeout() + { + $this->expectException(\InvalidArgumentException::class); + + $connection = $this->prophesize(\PDO::class); + + new MariaDbMetadataLockStrategy($connection->reveal(), -5); + } + + /** + * @test + */ + public function it_returns_true_when_lock_successful() + { + $statement = $this->prophesize(\PDOStatement::class); + $statement->fetchAll()->willReturn([ + 0 => ['get_lock' => '1'], + ]); + + $connection = $this->prophesize(\PDO::class); + + $connection->query(Argument::any())->willReturn($statement->reveal()); + + $strategy = new MariaDbMetadataLockStrategy($connection->reveal()); + + $this->assertTrue($strategy->getLock('lock')); + } + + /** + * @test + */ + public function it_requests_lock_with_given_name() + { + $statement = $this->prophesize(\PDOStatement::class); + $statement->fetchAll()->willReturn([ + 0 => ['get_lock' => '1'], + ]); + + $connection = $this->prophesize(\PDO::class); + + $connection->query(Argument::containingString('GET_LOCK("lock"')) + ->willReturn($statement->reveal()) + ->shouldBeCalled(); + + $strategy = new MariaDbMetadataLockStrategy($connection->reveal()); + + $strategy->getLock('lock'); + } + + /** + * @test + */ + public function it_requests_lock_without_timeout() + { + $statement = $this->prophesize(\PDOStatement::class); + $statement->fetchAll()->willReturn([ + 0 => ['get_lock' => '1'], + ]); + + $connection = $this->prophesize(\PDO::class); + + $connection->query(Argument::containingString("16777215")) + ->willReturn($statement->reveal()) + ->shouldBeCalled(); + + $strategy = new MariaDbMetadataLockStrategy($connection->reveal()); + + $strategy->getLock('lock'); + } + + /** + * @test + */ + public function it_requests_lock_with_configured_timeout() + { + $statement = $this->prophesize(\PDOStatement::class); + $statement->fetchAll()->willReturn([ + 0 => ['get_lock' => '1'], + ]); + + $connection = $this->prophesize(\PDO::class); + + $connection->query(Argument::containingString('100')) + ->willReturn($statement->reveal()) + ->shouldBeCalled(); + + $strategy = new MariaDbMetadataLockStrategy($connection->reveal(), 100); + + $strategy->getLock('lock'); + } + + /** + * @test + */ + public function it_returns_false_on_statement_error() + { + $connection = $this->prophesize(\PDO::class); + + $connection->query(Argument::any())->willReturn(false); + + $strategy = new MariaDbMetadataLockStrategy($connection->reveal()); + + $this->assertFalse($strategy->getLock('lock')); + } + + /** + * @test + */ + public function it_returns_false_on_lock_failure() + { + $statement = $this->prophesize(\PDOStatement::class); + $statement->fetchAll()->willReturn([ + 0 => ['get_lock' => '0'], + ]); + + $connection = $this->prophesize(\PDO::class); + + $connection->query(Argument::any())->willReturn($statement->reveal()); + + $strategy = new MariaDbMetadataLockStrategy($connection->reveal()); + + $this->assertFalse($strategy->getLock('lock')); + } + + /** + * @test + */ + public function it_returns_false_on_lock_killed() + { + $statement = $this->prophesize(\PDOStatement::class); + $statement->fetchAll()->willReturn([ + 0 => ['get_lock' => null], + ]); + + $connection = $this->prophesize(\PDO::class); + + $connection->query(Argument::any())->willReturn($statement->reveal()); + + $strategy = new MariaDbMetadataLockStrategy($connection->reveal()); + + $this->assertFalse($strategy->getLock('lock')); + } + + /** + * @test + */ + public function it_returns_false_on_deadlock_exception() + { + $connection = $this->prophesize(\PDO::class); + + $connection->query(Argument::any())->willThrow($this->prophesize(\PDOException::class)->reveal()); + $connection->errorCode()->willReturn('3058'); + + $strategy = new MariaDbMetadataLockStrategy($connection->reveal()); + + $this->assertFalse($strategy->getLock('lock')); + } + + /** + * @test + */ + public function it_releases_lock() + { + $connection = $this->prophesize(\PDO::class); + + $connection->exec(Argument::containingString('RELEASE_LOCK("lock"'))->shouldBeCalled(); + + $strategy = new MariaDbMetadataLockStrategy($connection->reveal()); + + $strategy->releaseLock('lock'); + } + + /** + * @test + */ + public function it_release_returns_true() + { + $connection = $this->prophesize(\PDO::class); + $strategy = new MariaDbMetadataLockStrategy($connection->reveal()); + + $this->assertTrue($strategy->releaseLock('lock')); + } +} diff --git a/tests/WriteLockStrategy/MysqlMetadataLockStrategyTest.php b/tests/WriteLockStrategy/MysqlMetadataLockStrategyTest.php new file mode 100644 index 00000000..e5082a19 --- /dev/null +++ b/tests/WriteLockStrategy/MysqlMetadataLockStrategyTest.php @@ -0,0 +1,198 @@ + + * (c) 2016-2019 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\EventStore\Pdo\WriteLockStrategy; + +use PHPUnit\Framework\TestCase; +use Prooph\EventStore\Pdo\WriteLockStrategy\MysqlMetadataLockStrategy; +use Prophecy\Argument; + +/** + * @group mysql + */ +class MysqlMetadataLockStrategyTest extends TestCase +{ + /** + * @test + */ + public function it_returns_true_when_lock_successful() + { + $statement = $this->prophesize(\PDOStatement::class); + $statement->fetchAll()->willReturn([ + 0 => ['get_lock' => '1'], + ]); + + $connection = $this->prophesize(\PDO::class); + + $connection->query(Argument::any())->willReturn($statement->reveal()); + + $strategy = new MysqlMetadataLockStrategy($connection->reveal()); + + $this->assertTrue($strategy->getLock('lock')); + } + + /** + * @test + */ + public function it_requests_lock_with_given_name() + { + $statement = $this->prophesize(\PDOStatement::class); + $statement->fetchAll()->willReturn([ + 0 => ['get_lock' => '1'], + ]); + + $connection = $this->prophesize(\PDO::class); + + $connection->query(Argument::containingString('GET_LOCK("lock"')) + ->willReturn($statement->reveal()) + ->shouldBeCalled(); + + $strategy = new MysqlMetadataLockStrategy($connection->reveal()); + + $strategy->getLock('lock'); + } + + /** + * @test + */ + public function it_requests_lock_without_timeout() + { + $statement = $this->prophesize(\PDOStatement::class); + $statement->fetchAll()->willReturn([ + 0 => ['get_lock' => '1'], + ]); + + $connection = $this->prophesize(\PDO::class); + + $connection->query(Argument::containingString('-1')) + ->willReturn($statement->reveal()) + ->shouldBeCalled(); + + $strategy = new MysqlMetadataLockStrategy($connection->reveal()); + + $strategy->getLock('lock'); + } + + /** + * @test + */ + public function it_requests_lock_with_configured_timeout() + { + $statement = $this->prophesize(\PDOStatement::class); + $statement->fetchAll()->willReturn([ + 0 => ['get_lock' => '1'], + ]); + + $connection = $this->prophesize(\PDO::class); + + $connection->query(Argument::containingString('100')) + ->willReturn($statement->reveal()) + ->shouldBeCalled(); + + $strategy = new MysqlMetadataLockStrategy($connection->reveal(), 100); + + $strategy->getLock('lock'); + } + + /** + * @test + */ + public function it_returns_false_on_statement_error() + { + $connection = $this->prophesize(\PDO::class); + + $connection->query(Argument::any())->willReturn(false); + + $strategy = new MysqlMetadataLockStrategy($connection->reveal()); + + $this->assertFalse($strategy->getLock('lock')); + } + + /** + * @test + */ + public function it_returns_false_on_lock_failure() + { + $statement = $this->prophesize(\PDOStatement::class); + $statement->fetchAll()->willReturn([ + 0 => ['get_lock' => '0'], + ]); + + $connection = $this->prophesize(\PDO::class); + + $connection->query(Argument::any())->willReturn($statement->reveal()); + + $strategy = new MysqlMetadataLockStrategy($connection->reveal()); + + $this->assertFalse($strategy->getLock('lock')); + } + + /** + * @test + */ + public function it_returns_false_on_lock_killed() + { + $statement = $this->prophesize(\PDOStatement::class); + $statement->fetchAll()->willReturn([ + 0 => ['get_lock' => null], + ]); + + $connection = $this->prophesize(\PDO::class); + + $connection->query(Argument::any())->willReturn($statement->reveal()); + + $strategy = new MysqlMetadataLockStrategy($connection->reveal()); + + $this->assertFalse($strategy->getLock('lock')); + } + + /** + * @test + */ + public function it_returns_false_on_deadlock_exception() + { + $connection = $this->prophesize(\PDO::class); + + $connection->query(Argument::any())->willThrow($this->prophesize(\PDOException::class)->reveal()); + $connection->errorCode()->willReturn('3058'); + + $strategy = new MysqlMetadataLockStrategy($connection->reveal()); + + $this->assertFalse($strategy->getLock('lock')); + } + + /** + * @test + */ + public function it_releases_lock() + { + $connection = $this->prophesize(\PDO::class); + + $connection->exec(Argument::containingString('RELEASE_LOCK("lock"'))->shouldBeCalled(); + + $strategy = new MysqlMetadataLockStrategy($connection->reveal()); + + $strategy->releaseLock('lock'); + } + + /** + * @test + */ + public function it_release_returns_true() + { + $connection = $this->prophesize(\PDO::class); + $strategy = new MysqlMetadataLockStrategy($connection->reveal()); + + $this->assertTrue($strategy->releaseLock('lock')); + } +} diff --git a/tests/WriteLockStrategy/NoLockStrategyTest.php b/tests/WriteLockStrategy/NoLockStrategyTest.php new file mode 100644 index 00000000..d8a00216 --- /dev/null +++ b/tests/WriteLockStrategy/NoLockStrategyTest.php @@ -0,0 +1,43 @@ + + * (c) 2016-2019 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\EventStore\Pdo\WriteLockStrategy; + +use PHPUnit\Framework\TestCase; +use Prooph\EventStore\Pdo\WriteLockStrategy\NoLockStrategy; + +/** + * @group mysql + */ +class NoLockStrategyTest extends TestCase +{ + /** + * @test + */ + public function it_always_succeeds_locking() + { + $strategy = new NoLockStrategy(); + + $this->assertTrue($strategy->getLock('write_lock')); + } + + /** + * @test + */ + public function in_always_succeeds_releasing() + { + $strategy = new NoLockStrategy(); + + $this->assertTrue($strategy->releaseLock('write_lock')); + } +}