Skip to content

Commit

Permalink
chore(channel): improve performance
Browse files Browse the repository at this point in the history
Signed-off-by: azjezz <azjezz@protonmail.com>
  • Loading branch information
azjezz committed Nov 29, 2021
1 parent 97e8c0f commit f71b926
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 39 deletions.
37 changes: 15 additions & 22 deletions src/Psl/Channel/Internal/ChannelState.php
Expand Up @@ -6,8 +6,9 @@

use Psl\Channel\ChannelInterface;
use Psl\Channel\Exception;
use Psl\DataStructure\Queue;
use Psl\DataStructure\QueueInterface;

use function array_shift;
use function count;

/**
* @template T
Expand All @@ -19,16 +20,15 @@
final class ChannelState implements ChannelInterface
{
/**
* @var QueueInterface<T>
* @var array<array-key, T>
*/
private QueueInterface $messages;
private array $messages = [];

private bool $closed = false;

public function __construct(
private ?int $capacity = null,
) {
$this->messages = new Queue();
}

/**
Expand Down Expand Up @@ -60,27 +60,23 @@ public function isClosed(): bool
*/
public function count(): int
{
return $this->messages->count();
return count($this->messages);
}

/**
* {@inheritDoc}
*/
public function isFull(): bool
{
if (null === $this->capacity) {
return false;
}

return $this->capacity === $this->count();
return $this->capacity && $this->capacity === $this->count();
}

/**
* {@inheritDoc}
*/
public function isEmpty(): bool
{
return 0 === $this->messages->count();
return 0 === count($this->messages);
}

