Skip to content

Commit

Permalink
Fix queue resumption on socket closure
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed Dec 15, 2021
1 parent bec75d7 commit 11e8ae6
Showing 1 changed file with 35 additions and 13 deletions.
48 changes: 35 additions & 13 deletions lib/Internal/Socket.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Amp\Dns\Internal;

use Amp\ByteStream\ClosedException;
use Amp\ByteStream\ReadableResourceStream;
use Amp\ByteStream\StreamException;
use Amp\ByteStream\WritableResourceStream;
Expand Down Expand Up @@ -31,13 +32,18 @@ abstract public static function connect(string $uri): self;

private ReadableResourceStream $input;
private WritableResourceStream $output;

/** @var array Contains already sent queries with no response yet. For UDP this is exactly zero or one item. */
private array $pending = [];

private MessageFactory $messageFactory;

/** @var float Used for determining whether the socket can be garbage collected, because it's inactive. */
private float $lastActivity;

private bool $receiving = false;
/** @var array Queued requests if the number of concurrent requests is too large. */

/** @var EventLoop\Suspension[] Queued requests if the number of concurrent requests is too large. */
private array $queue = [];

protected function __construct($socket)
Expand Down Expand Up @@ -91,7 +97,7 @@ private function handleResolution(?\Throwable $exception, ?Message $message = nu

abstract public function isAlive(): bool;

public function getLastActivity(): int
public function getLastActivity(): float
{
return $this->lastActivity;
}
Expand All @@ -101,15 +107,17 @@ public function getLastActivity(): int
* @param float $timeout
*
* @return Message
*
* @throws DnsException
*/
final public function ask(Question $question, float $timeout): Message
{
$this->lastActivity = now();

if (\count($this->pending) > self::MAX_CONCURRENT_REQUESTS) {
$deferred = new DeferredFuture;
$this->queue[] = $deferred;
$deferred->getFuture()->await();
$suspension = EventLoop::createSuspension();
$this->queue[] = $suspension;
$suspension->suspend();
}

do {
Expand Down Expand Up @@ -156,17 +164,23 @@ final public function ask(Question $question, float $timeout): Message
} finally {
if ($this->queue) {
$deferred = \array_shift($this->queue);
$deferred->resolve();
$deferred->resume();
}
}
}

final public function close(): void
{
$this->input->close();
$this->output->close();
$this->error(new ClosedException('Socket has been closed'));
}

/**
* @param Message $message
*
* @return void
*
* @throws StreamException
*/
abstract protected function send(Message $message): void;

/**
Expand All @@ -179,6 +193,9 @@ final protected function read(): ?string
return $this->input->read();
}

/**
* @throws ClosedException
*/
final protected function write(string $data): void
{
$this->output->write($data);
Expand All @@ -190,16 +207,14 @@ final protected function createMessage(Question $question, int $id): Message
$request->getQuestionRecords()->add($question);
$request->isRecursionDesired(true);
$request->setID($id);

return $request;
}

private function error(\Throwable $exception): void
{
$this->close();

if (empty($this->pending)) {
return;
}
$this->input->close();
$this->output->close();

if (!$exception instanceof DnsException) {
$message = "Unexpected error during resolution: " . $exception->getMessage();
Expand All @@ -214,6 +229,13 @@ private function error(\Throwable $exception): void
$deferred = $pendingQuestion->deferred;
$deferred->error($exception);
}

$queue = $this->queue;
$this->queue = [];

foreach ($queue as $suspension) {
$suspension->throw($exception);
}
}

private function matchesQuestion(Message $message, Question $question): bool
Expand Down

0 comments on commit 11e8ae6

Please sign in to comment.