Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MongoDB as EventStore #151

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@
<group>mongo</group>
</exclude>
</groups>

<php>
<const name="MONGODB_SERVER" value="mongodb://localhost:27017" />
<const name="MONGODB_DATABASE" value="broadway_tests" />
</php>
</phpunit>
371 changes: 371 additions & 0 deletions src/Broadway/EventStore/MongoDBEventStore.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,371 @@
<?php

/*
* This file is part of the broadway/broadway package.
*
* (c) Qandidate.com <opensource@qandidate.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Broadway\EventStore;

use Broadway\Domain\DateTime;
use Broadway\Domain\DomainEventStream;
use Broadway\Domain\DomainEventStreamInterface;
use Broadway\Domain\DomainMessage;
use Broadway\Domain\Metadata;
use Broadway\EventStore\Exception\InvalidIdentifierException;
use Broadway\Serializer\SerializerInterface;
use Broadway\UuidGenerator\UuidGeneratorInterface;
use Doctrine\MongoDB\Collection;
use Doctrine\MongoDB\Database;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any value in using the Doctrine classes instead of the raw driver? This class seems comparable to the session and cache adapters I worked on for Symfony and Zend, since it consists of just a few direct read and write ops.

As noted below, Doctrine doesn't yet support the MongoWriteBatch classes (for batch ops via the write commands in 2.6+), nor the createIndex() method (although the deprecated ensureIndex() method is an alias). Were you to make the switch, I'd suggest requiring at least >=1.5.0 of the mongo extension (although it wouldn't hurt to bump to 1.6.0 given that this is a new integration).

use Exception;
use MongoClient;
use MongoCollection;
use MongoException;

/**
* Event store using a relational database as storage.
*
* The implementation uses doctrine MongoDB for the communication with the
* underlying data store.
*/
class MongoDBEventStore implements EventStoreInterface
{
const FIELD_STREAM_ID = 'sid';
const FIELD_COMMIT_ID = 'cid';
const FIELD_PLAYHEAD = 'head';
const FIELD_PAYLOAD = 'data';
const FIELD_METADATA = 'meta';
const FIELD_TYPE = 'type';
const FIELD_RECORDED_ON = 'time';

/**
* @var Collection
*/
private $eventCollection;

/**
* @var Collection
*/
private $transactionCollection;

/**
* @var SerializerInterface
*/
private $payloadSerializer;

/**
* @var SerializerInterface
*/
private $metadataSerializer;

/**
* @var UuidGeneratorInterface
*/
private $uuidGenerator;

/**
* @param Database $database
* @param SerializerInterface $payloadSerializer
* @param SerializerInterface $metadataSerializer
* @param UuidGeneratorInterface $uuidGenerator
* @param string $eventCollectionName
* @param string $transactCollectionName
*/
public function __construct(
Database $database,
SerializerInterface $payloadSerializer,
SerializerInterface $metadataSerializer,
UuidGeneratorInterface $uuidGenerator,
$eventCollectionName,
$transactCollectionName
) {
$this->payloadSerializer = $payloadSerializer;
$this->metadataSerializer = $metadataSerializer;
$this->uuidGenerator = $uuidGenerator;

$this->eventCollection = $database->selectCollection($eventCollectionName);
$this->eventCollection->setReadPreference(MongoClient::RP_PRIMARY);

$this->transactionCollection = $database->selectCollection($transactCollectionName);
$this->transactionCollection->setReadPreference(MongoClient::RP_PRIMARY);
}

/**
* {@inheritDoc}
*/
public function load($id)
{
$cursor = $this->eventCollection
->find(array(
self::FIELD_STREAM_ID => (string) $id,
self::FIELD_COMMIT_ID => null,
), array(
'_id' => false,
))
->sort(array(self::FIELD_PLAYHEAD => MongoCollection::ASCENDING))
;

$events = array();

foreach ($cursor as $row) {
$events[] = $this->deserializeEvent($row);
}

$cursor = null;

if (!$events) {
throw new EventStreamNotFoundException(sprintf('EventStream not found for aggregate with id %s', $id));
}

return new DomainEventStream($events);
}

/**
* {@inheritdoc}
*/
public function append($id, DomainEventStreamInterface $eventStream)
{
$aggregateId = (string) $id;

$events = $this->extractEvents($eventStream, $aggregateId);

if (!$events) {
return;
}

try {
if (1 == count($events)) {
$this->appendEvent(reset($events));
} else {
$this->appendEvents($events);
}
} catch (MongoException $exception) {
throw MongoDBEventStoreException::create($exception);
} catch (Exception $exception) {
throw new MongoDBEventStoreException($exception->getMessage(), 0, $exception);
}
}

/**
* Configure collection indices. Should be used by migration or by bootstrapping tests.
*/
public function configureCollection()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it is only used in the test at the moment, do we really need this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's done in the same way as DBAL implementation, it requires indices to be set before processing.

{
$this->eventCollection->ensureIndex(array(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensureIndex is deprecated. use createIndex instead.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, doctrine mongodb uses still the ensureIndex method ;)

self::FIELD_STREAM_ID => MongoCollection::ASCENDING,
self::FIELD_PLAYHEAD => MongoCollection::ASCENDING,
), array(
'unique' => true,
'background' => true,
));
}

