Skip to content

Commit

Permalink
Use a custom priority queue for timers
Browse files Browse the repository at this point in the history
Fixes #220 by allowing immediate removal of the watcher from the queue. Insert and extract is O(log(n)), peeking is O(1), and removal is O(n).
  • Loading branch information
trowski committed Jan 24, 2019
1 parent b6fc1e1 commit c6f8425
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 59 deletions.
125 changes: 125 additions & 0 deletions lib/Loop/Internal/TimerQueue.php
@@ -0,0 +1,125 @@
<?php

namespace Amp\Loop\Internal;

use Amp\Loop\Watcher;

/**
* Uses a binary tree stored in an array to implement a heap.
*/
class TimerQueue
{
/** @var TimerQueueEntry[] */
private $data = [];

/**
* Inserts the watcher into the queue. Time complexity: O(log(n)).
*
* @param Watcher $watcher
* @param int $expiration
*/
public function insert(Watcher $watcher, int $expiration)
{
$entry = new TimerQueueEntry;
$entry->watcher = $watcher;
$entry->expiration = $expiration;

$node = \count($this->data);
$this->data[$node] = $entry;

while ($node !== 0 && $entry->expiration < $this->data[$parent = ($node - 1) >> 1]->expiration) {
$this->data[$node] = $this->data[$parent];
$this->data[$parent] = $entry;

$node = $parent;
}
}

/**
* Removes the given watcher from the queue. Time complexity: O(n).
*
* @param Watcher $watcher
*/
public function remove(Watcher $watcher)
{
foreach ($this->data as $node => $entry) {
if ($entry->watcher === $watcher) {
$this->removeAndRebuild($node);
return;
}
}
}

/**
* Deletes and returns the Watcher on top of the heap. Time complexity: O(log(n)).
*
* @return [Watcher, int] Tuple of the watcher and the expiration time.
*/
public function extract(): array
{
if ($this->isEmpty()) {
throw new \Error('No data left in the heap.');
}

$data = $this->removeAndRebuild(0);

return [$data->watcher, $data->expiration];
}

/**
* @param int $node Remove the given node and then rebuild the data array from that node downward.
*
* @return TimerQueueEntry Removed entry.
*/
private function removeAndRebuild(int $node): TimerQueueEntry
{
$length = \count($this->data) - 1;
$data = $this->data[$node];
$this->data[$node] = $this->data[$length];
unset($this->data[$length]);

while (($child = ($node << 1) + 1) < $length) {
if ($this->data[$child]->expiration < $this->data[$node]->expiration
&& ($child + 1 >= $length || $this->data[$child]->expiration < $this->data[$child + 1]->expiration)
) {
// Left child is less than parent and right child.
$swap = $child;
} elseif ($child + 1 < $length && $this->data[$child + 1]->expiration < $this->data[$node]->expiration) {
// Right child is less than parent and left child.
$swap = $child + 1;
} else { // Left and right child are greater than parent.
break;
}

$temp = $this->data[$node];
$this->data[$node] = $this->data[$swap];
$this->data[$swap] = $temp;
$node = $swap;
}

return $data;
}

/**
* Returns the value at the top of the heap (without removing it). Time complexity: O(1).
*
* @return [Watcher, int] Tuple of the watcher and the expiration time.
*/
public function peek(): array
{
if ($this->isEmpty()) {
throw new \Error('No data in the heap.');
}

return [$this->data[0]->watcher, $this->data[0]->expiration];
}

/**
* Determines if the heap is empty.
* @return bool
*/
public function isEmpty(): bool
{
return empty($this->data);
}
}
17 changes: 17 additions & 0 deletions lib/Loop/Internal/TimerQueueEntry.php
@@ -0,0 +1,17 @@
<?php

namespace Amp\Loop\Internal;

use Amp\Loop\Watcher;
use Amp\Struct;

class TimerQueueEntry
{
use Struct;

/** @var Watcher */
public $watcher;

/** @var int */
public $expiration;
}
97 changes: 38 additions & 59 deletions lib/Loop/NativeDriver.php
Expand Up @@ -22,10 +22,7 @@ class NativeDriver extends Driver
/** @var \Amp\Loop\Watcher[][] */
private $writeWatchers = [];

