Skip to content

Commit

Permalink
Merge pull request #60 from amphp/0.8.x-pending-gc
Browse files Browse the repository at this point in the history
Clear pending requests if there's no response after the timeout
  • Loading branch information
kelunik authored Jul 12, 2017
2 parents 35278b0 + a33ee4b commit 70441da
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions lib/DefaultResolver.php
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ private function doRecurse($name, array $types, $options) {
throw new ResolutionException("CNAME or DNAME chain too long (possible recursion?)");
}

private function doRequest($uri, $name, $type) {
private function doRequest($uri, $name, $type, $timeout) {
$server = $this->loadExistingServer($uri) ?: $this->loadNewServer($uri);

$useTCP = substr($uri, 0, 6) == "tcp://";
if ($useTCP && isset($server->connect)) {
return \Amp\pipe($server->connect, function() use ($uri, $name, $type) {
return $this->doRequest($uri, $name, $type);
return \Amp\pipe($server->connect, function() use ($uri, $name, $type, $timeout) {
return $this->doRequest($uri, $name, $type, $timeout);
});
}

Expand Down Expand Up @@ -204,7 +204,18 @@ private function doRequest($uri, $name, $type) {

$promisor = new Deferred;
$server->pendingRequests[$requestId] = true;
$this->pendingRequests[$requestId] = [$promisor, $name, $type, $uri];
$this->pendingRequests[$requestId] = [$promisor, $name, $type, $uri, $timeout];

$timeoutWatcher = \Amp\once(function () use ($server, $requestId, $name, $timeout) {
/** @var Deferred $deferred */
$deferred = $this->pendingRequests[$requestId][0];
unset($this->pendingRequests[$requestId], $server->pendingRequests[$requestId]);
$deferred->fail(new TimeoutException("No response from the server for {$name} within {$timeout} milliseconds"));
}, $timeout);

$promisor->promise()->when(function () use ($timeoutWatcher) {
\Amp\cancel($timeoutWatcher);
});

return $promisor->promise();
}
Expand Down Expand Up @@ -267,7 +278,7 @@ private function doResolve($name, array $types, $options) {

$promises = [];
foreach ($types as $type) {
$promises[] = $this->doRequest($uri, $name, $type);
$promises[] = $this->doRequest($uri, $name, $type, $timeout);
}

try {
Expand Down Expand Up @@ -618,13 +629,13 @@ private function decodeResponsePacket($serverId, $packet) {
}

private function processDecodedResponse($serverId, $requestId, $response) {
list($promisor, $name, $type, $uri) = $this->pendingRequests[$requestId];
list($promisor, $name, $type, $uri, $timeout) = $this->pendingRequests[$requestId];

// Retry via tcp if message has been truncated
if ($response->isTruncated()) {
if (\substr($uri, 0, 6) != "tcp://") {
$uri = \preg_replace("#[a-z.]+://#", "tcp://", $uri);
$promisor->succeed($this->doRequest($uri, $name, $type));
$promisor->succeed($this->doRequest($uri, $name, $type, $timeout));
} else {
$this->finalizeResult($serverId, $requestId, new ResolutionException(
"Server returned truncated response"
Expand Down

0 comments on commit 70441da

Please sign in to comment.