Skip to content

Commit

Permalink
prooph#189 Use locking strategy when appending to event-streams
Browse files Browse the repository at this point in the history
  • Loading branch information
fritz-gerneth committed Feb 7, 2019
1 parent 9f6b1cb commit 92046a6
Show file tree
Hide file tree
Showing 14 changed files with 997 additions and 3 deletions.
5 changes: 5 additions & 0 deletions src/Exception/ConcurrencyExceptionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ public static function fromStatementErrorInfo(array $errorInfo): ConcurrencyExce
)
);
}

public static function failedToAcquireLock(): ConcurrencyException
{
return new ConcurrencyException('Failed to acquire lock for writing to stream');
}
}
21 changes: 20 additions & 1 deletion src/MariaDbEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
use Prooph\EventStore\Pdo\Exception\ExtensionNotLoaded;
use Prooph\EventStore\Pdo\Exception\RuntimeException;
use Prooph\EventStore\Pdo\Util\Json;
use Prooph\EventStore\Pdo\WriteLockStrategy\NoLockStrategy;
use Prooph\EventStore\Stream;
use Prooph\EventStore\StreamIterator\EmptyStreamIterator;
use Prooph\EventStore\StreamName;
Expand Down Expand Up @@ -68,6 +69,11 @@ final class MariaDbEventStore implements PdoEventStore
*/
private $disableTransactionHandling;

/**
* @var WriteLockStrategy
*/
private $writeLockStrategy;

/**
* @throws ExtensionNotLoaded
*/
Expand All @@ -77,12 +83,17 @@ public function __construct(
PersistenceStrategy $persistenceStrategy,
int $loadBatchSize = 10000,
string $eventStreamsTable = 'event_streams',
bool $disableTransactionHandling = false
bool $disableTransactionHandling = false,
WriteLockStrategy $writeLockStrategy = null
) {
if (! \extension_loaded('pdo_mysql')) {
throw ExtensionNotLoaded::with('pdo_mysql');
}

if (null === $writeLockStrategy) {
$writeLockStrategy = new NoLockStrategy();
}

Assertion::min($loadBatchSize, 1);

$this->messageFactory = $messageFactory;
Expand All @@ -91,6 +102,7 @@ public function __construct(
$this->loadBatchSize = $loadBatchSize;
$this->eventStreamsTable = $eventStreamsTable;
$this->disableTransactionHandling = $disableTransactionHandling;
$this->writeLockStrategy = $writeLockStrategy;
}

public function fetchStreamMetadata(StreamName $streamName): array
Expand Down Expand Up @@ -222,6 +234,11 @@ public function appendTo(StreamName $streamName, Iterator $streamEvents): void

$tableName = $this->persistenceStrategy->generateTableName($streamName);

$lockName = '_' . $tableName . '_write_lock';
if (!$this->writeLockStrategy->getLock($lockName)) {
throw ConcurrencyExceptionFactory::failedToAcquireLock();
}

$rowPlaces = '(' . \implode(', ', \array_fill(0, \count($columnNames), '?')) . ')';
$allPlaces = \implode(', ', \array_fill(0, $countEntries, $rowPlaces));

Expand Down Expand Up @@ -271,6 +288,8 @@ public function appendTo(StreamName $streamName, Iterator $streamEvents): void
if (! $this->disableTransactionHandling && $this->connection->inTransaction() && ! $this->duringCreate) {
$this->connection->commit();
}

$this->writeLockStrategy->releaseLock($lockName);
}

