Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/Amqp/src/AmqpBackedMessageChannelBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Ecotone\Amqp;

use Ecotone\Enqueue\EnqueueMessageChannelBuilder;
use Ecotone\Messaging\Endpoint\FinalFailureStrategy;
use Enqueue\AmqpExt\AmqpConnectionFactory;

/**
Expand Down
1 change: 1 addition & 0 deletions packages/Amqp/src/AmqpInboundChannelAdapterBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public function compile(MessagingContainerBuilder $builder): Definition
DefaultHeaderMapper::createWith($this->headerMapper, []),
EnqueueHeader::HEADER_ACKNOWLEDGE,
Reference::to(LoggingGateway::class),
$this->finalFailureStrategy,
]);

return new Definition(AmqpInboundChannelAdapter::class, [
Expand Down
91 changes: 91 additions & 0 deletions packages/Amqp/tests/FinalFailureStrategyTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\Amqp;

use Ecotone\Amqp\AmqpBackedMessageChannelBuilder;
use Ecotone\Lite\EcotoneLite;
use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Messaging\Attribute\ServiceActivator;
use Ecotone\Messaging\Channel\PollableChannel\InMemory\InMemoryAcknowledgeStatus;
use Ecotone\Messaging\Channel\PollableChannel\InMemory\InMemoryQueueAcknowledgeInterceptor;
use Ecotone\Messaging\Channel\SimpleMessageChannelBuilder;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata;
use Ecotone\Messaging\Endpoint\FinalFailureStrategy;
use Ecotone\Messaging\Message;
use Enqueue\AmqpExt\AmqpConnectionFactory;
use PHPUnit\Framework\TestCase;

/**
* @internal
*/
/**
* licence Apache-2.0
*/
final class FinalFailureStrategyTest extends AmqpMessagingTestCase
{
public function test_reject_failure_strategy_rejects_message_on_exception()
{
$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
[FailingService::class],
[new FailingService(), AmqpConnectionFactory::class => $this->getCachedConnectionFactory(),],
configuration: ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withExtensionObjects([
AmqpBackedMessageChannelBuilder::create(channelName: 'async')
->withFinalFailureStrategy(FinalFailureStrategy::IGNORE)
->withReceiveTimeout(100),
])
);

$ecotoneTestSupport->sendDirectToChannel('executionChannel', 'some');
$ecotoneTestSupport->run('async', ExecutionPollingMetadata::createWithTestingSetup(failAtError: false));

$messageChannel = $ecotoneTestSupport->getMessageChannel('async');
$this->assertNull($messageChannel->receive());
}

public function test_resend_failure_strategy_rejects_message_on_exception()
{
$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
[FailingService::class],
[new FailingService(), AmqpConnectionFactory::class => $this->getCachedConnectionFactory(),],
configuration: ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withExtensionObjects([
AmqpBackedMessageChannelBuilder::create(channelName: 'async')
->withFinalFailureStrategy(FinalFailureStrategy::RESEND)
->withReceiveTimeout(100),
])
);

$ecotoneTestSupport->sendDirectToChannel('executionChannel', 'some');
$ecotoneTestSupport->run('async', ExecutionPollingMetadata::createWithTestingSetup(failAtError: false));

$messageChannel = $ecotoneTestSupport->getMessageChannel('async');
$this->assertNotNull($messageChannel->receive());
}
}


