diff --git a/bootstrap.php b/bootstrap.php index 04433db..75cf750 100644 --- a/bootstrap.php +++ b/bootstrap.php @@ -4,8 +4,6 @@ use Broadway\EventDispatcher\EventDispatcher; use Broadway\EventHandling\EventBusInterface; use Broadway\EventStore\DBALEventStore; -use Broadway\Saga\MultipleSagaManager; -use Broadway\Saga\State\MongoDBRepository; use Broadway\Serializer\SimpleInterfaceSerializer; use CommerceGuys\Intl\Currency\CurrencyRepository; use CommerceGuys\Intl\NumberFormat\NumberFormatRepository; @@ -19,7 +17,10 @@ use CultuurNet\UDB3\EventSourcing\ExecutionContextMetadataEnricher; use CultuurNet\UDB3\LabelCollection; use CultuurNet\UDB3\SimpleEventBus; -use CultuurNet\UDB3\UiTPASService\Broadway\Saga\StaticallyConfiguredSagaNamespacedEventsMetadataFactory; +use CultuurNet\UDB3\UiTPASService\Broadway\Saga\Metadata\StaticallyConfiguredSagaMetadataFactory; +use CultuurNet\UDB3\UiTPASService\Broadway\Saga\MultipleSagaManager; +use CultuurNet\UDB3\UiTPASService\Broadway\Saga\State\MongoDBRepository; +use CultuurNet\UDB3\UiTPASService\Broadway\Saga\State\StateManager; use CultuurNet\UDB3\UiTPASService\OrganizerLabelReadRepository\JSONLDOrganizerLabelReadRepository; use CultuurNet\UDB3\UiTPASService\Permissions\DefaultEventPermission; use CultuurNet\UDB3\UiTPASService\Permissions\UDB3EventPermission; @@ -469,11 +470,11 @@ function (Application $app) { [ 'uitpas_sync' => $app['uitpas_event_saga'], ], - new \Broadway\Saga\State\StateManager( + new StateManager( $app['saga_repository'], new Broadway\UuidGenerator\Rfc4122\Version4Generator() ), - new StaticallyConfiguredSagaNamespacedEventsMetadataFactory(), + new StaticallyConfiguredSagaMetadataFactory(), new EventDispatcher() ); } diff --git a/src/Broadway/Saga/NamespacedEventsMetadata.php b/src/Broadway/Saga/Metadata/Metadata.php similarity index 87% rename from src/Broadway/Saga/NamespacedEventsMetadata.php rename to src/Broadway/Saga/Metadata/Metadata.php index a39d6ba..06de91b 100644 --- a/src/Broadway/Saga/NamespacedEventsMetadata.php +++ b/src/Broadway/Saga/Metadata/Metadata.php @@ -1,6 +1,6 @@ repository = $repository; + $this->sagas = $sagas; + $this->stateManager = $stateManager; + $this->metadataFactory = $metadataFactory; + $this->eventDispatcher = $eventDispatcher; + } + + /** + * Handles the event by delegating it to Saga('s) related to the event. + * @param DomainMessage $domainMessage + */ + public function handle(DomainMessage $domainMessage) + { + $event = $domainMessage->getPayload(); + + foreach ($this->sagas as $sagaType => $saga) { + $metadata = $this->metadataFactory->create($saga); + + if (!$metadata->handles($event)) { + continue; + } + + $criteria = $metadata->criteria($event); + + if (is_null($criteria)) { + // If the saga returns null as criteria, it wants to handle the + // event with a new state. + $state = $this->stateManager->generateNewState(); + $this->handleEventBySagaWithState($sagaType, $saga, $event, $state); + } else { + // If actual criteria are given, fetch all matching states and + // update them one by one. + foreach ($this->stateManager->findBy($criteria, $sagaType) as $state) { + $this->handleEventBySagaWithState($sagaType, $saga, $event, $state); + } + } + } + } + + /** + * @param string $sagaType + * @param SagaInterface $saga + * @param mixed $event + * @param State $state + */ + private function handleEventBySagaWithState($sagaType, SagaInterface $saga, $event, State $state) + { + $this->eventDispatcher->dispatch( + SagaManagerInterface::EVENT_PRE_HANDLE, + [$sagaType, $state->getId()] + ); + + $newState = $saga->handle($event, $state); + + $this->eventDispatcher->dispatch( + SagaManagerInterface::EVENT_POST_HANDLE, + [$sagaType, $state->getId()] + ); + + $this->repository->save($newState, $sagaType); + } +} diff --git a/src/Broadway/Saga/State/InMemoryRepository.php b/src/Broadway/Saga/State/InMemoryRepository.php new file mode 100644 index 0000000..ced7eb1 --- /dev/null +++ b/src/Broadway/Saga/State/InMemoryRepository.php @@ -0,0 +1,47 @@ +states[$sagaId])) { + $states = []; + } else { + $states = $this->states[$sagaId]; + } + + foreach ($criteria->getComparisons() as $key => $value) { + $states = array_filter($states, function ($elem) use ($key, $value) { + $stateValue = $elem->get($key); + + return is_array($stateValue) ? in_array($value, $stateValue) : $value === $stateValue; + }); + } + + foreach ($states as $state) { + yield $state; + } + } + + /** + * {@inheritDoc} + */ + public function save(State $state, $sagaId) + { + if ($state->isDone()) { + unset($this->states[$sagaId][$state->getId()]); + } else { + $this->states[$sagaId][$state->getId()] = $state; + } + } +} diff --git a/src/Broadway/Saga/State/MongoDBRepository.php b/src/Broadway/Saga/State/MongoDBRepository.php new file mode 100644 index 0000000..fbea050 --- /dev/null +++ b/src/Broadway/Saga/State/MongoDBRepository.php @@ -0,0 +1,72 @@ +collection = $collection; + } + + /** + * {@inheritDoc} + */ + public function findBy(Criteria $criteria, $sagaId) + { + $query = $this->createQuery($criteria, $sagaId); + $results = $query->execute(); + + foreach ($results as $result) { + yield State::deserialize($result); + } + } + + /** + * {@inheritDoc} + */ + public function save(State $state, $sagaId) + { + $serializedState = $state->serialize(); + $serializedState['_id'] = $serializedState['id']; + $serializedState['sagaId'] = $sagaId; + $serializedState['removed'] = $state->isDone(); + + $this->collection->save($serializedState); + } + + /** + * @param Criteria $criteria + * @param string $sagaId + * @return Query + */ + private function createQuery(Criteria $criteria, $sagaId) + { + $comparisons = $criteria->getComparisons(); + $wheres = []; + + foreach ($comparisons as $key => $value) { + $wheres['values.' . $key] = $value; + } + + $queryBuilder = $this->collection->createQueryBuilder() + ->addAnd($wheres) + ->addAnd(['removed' => false, 'sagaId' => $sagaId]); + + return $queryBuilder->getQuery(); + } +} diff --git a/src/Broadway/Saga/State/RepositoryInterface.php b/src/Broadway/Saga/State/RepositoryInterface.php new file mode 100644 index 0000000..8e0e7b5 --- /dev/null +++ b/src/Broadway/Saga/State/RepositoryInterface.php @@ -0,0 +1,26 @@ +repository = $repository; + $this->generator = $generator; + } + + /** + * {@inheritDoc} + */ + public function findBy($criteria, $sagaId) + { + // @todo Use "yield from" when minimum requirement is PHP7. + foreach ($this->repository->findBy($criteria, $sagaId) as $state) { + yield $state; + } + } + + /** + * @return State + */ + public function generateNewState() + { + return new State($this->generator->generate()); + } +} diff --git a/src/Broadway/Saga/State/StateManagerInterface.php b/src/Broadway/Saga/State/StateManagerInterface.php new file mode 100644 index 0000000..f567511 --- /dev/null +++ b/src/Broadway/Saga/State/StateManagerInterface.php @@ -0,0 +1,26 @@ +testCase = $testCase; + $this->sagaManager = $sagaManager; + $this->traceableCommandBus = $traceableCommandBus; + $this->playhead = -1; + } + + /** + * @param array $events + * + * @return Scenario + */ + public function given(array $events = []) + { + foreach ($events as $given) { + $this->sagaManager->handle($this->createDomainMessageForEvent($given)); + } + + return $this; + } + + /** + * @param mixed $event + * + * @return Scenario + */ + public function when($event) + { + $this->traceableCommandBus->record(); + + $this->sagaManager->handle($this->createDomainMessageForEvent($event)); + + return $this; + } + + /** + * @param array $commands + * + * @return Scenario + */ + public function then(array $commands) + { + $this->testCase->assertEquals($commands, $this->traceableCommandBus->getRecordedCommands()); + + return $this; + } + + private function createDomainMessageForEvent($event) + { + $this->playhead++; + + return DomainMessage::recordNow(1, $this->playhead, new Metadata([]), $event); + } +} diff --git a/tests/UiTPASEventSagaTest.php b/tests/UiTPASEventSagaTest.php index 36da39c..6efcb3f 100644 --- a/tests/UiTPASEventSagaTest.php +++ b/tests/UiTPASEventSagaTest.php @@ -5,10 +5,6 @@ use Broadway\CommandHandling\CommandBusInterface; use Broadway\CommandHandling\Testing\TraceableCommandBus; use Broadway\EventDispatcher\EventDispatcher; -use Broadway\Saga\MultipleSagaManager; -use Broadway\Saga\State\InMemoryRepository; -use Broadway\Saga\State\StateManager; -use Broadway\Saga\Testing\Scenario; use Broadway\UuidGenerator\Rfc4122\Version4Generator; use CommerceGuys\Intl\Currency\CurrencyRepository; use CommerceGuys\Intl\NumberFormat\NumberFormatRepository; @@ -36,7 +32,11 @@ use CultuurNet\UDB3\PriceInfo\PriceInfo; use CultuurNet\UDB3\PriceInfo\Tariff; use CultuurNet\UDB3\Title; -use CultuurNet\UDB3\UiTPASService\Broadway\Saga\StaticallyConfiguredSagaNamespacedEventsMetadataFactory; +use CultuurNet\UDB3\UiTPASService\Broadway\Saga\Metadata\StaticallyConfiguredSagaMetadataFactory; +use CultuurNet\UDB3\UiTPASService\Broadway\Saga\MultipleSagaManager; +use CultuurNet\UDB3\UiTPASService\Broadway\Saga\State\InMemoryRepository; +use CultuurNet\UDB3\UiTPASService\Broadway\Saga\State\StateManager; +use CultuurNet\UDB3\UiTPASService\Broadway\Saga\Testing\Scenario; use CultuurNet\UDB3\UiTPASService\OrganizerLabelReadRepository\OrganizerLabelReadRepositoryInterface; use CultuurNet\UDB3\UiTPASService\Sync\Command\RegisterUiTPASEvent; use CultuurNet\UDB3\UiTPASService\Sync\Command\UpdateUiTPASEvent; @@ -212,7 +212,7 @@ protected function createScenario() $sagaStateRepository, [$saga], new StateManager($sagaStateRepository, new Version4Generator()), - new StaticallyConfiguredSagaNamespacedEventsMetadataFactory(), + new StaticallyConfiguredSagaMetadataFactory(), new EventDispatcher() ); return new Scenario($this, $sagaManager, $traceableCommandBus);