Skip to content

Commit

Permalink
[TransactionalMessenger] Added possibility to specify more than one c…
Browse files Browse the repository at this point in the history
…ommit types for message
  • Loading branch information
fractalzombie committed Aug 18, 2022
1 parent 4a01552 commit 8e4aef5
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 7 deletions.
8 changes: 6 additions & 2 deletions Attribute/Transactional.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@

use FRZB\Component\TransactionalMessenger\Enum\CommitType;

#[\Attribute(\Attribute::TARGET_CLASS)]
#[\Attribute(\Attribute::TARGET_CLASS | \Attribute::IS_REPEATABLE)]
final class Transactional
{
public const LISTENER_PRIORITY = -2048;

/** @var array<CommitType> */
public readonly array $commitTypes;

public function __construct(
public readonly CommitType $commitType = CommitType::OnTerminate,
CommitType ...$commitTypes,
) {
$this->commitTypes = $commitTypes ?: [CommitType::OnTerminate];
}
}
28 changes: 28 additions & 0 deletions Helper/TransactionHelper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);


namespace FRZB\Component\TransactionalMessenger\Helper;

use Fp\Collections\ArrayList;
use FRZB\Component\TransactionalMessenger\Enum\CommitType;
use FRZB\Component\TransactionalMessenger\ValueObject\PendingEnvelope;
use JetBrains\PhpStorm\Immutable;

/** @internal */
#[Immutable]
final class TransactionHelper
{
private function __construct()
{
}

public static function isDispatchAllowed(PendingEnvelope $envelope, CommitType ...$commitTypes): bool
{
return ArrayList::collect($envelope->getAttribute()->commitTypes)
->filter(static fn (CommitType $ct) => \in_array($ct, $commitTypes, true))
->isNonEmpty()
;
}
}
9 changes: 5 additions & 4 deletions MessageBus/TransactionalMessageBus.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use FRZB\Component\TransactionalMessenger\Event\DispatchSucceedEvent;
use FRZB\Component\TransactionalMessenger\Helper\AttributeHelper;
use FRZB\Component\TransactionalMessenger\Helper\EnvelopeHelper;
use FRZB\Component\TransactionalMessenger\Helper\TransactionHelper;
use FRZB\Component\TransactionalMessenger\Storage\Storage as StorageImpl;
use FRZB\Component\TransactionalMessenger\Storage\StorageInterface as Storage;
use FRZB\Component\TransactionalMessenger\ValueObject\FailedEnvelope;
Expand Down Expand Up @@ -84,16 +85,16 @@ public function rollback(\Throwable $exception): void

private function dispatchPendingEnvelopes(CommitType ...$commitTypes): void
{
$notExecutedEnvelopes = new StorageImpl();
$notAllowedForDispatchEnvelopes = new StorageImpl();

while ($pendingEnvelope = $this->pendingStorage->next()) {
\in_array($pendingEnvelope->getAttribute()->commitType, $commitTypes)
TransactionHelper::isDispatchAllowed($pendingEnvelope, ...$commitTypes)
? $this->dispatchEnvelope($pendingEnvelope->envelope)
: $notExecutedEnvelopes->prepend($pendingEnvelope)
: $notAllowedForDispatchEnvelopes->prepend($pendingEnvelope)
;
}

$this->pendingStorage->prepend(...$notExecutedEnvelopes->list());
$this->pendingStorage->prepend(...$notAllowedForDispatchEnvelopes->list());
}

private function dispatchSucceedEnvelopes(): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public function testCommitMethod(
;

$envelope = $this->messageBus->dispatch($message);
$this->messageBus->commit(AttributeHelper::getAttribute($message, Transactional::class)->commitType);
$this->messageBus->commit(...AttributeHelper::getAttribute($message, Transactional::class)->commitTypes);

self::assertSame(spl_object_hash($message), spl_object_hash($envelope->getMessage()));
self::assertSame($pendingCount, $this->pendingStorage->count());
Expand Down

0 comments on commit 8e4aef5

Please sign in to comment.