/** @var int[] */
private $timerExpires = [];

/** @var \SplPriorityQueue */
/** @var Internal\TimerQueue */
private $timerQueue;

/** @var \Amp\Loop\Watcher[][] */
Expand All @@ -45,7 +42,7 @@ class NativeDriver extends Driver

public function __construct()
{
$this->timerQueue = new \SplPriorityQueue();
$this->timerQueue = new Internal\TimerQueue;
$this->signalHandling = \extension_loaded("pcntl");
$this->nowOffset = getCurrentTime();
$this->now = \random_int(0, $this->nowOffset);
Expand Down Expand Up @@ -97,56 +94,48 @@ protected function dispatch(bool $blocking)
$blocking ? $this->getTimeout() : 0
);

if (!empty($this->timerExpires)) {
$scheduleQueue = [];
$scheduleQueue = [];

try {
while (!$this->timerQueue->isEmpty()) {
list($watcher, $expiration) = $this->timerQueue->top();
try {
while (!$this->timerQueue->isEmpty()) {
list($watcher, $expiration) = $this->timerQueue->peek();

$id = $watcher->id;
if ($expiration > $this->now()) { // Timer at top of queue has not expired.
break;
}

if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) {
$this->timerQueue->extract(); // Timer was removed from queue.
continue;
}
$this->timerQueue->extract();

if ($this->timerExpires[$id] > $this->now()) { // Timer at top of queue has not expired.
break;
}
if ($watcher->type & Watcher::REPEAT) {
$expiration = $this->now() + $watcher->value;
$scheduleQueue[] = [$watcher, $expiration];
} else {
$this->cancel($watcher->id);
}

$this->timerQueue->extract();
try {
// Execute the timer.
$result = ($watcher->callback)($watcher->id, $watcher->data);

if ($watcher->type & Watcher::REPEAT) {
$expiration = $this->now() + $watcher->value;
$this->timerExpires[$watcher->id] = $expiration;
$scheduleQueue[] = [$watcher, $expiration];
} else {
$this->cancel($id);
if ($result === null) {
continue;
}

try {
// Execute the timer.
$result = ($watcher->callback)($id, $watcher->data);

if ($result === null) {
continue;
}

if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}

if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
} finally {
foreach ($scheduleQueue as $item) {
$this->timerQueue->insert($item, -$item[1]);
}
} finally {
foreach ($scheduleQueue as list($watcher, $expiration)) {
if ($watcher->enabled) {
$this->timerQueue->insert($watcher, $expiration);
}
}
}
Expand Down Expand Up @@ -266,24 +255,15 @@ private function selectStreams(array $read, array $write, int $timeout)
*/
private function getTimeout(): int
{
while (!$this->timerQueue->isEmpty()) {
list($watcher, $expiration) = $this->timerQueue->top();
if (!$this->timerQueue->isEmpty()) {
list($watcher, $expiration) = $this->timerQueue->peek();

$id = $watcher->id;

if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) {
$this->timerQueue->extract(); // Timer was removed from queue.
continue;
}

$expiration -= $this->now();
$expiration -= getCurrentTime() - $this->nowOffset;

if ($expiration < 0) {
return 0;
}

$this->nowUpdateNeeded = true; // Loop will block, so trigger now update after blocking.

return $expiration;
}

Expand Down Expand Up @@ -312,8 +292,7 @@ protected function activate(array $watchers)
case Watcher::DELAY:
case Watcher::REPEAT:
$expiration = $this->now() + $watcher->value;
$this->timerExpires[$watcher->id] = $expiration;
$this->timerQueue->insert([$watcher, $expiration], -$expiration);
$this->timerQueue->insert($watcher, $expiration);
break;

case Watcher::SIGNAL:
Expand Down Expand Up @@ -362,7 +341,7 @@ protected function deactivate(Watcher $watcher)

case Watcher::DELAY:
case Watcher::REPEAT:
unset($this->timerExpires[$watcher->id]);
$this->timerQueue->remove($watcher);
break;

case Watcher::SIGNAL:
Expand Down

0 comments on commit c6f8425

Please sign in to comment.