Skip to content

Commit

Permalink
[MongoDB] Mercure support (#3290)
Browse files Browse the repository at this point in the history
* Mercure for MongoDB

* No abstract class

* Add deprecated alias
  • Loading branch information
alanpoulain committed Dec 20, 2019
1 parent 9a6c846 commit e773104
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -3,6 +3,7 @@
## 2.6.x-dev

* MongoDB: Possibility to add execute options (aggregate command fields) for a resource, like `allowDiskUse` (#3144)
* MongoDB: Mercure support (#3290)
* GraphQL: Allow to format GraphQL errors based on exceptions (#3063)
* GraphQL: Add page-based pagination (#3175)
* OpenAPI: Add PHP default values to the documentation (#2386)
Expand Down
94 changes: 52 additions & 42 deletions src/Bridge/Doctrine/EventListener/PublishMercureUpdatesListener.php
Expand Up @@ -21,7 +21,9 @@
use ApiPlatform\Core\Exception\RuntimeException;
use ApiPlatform\Core\Metadata\Resource\Factory\ResourceMetadataFactoryInterface;
use ApiPlatform\Core\Util\ResourceClassInfoTrait;
use Doctrine\ORM\Event\OnFlushEventArgs;
use Doctrine\Common\EventArgs;
use Doctrine\ODM\MongoDB\Event\OnFlushEventArgs as MongoDbOdmOnFlushEventArgs;
use Doctrine\ORM\Event\OnFlushEventArgs as OrmOnFlushEventArgs;
use Symfony\Component\ExpressionLanguage\ExpressionLanguage;
use Symfony\Component\Mercure\Update;
use Symfony\Component\Messenger\MessageBusInterface;
Expand All @@ -40,13 +42,12 @@ final class PublishMercureUpdatesListener
use ResourceClassInfoTrait;

private $iriConverter;
private $resourceMetadataFactory;
private $serializer;
private $publisher;
private $expressionLanguage;
private $createdEntities;
private $updatedEntities;
private $deletedEntities;
private $createdObjects;
private $updatedObjects;
private $deletedObjects;
private $formats;

/**
Expand All @@ -70,22 +71,31 @@ public function __construct(ResourceClassResolverInterface $resourceClassResolve
}

/**
* Collects created, updated and deleted entities.
* Collects created, updated and deleted objects.
*/
public function onFlush(OnFlushEventArgs $eventArgs): void
public function onFlush(EventArgs $eventArgs): void
{
$uow = $eventArgs->getEntityManager()->getUnitOfWork();
if ($eventArgs instanceof OrmOnFlushEventArgs) {
$uow = $eventArgs->getEntityManager()->getUnitOfWork();
} elseif ($eventArgs instanceof MongoDbOdmOnFlushEventArgs) {
$uow = $eventArgs->getDocumentManager()->getUnitOfWork();
} else {
return;
}

foreach ($uow->getScheduledEntityInsertions() as $entity) {
$this->storeEntityToPublish($entity, 'createdEntities');
$methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityInsertions' : 'getScheduledDocumentInsertions';
foreach ($uow->{$methodName}() as $object) {
$this->storeObjectToPublish($object, 'createdObjects');
}

foreach ($uow->getScheduledEntityUpdates() as $entity) {
$this->storeEntityToPublish($entity, 'updatedEntities');
$methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityUpdates' : 'getScheduledDocumentUpdates';
foreach ($uow->{$methodName}() as $object) {
$this->storeObjectToPublish($object, 'updatedObjects');
}

foreach ($uow->getScheduledEntityDeletions() as $entity) {
$this->storeEntityToPublish($entity, 'deletedEntities');
$methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityDeletions' : 'getScheduledDocumentDeletions';
foreach ($uow->{$methodName}() as $object) {
$this->storeObjectToPublish($object, 'deletedObjects');
}
}

Expand All @@ -95,16 +105,16 @@ public function onFlush(OnFlushEventArgs $eventArgs): void
public function postFlush(): void
{
try {
foreach ($this->createdEntities as $entity) {
$this->publishUpdate($entity, $this->createdEntities[$entity]);
foreach ($this->createdObjects as $object) {
$this->publishUpdate($object, $this->createdObjects[$object]);
}

foreach ($this->updatedEntities as $entity) {
$this->publishUpdate($entity, $this->updatedEntities[$entity]);
foreach ($this->updatedObjects as $object) {
$this->publishUpdate($object, $this->updatedObjects[$object]);
}

foreach ($this->deletedEntities as $entity) {
$this->publishUpdate($entity, $this->deletedEntities[$entity]);
foreach ($this->deletedObjects as $object) {
$this->publishUpdate($object, $this->deletedObjects[$object]);
}
} finally {
$this->reset();
Expand All @@ -113,17 +123,17 @@ public function postFlush(): void

private function reset(): void
{
$this->createdEntities = new \SplObjectStorage();
$this->updatedEntities = new \SplObjectStorage();
$this->deletedEntities = new \SplObjectStorage();
$this->createdObjects = new \SplObjectStorage();
$this->updatedObjects = new \SplObjectStorage();
$this->deletedObjects = new \SplObjectStorage();
}

/**
* @param object $entity
* @param object $object
*/
private function storeEntityToPublish($entity, string $property): void
private function storeObjectToPublish($object, string $property): void
{
if (null === $resourceClass = $this->getResourceClass($entity)) {
if (null === $resourceClass = $this->getResourceClass($object)) {
return;
}

Expand All @@ -137,7 +147,7 @@ private function storeEntityToPublish($entity, string $property): void
throw new RuntimeException('The Expression Language component is not installed. Try running "composer require symfony/expression-language".');
}

$value = $this->expressionLanguage->evaluate($value, ['object' => $entity]);
$value = $this->expressionLanguage->evaluate($value, ['object' => $object]);
}

if (true === $value) {
Expand All @@ -148,36 +158,36 @@ private function storeEntityToPublish($entity, string $property): void
throw new InvalidArgumentException(sprintf('The value of the "mercure" attribute of the "%s" resource class must be a boolean, an array of targets or a valid expression, "%s" given.', $resourceClass, \gettype($value)));
}

if ('deletedEntities' === $property) {
$this->deletedEntities[(object) [
'id' => $this->iriConverter->getIriFromItem($entity),
'iri' => $this->iriConverter->getIriFromItem($entity, UrlGeneratorInterface::ABS_URL),
if ('deletedObjects' === $property) {
$this->deletedObjects[(object) [
'id' => $this->iriConverter->getIriFromItem($object),
'iri' => $this->iriConverter->getIriFromItem($object, UrlGeneratorInterface::ABS_URL),
]] = $value;

return;
}

$this->{$property}[$entity] = $value;
$this->{$property}[$object] = $value;
}

/**
* @param object $entity
* @param object $object
*/
private function publishUpdate($entity, array $targets): void
private function publishUpdate($object, array $targets): void
{
if ($entity instanceof \stdClass) {
// By convention, if the entity has been deleted, we send only its IRI
if ($object instanceof \stdClass) {
// By convention, if the object has been deleted, we send only its IRI.
// This may change in the feature, because it's not JSON Merge Patch compliant,
// and I'm not a fond of this approach
$iri = $entity->iri;
// and I'm not a fond of this approach.
$iri = $object->iri;
/** @var string $data */
$data = json_encode(['@id' => $entity->id]);
$data = json_encode(['@id' => $object->id]);
} else {
$resourceClass = $this->getObjectClass($entity);
$resourceClass = $this->getObjectClass($object);
$context = $this->resourceMetadataFactory->create($resourceClass)->getAttribute('normalization_context', []);

$iri = $this->iriConverter->getIriFromItem($entity, UrlGeneratorInterface::ABS_URL);
$data = $this->serializer->serialize($entity, key($this->formats), $context);
$iri = $this->iriConverter->getIriFromItem($object, UrlGeneratorInterface::ABS_URL);
$data = $this->serializer->serialize($object, key($this->formats), $context);
}

$update = new Update($iri, $data, $targets);
Expand Down
Expand Up @@ -587,6 +587,9 @@ private function registerMercureConfiguration(ContainerBuilder $container, array
if ($this->isConfigEnabled($container, $config['doctrine'])) {
$loader->load('doctrine_orm_mercure_publisher.xml');
}
if ($this->isConfigEnabled($container, $config['doctrine_mongodb_odm'])) {
$loader->load('doctrine_mongodb_odm_mercure_publisher.xml');
}
}

private function registerMessengerConfiguration(ContainerBuilder $container, array $config, XmlFileLoader $loader): void
Expand Down
@@ -0,0 +1,26 @@
<?xml version="1.0" ?>

<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd">

<services>

<!-- Event listener -->

<service id="api_platform.doctrine_mongodb.odm.listener.mercure.publish" class="ApiPlatform\Core\Bridge\Doctrine\EventListener\PublishMercureUpdatesListener">
<argument type="service" id="api_platform.resource_class_resolver" />
<argument type="service" id="api_platform.iri_converter" />
<argument type="service" id="api_platform.metadata.resource.metadata_factory" />
<argument type="service" id="api_platform.serializer" />
<argument>%api_platform.formats%</argument>
<argument type="service" id="messenger.default_bus" on-invalid="ignore" />
<argument type="service" id="mercure.hub.default.publisher" />

<tag name="doctrine_mongodb.odm.event_listener" event="onFlush" />
<tag name="doctrine_mongodb.odm.event_listener" event="postFlush" />
</service>

</services>

</container>
Expand Up @@ -8,7 +8,7 @@

<!-- Event listener -->

<service id="api_platform.doctrine.listener.mercure.publish" class="ApiPlatform\Core\Bridge\Doctrine\EventListener\PublishMercureUpdatesListener">
<service id="api_platform.doctrine.orm.listener.mercure.publish" class="ApiPlatform\Core\Bridge\Doctrine\EventListener\PublishMercureUpdatesListener">
<argument type="service" id="api_platform.resource_class_resolver" />
<argument type="service" id="api_platform.iri_converter" />
<argument type="service" id="api_platform.metadata.resource.metadata_factory" />
Expand All @@ -21,6 +21,10 @@
<tag name="doctrine.event_listener" event="postFlush" />
</service>

<service id="api_platform.doctrine.listener.mercure.publish" alias="api_platform.doctrine.orm.listener.mercure.publish">
<deprecated>Using "%alias_id%" service is deprecated since API Platform 2.6. Use "api_platform.doctrine.orm.listener.mercure.publish" instead.</deprecated>
</service>

</services>

</container>
Expand Up @@ -37,7 +37,7 @@
*/
class PublishMercureUpdatesListenerTest extends TestCase
{
public function testPublishUpdate()
public function testPublishUpdate(): void
{
$toInsert = new Dummy();
$toInsert->setId(1);
Expand Down Expand Up @@ -115,7 +115,7 @@ public function testPublishUpdate()
$this->assertSame([[], [], [], ['foo', 'bar']], $targets);
}

public function testNoPublisher()
public function testNoPublisher(): void
{
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('A message bus or a publisher must be provided.');
Expand All @@ -131,7 +131,7 @@ public function testNoPublisher()
);
}

public function testInvalidMercureAttribute()
public function testInvalidMercureAttribute(): void
{
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('The value of the "mercure" attribute of the "ApiPlatform\Core\Tests\Fixtures\TestBundle\Entity\Dummy" resource class must be a boolean, an array of targets or a valid expression, "integer" given.');
Expand Down
Expand Up @@ -596,7 +596,7 @@ private function runDisableDoctrineTests()
$containerBuilderProphecy->setDefinition('api_platform.doctrine.orm.range_filter', Argument::type(Definition::class))->shouldNotBeCalled();
$containerBuilderProphecy->setDefinition('api_platform.doctrine.orm.search_filter', Argument::type(Definition::class))->shouldNotBeCalled();
$containerBuilderProphecy->setDefinition('api_platform.doctrine.orm.subresource_data_provider', Argument::type(Definition::class))->shouldNotBeCalled();
$containerBuilderProphecy->setDefinition('api_platform.doctrine.listener.mercure.publish', Argument::type(Definition::class))->shouldNotBeCalled();
$containerBuilderProphecy->setDefinition('api_platform.doctrine.orm.listener.mercure.publish', Argument::type(Definition::class))->shouldNotBeCalled();
$containerBuilderProphecy->setAlias(EagerLoadingExtension::class, 'api_platform.doctrine.orm.query_extension.eager_loading')->shouldNotBeCalled();
$containerBuilderProphecy->setAlias(FilterExtension::class, 'api_platform.doctrine.orm.query_extension.filter')->shouldNotBeCalled();
$containerBuilderProphecy->setAlias(FilterEagerLoadingExtension::class, 'api_platform.doctrine.orm.query_extension.filter_eager_loading')->shouldNotBeCalled();
Expand All @@ -609,6 +609,7 @@ private function runDisableDoctrineTests()
$containerBuilderProphecy->setAlias(BooleanFilter::class, 'api_platform.doctrine.orm.boolean_filter')->shouldNotBeCalled();
$containerBuilderProphecy->setAlias(NumericFilter::class, 'api_platform.doctrine.orm.numeric_filter')->shouldNotBeCalled();
$containerBuilderProphecy->setAlias(ExistsFilter::class, 'api_platform.doctrine.orm.exists_filter')->shouldNotBeCalled();
$containerBuilderProphecy->setAlias('api_platform.doctrine.listener.mercure.publish', 'api_platform.doctrine.orm.listener.mercure.publish')->shouldNotBeCalled();
$containerBuilder = $containerBuilderProphecy->reveal();

$config = self::DEFAULT_CONFIG;
Expand Down Expand Up @@ -648,6 +649,7 @@ public function testDisableDoctrineMongoDbOdm()
$containerBuilderProphecy->setDefinition('api_platform.doctrine_mongodb.odm.range_filter', Argument::type(Definition::class))->shouldNotBeCalled();
$containerBuilderProphecy->setDefinition('api_platform.doctrine_mongodb.odm.search_filter', Argument::type(Definition::class))->shouldNotBeCalled();
$containerBuilderProphecy->setDefinition('api_platform.doctrine_mongodb.odm.subresource_data_provider', Argument::type(Definition::class))->shouldNotBeCalled();
$containerBuilderProphecy->setDefinition('api_platform.doctrine_mongodb.odm.listener.mercure.publish', Argument::type(Definition::class))->shouldNotBeCalled();
$containerBuilderProphecy->setAlias(MongoDbOdmFilterExtension::class, 'api_platform.doctrine_mongodb.odm.aggregation_extension.filter')->shouldNotBeCalled();
$containerBuilderProphecy->setAlias(MongoDbOdmOrderExtension::class, 'api_platform.doctrine_mongodb.odm.aggregation_extension.order')->shouldNotBeCalled();
$containerBuilderProphecy->setAlias(MongoDbOdmPaginationExtension::class, 'api_platform.doctrine_mongodb.odm.aggregation_extension.pagination')->shouldNotBeCalled();
Expand Down Expand Up @@ -1146,7 +1148,7 @@ private function getBaseContainerBuilderProphecy(array $doctrineIntegrationsToLo
$definitions = [
'api_platform.data_collector.request',
'api_platform.doctrine.listener.http_cache.purge',
'api_platform.doctrine.listener.mercure.publish',
'api_platform.doctrine.orm.listener.mercure.publish',
'api_platform.doctrine.orm.boolean_filter',
'api_platform.doctrine.orm.collection_data_provider',
'api_platform.doctrine.orm.data_persister',
Expand Down Expand Up @@ -1242,6 +1244,7 @@ private function getBaseContainerBuilderProphecy(array $doctrineIntegrationsToLo

if (\in_array('odm', $doctrineIntegrationsToLoad, true)) {
$definitions = array_merge($definitions, [
'api_platform.doctrine_mongodb.odm.listener.mercure.publish',
'api_platform.doctrine_mongodb.odm.aggregation_extension.filter',
'api_platform.doctrine_mongodb.odm.aggregation_extension.order',
'api_platform.doctrine_mongodb.odm.aggregation_extension.pagination',
Expand Down Expand Up @@ -1323,6 +1326,7 @@ private function getBaseContainerBuilderProphecy(array $doctrineIntegrationsToLo
BooleanFilter::class => 'api_platform.doctrine.orm.boolean_filter',
NumericFilter::class => 'api_platform.doctrine.orm.numeric_filter',
ExistsFilter::class => 'api_platform.doctrine.orm.exists_filter',
'api_platform.doctrine.listener.mercure.publish' => 'api_platform.doctrine.orm.listener.mercure.publish',
GraphQlSerializerContextBuilderInterface::class => 'api_platform.graphql.serializer.context_builder',
];

Expand Down Expand Up @@ -1359,7 +1363,8 @@ private function getBaseContainerBuilderProphecy(array $doctrineIntegrationsToLo
$definitionDummy = $this->prophesize(Definition::class);
$containerBuilderProphecy->removeDefinition('api_platform.cache_warmer.cache_pool_clearer')->will(function () {});
$containerBuilderProphecy->getDefinition('api_platform.mercure.listener.response.add_link_header')->willReturn($definitionDummy);
$containerBuilderProphecy->getDefinition('api_platform.doctrine.listener.mercure.publish')->willReturn($definitionDummy);
$containerBuilderProphecy->getDefinition('api_platform.doctrine.orm.listener.mercure.publish')->willReturn($definitionDummy);
$containerBuilderProphecy->getDefinition('api_platform.doctrine_mongodb.odm.listener.mercure.publish')->willReturn($definitionDummy);

return $containerBuilderProphecy;
}
Expand Down

0 comments on commit e773104

Please sign in to comment.