Skip to content

Commit

Permalink
Merge pull request #37 from amphp/fix-immediate
Browse files Browse the repository at this point in the history
Defer immediate reads
  • Loading branch information
trowski committed Mar 9, 2018
2 parents a4739c8 + 062f16d commit ca9128a
Showing 1 changed file with 22 additions and 1 deletion.
23 changes: 22 additions & 1 deletion lib/ResourceInputStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ final class ResourceInputStream implements InputStream {
/** @var bool */
private $useSingleRead;

/** @var callable */
private $immediateCallable;

/** @var string */
private $immediateWatcher;

/**
* @param resource $stream Stream resource.
* @param int $chunkSize Chunk size per read operation.
Expand Down Expand Up @@ -88,6 +94,12 @@ public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE)
}
});

$this->immediateCallable = static function ($watcherId, $data) use (&$deferred) {
$temp = $deferred;
$deferred = null;
$temp->resolve($data);
};

Loop::disable($this->watcher);
}

Expand Down Expand Up @@ -126,7 +138,12 @@ public function read(): Promise {
}
}

return new Success($data);
// Prevent an immediate read → write loop from blocking everything
// See e.g. examples/benchmark-throughput.php
$this->deferred = new Deferred;
$this->immediateWatcher = Loop::defer($this->immediateCallable, $data);

return $this->deferred->promise();
}

/**
Expand Down Expand Up @@ -163,6 +180,10 @@ private function free() {
}

Loop::cancel($this->watcher);

if ($this->immediateWatcher !== null) {
Loop::cancel($this->immediateWatcher);
}
}

/**
Expand Down

0 comments on commit ca9128a

Please sign in to comment.