Skip to content

Commit

Permalink
Update for Stream to Pipeline changes
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Aug 28, 2020
1 parent c092fdf commit dee4167
Show file tree
Hide file tree
Showing 9 changed files with 18 additions and 94 deletions.
26 changes: 5 additions & 21 deletions src/ConnectionListener.php
Expand Up @@ -2,12 +2,12 @@

namespace Amp\Postgres;

use Amp\Pipeline;
use Amp\Promise;
use Amp\Stream;

final class ConnectionListener implements Listener
{
/** @var Stream */
/** @var Pipeline */
private $stream;

/** @var string */
Expand All @@ -17,13 +17,13 @@ final class ConnectionListener implements Listener
private $unlisten;

/**
* @param Stream $stream Stream emitting notificatons on the channel.
* @param Pipeline $pipeline Pipeline emitting notificatons on the channel.
* @param string $channel Channel name.
* @param callable(string $channel): $unlisten Function invoked to unlisten from the channel.
*/
public function __construct(Stream $stream, string $channel, callable $unlisten)
public function __construct(Pipeline $pipeline, string $channel, callable $unlisten)
{
$this->stream = $stream;
$this->stream = $pipeline;
$this->channel = $channel;
$this->unlisten = $unlisten;
}
Expand Down Expand Up @@ -52,22 +52,6 @@ public function dispose()
$this->unlisten();
}

/**
* @inheritDoc
*/
public function onDisposal(callable $onDisposal): void
{
$this->stream->onDisposal($onDisposal);
}

/**
* @inheritDoc
*/
public function onCompletion(callable $onCompletion): void
{
$this->stream->onCompletion($onCompletion);
}

/**
* @return string Channel name.
*/
Expand Down
4 changes: 2 additions & 2 deletions src/Listener.php
Expand Up @@ -2,10 +2,10 @@

namespace Amp\Postgres;

use Amp\Pipeline;
use Amp\Promise;
use Amp\Stream;

interface Listener extends Stream
interface Listener extends Pipeline
{
/**
* @return Promise<Notification|null>
Expand Down
8 changes: 4 additions & 4 deletions src/PgSqlHandle.php
Expand Up @@ -4,13 +4,13 @@

use Amp\Deferred;
use Amp\Loop;
use Amp\PipelineSource;
use Amp\Promise;
use Amp\Sql\Common\CommandResult;
use Amp\Sql\ConnectionException;
use Amp\Sql\FailureException;
use Amp\Sql\QueryError;
use Amp\Sql\Result;
use Amp\StreamSource;
use Amp\Struct;
use Amp\Success;
use function Amp\call;
Expand Down Expand Up @@ -44,7 +44,7 @@ final class PgSqlHandle implements Handle
/** @var string */
private $await;

/** @var StreamSource[] */
/** @var PipelineSource[] */
private $listeners = [];

/** @var object[] Anonymous class using Struct trait. */
Expand Down Expand Up @@ -463,7 +463,7 @@ public function listen(string $channel): Promise
throw new QueryError(\sprintf("Already listening on channel '%s'", $channel));
}

$this->listeners[$channel] = $source = new StreamSource;
$this->listeners[$channel] = $source = new PipelineSource;

try {
yield $this->query(\sprintf("LISTEN %s", $this->quoteName($channel)));
Expand All @@ -473,7 +473,7 @@ public function listen(string $channel): Promise
}

Loop::enable($this->poll);
return new ConnectionListener($source->stream(), $channel, \Closure::fromCallable([$this, 'unlisten']));
return new ConnectionListener($source->pipe(), $channel, \Closure::fromCallable([$this, 'unlisten']));
});
}

Expand Down
17 changes: 0 additions & 17 deletions src/PgSqlResultSet.php
Expand Up @@ -55,7 +55,6 @@ public function __construct($handle, Promise $nextResult)
\pg_free_result($handle);
}
});
$this->generator->getReturn(); // Force generator to start execution.
}

/**
Expand All @@ -74,22 +73,6 @@ public function dispose(): void
$this->generator->dispose();
}

/**
* @inheritDoc
*/
public function onDisposal(callable $onDisposal): void
{
$this->generator->onDisposal($onDisposal);
}

/**
* @inheritDoc
*/
public function onCompletion(callable $onCompletion): void
{
$this->generator->onCompletion($onCompletion);
}

/**
* @inheritDoc
*/
Expand Down
10 changes: 0 additions & 10 deletions src/PooledListener.php
Expand Up @@ -40,16 +40,6 @@ public function dispose(): void
$this->listener->dispose();
}

