Skip to content

Commit

Permalink
Add PSR-7 request adapter (#266)
Browse files Browse the repository at this point in the history
  • Loading branch information
remorhaz committed May 11, 2020
1 parent 5df8c18 commit b2a39ca
Show file tree
Hide file tree
Showing 8 changed files with 910 additions and 0 deletions.
41 changes: 41 additions & 0 deletions src/Body/PsrStreamBody.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

declare(strict_types=1);

namespace Amp\Http\Client\Body;

use Amp\ByteStream\InputStream;
use Amp\Http\Client\Internal\PsrInputStream;
use Amp\Http\Client\RequestBody;
use Amp\Promise;
use Amp\Success;
use Psr\Http\Message\StreamInterface;

final class PsrStreamBody implements RequestBody
{

/**
* @var StreamInterface
*/
private $stream;

public function __construct(StreamInterface $stream)
{
$this->stream = $stream;
}

public function getBodyLength(): Promise
{
return new Success($this->stream->getSize() ?? -1);
}

public function getHeaders(): Promise
{
return new Success([]);
}

public function createBodyStream(): InputStream
{
return new PsrInputStream($this->stream);
}
}
63 changes: 63 additions & 0 deletions src/Internal/PsrInputStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php

declare(strict_types=1);

namespace Amp\Http\Client\Internal;

use Amp\ByteStream\InputStream;
use Amp\Promise;
use Amp\Success;
use Psr\Http\Message\StreamInterface;

/**
* @internal
*/
final class PsrInputStream implements InputStream
{

private const DEFAULT_CHUNK_SIZE = 8192;

/**
* @var StreamInterface
*/
private $stream;

/**
* @var int
*/
private $chunkSize;

/**
* @var bool
*/
private $tryRewind = true;

public function __construct(StreamInterface $stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE)
{
if ($chunkSize < 1) {
throw new \Error("Invalid chunk size: {$chunkSize}");
}
$this->stream = $stream;
$this->chunkSize = $chunkSize;
}

public function read(): Promise
{
if (!$this->stream->isReadable()) {
return new Success();
}
if ($this->tryRewind) {
$this->tryRewind = false;
if ($this->stream->isSeekable()) {
$this->stream->rewind();
}
}
if ($this->stream->eof()) {
return new Success();
}

$data = $this->stream->read($this->chunkSize);

return new Success($data);
}
}
193 changes: 193 additions & 0 deletions src/Internal/PsrRequestStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
<?php

declare(strict_types=1);

namespace Amp\Http\Client\Internal;

use Amp\ByteStream\InputStream;
use Psr\Http\Message\StreamInterface;

use function Amp\Promise\timeout;
use function Amp\Promise\wait;

/**
* @internal
*/
final class PsrRequestStream implements StreamInterface
{

private const DEFAULT_TIMEOUT = 5000;

/**
* @var InputStream
*/
private $stream;

/**
* @var int
*/
private $timeout;

/**
* @var string
*/
private $buffer = '';

/**
* @var bool
*/
private $isEof = false;

public function __construct(InputStream $stream, int $timeout = self::DEFAULT_TIMEOUT)
{
$this->stream = $stream;
$this->timeout = $timeout;
}

public function __toString()
{
try {
return $this->getContents();
} catch (\Throwable $e) {
return '';
}
}

public function close(): void
{
unset($this->stream);
}

public function eof(): bool
{
return $this->isEof;
}

public function tell()
{
throw new \RuntimeException("Source stream is not seekable");
}

/**
* @return null
*/
public function getSize()
{
return null;
}

/**
* @return bool
*/
public function isSeekable()
{
return false;
}

/**
* @param int $offset
* @param int $whence
* @return void
*/
public function seek($offset, $whence = SEEK_SET)
{
throw new \RuntimeException("Source stream is not seekable");
}

/**
* @return void
*/
public function rewind()
{
throw new \RuntimeException("Source stream is not seekable");
}

/**
* @return bool
*/
public function isWritable()
{
return false;
}

public function write($string)
{
throw new \RuntimeException("Source stream is not writable");
}

/**
* @param string|null $key
* @return array|null
*/
public function getMetadata($key = null)
{
return isset($key) ? null : [];
}

/**
* @return null
*/
public function detach()
{
unset($this->stream);

return null;
}

/**
* @return bool
*/
public function isReadable()
{
return isset($this->stream);
}

/**
* @param int $length
* @return string
*/
public function read($length)
{
while (!$this->isEof && \strlen($this->buffer) < $length) {
$this->buffer .= $this->readFromStream();
}

$data = \substr($this->buffer, 0, $length);
$this->buffer = \substr($this->buffer, \strlen($data));

return $data;
}

private function readFromStream(): string
{
$data = wait(timeout($this->getOpenStream()->read(), $this->timeout));
if (!isset($data)) {
$this->isEof = true;

return '';
}
if (\is_string($data)) {
return $data;
}

throw new \RuntimeException("Invalid data received from stream");
}

private function getOpenStream(): InputStream
{
if (isset($this->stream)) {
return $this->stream;
}

throw new \RuntimeException("Stream is closed");
}

public function getContents()
{
while (!$this->isEof) {
$this->buffer .= $this->readFromStream();
}

return $this->buffer;
}
}
56 changes: 56 additions & 0 deletions src/PsrAdapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

declare(strict_types=1);

namespace Amp\Http\Client;

use Amp\Http\Client\Body\PsrStreamBody;
use Amp\Http\Client\Internal\PsrRequestStream;
use Psr\Http\Message\RequestFactoryInterface;
use Psr\Http\Message\RequestInterface;

final class PsrAdapter
{

public function fromPsrRequest(RequestInterface $source): Request
{
$target = new Request($source->getUri(), $source->getMethod());
$target->setHeaders($source->getHeaders());
$target->setProtocolVersions([$source->getProtocolVersion()]);
$target->setBody(new PsrStreamBody($source->getBody()));

return $target;
}

public function toPsrRequest(
RequestFactoryInterface $requestFactory,
Request $source,
?string $protocolVersion = null
): RequestInterface {
$target = $requestFactory
->createRequest($source->getMethod(), $source->getUri())
->withBody(new PsrRequestStream($source->getBody()->createBodyStream()));
foreach ($source->getHeaders() as $headerName => $headerValues) {
$target = $target->withHeader($headerName, $headerValues);
}
$protocolVersions = $source->getProtocolVersions();
if (isset($protocolVersion)) {
if (!\in_array($protocolVersion, $protocolVersions)) {
throw new \RuntimeException(
"Source request doesn't support provided HTTP protocol version: {$protocolVersion}"
);
}

return $target->withProtocolVersion($protocolVersion);
}
if (\count($protocolVersions) == 1) {
return $target->withProtocolVersion($protocolVersions[0]);
}

if (!\in_array($target->getProtocolVersion(), $protocolVersions)) {
throw new \RuntimeException("Can't choose HTTP protocol version automatically");
}

return $target;
}
}

0 comments on commit b2a39ca

Please sign in to comment.