/**
* @param DomainEventStreamInterface|DomainMessage[] $eventStream
* @param string $aggregateId
*
* @return array
*/
private function extractEvents(DomainEventStreamInterface $eventStream, $aggregateId)
{
$events = array();

foreach ($eventStream as $message) {
$this->assertIdentifier($aggregateId, $message->getId());

$events[] = $this->serializeEvent($message);
}

return $events;
}

/**
* @param string $expected
* @param string $actual
*/
private function assertIdentifier($expected, $actual)
{
if (0 != strcmp($expected, $actual)) {
throw new InvalidIdentifierException(sprintf('Expecting %s, got %s identifier.', $expected, $actual));
}
}

/**
* @param DomainMessage $message
*
* @return array
*/
private function serializeEvent(DomainMessage $message)
{
$data = array(
self::FIELD_STREAM_ID => (string) $message->getId(),
self::FIELD_PLAYHEAD => $message->getPlayhead(),
self::FIELD_METADATA => $this->metadataSerializer->serialize($message->getMetadata()),
self::FIELD_PAYLOAD => $this->payloadSerializer->serialize($message->getPayload()),
self::FIELD_RECORDED_ON => $message->getRecordedOn()->toString(),
self::FIELD_TYPE => $message->getType(),
);

// save space
if (empty($data[self::FIELD_METADATA])) {
unset($data[self::FIELD_METADATA]);
}

return $data;
}

/**
* @param array $row
*
* @return DomainMessage
*/
private function deserializeEvent(array $row)
{
if (empty($row[self::FIELD_METADATA])) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can see, it's not possible to save empty (or no) metadata, so just always deserialize it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on serialization. Eg, save only payload, as class name is known and never to be changed. Or why save empty payload of metadata, imagine overhead of 100M docs.

$metadata = new Metadata();
} else {
$metadata = $this->metadataSerializer->deserialize($row[self::FIELD_METADATA]);
}

return new DomainMessage(
$row[self::FIELD_STREAM_ID],
$row[self::FIELD_PLAYHEAD],
$metadata,
$this->payloadSerializer->deserialize($row[self::FIELD_PAYLOAD]),
DateTime::fromString($row[self::FIELD_RECORDED_ON])
);
}

/**
* @param array $event
*/
private function appendEvent(array $event)
{
$this->eventCollection->insert($event, array(
'safe' => true,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged writes have been the default since driver version 1.3, so this shouldn't be necessary. If you did want to specify the option, ['w' => 1] would be the equivalent option to use (safe is deprecated).

I'm not sure how important the data being logged by this service is, but you may want ['w' => 'majority'] when working with a replica set (in that case, it'd probably be best to make the write concern a configurable option).

));
}

/**
* @param array $events
*
* @throws Exception
*/
private function appendEvents(array $events)
{
$commitId = $this->getNewId();

try {
$this->beginTransaction($commitId);

$this->commitEvents($commitId, $events);

$this->commitTransaction($commitId);
} catch (Exception $exception) {
$this->rollbackTransaction($commitId);

throw $exception;
}
}

/**
* @param string $commitId
* @param array $events
*/
private function commitEvents($commitId, array $events)
{
foreach ($events as &$event) {
$event[self::FIELD_COMMIT_ID] = $commitId;
}

unset($event);

$this->eventCollection->batchInsert($events, array(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batchInsert is also deprecated in favor of the MongoWriteBatch classes, which were added in driver version 1.5 and supported by MongoDB 2.6+. Depending on whether those version requirements are acceptable, I'd suggest swapping this over.

'safe' => true,
));
}

/**
* @return string
*/
private function getNewId()
{
return $this->uuidGenerator->generate();
}

/**
* @param string $commitId
*/
private function beginTransaction($commitId)
{
$data = array(
'_id' => (string) $commitId,
);

$this->transactionCollection->insert($data, array(
'safe' => true,
));
}

/**
* @param string $commitId
*/
private function commitTransaction($commitId)
{
$commitId = (string) $commitId;

$this->eventCollection->update(array(
self::FIELD_COMMIT_ID => $commitId,
), array(
'$set' => array(self::FIELD_COMMIT_ID => null),
), array(
'multiple' => true,
'safe' => true,
));

$this->transactionCollection->remove(array(
'_id' => $commitId,
), array(
'safe' => true,
));
}

/**
* @param string $commitId
*
* @throws MongoException
*/
private function rollbackTransaction($commitId)
{
$commitId = (string) $commitId;
$exception = null;

try {
$this->eventCollection->remove(array(
self::FIELD_COMMIT_ID => $commitId,
), array(
'safe' => true,
));
} catch (MongoException $e) {
$exception = $e;
}

try {
$this->transactionCollection->remove(array(
'_id' => $commitId,
), array(
'safe' => true,
));
} catch (MongoException $e) {
$exception = $exception ?: $e;
}

if (null !== $exception) {
throw $exception;
}
}
}