class FailingService
{
private Message $message;

#[Asynchronous('async')]
#[ServiceActivator('executionChannel')]
public function handle(Message $message): void
{
$this->message = $message;

throw new \Exception('Service failed');
}

public function getMessage(): Message
{
return $this->message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Ecotone\Messaging\Channel\PollableChannel\InMemory;

use Ecotone\Messaging\Endpoint\AcknowledgementCallback;
use Ecotone\Messaging\Endpoint\FinalFailureStrategy;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\PollableChannel;
use Ecotone\Messaging\Support\Assert;
Expand All @@ -16,47 +17,51 @@
final class InMemoryAcknowledgeCallback implements AcknowledgementCallback
{
public function __construct(
private PollableChannel $queueChannel,
private Message $message,
private bool $isAutoAck = true,
private bool $wasAcked = false
private PollableChannel $queueChannel,
private Message $message,
private FinalFailureStrategy $failureStrategy = FinalFailureStrategy::RESEND,
private bool $isAutoAcked = true,
private InMemoryAcknowledgeStatus $status = InMemoryAcknowledgeStatus::AWAITING
) {
}

/**
* @return bool
* @inheritDoc
*/
public function isAutoAck(): bool
public function getFailureStrategy(): FinalFailureStrategy
{
return $this->isAutoAck;
return $this->failureStrategy;
}

/**
* Disable auto acknowledgment
* @inheritDoc
*/
public function disableAutoAck(): void
public function isAutoAcked(): bool
{
$this->isAutoAck = false;
return $this->isAutoAcked;
}

public function getStatus(): InMemoryAcknowledgeStatus
{
return $this->status;
}

/**
* Mark the message as accepted
*/
public function accept(): void
{
Assert::isFalse($this->wasAcked, 'Trying to acknowledge message that was already acknowledged');

$this->wasAcked = true;
Assert::isTrue(in_array($this->status, [InMemoryAcknowledgeStatus::AWAITING, InMemoryAcknowledgeStatus::RESENT], true), "Message was already acknowledged.");
$this->status = InMemoryAcknowledgeStatus::ACKED;
}

/**
* Mark the message as rejected
*/
public function reject(): void
{
Assert::isFalse($this->wasAcked, 'Trying to acknowledge message that was already acknowledged');

$this->wasAcked = true;
Assert::isTrue(in_array($this->status, [InMemoryAcknowledgeStatus::AWAITING, InMemoryAcknowledgeStatus::RESENT], true), "Message was already acknowledged.");
$this->status = InMemoryAcknowledgeStatus::IGNORED;
}

private int $requeueCount = 0;
Expand All @@ -66,6 +71,9 @@ public function reject(): void
*/
public function requeue(): void
{
Assert::isTrue(in_array($this->status, [InMemoryAcknowledgeStatus::AWAITING, InMemoryAcknowledgeStatus::RESENT], true), "Message was already acknowledged.");

$this->status = InMemoryAcknowledgeStatus::RESENT;
$this->requeueCount++;

if ($this->requeueCount > 100) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace Ecotone\Messaging\Channel\PollableChannel\InMemory;

/**
* licence Apache-2.0
*/
enum InMemoryAcknowledgeStatus: int
{
case AWAITING = 0;
case ACKED = 1;
case RESENT = 2;
case IGNORED = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@

use Ecotone\Messaging\Channel\AbstractChannelInterceptor;
use Ecotone\Messaging\Channel\ChannelInterceptor;
use Ecotone\Messaging\Endpoint\FinalFailureStrategy;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\MessageChannel;
use Ecotone\Messaging\MessageHeaders;
use Ecotone\Messaging\Support\Assert;
use Ecotone\Messaging\Support\MessageBuilder;
use Ecotone\Messaging\PollableChannel;

/**
* licence Apache-2.0
Expand All @@ -18,6 +21,11 @@ final class InMemoryQueueAcknowledgeInterceptor extends AbstractChannelIntercept
{
public const ECOTONE_IN_MEMORY_QUEUE_ACK = 'ecotone.in_memory_queue.ack';

public function __construct(private FinalFailureStrategy $finalFailureStrategy, private bool $isAutoAcked)
{

}

/**
* @inheritDoc
*/
Expand All @@ -27,9 +35,11 @@ public function preSend(Message $message, MessageChannel $messageChannel): ?Mess
return $message;
}

Assert::isTrue($messageChannel instanceof PollableChannel, "InMemoryQueueAcknowledgeInterceptor can be used only with PollableChannel");

return MessageBuilder::fromMessage($message)
->setHeader(MessageHeaders::CONSUMER_ACK_HEADER_LOCATION, self::ECOTONE_IN_MEMORY_QUEUE_ACK)
->setHeader(self::ECOTONE_IN_MEMORY_QUEUE_ACK, new InMemoryAcknowledgeCallback($messageChannel, $message))
->setHeader(self::ECOTONE_IN_MEMORY_QUEUE_ACK, new InMemoryAcknowledgeCallback(queueChannel: $messageChannel, message: $message, failureStrategy: $this->finalFailureStrategy, isAutoAcked: $this->isAutoAcked))
->build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
namespace Ecotone\Messaging\Channel\PollableChannel\InMemory;

use Ecotone\Messaging\Channel\ChannelInterceptorBuilder;
use Ecotone\Messaging\Config\Container\ChannelReference;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\MessagingContainerBuilder;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Endpoint\FinalFailureStrategy;
use Ecotone\Messaging\PrecedenceChannelInterceptor;

/**
* licence Apache-2.0
*/
final class InMemoryQueueAcknowledgeInterceptorBuilder implements ChannelInterceptorBuilder
{
public function __construct(private string $relatedChannel)
public function __construct(private string $relatedChannel, private FinalFailureStrategy $finalFailureStrategy, private bool $isAutoAcked)
{
}

Expand All @@ -30,6 +33,9 @@ public function getPrecedence(): int

public function compile(MessagingContainerBuilder $builder): Definition
{
return new Definition(InMemoryQueueAcknowledgeInterceptor::class);
return new Definition(InMemoryQueueAcknowledgeInterceptor::class, [
$this->finalFailureStrategy,
$this->isAutoAcked,
]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
{
$pollableMessageChannels = ExtensionObjectResolver::resolve(MessageChannelBuilder::class, $extensionObjects);

/** @var SimpleMessageChannelBuilder $pollableMessageChannel */
foreach ($pollableMessageChannels as $pollableMessageChannel) {
$messagingConfiguration->registerChannelInterceptor(
new InMemoryQueueAcknowledgeInterceptorBuilder(
$pollableMessageChannel->getMessageChannelName()
$pollableMessageChannel->getMessageChannelName(),
$pollableMessageChannel->getFinalFailureStrategy(),
$pollableMessageChannel->isAutoAcked()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Ecotone\Messaging\Config\Container\MessagingContainerBuilder;
use Ecotone\Messaging\Config\DefinedObjectWrapper;
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Endpoint\FinalFailureStrategy;
use Ecotone\Messaging\MessageChannel;
use Ecotone\Messaging\MessageConverter\DefaultHeaderMapper;
use Ecotone\Messaging\MessageConverter\HeaderMapper;
Expand All @@ -25,22 +26,26 @@
class SimpleMessageChannelBuilder implements MessageChannelWithSerializationBuilder
{
private function __construct(
private string $messageChannelName,
private MessageChannel $messageChannel,
private bool $isPollable,
private ?MediaType $conversionMediaType,
private HeaderMapper $headerMapper,
private string $messageChannelName,
private MessageChannel $messageChannel,
private bool $isPollable,
private ?MediaType $conversionMediaType,
private HeaderMapper $headerMapper,
private FinalFailureStrategy $finalFailureStrategy,
private bool $isAutoAcked,
) {
}

public static function create(string $messageChannelName, MessageChannel $messageChannel, string|MediaType|null $conversionMediaType = null): self
public static function create(string $messageChannelName, MessageChannel $messageChannel, string|MediaType|null $conversionMediaType = null, FinalFailureStrategy $finalFailureStrategy = FinalFailureStrategy::RESEND, bool $isAutoAcked = true): self
{
return new self(
$messageChannelName,
$messageChannel,
$messageChannel instanceof PollableChannel,
$conversionMediaType ? (is_string($conversionMediaType) ? MediaType::parseMediaType($conversionMediaType) : $conversionMediaType) : null,
DefaultHeaderMapper::createAllHeadersMapping(),
$finalFailureStrategy,
$isAutoAcked,
);
}

Expand All @@ -54,11 +59,11 @@ public static function createPublishSubscribeChannel(string $messageChannelName)
return self::create($messageChannelName, PublishSubscribeChannel::create($messageChannelName), null);
}

public static function createQueueChannel(string $messageChannelName, bool $delayable = false, string|MediaType|null $conversionMediaType = null): self
public static function createQueueChannel(string $messageChannelName, bool $delayable = false, string|MediaType|null $conversionMediaType = null, FinalFailureStrategy $finalFailureStrategy = FinalFailureStrategy::RESEND, bool $isAutoAcked = true): self
{
$messageChannel = $delayable ? DelayableQueueChannel::create($messageChannelName) : QueueChannel::create($messageChannelName);

return self::create($messageChannelName, $messageChannel, $conversionMediaType);
return self::create($messageChannelName, $messageChannel, $conversionMediaType, $finalFailureStrategy, $isAutoAcked);
}

public static function createNullableChannel(string $messageChannelName): self
Expand All @@ -79,6 +84,16 @@ public function isPollable(): bool
return $this->isPollable;
}

public function getFinalFailureStrategy(): FinalFailureStrategy
{
return $this->finalFailureStrategy;
}

public function isAutoAcked(): bool
{
return $this->isAutoAcked;
}

public function withDefaultConversionMediaType(string $mediaType): self
{
$this->conversionMediaType = MediaType::parseMediaType($mediaType);
Expand Down
Loading
Loading