From 4fd593f86945b02d71eaaaf91bd2588777628d16 Mon Sep 17 00:00:00 2001 From: Nyholm Date: Thu, 26 Sep 2019 11:41:29 +0200 Subject: [PATCH] [HttpClient] Async HTTPlug client --- composer.json | 1 + src/Symfony/Component/HttpClient/CHANGELOG.md | 2 +- .../Component/HttpClient/HttplugClient.php | 254 +++++++++++++++--- .../HttpClient/Response/HttplugPromise.php | 73 +++++ .../HttpClient/Tests/HttplugClientTest.php | 60 +++++ .../Component/HttpClient/composer.json | 1 + 6 files changed, 357 insertions(+), 34 deletions(-) create mode 100644 src/Symfony/Component/HttpClient/Response/HttplugPromise.php diff --git a/composer.json b/composer.json index 61b64b53d10d..ef3906cb328a 100644 --- a/composer.json +++ b/composer.json @@ -108,6 +108,7 @@ "doctrine/orm": "~2.4,>=2.4.5", "doctrine/reflection": "~1.0", "doctrine/doctrine-bundle": "~1.4", + "guzzlehttp/promises": "^1.3.1", "masterminds/html5": "^2.6", "monolog/monolog": "^1.25.1", "nyholm/psr7": "^1.0", diff --git a/src/Symfony/Component/HttpClient/CHANGELOG.md b/src/Symfony/Component/HttpClient/CHANGELOG.md index 973bb6107b96..c36aeb65ce6c 100644 --- a/src/Symfony/Component/HttpClient/CHANGELOG.md +++ b/src/Symfony/Component/HttpClient/CHANGELOG.md @@ -5,7 +5,7 @@ CHANGELOG ----- * added `StreamWrapper` - * added `HttplugClient` + * added `HttplugClient` with support for sync and async requests * added `max_duration` option * added support for NTLM authentication * added `$response->toStream()` to cast responses to regular PHP streams diff --git a/src/Symfony/Component/HttpClient/HttplugClient.php b/src/Symfony/Component/HttpClient/HttplugClient.php index 71eb5200ce3f..6ba3cca62099 100644 --- a/src/Symfony/Component/HttpClient/HttplugClient.php +++ b/src/Symfony/Component/HttpClient/HttplugClient.php @@ -11,31 +11,38 @@ namespace Symfony\Component\HttpClient; +use GuzzleHttp\Promise\Promise as GuzzlePromise; use Http\Client\Exception\NetworkException; use Http\Client\Exception\RequestException; -use Http\Client\HttpClient; +use Http\Client\HttpAsyncClient; +use Http\Client\HttpClient as HttplugInterface; use Http\Message\RequestFactory; use Http\Message\StreamFactory; use Http\Message\UriFactory; -use Psr\Http\Client\ClientInterface; -use Psr\Http\Client\NetworkExceptionInterface; -use Psr\Http\Client\RequestExceptionInterface; +use Http\Promise\Promise; +use Http\Promise\RejectedPromise; +use Nyholm\Psr7\Factory\Psr17Factory; +use Nyholm\Psr7\Request; +use Nyholm\Psr7\Uri; +use Psr\Http\Message\RequestFactoryInterface; use Psr\Http\Message\RequestInterface; use Psr\Http\Message\ResponseFactoryInterface; -use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\ResponseInterface as Psr7ResponseInterface; use Psr\Http\Message\StreamFactoryInterface; use Psr\Http\Message\StreamInterface; +use Psr\Http\Message\UriFactoryInterface; use Psr\Http\Message\UriInterface; +use Symfony\Component\HttpClient\Response\HttplugPromise; +use Symfony\Component\HttpClient\Response\ResponseTrait; +use Symfony\Component\HttpClient\Response\StreamWrapper; +use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface; use Symfony\Contracts\HttpClient\HttpClientInterface; +use Symfony\Contracts\HttpClient\ResponseInterface; -if (!interface_exists(HttpClient::class)) { +if (!interface_exists(HttplugInterface::class)) { throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "php-http/httplug" package is not installed. Try running "composer require php-http/httplug".'); } -if (!interface_exists(ClientInterface::class)) { - throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "psr/http-client" package is not installed. Try running "composer require psr/http-client".'); -} - if (!interface_exists(RequestFactory::class)) { throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "php-http/message-factory" package is not installed. Try running "composer require nyholm/psr7".'); } @@ -43,42 +50,166 @@ /** * An adapter to turn a Symfony HttpClientInterface into an Httplug client. * - * Run "composer require psr/http-client" to install the base ClientInterface. Run - * "composer require nyholm/psr7" to install an efficient implementation of response + * Run "composer require nyholm/psr7" to install an efficient implementation of response * and stream factories with flex-provided autowiring aliases. * * @author Nicolas Grekas */ -final class HttplugClient implements HttpClient, RequestFactory, StreamFactory, UriFactory +final class HttplugClient implements HttplugInterface, HttpAsyncClient, RequestFactory, StreamFactory, UriFactory { private $client; + private $responseFactory; + private $streamFactory; + private $promisePool = []; + private $pendingResponse; public function __construct(HttpClientInterface $client = null, ResponseFactoryInterface $responseFactory = null, StreamFactoryInterface $streamFactory = null) { - $this->client = new Psr18Client($client, $responseFactory, $streamFactory); + $this->client = $client ?? HttpClient::create(); + $this->responseFactory = $responseFactory; + $this->streamFactory = $streamFactory ?? ($responseFactory instanceof StreamFactoryInterface ? $responseFactory : null); + $this->promisePool = new \SplObjectStorage(); + + if (null !== $this->responseFactory && null !== $this->streamFactory) { + return; + } + + if (!class_exists(Psr17Factory::class)) { + throw new \LogicException('You cannot use the "Symfony\Component\HttpClient\HttplugClient" as no PSR-17 factories have been provided. Try running "composer require nyholm/psr7".'); + } + + $psr17Factory = new Psr17Factory(); + $this->responseFactory = $this->responseFactory ?? $psr17Factory; + $this->streamFactory = $this->streamFactory ?? $psr17Factory; } /** * {@inheritdoc} */ - public function sendRequest(RequestInterface $request): ResponseInterface + public function sendRequest(RequestInterface $request): Psr7ResponseInterface { try { - return $this->client->sendRequest($request); - } catch (RequestExceptionInterface $e) { - throw new RequestException($e->getMessage(), $request, $e); - } catch (NetworkExceptionInterface $e) { + return $this->createPsr7Response($this->sendPsr7Request($request)); + } catch (TransportExceptionInterface $e) { throw new NetworkException($e->getMessage(), $request, $e); } } + /** + * {@inheritdoc} + * + * @return HttplugPromise + */ + public function sendAsyncRequest(RequestInterface $request): Promise + { + if (!class_exists(GuzzlePromise::class)) { + throw new \LogicException(sprintf('You cannot use "%s()" as the "guzzlehttp/promises" package is not installed. Try running "composer require guzzlehttp/promises".', __METHOD__)); + } + + try { + $response = $this->sendPsr7Request($request, true); + } catch (NetworkException $e) { + return new RejectedPromise($e); + } + + $cancel = function () use ($response) { + $response->cancel(); + unset($this->promisePool[$response]); + }; + + $promise = new GuzzlePromise(function () use ($response) { + $this->pendingResponse = $response; + $this->wait(); + }, $cancel); + + $this->promisePool[$response] = [$request, $promise]; + + return new HttplugPromise($promise, $cancel); + } + + /** + * Resolve pending promises that complete before the timeouts are reached. + * + * When $maxDuration is null and $idleTimeout is reached, promises are rejected. + * + * @return int The number of remaining pending promises + */ + public function wait(float $maxDuration = null, float $idleTimeout = null): int + { + $pendingResponse = $this->pendingResponse; + $this->pendingResponse = null; + + if (null !== $maxDuration) { + $startTime = microtime(true); + $idleTimeout = max(0.0, min($maxDuration / 5, $idleTimeout ?? $maxDuration)); + $remainingDuration = $maxDuration; + } + + do { + foreach ($this->client->stream($this->promisePool, $idleTimeout) as $response => $chunk) { + try { + if (null !== $maxDuration && $chunk->isTimeout()) { + goto check_duration; + } + + if ($chunk->isFirst()) { + // Deactivate throwing on 3/4/5xx + $response->getStatusCode(); + } + + if (!$chunk->isLast()) { + goto check_duration; + } + + if ([$request, $promise] = $this->promisePool[$response] ?? null) { + unset($this->promisePool[$response]); + $promise->resolve($this->createPsr7Response($response, true)); + } + } catch (\Exception $e) { + if ([$request, $promise] = $this->promisePool[$response] ?? null) { + unset($this->promisePool[$response]); + + if ($e instanceof TransportExceptionInterface) { + $e = new NetworkException($e->getMessage(), $request, $e); + } + + $promise->reject($e); + } + } + + if ($pendingResponse === $response) { + return \count($this->promisePool); + } + + check_duration: + if (null !== $maxDuration && $idleTimeout && $idleTimeout > $remainingDuration = max(0.0, $maxDuration - microtime(true) + $startTime)) { + $idleTimeout = $remainingDuration / 5; + break; + } + } + + if (!$count = \count($this->promisePool)) { + return 0; + } + } while (null !== $maxDuration && 0 < $remainingDuration); + + return $count; + } + /** * {@inheritdoc} */ public function createRequest($method, $uri, array $headers = [], $body = null, $protocolVersion = '1.1'): RequestInterface { - $request = $this->client - ->createRequest($method, $uri) + if ($this->responseFactory instanceof RequestFactoryInterface) { + $request = $this->responseFactory->createRequest($method, $uri); + } elseif (!class_exists(Request::class)) { + throw new \LogicException(sprintf('You cannot use "%s()" as the "nyholm/psr7" package is not installed. Try running "composer require nyholm/psr7".', __METHOD__)); + } else { + $request = new Request($method, $uri); + } + + $request = $request ->withProtocolVersion($protocolVersion) ->withBody($this->createStream($body)) ; @@ -100,27 +231,84 @@ public function createStream($body = null): StreamInterface } if (\is_string($body ?? '')) { - $body = $this->client->createStream($body ?? ''); - - if ($body->isSeekable()) { - $body->seek(0); - } - - return $body; + $stream = $this->streamFactory->createStream($body ?? ''); + } elseif (\is_resource($body)) { + $stream = $this->streamFactory->createStreamFromResource($body); + } else { + throw new \InvalidArgumentException(sprintf('%s() expects string, resource or StreamInterface, %s given.', __METHOD__, \gettype($body))); } - if (\is_resource($body)) { - return $this->client->createStreamFromResource($body); + if ($stream->isSeekable()) { + $stream->seek(0); } - throw new \InvalidArgumentException(sprintf('%s() expects string, resource or StreamInterface, %s given.', __METHOD__, \gettype($body))); + return $stream; } /** * {@inheritdoc} */ - public function createUri($uri = ''): UriInterface + public function createUri($uri): UriInterface { - return $uri instanceof UriInterface ? $uri : $this->client->createUri($uri); + if ($uri instanceof UriInterface) { + return $uri; + } + + if ($this->responseFactory instanceof UriFactoryInterface) { + return $this->responseFactory->createUri($uri); + } + + if (!class_exists(Uri::class)) { + throw new \LogicException(sprintf('You cannot use "%s()" as the "nyholm/psr7" package is not installed. Try running "composer require nyholm/psr7".', __METHOD__)); + } + + return new Uri($uri); + } + + private function sendPsr7Request(RequestInterface $request, bool $buffer = null): ResponseInterface + { + try { + $body = $request->getBody(); + + if ($body->isSeekable()) { + $body->seek(0); + } + + return $this->client->request($request->getMethod(), (string) $request->getUri(), [ + 'headers' => $request->getHeaders(), + 'body' => $body->getContents(), + 'http_version' => '1.0' === $request->getProtocolVersion() ? '1.0' : null, + 'buffer' => $buffer, + ]); + } catch (\InvalidArgumentException $e) { + throw new RequestException($e->getMessage(), $request, $e); + } catch (TransportExceptionInterface $e) { + throw new NetworkException($e->getMessage(), $request, $e); + } + } + + private function createPsr7Response(ResponseInterface $response, bool $buffer = false): Psr7ResponseInterface + { + $psrResponse = $this->responseFactory->createResponse($response->getStatusCode()); + + foreach ($response->getHeaders(false) as $name => $values) { + foreach ($values as $value) { + $psrResponse = $psrResponse->withAddedHeader($name, $value); + } + } + + if (isset(class_uses($response)[ResponseTrait::class])) { + $body = $this->streamFactory->createStreamFromResource($response->toStream(false)); + } elseif (!$buffer) { + $body = $this->streamFactory->createStreamFromResource(StreamWrapper::createResource($response, $this->client)); + } else { + $body = $this->streamFactory->createStream($response->getContent(false)); + } + + if ($body->isSeekable()) { + $body->seek(0); + } + + return $psrResponse->withBody($body); } } diff --git a/src/Symfony/Component/HttpClient/Response/HttplugPromise.php b/src/Symfony/Component/HttpClient/Response/HttplugPromise.php new file mode 100644 index 000000000000..73637b38f729 --- /dev/null +++ b/src/Symfony/Component/HttpClient/Response/HttplugPromise.php @@ -0,0 +1,73 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\HttpClient\Response; + +use GuzzleHttp\Promise\PromiseInterface as GuzzlePromiseInterface; +use Http\Promise\Promise as HttplugPromiseInterface; +use Psr\Http\Message\ResponseInterface as Psr7ResponseInterface; + +/** + * @author Tobias Nyholm + * + * @internal + */ +final class HttplugPromise implements HttplugPromiseInterface +{ + private $promise; + private $cancel; + + public function __construct(GuzzlePromiseInterface $promise, callable $cancel = null) + { + $this->promise = $promise; + $this->cancel = $cancel; + } + + public function then(callable $onFulfilled = null, callable $onRejected = null): self + { + return new self($this->promise->then($onFulfilled, $onRejected)); + } + + public function cancel(): void + { + $this->promise->cancel(); + } + + /** + * {@inheritdoc} + */ + public function getState(): string + { + return $this->promise->getState(); + } + + /** + * {@inheritdoc} + * + * @return Psr7ResponseInterface|mixed + */ + public function wait($unwrap = true) + { + return $this->promise->wait($unwrap); + } + + public function __destruct() + { + if ($this->cancel) { + ($this->cancel)(); + } + } + + public function __wakeup() + { + throw new \BadMethodCallException('Cannot unserialize '.__CLASS__); + } +} diff --git a/src/Symfony/Component/HttpClient/Tests/HttplugClientTest.php b/src/Symfony/Component/HttpClient/Tests/HttplugClientTest.php index 05ff05073aed..8cd339f2f8f0 100644 --- a/src/Symfony/Component/HttpClient/Tests/HttplugClientTest.php +++ b/src/Symfony/Component/HttpClient/Tests/HttplugClientTest.php @@ -13,7 +13,9 @@ use Http\Client\Exception\NetworkException; use Http\Client\Exception\RequestException; +use Http\Promise\Promise; use PHPUnit\Framework\TestCase; +use Psr\Http\Message\ResponseInterface; use Symfony\Component\HttpClient\HttplugClient; use Symfony\Component\HttpClient\NativeHttpClient; use Symfony\Contracts\HttpClient\Test\TestHttpServer; @@ -41,6 +43,38 @@ public function testSendRequest() $this->assertSame('HTTP/1.1', $body['SERVER_PROTOCOL']); } + public function testSendAsyncRequest() + { + $client = new HttplugClient(new NativeHttpClient()); + + $promise = $client->sendAsyncRequest($client->createRequest('GET', 'http://localhost:8057')); + $successCallableCalled = false; + $failureCallableCalled = false; + $promise->then(function (ResponseInterface $response) use (&$successCallableCalled) { + $successCallableCalled = true; + + return $response; + }, function (\Exception $exception) use (&$failureCallableCalled) { + $failureCallableCalled = true; + + throw $exception; + }); + + $this->assertEquals(Promise::PENDING, $promise->getState()); + + $response = $promise->wait(true); + $this->assertTrue($successCallableCalled, '$promise->then() was never called.'); + $this->assertFalse($failureCallableCalled, 'Failure callable should not be called when request is successful.'); + $this->assertEquals(Promise::FULFILLED, $promise->getState()); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('application/json', $response->getHeaderLine('content-type')); + + $body = json_decode((string) $response->getBody(), true); + + $this->assertSame('HTTP/1.1', $body['SERVER_PROTOCOL']); + } + public function testPostRequest() { $client = new HttplugClient(new NativeHttpClient()); @@ -62,6 +96,32 @@ public function testNetworkException() $client->sendRequest($client->createRequest('GET', 'http://localhost:8058')); } + public function testAsyncNetworkException() + { + $client = new HttplugClient(new NativeHttpClient()); + + $promise = $client->sendAsyncRequest($client->createRequest('GET', 'http://localhost:8058')); + $successCallableCalled = false; + $failureCallableCalled = false; + $promise->then(function (ResponseInterface $response) use (&$successCallableCalled) { + $successCallableCalled = true; + + return $response; + }, function (\Exception $exception) use (&$failureCallableCalled) { + $failureCallableCalled = true; + + throw $exception; + }); + + $promise->wait(false); + $this->assertFalse($successCallableCalled, 'Success callable should not be called when request fails.'); + $this->assertTrue($failureCallableCalled, 'Failure callable was never called.'); + $this->assertEquals(Promise::REJECTED, $promise->getState()); + + $this->expectException(NetworkException::class); + $promise->wait(true); + } + public function testRequestException() { $client = new HttplugClient(new NativeHttpClient()); diff --git a/src/Symfony/Component/HttpClient/composer.json b/src/Symfony/Component/HttpClient/composer.json index cf45723b2dd1..5a5512a3e71e 100644 --- a/src/Symfony/Component/HttpClient/composer.json +++ b/src/Symfony/Component/HttpClient/composer.json @@ -26,6 +26,7 @@ "symfony/polyfill-php73": "^1.11" }, "require-dev": { + "guzzlehttp/promises": "^1.3.1", "nyholm/psr7": "^1.0", "php-http/httplug": "^1.0|^2.0", "psr/http-client": "^1.0",