/**
Expand All @@ -95,8 +91,8 @@ public function send(mixed $message): void
throw Exception\ClosedChannelException::forSending();
}

if (null === $this->capacity || $this->capacity > $this->count()) {
$this->messages->enqueue($message);
if (null === $this->capacity || $this->capacity > count($this->messages)) {
$this->messages[] = $message;

return;
}
Expand All @@ -112,17 +108,14 @@ public function send(mixed $message): void
*/
public function receive(): mixed
{
$empty = 0 === $this->count();
$closed = $this->closed;
if ($closed && $empty) {
throw Exception\ClosedChannelException::forReceiving();
}
if ([] === $this->messages) {
if ($this->closed) {
throw Exception\ClosedChannelException::forReceiving();
}

if ($empty) {
throw Exception\EmptyChannelException::create();
}

/** @psalm-suppress MissingThrowsDocblock */
return $this->messages->dequeue();
return array_shift($this->messages);
}
}
28 changes: 14 additions & 14 deletions src/Psl/Channel/Internal/Receiver.php
Expand Up @@ -34,29 +34,27 @@ public function __construct(
public function receive(): mixed
{
// there's a pending operation? wait for it.
$this->deferred?->getAwaitable()->then(static fn() => null, static fn() => null)->ignore()->await();
$this->deferred?->getAwaitable()->then(static fn() => null, static fn() => null)->await();

if ($this->state->isEmpty()) {
$this->deferred = new Async\Deferred();

$identifier = Async\Scheduler::repeat(0.000000001, function (): void {
if ($this->state->isClosed()) {
/**
* Channel has been closed from the receiving side.
*
* @psalm-suppress PossiblyNullReference
*/
if (!$this->deferred->isComplete()) {
/** @psalm-suppress PossiblyNullReference */
$this->deferred->error(Exception\ClosedChannelException::forReceiving());
}
if (!$this->state->isEmpty()) {
/** @psalm-suppress PossiblyNullReference */
$this->deferred->complete(null);

return;
}

if (!$this->state->isEmpty()) {
/**
* Channel has been closed from the sender side.
*
* @psalm-suppress PossiblyNullReference
*/
if ($this->state->isClosed() && !$this->deferred->isComplete()) {
/** @psalm-suppress PossiblyNullReference */
$this->deferred->complete(null);
$this->deferred->error(Exception\ClosedChannelException::forReceiving());
}
});

Expand Down Expand Up @@ -94,7 +92,9 @@ public function getCapacity(): ?int
*/
public function close(): void
{
$this->deferred?->error(Exception\ClosedChannelException::forReceiving());
if ($this->state->isEmpty()) {
$this->deferred?->error(Exception\ClosedChannelException::forReceiving());
}

$this->state->close();
}
Expand Down
4 changes: 2 additions & 2 deletions src/Psl/Channel/Internal/Sender.php
Expand Up @@ -34,15 +34,15 @@ public function __construct(
public function send(mixed $message): void
{
// there's a pending operation? wait for it.
$this->deferred?->getAwaitable()->then(static fn() => null, static fn() => null)->ignore()->await();
$this->deferred?->getAwaitable()->then(static fn() => null, static fn() => null)->await();

if ($this->state->isFull()) {
$this->deferred = new Async\Deferred();

$identifier = Async\Scheduler::repeat(0.000000001, function (): void {
if ($this->state->isClosed()) {
/**
* Channel has been closed from the receiving side.
* Channel has been closed from the receiver side.
*
* @psalm-suppress PossiblyNullReference
*/
Expand Down
45 changes: 45 additions & 0 deletions tests/benchmark/Channel/CommunicationBench.php
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);

namespace Psl\Tests\Benchmark\Channel;

use PhpBench\Attributes\Groups;
use Psl\Async;
use Psl\Channel;
use Psl\File;
use Psl\IO;

#[Groups(['channel'])]
final class CommunicationBench
{
public function benchCommunication(): void
{
[$receiver, $sender] = Channel\bounded(10);

Async\Scheduler::defer(static function () use ($receiver) {
try {
while (true) {
$receiver->receive();
}
} catch (Channel\Exception\ClosedChannelException) {
return;
}
});

/** @psalm-suppress MissingThrowsDocblock */
$file = File\open_read_only(__FILE__);
$reader = new IO\Reader($file);
/** @psalm-suppress MissingThrowsDocblock */
while (!$reader->isEndOfFile()) {
$byte = $reader->readByte();

/** @psalm-suppress InvalidArgument */
$sender->send($byte);
}

$sender->close();

Async\Scheduler::run();
}
}
2 changes: 2 additions & 0 deletions tests/benchmark/Type/ArrayKeyTypeBench.php
Expand Up @@ -4,13 +4,15 @@

namespace Psl\Tests\Benchmark\Type;

use PhpBench\Attributes\Groups;
use Psl\Tests\Benchmark\Type\Asset\ExplicitStringableObject;
use Psl\Tests\Benchmark\Type\Asset\ImplicitStringableObject;
use Psl\Type;

/**
* @extends GenericTypeBench<Type\TypeInterface<array-key>>
*/
#[Groups(['type'])]
final class ArrayKeyTypeBench extends GenericTypeBench
{
/**
Expand Down
2 changes: 2 additions & 0 deletions tests/benchmark/Type/DictTypeBench.php
Expand Up @@ -5,13 +5,15 @@
namespace Psl\Tests\Benchmark\Type;

use ArrayIterator;
use PhpBench\Attributes\Groups;
use Psl\Dict;
use Psl\Type;
use Psl\Vec;

/**
* @extends GenericTypeBench<Type\TypeInterface<array>>
*/
#[Groups(['type'])]
final class DictTypeBench extends GenericTypeBench
{
/**
Expand Down
2 changes: 2 additions & 0 deletions tests/benchmark/Type/IntTypeBench.php
Expand Up @@ -4,13 +4,15 @@

namespace Psl\Tests\Benchmark\Type;

use PhpBench\Attributes\Groups;
use Psl\Tests\Benchmark\Type\Asset\ExplicitStringableObject;
use Psl\Tests\Benchmark\Type\Asset\ImplicitStringableObject;
use Psl\Type;

/**
* @extends GenericTypeBench<Type\TypeInterface<int>>
*/
#[Groups(['type'])]
final class IntTypeBench extends GenericTypeBench
{
/**
Expand Down
2 changes: 2 additions & 0 deletions tests/benchmark/Type/NonEmptyStringTypeBench.php
Expand Up @@ -4,13 +4,15 @@

namespace Psl\Tests\Benchmark\Type;

use PhpBench\Attributes\Groups;
use Psl\Tests\Benchmark\Type\Asset\ExplicitStringableObject;
use Psl\Tests\Benchmark\Type\Asset\ImplicitStringableObject;
use Psl\Type;

/**
* @extends GenericTypeBench<Type\TypeInterface<non-empty-string>>
*/
#[Groups(['type'])]
final class NonEmptyStringTypeBench extends GenericTypeBench
{
/**
Expand Down
2 changes: 2 additions & 0 deletions tests/benchmark/Type/ShapeTypeBench.php
Expand Up @@ -5,11 +5,13 @@
namespace Psl\Tests\Benchmark\Type;

use ArrayIterator;
use PhpBench\Attributes\Groups;
use Psl\Type;

/**
* @extends GenericTypeBench<Type\TypeInterface<array>>
*/
#[Groups(['type'])]
final class ShapeTypeBench extends GenericTypeBench
{
/**
Expand Down
2 changes: 2 additions & 0 deletions tests/benchmark/Type/StringTypeBench.php
Expand Up @@ -4,13 +4,15 @@

namespace Psl\Tests\Benchmark\Type;

use PhpBench\Attributes\Groups;
use Psl\Tests\Benchmark\Type\Asset\ExplicitStringableObject;
use Psl\Tests\Benchmark\Type\Asset\ImplicitStringableObject;
use Psl\Type;

/**
* @extends GenericTypeBench<Type\TypeInterface<string>>
*/
#[Groups(['type'])]
final class StringTypeBench extends GenericTypeBench
{
/**
Expand Down
4 changes: 3 additions & 1 deletion tests/benchmark/Type/VecTypeBench.php
Expand Up @@ -5,12 +5,14 @@
namespace Psl\Tests\Benchmark\Type;

use ArrayIterator;
use PhpBench\Attributes\Groups;
use Psl\Type;
use Psl\Vec;

/**
* @extends GenericTypeBench<Type\TypeInterface<list<mixed>>>
*/
#[Groups(['type'])]
final class VecTypeBench extends GenericTypeBench
{
/**
Expand Down Expand Up @@ -54,7 +56,7 @@ public function provideHappyPathMatches(): array
}

/**
* @return array<non-empty-string, array{type: \Psl\Type\TypeInterface<list<mixed>>, value: array}>
* @return array<non-empty-string, array{type: Type\TypeInterface<list<mixed>>, value: array}>
*
* @psalm-suppress MissingThrowsDocblock this block should never throw
*/
Expand Down

0 comments on commit f71b926

Please sign in to comment.