Skip to content

Commit

Permalink
Improve timer cancellation in NativeDriver
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Oct 30, 2019
1 parent 9fa04aa commit 710f84c
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 54 deletions.
96 changes: 64 additions & 32 deletions lib/Loop/Internal/TimerQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,76 +7,103 @@
/**
* Uses a binary tree stored in an array to implement a heap.
*/
class TimerQueue
final class TimerQueue
{
/** @var TimerQueueEntry[] */
/** @var object[] */
private $data = [];

/** @var int[] */
private $pointers;

/**
* Inserts the watcher into the queue. Time complexity: O(log(n)).
*
* @param Watcher $watcher
* @param int $expiration
* @param int $expiration
*
* @return void
*/
public function insert(Watcher $watcher, int $expiration)
{
$entry = new TimerQueueEntry;
$entry = new class {
public $watcher;
public $expiration;
};

$entry->watcher = $watcher;
$entry->expiration = $expiration;

$node = \count($this->data);
$this->data[$node] = $entry;
$this->pointers[$watcher->id] = $node;

while ($node !== 0 && $entry->expiration < $this->data[$parent = ($node - 1) >> 1]->expiration) {
$this->data[$node] = $this->data[$parent];
$temp = $this->data[$parent];
$this->data[$node] = $temp;
$this->pointers[$temp->watcher->id] = $node;

$this->data[$parent] = $entry;
$this->pointers[$watcher->id] = $parent;

$node = $parent;
}
}

/**
* Removes the given watcher from the queue. Time complexity: O(n).
* Removes the given watcher from the queue. Time complexity: O(1).
*
* @param Watcher $watcher
*
* @return void
*/
public function remove(Watcher $watcher)
{
foreach ($this->data as $node => $entry) {
if ($entry->watcher === $watcher) {
$this->removeAndRebuild($node);
return;
}
$id = $watcher->id;

if (!isset($this->pointers[$id])) {
return;
}

$this->removeAndRebuild($this->pointers[$id]);
}

/**
* Deletes and returns the Watcher on top of the heap. Time complexity: O(log(n)).
* Deletes and returns the Watcher on top of the heap if it has expired, otherwise null is returned.
* Time complexity: O(log(n)).
*
* @param int $now Current loop time.
*
* @return [Watcher, int] Tuple of the watcher and the expiration time.
* @return Watcher|null Expired watcher at the top of the heap or null if the watcher has not expired.
*/
public function extract(): array
public function extract(int $now)
{
if ($this->isEmpty()) {
if (empty($this->data)) {
throw new \Error('No data left in the heap.');
}

$data = $this->removeAndRebuild(0);
$data = $this->data[0];

return [$data->watcher, $data->expiration];
if ($data->expiration > $now) {
return null;
}

$this->removeAndRebuild(0);

return $data->watcher;
}

/**
* @param int $node Remove the given node and then rebuild the data array from that node downward.
*
* @return TimerQueueEntry Removed entry.
* @return void
*/
private function removeAndRebuild(int $node): TimerQueueEntry
private function removeAndRebuild(int $node)
{
$length = \count($this->data) - 1;
$data = $this->data[$node];
$this->data[$node] = $this->data[$length];
unset($this->data[$length]);
$id = $this->data[$node]->watcher->id;
$left = $this->data[$node] = $this->data[$length];
$this->pointers[$left->watcher->id] = $node;
unset($this->data[$length], $this->pointers[$id]);

while (($child = ($node << 1) + 1) < $length) {
if ($this->data[$child]->expiration < $this->data[$node]->expiration
Expand All @@ -91,31 +118,36 @@ private function removeAndRebuild(int $node): TimerQueueEntry
break;
}

$temp = $this->data[$node];
$this->data[$node] = $this->data[$swap];
$this->data[$swap] = $temp;
$left = $this->data[$node];
$right = $this->data[$swap];

$this->data[$node] = $right;
$this->pointers[$right->watcher->id] = $node;

$this->data[$swap] = $left;
$this->pointers[$left->watcher->id] = $swap;

$node = $swap;
}

return $data;
}

/**
* Returns the value at the top of the heap (without removing it). Time complexity: O(1).
* Returns the expiration time value at the top of the heap. Time complexity: O(1).
*
* @return [Watcher, int] Tuple of the watcher and the expiration time.
* @return int Expiration time of the watcher at the top of the heap.
*/
public function peek(): array
public function peek(): int
{
if ($this->isEmpty()) {
if (empty($this->data)) {
throw new \Error('No data in the heap.');
}

return [$this->data[0]->watcher, $this->data[0]->expiration];
return $this->data[0]->expiration;
}

/**
* Determines if the heap is empty.
*
* @return bool
*/
public function isEmpty(): bool
Expand Down
17 changes: 0 additions & 17 deletions lib/Loop/Internal/TimerQueueEntry.php

This file was deleted.

8 changes: 3 additions & 5 deletions lib/Loop/NativeDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,12 @@ protected function dispatch(bool $blocking)

try {
while (!$this->timerQueue->isEmpty()) {
list($watcher, $expiration) = $this->timerQueue->peek();
$watcher = $this->timerQueue->extract($this->now());

if ($expiration > $this->now()) { // Timer at top of queue has not expired.
if ($watcher === null) { // Timer at top of queue has not expired.
break;
}

$this->timerQueue->extract();

if ($watcher->type & Watcher::REPEAT) {
$expiration = $this->now() + $watcher->value;
$scheduleQueue[] = [$watcher, $expiration];
Expand Down Expand Up @@ -259,7 +257,7 @@ private function selectStreams(array $read, array $write, int $timeout)
private function getTimeout(): int
{
if (!$this->timerQueue->isEmpty()) {
list($watcher, $expiration) = $this->timerQueue->peek();
$expiration = $this->timerQueue->peek();

$expiration -= getCurrentTime() - $this->nowOffset;

Expand Down

0 comments on commit 710f84c

Please sign in to comment.