public function load(
Expand Down
21 changes: 20 additions & 1 deletion src/MySqlEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
use Prooph\EventStore\Pdo\Exception\ExtensionNotLoaded;
use Prooph\EventStore\Pdo\Exception\RuntimeException;
use Prooph\EventStore\Pdo\Util\Json;
use Prooph\EventStore\Pdo\WriteLockStrategy\NoLockStrategy;
use Prooph\EventStore\Stream;
use Prooph\EventStore\StreamIterator\EmptyStreamIterator;
use Prooph\EventStore\StreamName;
Expand Down Expand Up @@ -68,6 +69,11 @@ final class MySqlEventStore implements PdoEventStore
*/
private $disableTransactionHandling;

/**
* @var WriteLockStrategy
*/
private $writeLockStrategy;

/**
* @throws ExtensionNotLoaded
*/
Expand All @@ -77,12 +83,17 @@ public function __construct(
PersistenceStrategy $persistenceStrategy,
int $loadBatchSize = 10000,
string $eventStreamsTable = 'event_streams',
bool $disableTransactionHandling = false
bool $disableTransactionHandling = false,
WriteLockStrategy $writeLockStrategy = null
) {
if (! \extension_loaded('pdo_mysql')) {
throw ExtensionNotLoaded::with('pdo_mysql');
}

if (null === $writeLockStrategy) {
$writeLockStrategy = new NoLockStrategy();
}

Assertion::min($loadBatchSize, 1);

$this->messageFactory = $messageFactory;
Expand All @@ -91,6 +102,7 @@ public function __construct(
$this->loadBatchSize = $loadBatchSize;
$this->eventStreamsTable = $eventStreamsTable;
$this->disableTransactionHandling = $disableTransactionHandling;
$this->writeLockStrategy = $writeLockStrategy;
}

public function fetchStreamMetadata(StreamName $streamName): array
Expand Down Expand Up @@ -222,6 +234,11 @@ public function appendTo(StreamName $streamName, Iterator $streamEvents): void

$tableName = $this->persistenceStrategy->generateTableName($streamName);

$lockName = '_' . $tableName . '_write_lock';
if (!$this->writeLockStrategy->getLock($lockName)) {
throw ConcurrencyExceptionFactory::failedToAcquireLock();
}

$rowPlaces = '(' . \implode(', ', \array_fill(0, \count($columnNames), '?')) . ')';
$allPlaces = \implode(', ', \array_fill(0, $countEntries, $rowPlaces));

Expand Down Expand Up @@ -271,6 +288,8 @@ public function appendTo(StreamName $streamName, Iterator $streamEvents): void
if (! $this->disableTransactionHandling && $this->connection->inTransaction() && ! $this->duringCreate) {
$this->connection->commit();
}

$this->writeLockStrategy->releaseLock($lockName);
}

public function load(
Expand Down
21 changes: 20 additions & 1 deletion src/PostgresEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
use Prooph\EventStore\Pdo\Exception\RuntimeException;
use Prooph\EventStore\Pdo\Util\Json;
use Prooph\EventStore\Pdo\Util\PostgresHelper;
use Prooph\EventStore\Pdo\WriteLockStrategy\NoLockStrategy;
use Prooph\EventStore\Stream;
use Prooph\EventStore\StreamIterator\EmptyStreamIterator;
use Prooph\EventStore\StreamName;
Expand Down Expand Up @@ -69,6 +70,11 @@ final class PostgresEventStore implements PdoEventStore, TransactionalEventStore
*/
private $disableTransactionHandling;

/**
* @var WriteLockStrategy
*/
private $writeLockStrategy;

/**
* @throws ExtensionNotLoaded
*/
Expand All @@ -78,12 +84,17 @@ public function __construct(
PersistenceStrategy $persistenceStrategy,
int $loadBatchSize = 10000,
string $eventStreamsTable = 'event_streams',
bool $disableTransactionHandling = false
bool $disableTransactionHandling = false,
WriteLockStrategy $writeLockStrategy = null
) {
if (! \extension_loaded('pdo_pgsql')) {
throw ExtensionNotLoaded::with('pdo_pgsql');
}

if (null === $writeLockStrategy) {
$writeLockStrategy = new NoLockStrategy();
}

Assertion::min($loadBatchSize, 1);

$this->messageFactory = $messageFactory;
Expand All @@ -92,6 +103,7 @@ public function __construct(
$this->loadBatchSize = $loadBatchSize;
$this->eventStreamsTable = $eventStreamsTable;
$this->disableTransactionHandling = $disableTransactionHandling;
$this->writeLockStrategy = $writeLockStrategy;
}

public function fetchStreamMetadata(StreamName $streamName): array
Expand Down Expand Up @@ -202,6 +214,11 @@ public function appendTo(StreamName $streamName, Iterator $streamEvents): void

$tableName = $this->persistenceStrategy->generateTableName($streamName);

$lockName = '_' . $tableName . '_write_lock';
if (!$this->writeLockStrategy->getLock($lockName)) {
throw ConcurrencyExceptionFactory::failedToAcquireLock();
}

$rowPlaces = '(' . \implode(', ', \array_fill(0, \count($columnNames), '?')) . ')';
$allPlaces = \implode(', ', \array_fill(0, $countEntries, $rowPlaces));

Expand All @@ -226,6 +243,8 @@ public function appendTo(StreamName $streamName, Iterator $streamEvents): void
if ($statement->errorCode() !== '00000') {
throw RuntimeException::fromStatementErrorInfo($statement->errorInfo());
}

$this->writeLockStrategy->releaseLock($lockName);
}

public function load(
Expand Down
21 changes: 21 additions & 0 deletions src/WriteLockStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

/**
* This file is part of prooph/pdo-event-store.
* (c) 2016-2019 prooph software GmbH <contact@prooph.de>
* (c) 2016-2019 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Prooph\EventStore\Pdo;

interface WriteLockStrategy
{
public function getLock(string $name): bool;

public function releaseLock(string $name): bool;
}
71 changes: 71 additions & 0 deletions src/WriteLockStrategy/MariaDbMetadataLockStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?php

/**
* This file is part of prooph/pdo-event-store.
* (c) 2016-2019 prooph software GmbH <contact@prooph.de>
* (c) 2016-2019 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Prooph\EventStore\Pdo\WriteLockStrategy;

use Prooph\EventStore\Pdo\WriteLockStrategy;

final class MariaDbMetadataLockStrategy implements WriteLockStrategy
{
/**
* @var \PDO
*/
private $connection;

/**
* @var int
*/
private $timeout;

public function __construct(\PDO $connection, int $timeout = 0xffffff)
{
if ($timeout < 0) {
throw new \InvalidArgumentException('$timeout must be larger 0.');
}

$this->connection = $connection;
$this->timeout = $timeout;
}

public function getLock(string $name): bool
{
try {
$res = $this->connection->query('SELECT GET_LOCK("' . $name . '", ' . $this->timeout . ') as `get_lock`');
} catch (\PDOException $e) {
// ER_USER_LOCK_DEADLOCK: we only care for deadlock errors and fail locking
if ('3058' === $this->connection->errorCode()) {
return false;
}

throw $e;
}

if (!$res) {
return false;
}

$lockStatus = $res->fetchAll();
if ('1' === $lockStatus[0]['get_lock']) {
return true;
}

return false;
}

public function releaseLock(string $name): bool
{
$this->connection->exec('DO RELEASE_LOCK("' . $name . '") as `release_lock`');

return true;
}
}
67 changes: 67 additions & 0 deletions src/WriteLockStrategy/MysqlMetadataLockStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
<?php

/**
* This file is part of prooph/pdo-event-store.
* (c) 2016-2019 prooph software GmbH <contact@prooph.de>
* (c) 2016-2019 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Prooph\EventStore\Pdo\WriteLockStrategy;

use Prooph\EventStore\Pdo\WriteLockStrategy;

final class MysqlMetadataLockStrategy implements WriteLockStrategy
{
/**
* @var \PDO
*/
private $connection;

/**
* @var int
*/
private $timeout;

public function __construct(\PDO $connection, int $timeout = -1)
{
$this->connection = $connection;
$this->timeout = $timeout;
}

public function getLock(string $name): bool
{
try {
$res = $this->connection->query('SELECT GET_LOCK("' . $name . '", ' . $this->timeout . ') as `get_lock`');
} catch (\PDOException $e) {
// ER_USER_LOCK_DEADLOCK: we only care for deadlock errors and fail locking
if ('3058' === $this->connection->errorCode()) {
return false;
}

throw $e;
}

if (!$res) {
return false;
}

$lockStatus = $res->fetchAll();
if ('1' === $lockStatus[0]['get_lock']) {
return true;
}

return false;
}

public function releaseLock(string $name): bool
{
$this->connection->exec('DO RELEASE_LOCK("' . $name . '") as `release_lock`');

return true;
}
}
Loading

0 comments on commit 92046a6

Please sign in to comment.