Skip to content
This repository has been archived by the owner on Jun 23, 2021. It is now read-only.

Commit

Permalink
Merge pull request #12 from cultuurnet/feature/III-1804-saga-fix
Browse files Browse the repository at this point in the history
III-1804 saga fix
  • Loading branch information
cyberwolf committed Jan 23, 2017
2 parents 9a3f5c5 + 7a79ad3 commit d9e2b28
Show file tree
Hide file tree
Showing 11 changed files with 481 additions and 43 deletions.
11 changes: 6 additions & 5 deletions bootstrap.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
use Broadway\EventDispatcher\EventDispatcher;
use Broadway\EventHandling\EventBusInterface;
use Broadway\EventStore\DBALEventStore;
use Broadway\Saga\MultipleSagaManager;
use Broadway\Saga\State\MongoDBRepository;
use Broadway\Serializer\SimpleInterfaceSerializer;
use CommerceGuys\Intl\Currency\CurrencyRepository;
use CommerceGuys\Intl\NumberFormat\NumberFormatRepository;
Expand All @@ -19,7 +17,10 @@
use CultuurNet\UDB3\EventSourcing\ExecutionContextMetadataEnricher;
use CultuurNet\UDB3\LabelCollection;
use CultuurNet\UDB3\SimpleEventBus;
use CultuurNet\UDB3\UiTPASService\Broadway\Saga\StaticallyConfiguredSagaNamespacedEventsMetadataFactory;
use CultuurNet\UDB3\UiTPASService\Broadway\Saga\Metadata\StaticallyConfiguredSagaMetadataFactory;
use CultuurNet\UDB3\UiTPASService\Broadway\Saga\MultipleSagaManager;
use CultuurNet\UDB3\UiTPASService\Broadway\Saga\State\MongoDBRepository;
use CultuurNet\UDB3\UiTPASService\Broadway\Saga\State\StateManager;
use CultuurNet\UDB3\UiTPASService\OrganizerLabelReadRepository\JSONLDOrganizerLabelReadRepository;
use CultuurNet\UDB3\UiTPASService\Permissions\DefaultEventPermission;
use CultuurNet\UDB3\UiTPASService\Permissions\UDB3EventPermission;
Expand Down Expand Up @@ -469,11 +470,11 @@ function (Application $app) {
[
'uitpas_sync' => $app['uitpas_event_saga'],
],
new \Broadway\Saga\State\StateManager(
new StateManager(
$app['saga_repository'],
new Broadway\UuidGenerator\Rfc4122\Version4Generator()
),
new StaticallyConfiguredSagaNamespacedEventsMetadataFactory(),
new StaticallyConfiguredSagaMetadataFactory(),
new EventDispatcher()
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
<?php

namespace CultuurNet\UDB3\UiTPASService\Broadway\Saga;
namespace CultuurNet\UDB3\UiTPASService\Broadway\Saga\Metadata;

use Broadway\Saga\MetadataInterface;

/**
* Copied from Broadway\Saga\Metadata\Metadata and modified to work with
* namespaced event names.
*/
class NamespacedEventsMetadata implements MetadataInterface
class Metadata implements MetadataInterface
{
private $criteria;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
<?php

namespace CultuurNet\UDB3\UiTPASService\Broadway\Saga;
namespace CultuurNet\UDB3\UiTPASService\Broadway\Saga\Metadata;

use Broadway\Saga\Metadata\MetadataFactoryInterface;
use Broadway\Saga\Metadata\StaticallyConfiguredSagaInterface;

/**
* Copied from Broadway\Saga\Metadata\StaticallyConfiguredSagaMetadataFactory
* and modified to return an instance of NamespacedEventsMetadata.
* and modified to return an instance of the forked Metadata class.
*/
class StaticallyConfiguredSagaNamespacedEventsMetadataFactory implements MetadataFactoryInterface
class StaticallyConfiguredSagaMetadataFactory implements MetadataFactoryInterface
{
/**
* {@inheritDoc}
Expand All @@ -24,6 +24,6 @@ public function create($saga)

$criteria = $saga::configuration();

return new NamespacedEventsMetadata($criteria);
return new Metadata($criteria);
}
}
101 changes: 101 additions & 0 deletions src/Broadway/Saga/MultipleSagaManager.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
<?php

namespace CultuurNet\UDB3\UiTPASService\Broadway\Saga;

use Broadway\Domain\DomainMessage;
use Broadway\EventDispatcher\EventDispatcherInterface;
use Broadway\Saga\Metadata\MetadataFactoryInterface;
use Broadway\Saga\SagaInterface;
use Broadway\Saga\SagaManagerInterface;
use Broadway\Saga\State;
use CultuurNet\UDB3\UiTPASService\Broadway\Saga\State\RepositoryInterface;
use CultuurNet\UDB3\UiTPASService\Broadway\Saga\State\StateManagerInterface;

/**
* SagaManager that manages multiple sagas with multiple states.
* Copied and adjusted from \Broadway\Saga\MultipleSagaManager.
*/
class MultipleSagaManager implements SagaManagerInterface
{
private $repository;
private $sagas;
private $stateManager;
private $metadataFactory;
private $eventDispatcher;

/**
* @param RepositoryInterface $repository
* @param array $sagas
* @param StateManagerInterface $stateManager
* @param MetadataFactoryInterface $metadataFactory
* @param EventDispatcherInterface $eventDispatcher
*/
public function __construct(
RepositoryInterface $repository,
array $sagas,
StateManagerInterface $stateManager,
MetadataFactoryInterface $metadataFactory,
EventDispatcherInterface $eventDispatcher
) {
$this->repository = $repository;
$this->sagas = $sagas;
$this->stateManager = $stateManager;
$this->metadataFactory = $metadataFactory;
$this->eventDispatcher = $eventDispatcher;
}

/**
* Handles the event by delegating it to Saga('s) related to the event.
* @param DomainMessage $domainMessage
*/
public function handle(DomainMessage $domainMessage)
{
$event = $domainMessage->getPayload();

foreach ($this->sagas as $sagaType => $saga) {
$metadata = $this->metadataFactory->create($saga);

if (!$metadata->handles($event)) {
continue;
}

$criteria = $metadata->criteria($event);

if (is_null($criteria)) {
// If the saga returns null as criteria, it wants to handle the
// event with a new state.
$state = $this->stateManager->generateNewState();
$this->handleEventBySagaWithState($sagaType, $saga, $event, $state);
} else {
// If actual criteria are given, fetch all matching states and
// update them one by one.
foreach ($this->stateManager->findBy($criteria, $sagaType) as $state) {
$this->handleEventBySagaWithState($sagaType, $saga, $event, $state);
}
}
}
}

/**
* @param string $sagaType
* @param SagaInterface $saga
* @param mixed $event
* @param State $state
*/
private function handleEventBySagaWithState($sagaType, SagaInterface $saga, $event, State $state)
{
$this->eventDispatcher->dispatch(
SagaManagerInterface::EVENT_PRE_HANDLE,
[$sagaType, $state->getId()]
);

$newState = $saga->handle($event, $state);

$this->eventDispatcher->dispatch(
SagaManagerInterface::EVENT_POST_HANDLE,
[$sagaType, $state->getId()]
);

$this->repository->save($newState, $sagaType);
}
}
53 changes: 53 additions & 0 deletions src/Broadway/Saga/State/InMemoryRepository.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?php

namespace CultuurNet\UDB3\UiTPASService\Broadway\Saga\State;

use Broadway\Saga\State;
use Broadway\Saga\State\Criteria;

/**
* Copied from Broadway\Saga\State\InMemoryRepository and modified
* to use findBy() instead of findOneBy().
*/
class InMemoryRepository implements RepositoryInterface
{
private $states = [];

/**
* {@inheritDoc}
*/
public function findBy(Criteria $criteria, $sagaId)
{
if (!isset($this->states[$sagaId])) {
$states = [];
} else {
$states = $this->states[$sagaId];
}

foreach ($criteria->getComparisons() as $key => $value) {
$states = array_filter(
$states,
function ($elem) use ($key, $value) {
$stateValue = $elem->get($key);
return is_array($stateValue) ? in_array($value, $stateValue) : $value === $stateValue;
}
);
}

foreach ($states as $state) {
yield $state;
}
}

/**
* {@inheritDoc}
*/
public function save(State $state, $sagaId)
{
if ($state->isDone()) {
unset($this->states[$sagaId][$state->getId()]);
} else {
$this->states[$sagaId][$state->getId()] = $state;
}
}
}
72 changes: 72 additions & 0 deletions src/Broadway/Saga/State/MongoDBRepository.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<?php

namespace CultuurNet\UDB3\UiTPASService\Broadway\Saga\State;

use Broadway\Saga\State;
use Broadway\Saga\State\Criteria;
use Doctrine\MongoDB\Collection;
use Doctrine\MongoDB\Query\Query;

/**
* Copied from Broadway\Saga\State\MongoDBRepository and updated
* to implement findBy() instead of findOneBy().
*/
class MongoDBRepository implements RepositoryInterface
{
private $collection;

/**
* @param Collection $collection
*/
public function __construct(Collection $collection)
{
$this->collection = $collection;
}

/**
* {@inheritDoc}
*/
public function findBy(Criteria $criteria, $sagaId)
{
$query = $this->createQuery($criteria, $sagaId);
$results = $query->execute();

foreach ($results as $result) {
yield State::deserialize($result);
}
}

/**
* {@inheritDoc}
*/
public function save(State $state, $sagaId)
{
$serializedState = $state->serialize();
$serializedState['_id'] = $serializedState['id'];
$serializedState['sagaId'] = $sagaId;
$serializedState['removed'] = $state->isDone();

$this->collection->save($serializedState);
}

/**
* @param Criteria $criteria
* @param string $sagaId
* @return Query
*/
private function createQuery(Criteria $criteria, $sagaId)
{
$comparisons = $criteria->getComparisons();
$wheres = [];

foreach ($comparisons as $key => $value) {
$wheres['values.' . $key] = $value;
}

$queryBuilder = $this->collection->createQueryBuilder()
->addAnd($wheres)
->addAnd(['removed' => false, 'sagaId' => $sagaId]);

return $queryBuilder->getQuery();
}
}
26 changes: 26 additions & 0 deletions src/Broadway/Saga/State/RepositoryInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

namespace CultuurNet\UDB3\UiTPASService\Broadway\Saga\State;

use Broadway\Saga\State;
use Broadway\Saga\State\Criteria;

/**
* Copied from Broadway\Saga\State\RepositoryInterface and updated to use
* findBy() instead of findOneBy().
*/
interface RepositoryInterface
{
/**
* @param Criteria $criteria
* @param string $sagaId
* @return \Generator|State[]
*/
public function findBy(Criteria $criteria, $sagaId);

/**
* @param State $state
* @param string $sagaId
*/
public function save(State $state, $sagaId);
}
41 changes: 41 additions & 0 deletions src/Broadway/Saga/State/StateManager.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

namespace CultuurNet\UDB3\UiTPASService\Broadway\Saga\State;

use Broadway\Saga\State;
use Broadway\UuidGenerator\UuidGeneratorInterface;

/**
* Copied from Broadway\Saga\State\StateManager to use findBy() instead of
* findOneBy(), and added generateNewState().
*/
class StateManager implements StateManagerInterface
{
private $repository;
private $generator;

public function __construct(RepositoryInterface $repository, UuidGeneratorInterface $generator)
{
$this->repository = $repository;
$this->generator = $generator;
}

/**
* {@inheritDoc}
*/
public function findBy($criteria, $sagaId)
{
// @todo Use "yield from" when minimum requirement is PHP7.
foreach ($this->repository->findBy($criteria, $sagaId) as $state) {
yield $state;
}
}

/**
* @return State
*/
public function generateNewState()
{
return new State($this->generator->generate());
}
}
Loading

0 comments on commit d9e2b28

Please sign in to comment.