Skip to content

Commit

Permalink
Add psalm annotations and fix doc types
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed Apr 4, 2020
1 parent 3b439dc commit b5b5fa7
Show file tree
Hide file tree
Showing 15 changed files with 144 additions and 28 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Expand Up @@ -19,13 +19,15 @@ env:
- AMP_DEBUG=true

install:
- if [[ ${TRAVIS_PHP_VERSION:0:3} == "7.0" ]]; then composer remove --dev vimeo/psalm; fi
- composer update -n --prefer-dist
- wget https://github.com/php-coveralls/php-coveralls/releases/download/v1.0.2/coveralls.phar
- chmod +x coveralls.phar

script:
- vendor/bin/phpunit --coverage-text --coverage-clover build/logs/clover.xml
- PHP_CS_FIXER_IGNORE_ENV=1 php vendor/bin/php-cs-fixer --diff --dry-run -v fix
- if [[ ${TRAVIS_PHP_VERSION:0:3} == "7.0" ]]; then echo "Skipped psalm static analysis"; else vendor/bin/psalm; fi

after_script:
- ./coveralls.phar -v
Expand Down
11 changes: 6 additions & 5 deletions composer.json
Expand Up @@ -30,10 +30,11 @@
},
"require-dev": {
"amphp/phpunit-util": "^1",
"phpunit/phpunit": "^6",
"phpunit/phpunit": "^6 || ^7 || ^8",
"friendsofphp/php-cs-fixer": "^2.3",
"amphp/php-cs-fixer-config": "dev-master",
"infection/infection": "^0.9.3"
"vimeo/psalm": "^3.9@dev",
"jetbrains/phpstorm-stubs": "^2019.3"
},
"autoload": {
"psr-4": {
Expand All @@ -48,9 +49,9 @@
"Amp\\ByteStream\\Test\\": "test"
}
},
"config": {
"platform": {
"php": "7.0.0"
"extra": {
"branch-alias": {
"dev-master": "1.x-dev"
}
}
}
13 changes: 9 additions & 4 deletions examples/benchmark-throughput.php
Expand Up @@ -12,9 +12,11 @@
Loop::set(new Loop\NativeDriver());

$args = \getopt('i:o:t:');
$if = isset($args['i']) ? $args['i'] : '/dev/zero';
$of = isset($args['o']) ? $args['o'] : '/dev/null';
$t = isset($args['t']) ? $args['t'] : 30;
$if = $args['i'] ?? '/dev/zero';
$of = $args['o'] ?? '/dev/null';
$t = (int) ($args['t'] ?? 30);

\assert(\is_string($if) && \is_string($of));

// passing file descriptors requires mapping paths (https://bugs.php.net/bug.php?id=53465)
$if = \preg_replace('(^/dev/fd/)', 'php://fd/', $if);
Expand Down Expand Up @@ -49,7 +51,10 @@

$t = \microtime(true) - $start;

$bytes = \ftell($out->getResource());
$resource = $out->getResource();
\assert($resource !== null);

$bytes = \ftell($resource);

$stderr->write('read ' . $bytes . ' byte(s) in ' . \round($t, 3) . ' second(s) => ' . \round($bytes / 1024 / 1024 / $t, 1) . ' MiB/s' . PHP_EOL);
$stderr->write('peak memory usage of ' . \round(\memory_get_peak_usage(true) / 1024 / 1024, 1) . ' MiB' . PHP_EOL);
Expand Down
2 changes: 1 addition & 1 deletion lib/InMemoryStream.php
Expand Up @@ -23,7 +23,7 @@ public function __construct(string $contents = null)
/**
* Reads data from the stream.
*
* @return Promise Resolves with the full contents or `null` if the stream has closed / already been consumed.
* @return Promise<string|null> Resolves with the full contents or `null` if the stream has closed / already been consumed.
*/
public function read(): Promise
{
Expand Down
2 changes: 2 additions & 0 deletions lib/InputStream.php
Expand Up @@ -30,6 +30,8 @@ interface InputStream
*
* @return Promise Resolves with a string when new data is available or `null` if the stream has closed.
*
* @psalm-return Promise<string|null>
*
* @throws PendingReadError Thrown if another read operation is still pending.
*/
public function read(): Promise;
Expand Down
2 changes: 2 additions & 0 deletions lib/InputStreamChain.php
Expand Up @@ -8,7 +8,9 @@

final class InputStreamChain implements InputStream
{
/** @var InputStream[] */
private $streams;
/** @var bool */
private $reading = false;

public function __construct(InputStream ...$streams)
Expand Down
7 changes: 7 additions & 0 deletions lib/IteratorStream.php
Expand Up @@ -9,10 +9,16 @@

final class IteratorStream implements InputStream
{
/** @var Iterator<string> */
private $iterator;
/** @var \Throwable|null */
private $exception;
/** @var bool */
private $pending = false;

/**
* @psam-param Iterator<string> $iterator
*/
public function __construct(Iterator $iterator)
{
$this->iterator = $iterator;
Expand All @@ -30,6 +36,7 @@ public function read(): Promise
}

$this->pending = true;
/** @var Deferred<string|null> $deferred */
$deferred = new Deferred;

$this->iterator->advance()->onResolve(function ($error, $hasNextElement) use ($deferred) {
Expand Down
3 changes: 3 additions & 0 deletions lib/LineReader.php
Expand Up @@ -55,6 +55,9 @@ public function getBuffer(): string
return $this->buffer;
}

/**
* @return void
*/
public function clearBuffer()
{
$this->buffer = "";
Expand Down
8 changes: 4 additions & 4 deletions lib/Message.php
Expand Up @@ -38,22 +38,22 @@ class Message implements InputStream, Promise
/** @var string */
private $buffer = "";

/** @var \Amp\Deferred|null */
/** @var Deferred|null */
private $pendingRead;

/** @var \Amp\Coroutine */
/** @var Coroutine|null */
private $coroutine;

/** @var bool True if onResolve() has been called. */
private $buffering = false;

/** @var \Amp\Deferred|null */
/** @var Deferred|null */
private $backpressure;

/** @var bool True if the iterator has completed. */
private $complete = false;

/** @var \Throwable Used to fail future reads on failure. */
/** @var \Throwable|null Used to fail future reads on failure. */
private $error;

/**
Expand Down
5 changes: 3 additions & 2 deletions lib/OutputBuffer.php
Expand Up @@ -8,12 +8,13 @@

class OutputBuffer implements OutputStream, Promise
{
/** @var \Amp\Deferred|null */
/** @var Deferred */
private $deferred;

/** @var string */
private $contents;
private $contents = '';

/** @var bool */
private $closed = false;

public function __construct()
Expand Down
18 changes: 15 additions & 3 deletions lib/ResourceInputStream.php
Expand Up @@ -14,13 +14,13 @@ final class ResourceInputStream implements InputStream
{
const DEFAULT_CHUNK_SIZE = 8192;

/** @var resource */
/** @var resource|null */
private $resource;

/** @var string */
private $watcher;

/** @var \Amp\Deferred|null */
/** @var Deferred|null */
private $deferred;

/** @var bool */
Expand All @@ -35,7 +35,7 @@ final class ResourceInputStream implements InputStream
/** @var callable */
private $immediateCallable;

/** @var string */
/** @var string|null */
private $immediateWatcher;

/**
Expand Down Expand Up @@ -123,6 +123,8 @@ public function read(): Promise
return new Success; // Resolve with null on closed stream.
}

\assert($this->resource !== null);

// Attempt a direct read, because Windows suffers from slow I/O on STDIN otherwise.
if ($this->useSingleRead) {
$data = @\fread($this->resource, $this->chunkSize);
Expand Down Expand Up @@ -172,6 +174,7 @@ public function close()
if ($meta && \strpos($meta["mode"], "+") !== false) {
@\stream_socket_shutdown($this->resource, \STREAM_SHUT_RD);
} else {
/** @psalm-suppress InvalidPropertyAssignmentValue */
@\fclose($this->resource);
}
}
Expand All @@ -181,6 +184,8 @@ public function close()

/**
* Nulls reference to resource, marks stream unreadable, and succeeds any pending read with null.
*
* @return void
*/
private function free()
{
Expand Down Expand Up @@ -208,6 +213,9 @@ public function getResource()
return $this->resource;
}

/**
* @return void
*/
public function setChunkSize(int $chunkSize)
{
$this->chunkSize = $chunkSize;
Expand All @@ -216,6 +224,8 @@ public function setChunkSize(int $chunkSize)
/**
* References the read watcher, so the loop keeps running in case there's an active read.
*
* @return void
*
* @see Loop::reference()
*/
public function reference()
Expand All @@ -230,6 +240,8 @@ public function reference()
/**
* Unreferences the read watcher, so the loop doesn't keep running even if there are active reads.
*
* @return void
*
* @see Loop::unreference()
*/
public function unreference()
Expand Down
21 changes: 14 additions & 7 deletions lib/ResourceOutputStream.php
Expand Up @@ -16,13 +16,13 @@ final class ResourceOutputStream implements OutputStream
const MAX_CONSECUTIVE_EMPTY_WRITES = 3;
const LARGE_CHUNK_SIZE = 128 * 1024;

/** @var resource */
/** @var resource|null */
private $resource;

/** @var string */
private $watcher;

/** @var \SplQueue */
/** @var \SplQueue<array> */
private $writes;

/** @var bool */
Expand Down Expand Up @@ -62,8 +62,8 @@ public function __construct($stream, int $chunkSize = null)

try {
while (!$writes->isEmpty()) {
/** @var \Amp\Deferred $deferred */
list($data, $previous, $deferred) = $writes->shift();
/** @var Deferred $deferred */
[$data, $previous, $deferred] = $writes->shift();
$length = \strlen($data);

if ($length === 0) {
Expand Down Expand Up @@ -125,9 +125,10 @@ public function __construct($stream, int $chunkSize = null)
$resource = null;
$writable = false;

/** @psalm-suppress PossiblyUndefinedVariable */
$deferred->fail($exception);
while (!$writes->isEmpty()) {
list(, , $deferred) = $writes->shift();
[, , $deferred] = $writes->shift();
$deferred->fail($exception);
}

Expand Down Expand Up @@ -265,6 +266,7 @@ public function close()
if ($meta && \strpos($meta["mode"], "+") !== false) {
@\stream_socket_shutdown($this->resource, \STREAM_SHUT_WR);
} else {
/** @psalm-suppress InvalidPropertyAssignmentValue psalm reports this as closed-resource */
@\fclose($this->resource);
}
}
Expand All @@ -274,6 +276,8 @@ public function close()

/**
* Nulls reference to resource, marks stream unwritable, and fails any pending write.
*
* @return void
*/
private function free()
{
Expand All @@ -283,8 +287,8 @@ private function free()
if (!$this->writes->isEmpty()) {
$exception = new ClosedException("The socket was closed before writing completed");
do {
/** @var \Amp\Deferred $deferred */
list(, , $deferred) = $this->writes->shift();
/** @var Deferred $deferred */
[, , $deferred] = $this->writes->shift();
$deferred->fail($exception);
} while (!$this->writes->isEmpty());
}
Expand All @@ -300,6 +304,9 @@ public function getResource()
return $this->resource;
}

/**
* @return void
*/
public function setChunkSize(int $chunkSize)
{
$this->chunkSize = $chunkSize;
Expand Down
12 changes: 11 additions & 1 deletion lib/ZlibInputStream.php
Expand Up @@ -10,9 +10,13 @@
*/
final class ZlibInputStream implements InputStream
{
/** @var InputStream|null */
private $source;
/** @var int */
private $encoding;
/** @var array */
private $options;
/** @var resource|null */
private $resource;

/**
Expand Down Expand Up @@ -45,9 +49,12 @@ public function read(): Promise
return null;
}

\assert($this->source !== null);

$data = yield $this->source->read();

// Needs a double guard, as stream might have been closed while reading
/** @psalm-suppress ParadoxicalCondition */
if ($this->resource === null) {
return null;
}
Expand All @@ -74,7 +81,10 @@ public function read(): Promise
});
}

/** @internal */
/**
* @internal
* @return void
*/
private function close()
{
$this->resource = null;
Expand Down

0 comments on commit b5b5fa7

Please sign in to comment.