public function onDisposal(callable $onDisposal): void
{
$this->listener->onDisposal($onDisposal);
}

public function onCompletion(callable $onCompletion): void
{
$this->listener->onCompletion($onCompletion);
}

public function getChannel(): string
{
return $this->listener->getChannel();
Expand Down
16 changes: 0 additions & 16 deletions src/PqBufferedResultSet.php
Expand Up @@ -53,22 +53,6 @@ public function dispose(): void
$this->generator->dispose();
}

/**
* @inheritDoc
*/
public function onDisposal(callable $onDisposal): void
{
$this->generator->onDisposal($onDisposal);
}

/**
* @inheritDoc
*/
public function onCompletion(callable $onCompletion): void
{
$this->generator->onCompletion($onCompletion);
}

/**
* @inheritDoc
*/
Expand Down
8 changes: 4 additions & 4 deletions src/PqHandle.php
Expand Up @@ -5,13 +5,13 @@
use Amp\Coroutine;
use Amp\Deferred;
use Amp\Loop;
use Amp\PipelineSource;
use Amp\Promise;
use Amp\Sql\Common\CommandResult;
use Amp\Sql\ConnectionException;
use Amp\Sql\FailureException;
use Amp\Sql\QueryError;
use Amp\Sql\Result;
use Amp\StreamSource;
use Amp\Struct;
use Amp\Success;
use pq;
Expand All @@ -34,7 +34,7 @@ final class PqHandle implements Handle
/** @var string */
private $await;

/** @var StreamSource[] */
/** @var PipelineSource[] */
private $listeners;

/** @var object[] Anonymous class using Struct trait. */
Expand Down Expand Up @@ -469,7 +469,7 @@ public function listen(string $channel): Promise
throw new QueryError(\sprintf("Already listening on channel '%s'", $channel));
}

$this->listeners[$channel] = $source = new StreamSource;
$this->listeners[$channel] = $source = new PipelineSource;

try {
yield from $this->send(
Expand All @@ -490,7 +490,7 @@ static function (string $channel, string $message, int $pid) use ($source) {
}

Loop::enable($this->poll);
return new ConnectionListener($source->stream(), $channel, \Closure::fromCallable([$this, 'unlisten']));
return new ConnectionListener($source->pipe(), $channel, \Closure::fromCallable([$this, 'unlisten']));
});
}

Expand Down
17 changes: 0 additions & 17 deletions src/PqUnbufferedResultSet.php
Expand Up @@ -39,7 +39,6 @@ public function __construct(callable $fetch, pq\Result $result, Promise $nextRes
}
}
});
$this->generator->getReturn(); // Force generator to start execution.
}

public function getNextResult(): Promise
Expand All @@ -63,22 +62,6 @@ public function dispose(): void
$this->generator->dispose();
}

/**
* @inheritDoc
*/
public function onDisposal(callable $onDisposal): void
{
$this->generator->onDisposal($onDisposal);
}

/**
* @inheritDoc
*/
public function onCompletion(callable $onCompletion): void
{
$this->generator->onCompletion($onCompletion);
}

/**
* @inheritDoc
*/
Expand Down
6 changes: 3 additions & 3 deletions test/AbstractLinkTest.php
Expand Up @@ -6,6 +6,7 @@
use Amp\Delayed;
use Amp\Loop;
use Amp\PHPUnit\AsyncTestCase;
use Amp\Pipeline;
use Amp\Postgres\Link;
use Amp\Postgres\Listener;
use Amp\Postgres\QueryExecutionError;
Expand All @@ -16,7 +17,6 @@
use Amp\Sql\Statement;
use Amp\Sql\Transaction as SqlTransaction;
use Amp\Sql\TransactionError;
use Amp\Stream;

abstract class AbstractLinkTest extends AsyncTestCase
{
Expand Down Expand Up @@ -433,8 +433,8 @@ public function testPrepareSimilarQueryReturnsDifferentStatements(): \Generator

$results = [];

$results[] = yield Stream\toArray(yield $statement1->execute([$data[0]]));
$results[] = yield Stream\toArray(yield $statement2->execute(['domain' => $data[0]]));
$results[] = yield Pipeline\toArray(yield $statement1->execute([$data[0]]));
$results[] = yield Pipeline\toArray(yield $statement2->execute(['domain' => $data[0]]));

foreach ($results as $result) {
/** @var Result $result */
Expand Down

0 comments on commit dee4167

Please sign in to comment.