Skip to content

Commit

Permalink
Merge pull request #148 from duyler/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
milinsky committed May 19, 2024
2 parents 256d72b + e58c96d commit f3d0970
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 304 deletions.
302 changes: 1 addition & 301 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,306 +4,6 @@
[![codecov](https://codecov.io/gh/duyler/action-bus/graph/badge.svg?token=Z60T9EMXD6)](https://codecov.io/gh/duyler/action-bus)
# Action Bus

The action bus implements cooperative multitasking between the actions performed within it. Each action is performed within a separate thread (Fiber) in an isolated DI container. You can control execution using state handlers, triggers, and subscriptions to events generated by actions.
The action bus implements cooperative multitasking between actions performed inside it. Each action is performed inside a separate thread (Fiber) in an isolated DI container. Execution can be controlled using state handlers, triggers, and subscriptions to events generated by actions.

Full documentation see [duyler.com/en/docs/action-bus](https://duyler.com/en/docs/event-bus)

## Example amqp worker

### Create state handler for connect to amqp queue

```php

<?php

declare(strict_types=1);

use AMQPChannel;
use AMQPConnection;
use AMQPQueue;
use Duyler\ActionBus\Contract\State\MainBeginStateHandlerInterface;
use Duyler\ActionBus\State\Service\StateMainBeginService;
use Duyler\ActionBus\State\StateContext;
use AccountEventQueueConfig;
use Override;

class ConnectToQueueStateHandler implements MainBeginStateHandlerInterface
{
public function __construct(
private AccountEventQueueConfig $queueConfig,
) {}

#[Override]
public function handle(StateMainBeginService $stateService, StateContext $context): void
{
$connection = new AMQPConnection();
$connection->setHost($this->queueConfig->host);
$connection->setPort($this->queueConfig->port);
$connection->setLogin($this->queueConfig->login);
$connection->setPassword($this->queueConfig->password);
$connection->connect();

$channel = new AMQPChannel($connection);

$queue = new AMQPQueue($channel);
$queue->setName($this->queueConfig->queueName);
$queue->declareQueue();

$context->write('queue', $queue);
}
}

```

### Create state handler for listening queue

```php

<?php

declare(strict_types=1);

use AMQPEnvelope;
use AMQPQueue;
use Duyler\ActionBus\Contract\State\MainCyclicStateHandlerInterface;
use Duyler\ActionBus\Dto\Action;
use Duyler\ActionBus\Dto\Trigger;
use Duyler\ActionBus\Enum\ResultStatus;
use Duyler\ActionBus\State\Service\StateMainCyclicService;
use Duyler\ActionBus\State\StateContext;
use Override;

class ListeningQueueStateHandler implements MainCyclicStateHandlerInterface
{
#[Override]
public function handle(StateMainCyclicService $stateService, StateContext $context): void
{
/** @var AMQPQueue $queue */
$queue = $context->read('queue');

$message = $queue->get();

if ($message === null) {
return;
}

$content = json_decode($message->getBody(), true);

$actionId = 'account_id_' . $content['account_id'];

if ($stateService->actionIsExists($actionId) === false) {
$stateService->addAction(
new Action(
id: $actionId,
handler: HandleAccountEventAction::class,
triggeredOn: $actionId,
argument: AMQPEnvelope::class,
contract: AMQPEnvelope::class,
repeatable: true,
)
);
}

$stateService->doTrigger(
new Trigger(
id: $actionId,
data: $message,
contract: AMQPEnvelope::class,
)
);
}
}

```

### Create state handler for send ack into queue

```php

<?php

declare(strict_types=1);

use AMQPEnvelope;
use AMQPQueue;
use Duyler\ActionBus\Contract\State\MainAfterStateHandlerInterface;
use Duyler\ActionBus\Enum\ResultStatus;
use Duyler\ActionBus\State\Service\StateMainAfterService;
use Duyler\ActionBus\State\StateContext;
use Override;

class AckMessageStateHandler implements MainAfterStateHandlerInterface
{
#[Override]
public function handle(StateMainAfterService $stateService, StateContext $context): void
{
if ($stateService->getStatus() === ResultStatus::Success) {
/** @var AMQPEnvelope $message */
$message = $stateService->getResultData();
/** @var AMQPQueue $queue */
$queue = $context->read('queue');
$queue->ack($message->getDeliveryTag());
}
}

#[Override]
public function observed(StateContext $context): array
{
return [];
}
}

```

### Create action handler

```php

<?php

declare(strict_types=1);

use AMQPEnvelope;
use Duyler\ActionBus\Dto\Result;
use Duyler\ActionBus\Enum\ResultStatus;
use Fiber;

class HandleAccountEventAction
{
public function __invoke(AMQPEnvelope $message): Result
{
$content = json_decode($message->getBody(), true);

echo Fiber::suspend(
fn() => 'Account id: ' . $content['account_id'] . '. Event id: ' . $content['event_id'] . PHP_EOL
);

return new Result(
status: ResultStatus::Success,
data: $message,
);
}
}

```

### Build and run

```php

// run.php

<?php

declare(strict_types=1);

use Duyler\ActionBus\BusBuilder;
use Duyler\ActionBus\BusConfig;
use Duyler\ActionBus\Enum\Mode;
use AccountEventQueueConfig;

$busBuilder = new BusBuilder(
new BusConfig(
mode: Mode::Loop,
)
);

$config = new AccountEventQueueConfig(
host: 'localhost',
port: 5672,
logi: 'user',
password: 'password',
queueName: 'account_events_queue',
);

$busBuilder->addStateHandler(
new ConnectToQueueStateHandler($config),
);

$busBuilder->addStateHandler(
new ListeningQueueStateHandler(),
);

$busBuilder->addStateHandler(
new AckMessageStateHandler(),
);

$bus = $busBuilder
->build()
->run();

```

```shell
$ php run.php
```

## Example content receive

```php

<?php

use Duyler\ActionBus\BusBuilder;
use Duyler\ActionBus\BusConfig;
use Duyler\ActionBus\Dto\Action;
use Duyler\ActionBus\Dto\Subscription;
use Duyler\ActionBus\Enum\ResultStatus;
use Psr\Http\Message\ServerRequestInterface;

$requestAction = new Action(
id: 'Request.GetRequest',
handler: GetRequestAction::class,
contract: ServerRequestInterface::class,
);

$blogAction = new Action(
id: 'Blog.GetPostById',
handler: GetPostByIdAction::class,
required: [
'Request.GetRequest',
],
argument: PostId::class,
argumentFactory: fn(ServerRequestInterface $request): PostId => new PostId($request->getAttribute('id')),
externalAccess: true,
contract: Post::class,
);

$blogCommentListAction = new Action(
id: 'Blog.GetPostComments',
handler: GetCommentsByPostAction::class,
required: [
'Blog.GetPostById',
],
argument: Post,
externalAccess: true,
contract: CommentList::class,
);

$blogActionSubscription = new Subscription(
subject: 'Request.GetRequest',
actionId: 'Blog.GetPostById',
status: ResultStatus::Success,
);

$blogCommentListActionSubscription = new Subscription(
subject: 'Blog.GetPostById',
actionId: 'Blog.GetPostComments',
status: ResultStatus::Success,
);

$busBuilder = new BusBuilder(new BusConfig());

$bus = $busBuilder
->addAction($blogAction)
->addAction($blogCommentListAction)
->addSubscription($blogActionSubscription)
->addSubscription($blogCommentListActionSubscription)
->doAction($requestAction)
->build()
->run();

$blogPost = $bus->getResult('Blog.GetPostById');
$blogPostComments = $bus->getResult('Blog.GetPostComments');

```
4 changes: 1 addition & 3 deletions src/Service/ActionService.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ public function collect(array $actions): void
$this->throwActionNotDefined($subject);
}

$requiredAction = $actions[$subject];

$this->checkRequiredAction($action->id, $requiredAction);
$this->checkRequiredAction($action->id, $actions[$subject]);
}

foreach ($action->alternates as $actionId) {
Expand Down

0 comments on commit f3d0970

Please sign in to comment.