Skip to content

Commit

Permalink
bug #35223 [HttpClient] Don't read from the network faster than the C…
Browse files Browse the repository at this point in the history
…PU can deal with (nicolas-grekas)

This PR was merged into the 4.3 branch.

Discussion
----------

[HttpClient] Don't read from the network faster than the CPU can deal with

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  | no
| Deprecations? | no
| Tickets       | -
| License       | MIT
| Doc PR        | -

Something I spotted while working on #35115: both the curl and native clients don't play well with heavily compressed HTTP streams: they decompress faster than userland can process chunks.

The attached patch moves the decompression logic to the chunk generator. This means internally we only deal with raw compressed chunks, and they are decompressed only when passing the value to userland.

Commits
-------

ac3d77a [HttpClient] Don't read from the network faster than the CPU can deal with
  • Loading branch information
nicolas-grekas committed Jan 6, 2020
2 parents 6fb2d52 + ac3d77a commit f9b36c7
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 100 deletions.
6 changes: 3 additions & 3 deletions src/Symfony/Component/HttpClient/CurlHttpClient.php
Expand Up @@ -113,7 +113,7 @@ public function request(string $method, string $url, array $options = []): Respo
$url = implode('', $url);

if (!isset($options['normalized_headers']['user-agent'])) {
$options['normalized_headers']['user-agent'][] = $options['headers'][] = 'User-Agent: Symfony HttpClient/Curl';
$options['headers'][] = 'User-Agent: Symfony HttpClient/Curl';
}

$curlopts = [
Expand Down Expand Up @@ -194,8 +194,8 @@ public function request(string $method, string $url, array $options = []): Respo
$curlopts[CURLOPT_NOSIGNAL] = true;
}

