From ac3d77a76acbc67da67b67a6f37f39807238d4f0 Mon Sep 17 00:00:00 2001 From: Nicolas Grekas Date: Sun, 5 Jan 2020 14:56:11 +0100 Subject: [PATCH] [HttpClient] Don't read from the network faster than the CPU can deal with --- .../Component/HttpClient/CurlHttpClient.php | 6 +- .../Component/HttpClient/NativeHttpClient.php | 4 +- .../HttpClient/Response/CurlResponse.php | 46 +++++---------- .../HttpClient/Response/MockResponse.php | 17 +----- .../HttpClient/Response/NativeResponse.php | 47 ++++----------- .../HttpClient/Response/ResponseTrait.php | 57 ++++++++++++++----- 6 files changed, 77 insertions(+), 100 deletions(-) diff --git a/src/Symfony/Component/HttpClient/CurlHttpClient.php b/src/Symfony/Component/HttpClient/CurlHttpClient.php index 494bf76a1043..727f878ccc0d 100644 --- a/src/Symfony/Component/HttpClient/CurlHttpClient.php +++ b/src/Symfony/Component/HttpClient/CurlHttpClient.php @@ -113,7 +113,7 @@ public function request(string $method, string $url, array $options = []): Respo $url = implode('', $url); if (!isset($options['normalized_headers']['user-agent'])) { - $options['normalized_headers']['user-agent'][] = $options['headers'][] = 'User-Agent: Symfony HttpClient/Curl'; + $options['headers'][] = 'User-Agent: Symfony HttpClient/Curl'; } $curlopts = [ @@ -194,8 +194,8 @@ public function request(string $method, string $url, array $options = []): Respo $curlopts[CURLOPT_NOSIGNAL] = true; } - if (!isset($options['normalized_headers']['accept-encoding']) && CURL_VERSION_LIBZ & self::$curlVersion['features']) { - $curlopts[CURLOPT_ENCODING] = 'gzip'; // Expose only one encoding, some servers mess up when more are provided + if (\extension_loaded('zlib') && !isset($options['normalized_headers']['accept-encoding'])) { + $options['headers'][] = 'Accept-Encoding: gzip'; // Expose only one encoding, some servers mess up when more are provided } foreach ($options['headers'] as $header) { diff --git a/src/Symfony/Component/HttpClient/NativeHttpClient.php b/src/Symfony/Component/HttpClient/NativeHttpClient.php index 7081842ddf4e..06b1f3ad7121 100644 --- a/src/Symfony/Component/HttpClient/NativeHttpClient.php +++ b/src/Symfony/Component/HttpClient/NativeHttpClient.php @@ -77,7 +77,7 @@ public function request(string $method, string $url, array $options = []): Respo $options['headers'][] = 'Content-Type: application/x-www-form-urlencoded'; } - if ($gzipEnabled = \extension_loaded('zlib') && !isset($options['normalized_headers']['accept-encoding'])) { + if (\extension_loaded('zlib') && !isset($options['normalized_headers']['accept-encoding'])) { // gzip is the most widely available algo, no need to deal with deflate $options['headers'][] = 'Accept-Encoding: gzip'; } @@ -210,7 +210,7 @@ public function request(string $method, string $url, array $options = []): Respo $context = stream_context_create($context, ['notification' => $notification]); self::configureHeadersAndProxy($context, $host, $options['headers'], $proxy, $noProxy); - return new NativeResponse($this->multi, $context, implode('', $url), $options, $gzipEnabled, $info, $resolveRedirect, $onProgress, $this->logger); + return new NativeResponse($this->multi, $context, implode('', $url), $options, $info, $resolveRedirect, $onProgress, $this->logger); } /** diff --git a/src/Symfony/Component/HttpClient/Response/CurlResponse.php b/src/Symfony/Component/HttpClient/Response/CurlResponse.php index 13320acfbba9..51b815b47a1c 100644 --- a/src/Symfony/Component/HttpClient/Response/CurlResponse.php +++ b/src/Symfony/Component/HttpClient/Response/CurlResponse.php @@ -52,6 +52,7 @@ public function __construct(CurlClientState $multi, $ch, array $options = null, $this->id = $id = (int) $ch; $this->logger = $logger; + $this->shouldBuffer = $options['buffer'] ?? true; $this->timeout = $options['timeout'] ?? null; $this->info['http_method'] = $method; $this->info['user_data'] = $options['user_data'] ?? null; @@ -65,30 +66,25 @@ public function __construct(CurlClientState $multi, $ch, array $options = null, curl_setopt($ch, CURLOPT_PRIVATE, \in_array($method, ['GET', 'HEAD', 'OPTIONS', 'TRACE'], true) && 1.0 < (float) ($options['http_version'] ?? 1.1) ? 'H2' : 'H0'); // H = headers + retry counter } - if (null === $content = &$this->content) { - $content = ($options['buffer'] ?? true) ? fopen('php://temp', 'w+') : null; - } else { - // Move the pushed response to the activity list - if (ftell($content)) { - rewind($content); - $multi->handlesActivity[$id][] = stream_get_contents($content); - } - $content = ($options['buffer'] ?? true) ? $content : null; - } - curl_setopt($ch, CURLOPT_HEADERFUNCTION, static function ($ch, string $data) use (&$info, &$headers, $options, $multi, $id, &$location, $resolveRedirect, $logger): int { return self::parseHeaderLine($ch, $data, $info, $headers, $options, $multi, $id, $location, $resolveRedirect, $logger); }); if (null === $options) { // Pushed response: buffer until requested - curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use (&$content): int { - return fwrite($content, $data); + curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use ($multi, $id): int { + $multi->handlesActivity[$id][] = $data; + curl_pause($ch, CURLPAUSE_RECV); + + return \strlen($data); }); return; } + $this->inflate = !isset($options['normalized_headers']['accept-encoding']); + curl_pause($ch, CURLPAUSE_CONT); + if ($onProgress = $options['on_progress']) { $url = isset($info['url']) ? ['url' => $info['url']] : []; curl_setopt($ch, CURLOPT_NOPROGRESS, false); @@ -108,33 +104,16 @@ public function __construct(CurlClientState $multi, $ch, array $options = null, }); } - curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use (&$content, $multi, $id): int { + curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use ($multi, $id): int { $multi->handlesActivity[$id][] = $data; - return null !== $content ? fwrite($content, $data) : \strlen($data); + return \strlen($data); }); $this->initializer = static function (self $response) { - if (null !== $response->info['error']) { - throw new TransportException($response->info['error']); - } - $waitFor = curl_getinfo($ch = $response->handle, CURLINFO_PRIVATE); - if ('H' === $waitFor[0] || 'D' === $waitFor[0]) { - try { - foreach (self::stream([$response]) as $chunk) { - if ($chunk->isFirst()) { - break; - } - } - } catch (\Throwable $e) { - // Persist timeouts thrown during initialization - $response->info['error'] = $e->getMessage(); - $response->close(); - throw $e; - } - } + return 'H' === $waitFor[0] || 'D' === $waitFor[0]; }; // Schedule the request in a non-blocking way @@ -221,6 +200,7 @@ public function __destruct() */ private function close(): void { + $this->inflate = null; unset($this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]); curl_multi_remove_handle($this->multi->handle, $this->handle); curl_setopt_array($this->handle, [ diff --git a/src/Symfony/Component/HttpClient/Response/MockResponse.php b/src/Symfony/Component/HttpClient/Response/MockResponse.php index 4cb8be180b48..01c3f605a7bb 100644 --- a/src/Symfony/Component/HttpClient/Response/MockResponse.php +++ b/src/Symfony/Component/HttpClient/Response/MockResponse.php @@ -93,6 +93,7 @@ public function cancel(): void */ protected function close(): void { + $this->inflate = null; $this->body = []; } @@ -104,16 +105,9 @@ public static function fromRequest(string $method, string $url, array $options, $response = new self([]); $response->requestOptions = $options; $response->id = ++self::$idSequence; - $response->content = ($options['buffer'] ?? true) ? fopen('php://temp', 'w+') : null; + $response->shouldBuffer = $options['buffer'] ?? true; $response->initializer = static function (self $response) { - if (null !== $response->info['error']) { - throw new TransportException($response->info['error']); - } - - if (\is_array($response->body[0] ?? null)) { - // Consume the first chunk if it's not yielded yet - self::stream([$response])->current(); - } + return \is_array($response->body[0] ?? null); }; $response->info['redirect_count'] = 0; @@ -186,11 +180,6 @@ protected static function perform(ClientState $multi, array &$responses): void } else { // Data or timeout chunk $multi->handlesActivity[$id][] = $chunk; - - if (\is_string($chunk) && null !== $response->content) { - // Buffer response body - fwrite($response->content, $chunk); - } } } } diff --git a/src/Symfony/Component/HttpClient/Response/NativeResponse.php b/src/Symfony/Component/HttpClient/Response/NativeResponse.php index 383603dd2883..f8c19fbf8c7d 100644 --- a/src/Symfony/Component/HttpClient/Response/NativeResponse.php +++ b/src/Symfony/Component/HttpClient/Response/NativeResponse.php @@ -32,14 +32,13 @@ final class NativeResponse implements ResponseInterface private $onProgress; private $remaining; private $buffer; - private $inflate; private $multi; private $debugBuffer; /** * @internal */ - public function __construct(NativeClientState $multi, $context, string $url, $options, bool $gzipEnabled, array &$info, callable $resolveRedirect, ?callable $onProgress, ?LoggerInterface $logger) + public function __construct(NativeClientState $multi, $context, string $url, array $options, array &$info, callable $resolveRedirect, ?callable $onProgress, ?LoggerInterface $logger) { $this->multi = $multi; $this->id = (int) $context; @@ -50,27 +49,17 @@ public function __construct(NativeClientState $multi, $context, string $url, $op $this->info = &$info; $this->resolveRedirect = $resolveRedirect; $this->onProgress = $onProgress; - $this->content = $options['buffer'] ? fopen('php://temp', 'w+') : null; + $this->inflate = !isset($options['normalized_headers']['accept-encoding']); + $this->shouldBuffer = $options['buffer'] ?? true; - // Temporary resources to dechunk/inflate the response stream + // Temporary resource to dechunk the response stream $this->buffer = fopen('php://temp', 'w+'); - $this->inflate = $gzipEnabled ? inflate_init(ZLIB_ENCODING_GZIP) : null; $info['user_data'] = $options['user_data']; ++$multi->responseCount; $this->initializer = static function (self $response) { - if (null !== $response->info['error']) { - throw new TransportException($response->info['error']); - } - - if (null === $response->remaining) { - foreach (self::stream([$response]) as $chunk) { - if ($chunk->isFirst()) { - break; - } - } - } + return null === $response->remaining; }; } @@ -165,7 +154,7 @@ private function open(): void stream_set_blocking($h, false); $this->context = $this->resolveRedirect = null; - // Create dechunk and inflate buffers + // Create dechunk buffers if (isset($this->headers['content-length'])) { $this->remaining = (int) $this->headers['content-length'][0]; } elseif ('chunked' === ($this->headers['transfer-encoding'][0] ?? null)) { @@ -175,10 +164,6 @@ private function open(): void $this->remaining = -2; } - if ($this->inflate && 'gzip' !== ($this->headers['content-encoding'][0] ?? null)) { - $this->inflate = null; - } - $this->multi->handlesActivity[$this->id] = [new FirstChunk()]; if ('HEAD' === $context['http']['method'] || \in_array($this->info['http_code'], [204, 304], true)) { @@ -188,7 +173,7 @@ private function open(): void return; } - $this->multi->openHandles[$this->id] = [$h, $this->buffer, $this->inflate, $this->content, $this->onProgress, &$this->remaining, &$this->info]; + $this->multi->openHandles[$this->id] = [$h, $this->buffer, $this->onProgress, &$this->remaining, &$this->info]; } /** @@ -228,15 +213,15 @@ private static function perform(NativeClientState $multi, array &$responses = nu $multi->handles = []; } - foreach ($multi->openHandles as $i => [$h, $buffer, $inflate, $content, $onProgress]) { + foreach ($multi->openHandles as $i => [$h, $buffer, $onProgress]) { $hasActivity = false; - $remaining = &$multi->openHandles[$i][5]; - $info = &$multi->openHandles[$i][6]; + $remaining = &$multi->openHandles[$i][3]; + $info = &$multi->openHandles[$i][4]; $e = null; // Read incoming buffer and write it to the dechunk one try { - while ($remaining && '' !== $data = (string) fread($h, 0 > $remaining ? 16372 : $remaining)) { + if ($remaining && '' !== $data = (string) fread($h, 0 > $remaining ? 16372 : $remaining)) { fwrite($buffer, $data); $hasActivity = true; $multi->sleep = false; @@ -264,16 +249,8 @@ private static function perform(NativeClientState $multi, array &$responses = nu rewind($buffer); ftruncate($buffer, 0); - if (null !== $inflate && false === $data = @inflate_add($inflate, $data)) { - $e = new TransportException('Error while processing content unencoding.'); - } - - if ('' !== $data && null === $e) { + if (null === $e) { $multi->handlesActivity[$i][] = $data; - - if (null !== $content && \strlen($data) !== fwrite($content, $data)) { - $e = new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($data))); - } } } diff --git a/src/Symfony/Component/HttpClient/Response/ResponseTrait.php b/src/Symfony/Component/HttpClient/Response/ResponseTrait.php index 52e413a07ad3..d249d6304878 100644 --- a/src/Symfony/Component/HttpClient/Response/ResponseTrait.php +++ b/src/Symfony/Component/HttpClient/Response/ResponseTrait.php @@ -39,11 +39,6 @@ trait ResponseTrait */ private $initializer; - /** - * @var resource A php://temp stream typically - */ - private $content; - private $info = [ 'response_headers' => [], 'http_code' => 0, @@ -54,6 +49,9 @@ trait ResponseTrait private $handle; private $id; private $timeout; + private $inflate; + private $shouldBuffer; + private $content; private $finalInfo; private $offset = 0; private $jsonData; @@ -64,8 +62,7 @@ trait ResponseTrait public function getStatusCode(): int { if ($this->initializer) { - ($this->initializer)($this); - $this->initializer = null; + self::initialize($this); } return $this->info['http_code']; @@ -77,8 +74,7 @@ public function getStatusCode(): int public function getHeaders(bool $throw = true): array { if ($this->initializer) { - ($this->initializer)($this); - $this->initializer = null; + self::initialize($this); } if ($throw) { @@ -94,8 +90,7 @@ public function getHeaders(bool $throw = true): array public function getContent(bool $throw = true): string { if ($this->initializer) { - ($this->initializer)($this); - $this->initializer = null; + self::initialize($this); } if ($throw) { @@ -201,6 +196,30 @@ abstract protected static function perform(ClientState $multi, array &$responses */ abstract protected static function select(ClientState $multi, float $timeout): int; + private static function initialize(self $response): void + { + if (null !== $response->info['error']) { + throw new TransportException($response->info['error']); + } + + try { + if (($response->initializer)($response)) { + foreach (self::stream([$response]) as $chunk) { + if ($chunk->isFirst()) { + break; + } + } + } + } catch (\Throwable $e) { + // Persist timeouts thrown during initialization + $response->info['error'] = $e->getMessage(); + $response->close(); + throw $e; + } + + $response->initializer = null; + } + private static function addResponseHeaders(array $responseHeaders, array &$info, array &$headers, string &$debug = ''): void { foreach ($responseHeaders as $h) { @@ -246,8 +265,7 @@ private function checkStatusCode() private function doDestruct() { if ($this->initializer && null === $this->info['error']) { - ($this->initializer)($this); - $this->initializer = null; + self::initialize($this); $this->checkStatusCode(); } } @@ -299,6 +317,16 @@ public static function stream(iterable $responses, float $timeout = null): \Gene $isTimeout = false; if (\is_string($chunk = array_shift($multi->handlesActivity[$j]))) { + if (null !== $response->inflate && false === $chunk = @inflate_add($response->inflate, $chunk)) { + $multi->handlesActivity[$j] = [null, new TransportException('Error while processing content unencoding.')]; + continue; + } + + if ('' !== $chunk && null !== $response->content && \strlen($chunk) !== fwrite($response->content, $chunk)) { + $multi->handlesActivity[$j] = [null, new TransportException('Failed writing %d bytes to the response buffer.', \strlen($chunk))]; + continue; + } + $response->offset += \strlen($chunk); $chunk = new DataChunk($response->offset, $chunk); } elseif (null === $chunk) { @@ -326,6 +354,9 @@ public static function stream(iterable $responses, float $timeout = null): \Gene $response->logger->info(sprintf('Response: "%s %s"', $info['http_code'], $info['url'])); } + $response->inflate = \extension_loaded('zlib') && $response->inflate && 'gzip' === ($response->headers['content-encoding'][0] ?? null) ? inflate_init(ZLIB_ENCODING_GZIP) : null; + $response->content = $response->shouldBuffer ? fopen('php://temp', 'w+') : null; + yield $response => $chunk; if ($response->initializer && null === $response->info['error']) {