Skip to content

Commit

Permalink
feat(channel): introduce new channel component (#283)
Browse files Browse the repository at this point in the history
Signed-off-by: azjezz <azjezz@protonmail.com>
  • Loading branch information
azjezz committed Nov 23, 2021
1 parent 8595a53 commit 23a5a61
Show file tree
Hide file tree
Showing 24 changed files with 1,136 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
* introduced a new `Psl\Network` component.
* introduced a new `Psl\TCP` component.
* introduced a new `Psl\Unix` component.
* introduced a new `Psl\Channel` component.
1 change: 1 addition & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

- [Psl](./component/psl.md)
- [Psl\Async](./component/async.md)
- [Psl\Channel](./component/channel.md)
- [Psl\Class](./component/class.md)
- [Psl\Collection](./component/collection.md)
- [Psl\DataStructure](./component/data-structure.md)
Expand Down
23 changes: 23 additions & 0 deletions docs/component/channel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<!--
This markdown file was generated using `docs/documenter.php`.
Any edits to it will likely be lost.
-->

[*index](./../README.md)

---

### `Psl\Channel` Component

#### `Functions`

- [bounded](./../../src/Psl/Channel/bounded.php#L18)
- [unbounded](./../../src/Psl/Channel/unbounded.php#L16)

#### `Interfaces`

- [ReceiverInterface](./../../src/Psl/Channel/ReceiverInterface.php#L12)
- [SenderInterface](./../../src/Psl/Channel/SenderInterface.php#L12)


4 changes: 2 additions & 2 deletions docs/component/data-structure.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

#### `Classes`

- [PriorityQueue](./../../src/Psl/DataStructure/PriorityQueue.php#L18)
- [Queue](./../../src/Psl/DataStructure/Queue.php#L19)
- [PriorityQueue](./../../src/Psl/DataStructure/PriorityQueue.php#L20)
- [Queue](./../../src/Psl/DataStructure/Queue.php#L21)
- [Stack](./../../src/Psl/DataStructure/Stack.php#L19)


1 change: 1 addition & 0 deletions docs/documenter.php
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ function get_all_components(): array
$components = [
'Psl',
'Psl\\Async',
'Psl\\Channel',
'Psl\\Class',
'Psl\\Collection',
'Psl\\DataStructure',
Expand Down
25 changes: 25 additions & 0 deletions examples/channel/main.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Psl\Example\IO;

use Psl\Async;
use Psl\Channel;
use Psl\IO;

require __DIR__ . '/../../vendor/autoload.php';

/**
* @var Channel\ReceiverInterface<string> $receiver
* @var Channel\SenderInterface<string> $sender
*/
[$receiver, $sender] = Channel\unbounded();

Async\Scheduler::delay(1, static function () use ($sender) {
$sender->send('Hello, World!');
});

$message = $receiver->receive();

IO\output_handle()->writeAll($message);
59 changes: 59 additions & 0 deletions src/Psl/Channel/ChannelInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php

declare(strict_types=1);

namespace Psl\Channel;

use Countable;

/**
* @template T
*/
interface ChannelInterface extends Countable
{
/**
* Returns the channel capacity if it’s bounded.
*
* @mutation-free
*/
public function getCapacity(): ?int;

/**
* Closes the channel.
*
* The remaining messages can still be received.
*/
public function close(): void;

/**
* Returns true if the channel is closed.
*
* @mutation-free
*/
public function isClosed(): bool;

/**
* Returns the number of messages in the channel.
*
* @return positive-int|0
*
* @mutation-free
*/
public function count(): int;

/**
* Returns true if the channel is full.
*
* Unbounded channels are never full.
*
* @mutation-free
*/
public function isFull(): bool;

/**
* Returns true if the channel is empty.
*
* @mutation-free
*/
public function isEmpty(): bool;
}
34 changes: 34 additions & 0 deletions src/Psl/Channel/Exception/ClosedChannelException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);

namespace Psl\Channel\Exception;

use Psl\Channel;
use Psl\Exception\RuntimeException;

/**
* This exception is thrown when attempting to send or receive a message on a closed channel.
*
* @see Channel\SenderInterface::send()
* @see Channel\SenderInterface::trySend()
* @see Channel\ReceiverInterface::receive()
* @see Channel\ReceiverInterface::tryReceive()
*/
final class ClosedChannelException extends RuntimeException implements ExceptionInterface
{
public function __construct(string $message = 'Channel has been closed')
{
parent::__construct($message);
}

public static function forSending(): ClosedChannelException
{
return new self('Attempted to send a message to a closed channel.');
}

public static function forReceiving(): ClosedChannelException
{
return new self('Attempted to receive a message from a closed empty channel.');
}
}
19 changes: 19 additions & 0 deletions src/Psl/Channel/Exception/EmptyChannelException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

declare(strict_types=1);

namespace Psl\Channel\Exception;

use OutOfBoundsException;
use Psl\Channel;

/**
* This exception is throw when calling {@see Channel\ReceiverInterface::tryReceive()} on an empty channel.
*/
final class EmptyChannelException extends OutOfBoundsException implements ExceptionInterface
{
public static function create(): EmptyChannelException
{
return new self('Attempted to receiver from an empty channel.');
}
}
11 changes: 11 additions & 0 deletions src/Psl/Channel/Exception/ExceptionInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Psl\Channel\Exception;

use Psl\Exception;

interface ExceptionInterface extends Exception\ExceptionInterface
{
}
20 changes: 20 additions & 0 deletions src/Psl/Channel/Exception/FullChannelException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

namespace Psl\Channel\Exception;

use OutOfBoundsException;
use Psl\Channel;
use Psl\Str;

/**
* This exception is throw when calling {@see Channel\SenderInterface::trySend()} on a full channel.
*/
final class FullChannelException extends OutOfBoundsException implements ExceptionInterface
{
public static function ofCapacity(int $capacity): FullChannelException
{
return new self(Str\format('Channel has reached its full capacity of %d.', $capacity));
}
}
128 changes: 128 additions & 0 deletions src/Psl/Channel/Internal/ChannelState.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
<?php

declare(strict_types=1);

namespace Psl\Channel\Internal;

use Psl\Channel\ChannelInterface;
use Psl\Channel\Exception;
use Psl\DataStructure\Queue;
use Psl\DataStructure\QueueInterface;

/**
* @template T
*
* @implements ChannelInterface<T>
*
* @internal
*/
final class ChannelState implements ChannelInterface
{
/**
* @var QueueInterface<T>
*/
private QueueInterface $messages;

private bool $closed = false;

public function __construct(
private ?int $capacity = null,
) {
$this->messages = new Queue();
}

/**
* {@inheritDoc}
*/
public function getCapacity(): ?int
{
return $this->capacity;
}

/**
* {@inheritDoc}
*/
public function close(): void
{
$this->closed = true;
}

/**
* {@inheritDoc}
*/
public function isClosed(): bool
{
return $this->closed;
}

/**
* {@inheritDoc}
*/
public function count(): int
{
return $this->messages->count();
}

/**
* {@inheritDoc}
*/
public function isFull(): bool
{
if (null === $this->capacity) {
return false;
}

return $this->capacity === $this->count();
}

/**
* {@inheritDoc}
*/
public function isEmpty(): bool
{
return 0 === $this->messages->count();
}

/**
* @param T $message
*
* @throws Exception\ClosedChannelException If the channel is closed.
* @throws Exception\FullChannelException If the channel is full.
*/
public function send(mixed $message): void
{
if ($this->closed) {
throw Exception\ClosedChannelException::forSending();
}

if (null === $this->capacity || $this->capacity > $this->count()) {
$this->messages->enqueue($message);

return;
}

throw Exception\FullChannelException::ofCapacity($this->capacity);
}

/**
* @throws Exception\ClosedChannelException If the channel is closed, and there's no more messages to receive.
* @throws Exception\EmptyChannelException If the channel is empty.
*
* @return T
*/
public function receive(): mixed
{
$empty = 0 === $this->count();
$closed = $this->closed;
if ($closed && $empty) {
throw Exception\ClosedChannelException::forReceiving();
}

if ($empty) {
throw Exception\EmptyChannelException::create();
}

/** @psalm-suppress MissingThrowsDocblock */
return $this->messages->dequeue();
}
}
Loading

0 comments on commit 23a5a61

Please sign in to comment.