Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make timers act like normal ops #1915

Merged
merged 2 commits into from Mar 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 12 additions & 23 deletions js/dispatch.ts
Expand Up @@ -8,31 +8,20 @@ import * as util from "./util";
let nextCmdId = 0;
const promiseTable = new Map<number, util.Resolvable<msg.Base>>();

let fireTimers: () => void;

export function setFireTimersCallback(fn: () => void): void {
fireTimers = fn;
}

export function handleAsyncMsgFromRust(ui8: Uint8Array): void {
// If a the buffer is empty, recv() on the native side timed out and we
// did not receive a message.
if (ui8 && ui8.length) {
const bb = new flatbuffers.ByteBuffer(ui8);
const base = msg.Base.getRootAsBase(bb);
const cmdId = base.cmdId();
const promise = promiseTable.get(cmdId);
util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
promiseTable.delete(cmdId);
const err = errors.maybeError(base);
if (err != null) {
promise!.reject(err);
} else {
promise!.resolve(base);
}
util.assert(ui8 != null && ui8.length > 0);
const bb = new flatbuffers.ByteBuffer(ui8);
const base = msg.Base.getRootAsBase(bb);
const cmdId = base.cmdId();
const promise = promiseTable.get(cmdId);
util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
promiseTable.delete(cmdId);
const err = errors.maybeError(base);
if (err != null) {
promise!.reject(err);
} else {
promise!.resolve(base);
}
// Fire timers that have become runnable.
fireTimers();
}

function sendInternal(
Expand Down
56 changes: 32 additions & 24 deletions js/timers.ts
Expand Up @@ -2,7 +2,7 @@
import { assert } from "./util";
import * as msg from "gen/msg_generated";
import * as flatbuffers from "./flatbuffers";
import { sendSync, setFireTimersCallback } from "./dispatch";
import { sendAsync, sendSync } from "./dispatch";

interface Timer {
id: number;
Expand Down Expand Up @@ -37,28 +37,39 @@ function getTime(): number {
return now;
}

function setGlobalTimeout(due: number | null, now: number): void {
function clearGlobalTimeout(): void {
const builder = flatbuffers.createBuilder();
msg.GlobalTimerStop.startGlobalTimerStop(builder);
const inner = msg.GlobalTimerStop.endGlobalTimerStop(builder);
globalTimeoutDue = null;
let res = sendSync(builder, msg.Any.GlobalTimerStop, inner);
assert(res == null);
}

async function setGlobalTimeout(due: number, now: number): Promise<void> {
// Since JS and Rust don't use the same clock, pass the time to rust as a
// relative time value. On the Rust side we'll turn that into an absolute
// value again.
// Note that a negative time-out value stops the global timer.
let timeout;
if (due === null) {
timeout = -1;
} else {
timeout = due - now;
assert(timeout >= 0);
}
let timeout = due - now;
assert(timeout >= 0);

// Send message to the backend.
const builder = flatbuffers.createBuilder();
msg.SetTimeout.startSetTimeout(builder);
msg.SetTimeout.addTimeout(builder, timeout);
const inner = msg.SetTimeout.endSetTimeout(builder);
const res = sendSync(builder, msg.Any.SetTimeout, inner);
assert(res == null);
// Remember when when the global timer will fire.
msg.GlobalTimer.startGlobalTimer(builder);
msg.GlobalTimer.addTimeout(builder, timeout);
const inner = msg.GlobalTimer.endGlobalTimer(builder);
globalTimeoutDue = due;
await sendAsync(builder, msg.Any.GlobalTimer, inner);
// eslint-disable-next-line @typescript-eslint/no-use-before-define
fireTimers();
}

function setOrClearGlobalTimeout(due: number | null, now: number): void {
if (due == null) {
clearGlobalTimeout();
} else {
setGlobalTimeout(due, now);
}
}

function schedule(timer: Timer, now: number): void {
Expand All @@ -75,7 +86,7 @@ function schedule(timer: Timer, now: number): void {
// If the new timer is scheduled to fire before any timer that existed before,
// update the global timeout to reflect this.
if (globalTimeoutDue === null || globalTimeoutDue > timer.due) {
setGlobalTimeout(timer.due, now);
setOrClearGlobalTimeout(timer.due, now);
}
}

Expand All @@ -97,7 +108,7 @@ function unschedule(timer: Timer): void {
nextTimerDue = Number(key);
break;
}
setGlobalTimeout(nextTimerDue, getTime());
setOrClearGlobalTimeout(nextTimerDue, getTime());
}
} else {
// Multiple timers that are due at the same point in time.
Expand Down Expand Up @@ -162,9 +173,10 @@ function fireTimers(): void {
Promise.resolve(timer).then(fire);
}
}

// Update the global alarm to go off when the first-up timer that hasn't fired
// yet is due.
setGlobalTimeout(nextTimerDue, now);
setOrClearGlobalTimeout(nextTimerDue, now);
}

export type Args = unknown[];
Expand Down Expand Up @@ -226,7 +238,7 @@ export function setInterval(
return setTimer(cb, delay, args, true);
}

/** Clears a previously set timer by id. */
/** Clears a previously set timer by id. AKA clearTimeout and clearInterval. */
export function clearTimer(id: number): void {
const timer = idMap.get(id);
if (timer === undefined) {
Expand All @@ -237,7 +249,3 @@ export function clearTimer(id: number): void {
unschedule(timer);
idMap.delete(timer.id);
}

// Tell the dispatcher which function it should call to fire timers that are
// due. This is done using a callback because circular imports are disallowed.
setFireTimersCallback(fireTimers);
49 changes: 49 additions & 0 deletions src/global_timer.rs
@@ -0,0 +1,49 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.

//! This module helps deno implement timers.
//!
//! As an optimization, we want to avoid an expensive calls into rust for every
//! setTimeout in JavaScript. Thus in //js/timers.ts a data structure is
//! implemented that calls into Rust for only the smallest timeout. Thus we
//! only need to be able to start and cancel a single timer (or Delay, as Tokio
//! calls it) for an entire Isolate. This is what is implemented here.

use crate::tokio_util::panic_on_error;
use futures::Future;
use std::time::Instant;
use tokio::sync::oneshot;
use tokio::timer::Delay;

pub struct GlobalTimer {
tx: Option<oneshot::Sender<()>>,
}

impl GlobalTimer {
pub fn new() -> Self {
Self { tx: None }
}

pub fn cancel(&mut self) {
if let Some(tx) = self.tx.take() {
tx.send(()).ok();
}
}

pub fn new_timeout(
&mut self,
deadline: Instant,
) -> impl Future<Item = (), Error = ()> {
if self.tx.is_some() {
self.cancel();
}
assert!(self.tx.is_none());

let (tx, rx) = oneshot::channel();
self.tx = Some(tx);

let delay = panic_on_error(Delay::new(deadline));
let rx = panic_on_error(rx);

delay.select(rx).then(|_| Ok(()))
}
}
46 changes: 6 additions & 40 deletions src/isolate.rs
Expand Up @@ -12,6 +12,7 @@ use crate::errors::DenoError;
use crate::errors::DenoResult;
use crate::errors::RustOrJsError;
use crate::flags;
use crate::global_timer::GlobalTimer;
use crate::isolate_init::IsolateInit;
use crate::js_errors::apply_source_map;
use crate::libdeno;
Expand All @@ -35,8 +36,6 @@ use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::{Once, ONCE_INIT};
use std::time::Duration;
use std::time::Instant;
use tokio;

// Buf represents a byte array returned from a "Op".
Expand All @@ -62,7 +61,6 @@ pub struct Isolate {
rx: mpsc::Receiver<(usize, Buf)>,
tx: mpsc::Sender<(usize, Buf)>,
ntasks: Cell<i32>,
timeout_due: Cell<Option<Instant>>,
pub modules: RefCell<Modules>,
pub state: Arc<IsolateState>,
pub permissions: Arc<DenoPermissions>,
Expand All @@ -83,6 +81,7 @@ pub struct IsolateState {
pub flags: flags::DenoFlags,
pub metrics: Metrics,
pub worker_channels: Option<Mutex<WorkerChannels>>,
pub global_timer: Mutex<GlobalTimer>,
}

impl IsolateState {
Expand All @@ -100,6 +99,7 @@ impl IsolateState {
flags,
metrics: Metrics::default(),
worker_channels: worker_channels.map(Mutex::new),
global_timer: Mutex::new(GlobalTimer::new()),
}
}

Expand Down Expand Up @@ -194,7 +194,6 @@ impl Isolate {
rx,
tx,
ntasks: Cell::new(0),
timeout_due: Cell::new(None),
modules: RefCell::new(Modules::new()),
state,
permissions: Arc::new(permissions),
Expand Down Expand Up @@ -222,16 +221,6 @@ impl Isolate {
&*ptr
}

#[inline]
pub fn get_timeout_due(&self) -> Option<Instant> {
self.timeout_due.clone().into_inner()
}

#[inline]
pub fn set_timeout_due(&self, inst: Option<Instant>) {
self.timeout_due.set(inst);
}

#[inline]
pub fn check_read(&self, filename: &str) -> DenoResult<()> {
self.permissions.check_read(filename)
Expand Down Expand Up @@ -463,10 +452,9 @@ impl Isolate {
pub fn event_loop(&self) -> Result<(), JSError> {
// Main thread event loop.
while !self.is_idle() {
match recv_deadline(&self.rx, self.get_timeout_due()) {
match self.rx.recv() {
Ok((zero_copy_id, buf)) => self.complete_op(zero_copy_id, buf),
Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(),
Err(e) => panic!("recv_deadline() failed: {:?}", e),
Err(e) => panic!("Isolate.rx.recv() failed: {:?}", e),
}
self.check_promise_errors();
if let Some(err) = self.last_exception() {
Expand Down Expand Up @@ -495,7 +483,7 @@ impl Isolate {

#[inline]
fn is_idle(&self) -> bool {
self.ntasks.get() == 0 && self.get_timeout_due().is_none()
self.ntasks.get() == 0
}
}

Expand Down Expand Up @@ -596,28 +584,6 @@ extern "C" fn pre_dispatch(
}
}

fn recv_deadline<T>(
rx: &mpsc::Receiver<T>,
maybe_due: Option<Instant>,
) -> Result<T, mpsc::RecvTimeoutError> {
match maybe_due {
None => rx.recv().map_err(|e| e.into()),
Some(due) => {
// Subtracting two Instants causes a panic if the resulting duration
// would become negative. Avoid this.
let now = Instant::now();
let timeout = if due > now {
due - now
} else {
Duration::new(0, 0)
};
// TODO: use recv_deadline() instead of recv_timeout() when this
// feature becomes stable/available.
rx.recv_timeout(timeout)
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Expand Up @@ -14,6 +14,7 @@ pub mod deno_dir;
pub mod errors;
pub mod flags;
mod fs;
mod global_timer;
mod http_body;
mod http_util;
pub mod isolate;
Expand Down