From a6bfa5931ee9acc2e16646056e236aa3c262a0cf Mon Sep 17 00:00:00 2001 From: Joe Bennett Date: Thu, 22 Aug 2019 14:21:57 +1000 Subject: [PATCH] [Lock] add mongodb store --- src/Symfony/Component/Lock/CHANGELOG.md | 3 +- .../Component/Lock/Store/MongoDbStore.php | 386 ++++++++++++++++++ .../Component/Lock/Store/StoreFactory.php | 8 +- .../Lock/Tests/Store/MongoDbStoreTest.php | 131 ++++++ .../Lock/Tests/Store/StoreFactoryTest.php | 5 + 5 files changed, 531 insertions(+), 2 deletions(-) create mode 100644 src/Symfony/Component/Lock/Store/MongoDbStore.php create mode 100644 src/Symfony/Component/Lock/Tests/Store/MongoDbStoreTest.php diff --git a/src/Symfony/Component/Lock/CHANGELOG.md b/src/Symfony/Component/Lock/CHANGELOG.md index 32b40c6df2b3..c38b783fb9cb 100644 --- a/src/Symfony/Component/Lock/CHANGELOG.md +++ b/src/Symfony/Component/Lock/CHANGELOG.md @@ -11,7 +11,8 @@ CHANGELOG 4.4.0 ----- - * added InvalidTtlException + * added InvalidTtlException + * added the MongoDbStore supporting MongoDB servers >=2.2 * deprecated `StoreInterface` in favor of `BlockingStoreInterface` and `PersistingStoreInterface` * `Factory` is deprecated, use `LockFactory` instead * `StoreFactory::createStore` allows PDO and Zookeeper DSN. diff --git a/src/Symfony/Component/Lock/Store/MongoDbStore.php b/src/Symfony/Component/Lock/Store/MongoDbStore.php new file mode 100644 index 000000000000..73158935c92b --- /dev/null +++ b/src/Symfony/Component/Lock/Store/MongoDbStore.php @@ -0,0 +1,386 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Lock\Store; + +use MongoDB\BSON\UTCDateTime; +use MongoDB\Client; +use MongoDB\Collection; +use MongoDB\Driver\Command; +use MongoDB\Driver\Exception\WriteException; +use MongoDB\Exception\DriverRuntimeException; +use MongoDB\Exception\InvalidArgumentException as MongoInvalidArgumentException; +use MongoDB\Exception\UnsupportedException; +use Symfony\Component\Lock\Exception\InvalidArgumentException; +use Symfony\Component\Lock\Exception\InvalidTtlException; +use Symfony\Component\Lock\Exception\LockAcquiringException; +use Symfony\Component\Lock\Exception\LockConflictedException; +use Symfony\Component\Lock\Exception\LockExpiredException; +use Symfony\Component\Lock\Exception\LockStorageException; +use Symfony\Component\Lock\Exception\NotSupportedException; +use Symfony\Component\Lock\Key; +use Symfony\Component\Lock\StoreInterface; + +/** + * MongoDbStore is a StoreInterface implementation using MongoDB as a storage + * engine. Support for MongoDB server >=2.2 due to use of TTL indexes. + * + * CAUTION: TTL Indexes are used so this store relies on all client and server + * nodes to have synchronized clocks for lock expiry to occur at the correct + * time. To ensure locks don't expire prematurely; the TTLs should be set with + * enough extra time to account for any clock drift between nodes. + * + * CAUTION: The locked resource name is indexed in the _id field of the lock + * collection. An indexed field's value in MongoDB can be a maximum of 1024 + * bytes in length inclusive of structural overhead. + * + * @see https://docs.mongodb.com/manual/reference/limits/#Index-Key-Limit + * + * @requires extension mongodb + * + * @author Joe Bennett + */ +class MongoDbStore implements StoreInterface +{ + private $collection; + private $client; + private $uri; + private $options; + private $initialTtl; + + private $databaseVersion; + + use ExpiringStoreTrait; + + /** + * @param Collection|Client|string $mongo An instance of a Collection or Client or URI @see https://docs.mongodb.com/manual/reference/connection-string/ + * @param array $options See below + * @param float $initialTtl The expiration delay of locks in seconds + * + * @throws InvalidArgumentException If required options are not provided + * @throws InvalidTtlException When the initial ttl is not valid + * + * Options: + * gcProbablity: Should a TTL Index be created expressed as a probability from 0.0 to 1.0 [default: 0.001] + * database: The name of the database [required when $mongo is a Client] + * collection: The name of the collection [required when $mongo is a Client] + * uriOptions: Array of uri options. [used when $mongo is a URI] + * driverOptions: Array of driver options. [used when $mongo is a URI] + * + * When using a URI string: + * the database is determined from the "database" option, otherwise the uri's path is used. + * the collection is determined from the "collection" option, otherwise the uri's "collection" querystring parameter is used. + * + * For example: mongodb://myuser:mypass@myhost/mydatabase?collection=mycollection + * + * @see https://docs.mongodb.com/php-library/current/reference/method/MongoDBClient__construct/ + * + * If gcProbablity is set to a value greater than 0.0 there is a chance + * this store will attempt to create a TTL index on self::save(). + * If you prefer to create your TTL Index manually you can set gcProbablity + * to 0.0 and optionally leverage + * self::createTtlIndex(int $expireAfterSeconds = 0). + * + * writeConcern, readConcern and readPreference are not specified by + * MongoDbStore meaning the collection's settings will take effect. + * @see https://docs.mongodb.com/manual/applications/replication/ + */ + public function __construct($mongo, array $options = [], float $initialTtl = 300.0) + { + $this->options = array_merge([ + 'gcProbablity' => 0.001, + 'database' => null, + 'collection' => null, + 'uriOptions' => [], + 'driverOptions' => [], + ], $options); + + $this->initialTtl = $initialTtl; + + if ($mongo instanceof Collection) { + $this->collection = $mongo; + } elseif ($mongo instanceof Client) { + if (null === $this->options['database']) { + throw new InvalidArgumentException(sprintf('%s() requires the "database" option when constructing with a %s', __METHOD__, Client::class)); + } + if (null === $this->options['collection']) { + throw new InvalidArgumentException(sprintf('%s() requires the "collection" option when constructing with a %s', __METHOD__, Client::class)); + } + + $this->client = $mongo; + } elseif (\is_string($mongo)) { + if (false === $parsedUrl = parse_url($mongo)) { + throw new InvalidArgumentException(sprintf('The given MongoDB Connection URI "%s" is invalid.', $mongo)); + } + $query = []; + if (isset($parsedUrl['query'])) { + parse_str($parsedUrl['query'], $query); + } + $this->options['collection'] = $this->options['collection'] ?? $query['collection'] ?? null; + $this->options['database'] = $this->options['database'] ?? ltrim($parsedUrl['path'] ?? '', '/') ?: null; + if (null === $this->options['database']) { + throw new InvalidArgumentException(sprintf('%s() requires the "database" in the uri path or option when constructing with a uri', __METHOD__)); + } + if (null === $this->options['collection']) { + throw new InvalidArgumentException(sprintf('%s() requires the "collection" in the uri querystring or option when constructing with a uri', __METHOD__)); + } + + $this->uri = $mongo; + } else { + throw new InvalidArgumentException(sprintf('%s() requires %s or %s or URI as first argument, %s given.', __METHOD__, Collection::class, Client::class, \is_object($mongo) ? \get_class($mongo) : \gettype($mongo))); + } + + if ($this->options['gcProbablity'] < 0.0 || $this->options['gcProbablity'] > 1.0) { + throw new InvalidArgumentException(sprintf('%s() gcProbablity must be a float from 0.0 to 1.0, %f given.', __METHOD__, $this->options['gcProbablity'])); + } + + if ($this->initialTtl <= 0) { + throw new InvalidTtlException(sprintf('%s() expects a strictly positive TTL. Got %d.', __METHOD__, $this->initialTtl)); + } + } + + /** + * Create a TTL index to automatically remove expired locks. + * + * If the gcProbablity option is set higher than 0.0 (defaults to 0.001); + * there is a chance this will be called on self::save(). + * + * Otherwise; this should be called once manually during database setup. + * + * Alternatively the TTL index can be created manually on the database: + * + * db.lock.ensureIndex( + * { "expires_at": 1 }, + * { "expireAfterSeconds": 0 } + * ) + * + * Please note, expires_at is based on the application server. If the + * database time differs; a lock could be cleaned up before it has expired. + * To ensure locks don't expire prematurely; the lock TTL should be set + * with enough extra time to account for any clock drift between nodes. + * + * A TTL index MUST BE used to automatically clean up expired locks. + * + * @see http://docs.mongodb.org/manual/tutorial/expire-data/ + * + * @throws UnsupportedException if options are not supported by the selected server + * @throws MongoInvalidArgumentException for parameter/option parsing errors + * @throws DriverRuntimeException for other driver errors (e.g. connection errors) + */ + public function createTtlIndex(int $expireAfterSeconds = 0) + { + $this->getCollection()->createIndex( + [ // key + 'expires_at' => 1, + ], + [ // options + 'expireAfterSeconds' => $expireAfterSeconds, + ] + ); + } + + /** + * {@inheritdoc} + * + * @throws LockExpiredException when save is called on an expired lock + */ + public function save(Key $key) + { + $key->reduceLifetime($this->initialTtl); + + try { + $this->upsert($key, $this->initialTtl); + } catch (WriteException $e) { + if ($this->isDuplicateKeyException($e)) { + throw new LockConflictedException('Lock was acquired by someone else', 0, $e); + } + throw new LockAcquiringException('Failed to acquire lock', 0, $e); + } + + if ($this->options['gcProbablity'] > 0.0 + && ( + 1.0 === $this->options['gcProbablity'] + || (random_int(0, PHP_INT_MAX) / PHP_INT_MAX) <= $this->options['gcProbablity'] + ) + ) { + $this->createTtlIndex(); + } + + $this->checkNotExpired($key); + } + + /** + * {@inheritdoc} + */ + public function waitAndSave(Key $key) + { + throw new NotSupportedException(sprintf('The store "%s" does not support blocking locks.', __CLASS__)); + } + + /** + * {@inheritdoc} + * + * @throws LockStorageException + * @throws LockExpiredException + */ + public function putOffExpiration(Key $key, $ttl) + { + $key->reduceLifetime($ttl); + + try { + $this->upsert($key, $ttl); + } catch (WriteException $e) { + if ($this->isDuplicateKeyException($e)) { + throw new LockConflictedException('Failed to put off the expiration of the lock', 0, $e); + } + throw new LockStorageException($e->getMessage(), 0, $e); + } + + $this->checkNotExpired($key); + } + + /** + * {@inheritdoc} + */ + public function delete(Key $key) + { + $this->getCollection()->deleteOne([ // filter + '_id' => (string) $key, + 'token' => $this->getUniqueToken($key), + ]); + } + + /** + * {@inheritdoc} + */ + public function exists(Key $key): bool + { + return null !== $this->getCollection()->findOne([ // filter + '_id' => (string) $key, + 'token' => $this->getUniqueToken($key), + 'expires_at' => [ + '$gt' => $this->createMongoDateTime(microtime(true)), + ], + ]); + } + + /** + * Update or Insert a Key. + * + * @param float $ttl Expiry in seconds from now + */ + private function upsert(Key $key, float $ttl) + { + $now = microtime(true); + $token = $this->getUniqueToken($key); + + $this->getCollection()->updateOne( + [ // filter + '_id' => (string) $key, + '$or' => [ + [ + 'token' => $token, + ], + [ + 'expires_at' => [ + '$lte' => $this->createMongoDateTime($now), + ], + ], + ], + ], + [ // update + '$set' => [ + '_id' => (string) $key, + 'token' => $token, + 'expires_at' => $this->createMongoDateTime($now + $ttl), + ], + ], + [ // options + 'upsert' => true, + ] + ); + } + + private function isDuplicateKeyException(WriteException $e): bool + { + $code = $e->getCode(); + + $writeErrors = $e->getWriteResult()->getWriteErrors(); + if (1 === \count($writeErrors)) { + $code = $writeErrors[0]->getCode(); + } + + // Mongo error E11000 - DuplicateKey + return 11000 === $code; + } + + private function getDatabaseVersion(): string + { + if (null !== $this->databaseVersion) { + return $this->databaseVersion; + } + + $command = new Command([ + 'buildinfo' => 1, + ]); + $cursor = $this->getCollection()->getManager()->executeReadCommand( + $this->getCollection()->getDatabaseName(), + $command + ); + $buildInfo = $cursor->toArray()[0]; + $this->databaseVersion = $buildInfo->version; + + return $this->databaseVersion; + } + + private function getCollection(): Collection + { + if (null !== $this->collection) { + return $this->collection; + } + + if (null === $this->client) { + $this->client = new Client($this->uri, $this->options['uriOptions'], $this->options['driverOptions']); + } + + $this->collection = $this->client->selectCollection( + $this->options['database'], + $this->options['collection'] + ); + + return $this->collection; + } + + /** + * @param float $seconds Seconds since 1970-01-01T00:00:00.000Z supporting millisecond precision. Defaults to now. + */ + private function createMongoDateTime(float $seconds): UTCDateTime + { + return new UTCDateTime($seconds * 1000); + } + + /** + * Retrieves an unique token for the given key namespaced to this store. + * + * @param Key lock state container + * + * @return string token + */ + private function getUniqueToken(Key $key): string + { + if (!$key->hasState(__CLASS__)) { + $token = base64_encode(random_bytes(32)); + $key->setState(__CLASS__, $token); + } + + return $key->getState(__CLASS__); + } +} diff --git a/src/Symfony/Component/Lock/Store/StoreFactory.php b/src/Symfony/Component/Lock/Store/StoreFactory.php index b8c90e170d2d..c7bd5725b8b2 100644 --- a/src/Symfony/Component/Lock/Store/StoreFactory.php +++ b/src/Symfony/Component/Lock/Store/StoreFactory.php @@ -26,7 +26,7 @@ class StoreFactory { /** - * @param \Redis|\RedisArray|\RedisCluster|\Predis\ClientInterface|RedisProxy|RedisClusterProxy|\Memcached|\PDO|Connection|\Zookeeper|string $connection Connection or DSN or Store short name + * @param \Redis|\RedisArray|\RedisCluster|\Predis\ClientInterface|RedisProxy|RedisClusterProxy|\Memcached|\MongoDB\Collection|\PDO|Connection|\Zookeeper|string $connection Connection or DSN or Store short name * * @return PersistingStoreInterface */ @@ -48,6 +48,9 @@ public static function createStore($connection) case $connection instanceof \Memcached: return new MemcachedStore($connection); + case $connection instanceof \MongoDB\Collection: + return new MongoDbStore($connection); + case $connection instanceof \PDO: case $connection instanceof Connection: return new PdoStore($connection); @@ -77,6 +80,9 @@ public static function createStore($connection) return new $storeClass($connection); + case 0 === strpos($connection, 'mongodb'): + return new MongoDbStore($connection); + case 0 === strpos($connection, 'mssql://'): case 0 === strpos($connection, 'mysql:'): case 0 === strpos($connection, 'mysql2://'): diff --git a/src/Symfony/Component/Lock/Tests/Store/MongoDbStoreTest.php b/src/Symfony/Component/Lock/Tests/Store/MongoDbStoreTest.php new file mode 100644 index 000000000000..144c58737b9d --- /dev/null +++ b/src/Symfony/Component/Lock/Tests/Store/MongoDbStoreTest.php @@ -0,0 +1,131 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Lock\Tests\Store; + +use MongoDB\Client; +use Symfony\Component\Lock\Key; +use Symfony\Component\Lock\PersistingStoreInterface; +use Symfony\Component\Lock\Store\MongoDbStore; + +/** + * @author Joe Bennett + * @requires extension mongodb + */ +class MongoDbStoreTest extends AbstractStoreTest +{ + use ExpiringStoreTestTrait; + + public static function setupBeforeClass(): void + { + $client = self::getMongoClient(); + $client->listDatabases(); + } + + private static function getMongoClient(): Client + { + return new Client('mongodb://'.getenv('MONGODB_HOST')); + } + + protected function getClockDelay(): int + { + return 250000; + } + + /** + * {@inheritdoc} + */ + public function getStore(): PersistingStoreInterface + { + return new MongoDbStore(self::getMongoClient(), [ + 'database' => 'test', + 'collection' => 'lock', + ]); + } + + public function testCreateIndex() + { + $store = $this->getStore(); + $store->createTtlIndex(); + + $client = self::getMongoClient(); + $collection = $client->selectCollection( + 'test', + 'lock' + ); + $indexes = []; + foreach ($collection->listIndexes() as $index) { + $indexes[] = $index->getName(); + } + $this->assertContains('expires_at_1', $indexes); + } + + public function testNonBlocking() + { + $this->expectException(\Symfony\Component\Lock\Exception\NotSupportedException::class); + + $store = $this->getStore(); + + $key = new Key(uniqid(__METHOD__, true)); + + $store->waitAndSave($key); + } + + /** + * @dataProvider provideConstructorArgs + */ + public function testConstructionMethods($mongo, array $options) + { + $key = new Key(uniqid(__METHOD__, true)); + + $store = new MongoDbStore($mongo, $options); + + $store->save($key); + $this->assertTrue($store->exists($key)); + + $store->delete($key); + $this->assertFalse($store->exists($key)); + } + + public function provideConstructorArgs() + { + $client = self::getMongoClient(); + yield [$client, ['database' => 'test', 'collection' => 'lock']]; + + $collection = $client->selectCollection('test', 'lock'); + yield [$collection, []]; + + yield ['mongodb://localhost/test?collection=lock', []]; + yield ['mongodb://localhost/test', ['collection' => 'lock']]; + yield ['mongodb://localhost/', ['database' => 'test', 'collection' => 'lock']]; + } + + /** + * @dataProvider provideInvalidConstructorArgs + */ + public function testInvalidConstructionMethods($mongo, array $options) + { + $this->expectException('Symfony\Component\Lock\Exception\InvalidArgumentException'); + + new MongoDbStore($mongo, $options); + } + + public function provideInvalidConstructorArgs() + { + $client = self::getMongoClient(); + yield [$client, ['collection' => 'lock']]; + yield [$client, ['database' => 'test']]; + + yield ['mongodb://localhost/?collection=lock', []]; + yield ['mongodb://localhost/test', []]; + yield ['mongodb://localhost/', []]; + } +} diff --git a/src/Symfony/Component/Lock/Tests/Store/StoreFactoryTest.php b/src/Symfony/Component/Lock/Tests/Store/StoreFactoryTest.php index 4e5f38798811..7ef02cb4a209 100644 --- a/src/Symfony/Component/Lock/Tests/Store/StoreFactoryTest.php +++ b/src/Symfony/Component/Lock/Tests/Store/StoreFactoryTest.php @@ -16,6 +16,7 @@ use Symfony\Component\Cache\Traits\RedisProxy; use Symfony\Component\Lock\Store\FlockStore; use Symfony\Component\Lock\Store\MemcachedStore; +use Symfony\Component\Lock\Store\MongoDbStore; use Symfony\Component\Lock\Store\PdoStore; use Symfony\Component\Lock\Store\RedisStore; use Symfony\Component\Lock\Store\SemaphoreStore; @@ -49,6 +50,10 @@ public function validConnections() if (class_exists(\Memcached::class)) { yield [new \Memcached(), MemcachedStore::class]; } + if (class_exists(\MongoDB\Collection::class)) { + yield [$this->createMock(\MongoDB\Collection::class), MongoDbStore::class]; + yield ['mongodb://localhost/test?collection=lock', MongoDbStore::class]; + } if (class_exists(\Zookeeper::class)) { yield [$this->createMock(\Zookeeper::class), ZookeeperStore::class]; yield ['zookeeper://localhost:2181', ZookeeperStore::class];