if (!isset($options['normalized_headers']['accept-encoding']) && CURL_VERSION_LIBZ & self::$curlVersion['features']) {
$curlopts[CURLOPT_ENCODING] = 'gzip'; // Expose only one encoding, some servers mess up when more are provided
if (\extension_loaded('zlib') && !isset($options['normalized_headers']['accept-encoding'])) {
$options['headers'][] = 'Accept-Encoding: gzip'; // Expose only one encoding, some servers mess up when more are provided
}

foreach ($options['headers'] as $header) {
Expand Down
4 changes: 2 additions & 2 deletions src/Symfony/Component/HttpClient/NativeHttpClient.php
Expand Up @@ -77,7 +77,7 @@ public function request(string $method, string $url, array $options = []): Respo
$options['headers'][] = 'Content-Type: application/x-www-form-urlencoded';
}

if ($gzipEnabled = \extension_loaded('zlib') && !isset($options['normalized_headers']['accept-encoding'])) {
if (\extension_loaded('zlib') && !isset($options['normalized_headers']['accept-encoding'])) {
// gzip is the most widely available algo, no need to deal with deflate
$options['headers'][] = 'Accept-Encoding: gzip';
}
Expand Down Expand Up @@ -210,7 +210,7 @@ public function request(string $method, string $url, array $options = []): Respo
$context = stream_context_create($context, ['notification' => $notification]);
self::configureHeadersAndProxy($context, $host, $options['headers'], $proxy, $noProxy);

return new NativeResponse($this->multi, $context, implode('', $url), $options, $gzipEnabled, $info, $resolveRedirect, $onProgress, $this->logger);
return new NativeResponse($this->multi, $context, implode('', $url), $options, $info, $resolveRedirect, $onProgress, $this->logger);
}

/**
Expand Down
46 changes: 13 additions & 33 deletions src/Symfony/Component/HttpClient/Response/CurlResponse.php
Expand Up @@ -52,6 +52,7 @@ public function __construct(CurlClientState $multi, $ch, array $options = null,

$this->id = $id = (int) $ch;
$this->logger = $logger;
$this->shouldBuffer = $options['buffer'] ?? true;
$this->timeout = $options['timeout'] ?? null;
$this->info['http_method'] = $method;
$this->info['user_data'] = $options['user_data'] ?? null;
Expand All @@ -65,30 +66,25 @@ public function __construct(CurlClientState $multi, $ch, array $options = null,
curl_setopt($ch, CURLOPT_PRIVATE, \in_array($method, ['GET', 'HEAD', 'OPTIONS', 'TRACE'], true) && 1.0 < (float) ($options['http_version'] ?? 1.1) ? 'H2' : 'H0'); // H = headers + retry counter
}

if (null === $content = &$this->content) {
$content = ($options['buffer'] ?? true) ? fopen('php://temp', 'w+') : null;
} else {
// Move the pushed response to the activity list
if (ftell($content)) {
rewind($content);
$multi->handlesActivity[$id][] = stream_get_contents($content);
}
$content = ($options['buffer'] ?? true) ? $content : null;
}

curl_setopt($ch, CURLOPT_HEADERFUNCTION, static function ($ch, string $data) use (&$info, &$headers, $options, $multi, $id, &$location, $resolveRedirect, $logger): int {
return self::parseHeaderLine($ch, $data, $info, $headers, $options, $multi, $id, $location, $resolveRedirect, $logger);
});

if (null === $options) {
// Pushed response: buffer until requested
curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use (&$content): int {
return fwrite($content, $data);
curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use ($multi, $id): int {
$multi->handlesActivity[$id][] = $data;
curl_pause($ch, CURLPAUSE_RECV);

return \strlen($data);
});

return;
}

$this->inflate = !isset($options['normalized_headers']['accept-encoding']);
curl_pause($ch, CURLPAUSE_CONT);

if ($onProgress = $options['on_progress']) {
$url = isset($info['url']) ? ['url' => $info['url']] : [];
curl_setopt($ch, CURLOPT_NOPROGRESS, false);
Expand All @@ -108,33 +104,16 @@ public function __construct(CurlClientState $multi, $ch, array $options = null,
});
}

curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use (&$content, $multi, $id): int {
curl_setopt($ch, CURLOPT_WRITEFUNCTION, static function ($ch, string $data) use ($multi, $id): int {
$multi->handlesActivity[$id][] = $data;

return null !== $content ? fwrite($content, $data) : \strlen($data);
return \strlen($data);
});

$this->initializer = static function (self $response) {
if (null !== $response->info['error']) {
throw new TransportException($response->info['error']);
}

$waitFor = curl_getinfo($ch = $response->handle, CURLINFO_PRIVATE);

if ('H' === $waitFor[0] || 'D' === $waitFor[0]) {
try {
foreach (self::stream([$response]) as $chunk) {
if ($chunk->isFirst()) {
break;
}
}
} catch (\Throwable $e) {
// Persist timeouts thrown during initialization
$response->info['error'] = $e->getMessage();
$response->close();
throw $e;
}
}
return 'H' === $waitFor[0] || 'D' === $waitFor[0];
};

// Schedule the request in a non-blocking way
Expand Down Expand Up @@ -221,6 +200,7 @@ public function __destruct()
*/
private function close(): void
{
$this->inflate = null;
unset($this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]);
curl_multi_remove_handle($this->multi->handle, $this->handle);
curl_setopt_array($this->handle, [
Expand Down
17 changes: 3 additions & 14 deletions src/Symfony/Component/HttpClient/Response/MockResponse.php
Expand Up @@ -93,6 +93,7 @@ public function cancel(): void
*/
protected function close(): void
{
$this->inflate = null;
$this->body = [];
}

Expand All @@ -104,16 +105,9 @@ public static function fromRequest(string $method, string $url, array $options,
$response = new self([]);
$response->requestOptions = $options;
$response->id = ++self::$idSequence;
$response->content = ($options['buffer'] ?? true) ? fopen('php://temp', 'w+') : null;
$response->shouldBuffer = $options['buffer'] ?? true;
$response->initializer = static function (self $response) {
if (null !== $response->info['error']) {
throw new TransportException($response->info['error']);
}

if (\is_array($response->body[0] ?? null)) {
// Consume the first chunk if it's not yielded yet
self::stream([$response])->current();
}
return \is_array($response->body[0] ?? null);
};

$response->info['redirect_count'] = 0;
Expand Down Expand Up @@ -186,11 +180,6 @@ protected static function perform(ClientState $multi, array &$responses): void
} else {
// Data or timeout chunk
$multi->handlesActivity[$id][] = $chunk;

if (\is_string($chunk) && null !== $response->content) {
// Buffer response body
fwrite($response->content, $chunk);
}
}
}
}
Expand Down
47 changes: 12 additions & 35 deletions src/Symfony/Component/HttpClient/Response/NativeResponse.php
Expand Up @@ -32,14 +32,13 @@ final class NativeResponse implements ResponseInterface
private $onProgress;
private $remaining;
private $buffer;
private $inflate;
private $multi;
private $debugBuffer;

/**
* @internal
*/
public function __construct(NativeClientState $multi, $context, string $url, $options, bool $gzipEnabled, array &$info, callable $resolveRedirect, ?callable $onProgress, ?LoggerInterface $logger)
public function __construct(NativeClientState $multi, $context, string $url, array $options, array &$info, callable $resolveRedirect, ?callable $onProgress, ?LoggerInterface $logger)
{
$this->multi = $multi;
$this->id = (int) $context;
Expand All @@ -50,27 +49,17 @@ public function __construct(NativeClientState $multi, $context, string $url, $op
$this->info = &$info;
$this->resolveRedirect = $resolveRedirect;
$this->onProgress = $onProgress;
$this->content = $options['buffer'] ? fopen('php://temp', 'w+') : null;
$this->inflate = !isset($options['normalized_headers']['accept-encoding']);
$this->shouldBuffer = $options['buffer'] ?? true;

// Temporary resources to dechunk/inflate the response stream
// Temporary resource to dechunk the response stream
$this->buffer = fopen('php://temp', 'w+');
$this->inflate = $gzipEnabled ? inflate_init(ZLIB_ENCODING_GZIP) : null;

$info['user_data'] = $options['user_data'];
++$multi->responseCount;

$this->initializer = static function (self $response) {
if (null !== $response->info['error']) {
throw new TransportException($response->info['error']);
}

if (null === $response->remaining) {
foreach (self::stream([$response]) as $chunk) {
if ($chunk->isFirst()) {
break;
}
}
}
return null === $response->remaining;
};
}

Expand Down Expand Up @@ -165,7 +154,7 @@ private function open(): void
stream_set_blocking($h, false);
$this->context = $this->resolveRedirect = null;

// Create dechunk and inflate buffers
// Create dechunk buffers
if (isset($this->headers['content-length'])) {
$this->remaining = (int) $this->headers['content-length'][0];
} elseif ('chunked' === ($this->headers['transfer-encoding'][0] ?? null)) {
Expand All @@ -175,10 +164,6 @@ private function open(): void
$this->remaining = -2;
}

if ($this->inflate && 'gzip' !== ($this->headers['content-encoding'][0] ?? null)) {
$this->inflate = null;
}

$this->multi->handlesActivity[$this->id] = [new FirstChunk()];

if ('HEAD' === $context['http']['method'] || \in_array($this->info['http_code'], [204, 304], true)) {
Expand All @@ -188,7 +173,7 @@ private function open(): void
return;
}

$this->multi->openHandles[$this->id] = [$h, $this->buffer, $this->inflate, $this->content, $this->onProgress, &$this->remaining, &$this->info];
$this->multi->openHandles[$this->id] = [$h, $this->buffer, $this->onProgress, &$this->remaining, &$this->info];
}

/**
Expand Down Expand Up @@ -228,15 +213,15 @@ private static function perform(NativeClientState $multi, array &$responses = nu
$multi->handles = [];
}

foreach ($multi->openHandles as $i => [$h, $buffer, $inflate, $content, $onProgress]) {
foreach ($multi->openHandles as $i => [$h, $buffer, $onProgress]) {
$hasActivity = false;
$remaining = &$multi->openHandles[$i][5];
$info = &$multi->openHandles[$i][6];
$remaining = &$multi->openHandles[$i][3];
$info = &$multi->openHandles[$i][4];
$e = null;

// Read incoming buffer and write it to the dechunk one
try {
while ($remaining && '' !== $data = (string) fread($h, 0 > $remaining ? 16372 : $remaining)) {
if ($remaining && '' !== $data = (string) fread($h, 0 > $remaining ? 16372 : $remaining)) {
fwrite($buffer, $data);
$hasActivity = true;
$multi->sleep = false;
Expand Down Expand Up @@ -264,16 +249,8 @@ private static function perform(NativeClientState $multi, array &$responses = nu
rewind($buffer);
ftruncate($buffer, 0);

if (null !== $inflate && false === $data = @inflate_add($inflate, $data)) {
$e = new TransportException('Error while processing content unencoding.');
}

if ('' !== $data && null === $e) {
if (null === $e) {
$multi->handlesActivity[$i][] = $data;

if (null !== $content && \strlen($data) !== fwrite($content, $data)) {
$e = new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($data)));
}
}
}

Expand Down

0 comments on commit f9b36c7

Please sign in to comment.