-
-
Notifications
You must be signed in to change notification settings - Fork 932
Open
Description
To continue:
<?php
namespace App\Listeners;
use Illuminate\Database\Eloquent\Model;
use Illuminate\Contracts\Bus\Dispatcher;
use Illuminate\Contracts\Foundation\Application;
use Illuminate\Support\ItemNotFoundException;
use SplObjectStorage;
use Symfony\Component\Mercure\HubRegistry;
use Symfony\Component\Mercure\Update;
use Symfony\Component\Serializer\SerializerInterface;
// Assuming you have these contracts for your application
use App\Contracts\Mercure\IriConverterInterface;
/**
* Publishes resource updates to the Mercure hub.
* This is a Laravel port of ApiPlatform's PublishMercureUpdatesListener.
*/
final class PublishMercureUpdatesListener
{
private SplObjectStorage $createdObjects;
private SplObjectStorage $updatedObjects;
private SplObjectStorage $deletedObjects;
private const ALLOWED_KEYS = [
'topics' => true,
'data' => true,
'private' => true,
'id' => true,
'type' => true,
'retry' => true,
'normalization_context' => true,
'hub' => true,
'enable_async_update' => true,
];
/**
* @param array<string, string> $formats
*/
public function __construct(
private readonly IriConverterInterface $iriConverter,
private readonly SerializerInterface $serializer,
private readonly array $formats,
private readonly Application $app,
private readonly ?HubRegistry $hubRegistry = null,
private readonly ?Dispatcher $bus = null
) {
if (null === $this->bus && null === $this->hubRegistry) {
throw new \InvalidArgumentException('A message bus (Dispatcher) or a HubRegistry must be provided.');
}
$this->reset();
}
/**
* Handle "saved" events.
*/
public function handleSaved(Model $model): void
{
$property = $model->wasRecentlyCreated ? 'createdObjects' : 'updatedObjects';
$this->storeObjectToPublish($model, $property);
}
/**
* Handle "deleted" events.
*/
public function handleDeleted(Model $model): void
{
$this->storeObjectToPublish($model, 'deletedObjects');
}
/**
* Publishes updates for changes collected, then resets.
* This method should be called by a service provider in the app's 'terminating' event.
*/
public function postFlush(): void
{
try {
foreach (clone $this->createdObjects as $object) {
$this->publishUpdate($object, $this->createdObjects[$object], 'create');
}
foreach (clone $this->updatedObjects as $object) {
$this->publishUpdate($object, $this->updatedObjects[$object], 'update');
}
foreach (clone $this->deletedObjects as $object) {
$this->publishUpdate($object, $this->deletedObjects[$object], 'delete');
}
} finally {
$this->reset();
}
}
private function storeObjectToPublish(Model $model, string $property): void
{
if (!method_exists($model, 'getMercureOptions')) {
return;
}
$options = $model->getMercureOptions();
if ($options === false) {
return;
}
if ($options === true) {
$options = [];
}
if (!is_array($options)) {
throw new \InvalidArgumentException(sprintf('The value of getMercureOptions() for model "%s" must be a boolean or an array.', get_class($model)));
}
foreach (array_keys($options) as $key) {
if (!isset(self::ALLOWED_KEYS[$key])) {
throw new \InvalidArgumentException(sprintf('The option "%s" from getMercureOptions() on model "%s" is not supported.', $key, get_class($model)));
}
}
$options['enable_async_update'] ??= true;
if ($property === 'deletedObjects') {
// For deleted objects, we must evaluate topics and store IRIs now,
// as the object will no longer exist when publishing.
$this->evaluateOptions($options, $model);
// Store a proxy object with the necessary information.
$proxy = (object) [
'id' => $this->iriConverter->getIriFromResource($model),
'iri' => $this->iriConverter->getIriFromResource($model, true), // absolute URL
'type' => class_basename($model), // A sensible default for type
];
$this->deletedObjects[$proxy] = $options;
return;
}
$this->{$property}[$model] = $options;
}
/**
* @param Model|\stdClass $object
*/
private function publishUpdate(object $object, array $options, string $type): void
{
if ($object instanceof \stdClass) { // Proxy object for a deleted entity
$iri = $options['topics'] ?? $object->iri;
$data = json_encode(['@id' => $object->id, '@type' => $object->type]);
} else {
// For create/update, evaluate options just-in-time
$this->evaluateOptions($options, $object);
$iri = $options['topics'] ?? $this->iriConverter->getIriFromResource($object, true);
$context = $options['normalization_context'] ?? [];
$data = $options['data'] ?? $this->serializer->serialize($object, key($this->formats), $context);
}
if (empty($iri) || empty($data)) {
return;
}
$update = $this->buildUpdate($iri, $data, $options);
if ($options['enable_async_update'] && $this->bus) {
// In Laravel, Mercure updates are often handled by a queued job.
// You would need to create a job that takes an Update object and publishes it.
// For this example, we'll dispatch the Update object directly if a bus is present.
// A real implementation might be: $this->bus->dispatch(new PublishMercureUpdateJob($update, $options['hub'] ?? null));
$this->bus->dispatch($update);
return;
}
if ($this->hubRegistry) {
$this->hubRegistry->getHub($options['hub'] ?? null)->publish($update);
}
}
/**
* Replaces callable options with their evaluated values.
*/
private function evaluateOptions(array &$options, object $object): void
{
foreach ($options as $key => &$value) {
if ($value instanceof \Closure) {
$value = $this->app->call($value, ['object' => $object]);
}
}
}
/**
* @param string|string[] $iri
*/
private function buildUpdate(string|array $iri, string $data, array $options): Update
{
return new Update(
$iri,
$data,
$options['private'] ?? false,
$options['id'] ?? null,
$options['type'] ?? null,
$options['retry'] ?? null
);
}
private function reset(): void
{
$this->createdObjects = new SplObjectStorage();
$this->updatedObjects = new SplObjectStorage();
$this->deletedObjects = new SplObjectStorage();
}
}
Metadata
Metadata
Assignees
Labels
No labels