Skip to content

Commit

Permalink
Merge branch '2.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-gribanov committed Aug 17, 2017
2 parents 74f5504 + fcba77e commit 2d36886
Show file tree
Hide file tree
Showing 7 changed files with 466 additions and 173 deletions.
2 changes: 1 addition & 1 deletion src/DependencyInjection/GpsLabDomainEventExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public function load(array $configs, ContainerBuilder $container)
$container->setAlias('domain_event.queue', $this->queueRealName($config['queue']));
$container->setAlias('domain_event.locator', $this->locatorRealName($config['locator']));

$container->getDefinition('domain_event.publisher')->replaceArgument(1, $config['publish_on_flush']);
$container->getDefinition('domain_event.publisher')->replaceArgument(2, $config['publish_on_flush']);
}

/**
Expand Down
70 changes: 38 additions & 32 deletions src/Event/Listener/DomainEventPublisher.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
namespace GpsLab\Bundle\DomainEvent\Event\Listener;

use Doctrine\Common\EventSubscriber;
use Doctrine\Common\Persistence\Proxy;
use Doctrine\ORM\Event\OnFlushEventArgs;
use Doctrine\ORM\Event\PostFlushEventArgs;
use Doctrine\ORM\Events;
use GpsLab\Domain\Event\Aggregator\AggregateEvents;
use GpsLab\Bundle\DomainEvent\Service\EventPuller;
use GpsLab\Domain\Event\Bus\EventBus;
use GpsLab\Domain\Event\Event;

class DomainEventPublisher implements EventSubscriber
{
/**
* @var EventPuller
*/
private $puller;

/**
* @var EventBus
*/
Expand All @@ -30,12 +36,19 @@ class DomainEventPublisher implements EventSubscriber
private $enable;

/**
* @param EventBus $bus
* @param bool $enable
* @var Event[]
*/
private $events = [];

/**
* @param EventPuller $puller
* @param EventBus $bus
* @param bool $enable
*/
public function __construct(EventBus $bus, $enable)
public function __construct(EventPuller $puller, EventBus $bus, $enable)
{
$this->bus = $bus;
$this->puller = $puller;
$this->enable = $enable;
}

Expand All @@ -49,47 +62,40 @@ public function getSubscribedEvents()
}

return [
Events::onFlush,
Events::postFlush,
];
}

/**
* @param PostFlushEventArgs $args
* @param OnFlushEventArgs $args
*/
public function postFlush(PostFlushEventArgs $args)
public function onFlush(OnFlushEventArgs $args)
{
$map = $args->getEntityManager()->getUnitOfWork()->getIdentityMap();

// flush only if has domain events
// it necessary for fix recursive handle flush
if ($this->publish($map)) {
$args->getEntityManager()->flush();
}
// aggregate events from deleted entities
$this->events = $this->puller->pull($args->getEntityManager()->getUnitOfWork());
}

/**
* @param array $map
*
* @return bool
* @param PostFlushEventArgs $args
*/
private function publish(array $map)
public function postFlush(PostFlushEventArgs $args)
{
$has_events = false;
foreach ($map as $entities) {
foreach ($entities as $entity) {
// ignore Doctrine proxy classes
// proxy class can't have a domain events
if ($entity instanceof Proxy || !($entity instanceof AggregateEvents)) {
break;
}
// aggregate PreRemove/PostRemove events
$events = array_merge($this->events, $this->puller->pull($args->getEntityManager()->getUnitOfWork()));

// clear aggregate events before publish it
// it necessary for fix recursive publish of events
$this->events = [];

foreach ($entity->pullEvents() as $event) {
$this->bus->publish($event);
$has_events = true;
}
// flush only if has domain events
// it necessary for fix recursive handle flush
if (!empty($events)) {
foreach ($events as $event) {
$this->bus->publish($event);
}
}

return $has_events;
$args->getEntityManager()->flush();
}
}
}
8 changes: 6 additions & 2 deletions src/Resources/config/publisher.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
services:
domain_event.publisher:
class: GpsLab\Bundle\DomainEvent\Event\Listener\DomainEventPublisher
arguments: [ '@domain_event.bus', ~ ]
arguments: [ '@domain_event.puller', '@domain_event.bus', ~ ]
tags:
- { name: doctrine.event_subscriber, connection: default }
- { name: doctrine.event_subscriber }

domain_event.puller:
class: GpsLab\Bundle\DomainEvent\Service\EventPuller
public: false
59 changes: 59 additions & 0 deletions src/Service/EventPuller.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php

/**
* GpsLab component.
*
* @author Peter Gribanov <info@peter-gribanov.ru>
* @copyright Copyright (c) 2011, Peter Gribanov
* @license http://opensource.org/licenses/MIT
*/

namespace GpsLab\Bundle\DomainEvent\Service;

use Doctrine\Common\Persistence\Proxy;
use Doctrine\ORM\UnitOfWork;
use GpsLab\Domain\Event\Aggregator\AggregateEvents;
use GpsLab\Domain\Event\Event;

class EventPuller
{
/**
* @param UnitOfWork $uow
*
* @return Event[]
*/
public function pull(UnitOfWork $uow)
{
$events = [];

$events = array_merge($events, $this->pullFromEntities($uow->getScheduledEntityDeletions()));
$events = array_merge($events, $this->pullFromEntities($uow->getScheduledEntityInsertions()));
$events = array_merge($events, $this->pullFromEntities($uow->getScheduledEntityUpdates()));

// other entities
foreach ($uow->getIdentityMap() as $entities) {
$events = array_merge($events, $this->pullFromEntities($entities));
}

return $events;
}

/**
* @param array $entities
*
* @return Event[]
*/
private function pullFromEntities(array $entities)
{
$events = [];
foreach ($entities as $entity) {
// ignore Doctrine proxy classes
// proxy class can't have a domain events
if (!($entity instanceof Proxy) && $entity instanceof AggregateEvents) {
$events = array_merge($events, $entity->pullEvents());
}
}

return $events;
}
}
4 changes: 2 additions & 2 deletions tests/DependencyInjection/GpsLabDomainEventExtensionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class GpsLabDomainEventExtensionTest extends \PHPUnit_Framework_TestCase
*/
private $extension;

const CONTAINER_OFFSET = 12;
const CONTAINER_OFFSET = 13;

protected function setUp()
{
Expand Down Expand Up @@ -111,7 +111,7 @@ public function testLoad(array $config, $bus, $queue, $locator, $publish_on_flus
$publisher
->expects($this->once())
->method('replaceArgument')
->with(1, $publish_on_flush)
->with(2, $publish_on_flush)
;

$this->container
Expand Down
Loading

0 comments on commit 2d36886

Please sign in to comment.