Skip to content
Permalink
Browse files

Make timers act like normal ops

This is in preperation for core integration.
  • Loading branch information...
ry committed Mar 10, 2019
1 parent 9691d7b commit 58cc69f672f91841984fc4e1e9bcfb1a75362677
Showing with 155 additions and 105 deletions.
  1. +12 −23 js/dispatch.ts
  2. +32 −24 js/timers.ts
  3. +49 −0 src/global_timer.rs
  4. +6 −40 src/isolate.rs
  5. +1 −0 src/main.rs
  6. +8 −2 src/msg.fbs
  7. +39 −16 src/ops.rs
  8. +8 −0 src/tokio_util.rs
@@ -8,31 +8,20 @@ import * as util from "./util";
let nextCmdId = 0;
const promiseTable = new Map<number, util.Resolvable<msg.Base>>();

let fireTimers: () => void;

export function setFireTimersCallback(fn: () => void): void {
fireTimers = fn;
}

export function handleAsyncMsgFromRust(ui8: Uint8Array): void {
// If a the buffer is empty, recv() on the native side timed out and we
// did not receive a message.
if (ui8 && ui8.length) {
const bb = new flatbuffers.ByteBuffer(ui8);
const base = msg.Base.getRootAsBase(bb);
const cmdId = base.cmdId();
const promise = promiseTable.get(cmdId);
util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
promiseTable.delete(cmdId);
const err = errors.maybeError(base);
if (err != null) {
promise!.reject(err);
} else {
promise!.resolve(base);
}
util.assert(ui8 != null && ui8.length > 0);
const bb = new flatbuffers.ByteBuffer(ui8);
const base = msg.Base.getRootAsBase(bb);
const cmdId = base.cmdId();
const promise = promiseTable.get(cmdId);
util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
promiseTable.delete(cmdId);
const err = errors.maybeError(base);
if (err != null) {
promise!.reject(err);
} else {
promise!.resolve(base);
}
// Fire timers that have become runnable.
fireTimers();
}

function sendInternal(
@@ -2,7 +2,7 @@
import { assert } from "./util";
import * as msg from "gen/msg_generated";
import * as flatbuffers from "./flatbuffers";
import { sendSync, setFireTimersCallback } from "./dispatch";
import { sendAsync, sendSync } from "./dispatch";

interface Timer {
id: number;
@@ -37,28 +37,39 @@ function getTime(): number {
return now;
}

function setGlobalTimeout(due: number | null, now: number): void {
function clearGlobalTimeout(): void {
const builder = flatbuffers.createBuilder();
msg.GlobalTimerStop.startGlobalTimerStop(builder);
const inner = msg.GlobalTimerStop.endGlobalTimerStop(builder);
globalTimeoutDue = null;
let res = sendSync(builder, msg.Any.GlobalTimerStop, inner);
assert(res == null);
}

async function setGlobalTimeout(due: number, now: number): Promise<void> {
// Since JS and Rust don't use the same clock, pass the time to rust as a
// relative time value. On the Rust side we'll turn that into an absolute
// value again.
// Note that a negative time-out value stops the global timer.
let timeout;
if (due === null) {
timeout = -1;
} else {
timeout = due - now;
assert(timeout >= 0);
}
let timeout = due - now;
assert(timeout >= 0);

// Send message to the backend.
const builder = flatbuffers.createBuilder();
msg.SetTimeout.startSetTimeout(builder);
msg.SetTimeout.addTimeout(builder, timeout);
const inner = msg.SetTimeout.endSetTimeout(builder);
const res = sendSync(builder, msg.Any.SetTimeout, inner);
assert(res == null);
// Remember when when the global timer will fire.
msg.GlobalTimer.startGlobalTimer(builder);
msg.GlobalTimer.addTimeout(builder, timeout);
const inner = msg.GlobalTimer.endGlobalTimer(builder);
globalTimeoutDue = due;
await sendAsync(builder, msg.Any.GlobalTimer, inner);
// eslint-disable-next-line @typescript-eslint/no-use-before-define
fireTimers();
}

function setOrClearGlobalTimeout(due: number | null, now: number): void {
if (due == null) {
clearGlobalTimeout();
} else {
setGlobalTimeout(due, now);
}
}

function schedule(timer: Timer, now: number): void {
@@ -75,7 +86,7 @@ function schedule(timer: Timer, now: number): void {
// If the new timer is scheduled to fire before any timer that existed before,
// update the global timeout to reflect this.
if (globalTimeoutDue === null || globalTimeoutDue > timer.due) {
setGlobalTimeout(timer.due, now);
setOrClearGlobalTimeout(timer.due, now);
}
}

@@ -97,7 +108,7 @@ function unschedule(timer: Timer): void {
nextTimerDue = Number(key);
break;
}
setGlobalTimeout(nextTimerDue, getTime());
setOrClearGlobalTimeout(nextTimerDue, getTime());
}
} else {
// Multiple timers that are due at the same point in time.
@@ -162,9 +173,10 @@ function fireTimers(): void {
Promise.resolve(timer).then(fire);
}
}

// Update the global alarm to go off when the first-up timer that hasn't fired
// yet is due.
setGlobalTimeout(nextTimerDue, now);
setOrClearGlobalTimeout(nextTimerDue, now);
}

export type Args = unknown[];
@@ -226,7 +238,7 @@ export function setInterval(
return setTimer(cb, delay, args, true);
}

/** Clears a previously set timer by id. */
/** Clears a previously set timer by id. AKA clearTimeout and clearInterval. */
export function clearTimer(id: number): void {
const timer = idMap.get(id);
if (timer === undefined) {
@@ -237,7 +249,3 @@ export function clearTimer(id: number): void {
unschedule(timer);
idMap.delete(timer.id);
}

// Tell the dispatcher which function it should call to fire timers that are
// due. This is done using a callback because circular imports are disallowed.
setFireTimersCallback(fireTimers);
@@ -0,0 +1,49 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.

//! This module helps deno implement timers.
//!
//! As an optimization, we want to avoid an expensive calls into rust for every
//! setTimeout in JavaScript. Thus in //js/timers.ts a data structure is
//! implemented that calls into Rust for only the smallest timeout. Thus we
//! only need to be able to start and cancel a single timer (or Delay, as Tokio
//! calls it) for an entire Isolate. This is what is implemented here.

use crate::tokio_util::panic_on_error;
use futures::Future;
use std::time::Instant;
use tokio::sync::oneshot;
use tokio::timer::Delay;

pub struct GlobalTimer {
tx: Option<oneshot::Sender<()>>,
}

impl GlobalTimer {
pub fn new() -> Self {
Self { tx: None }
}

pub fn cancel(&mut self) {
if let Some(tx) = self.tx.take() {
tx.send(()).ok();
}
}

pub fn new_timeout(
&mut self,
deadline: Instant,
) -> impl Future<Item = (), Error = ()> {
if self.tx.is_some() {
self.cancel();
}
assert!(self.tx.is_none());

let (tx, rx) = oneshot::channel();
self.tx = Some(tx);

let delay = panic_on_error(Delay::new(deadline));
let rx = panic_on_error(rx);

delay.select(rx).then(|_| Ok(()))
}
}
@@ -12,6 +12,7 @@ use crate::errors::DenoError;
use crate::errors::DenoResult;
use crate::errors::RustOrJsError;
use crate::flags;
use crate::global_timer::GlobalTimer;
use crate::isolate_init::IsolateInit;
use crate::js_errors::apply_source_map;
use crate::libdeno;
@@ -35,8 +36,6 @@ use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::{Once, ONCE_INIT};
use std::time::Duration;
use std::time::Instant;
use tokio;

// Buf represents a byte array returned from a "Op".
@@ -62,7 +61,6 @@ pub struct Isolate {
rx: mpsc::Receiver<(usize, Buf)>,
tx: mpsc::Sender<(usize, Buf)>,
ntasks: Cell<i32>,
timeout_due: Cell<Option<Instant>>,
pub modules: RefCell<Modules>,
pub state: Arc<IsolateState>,
pub permissions: Arc<DenoPermissions>,
@@ -83,6 +81,7 @@ pub struct IsolateState {
pub flags: flags::DenoFlags,
pub metrics: Metrics,
pub worker_channels: Option<Mutex<WorkerChannels>>,
pub global_timer: Mutex<GlobalTimer>,
}

impl IsolateState {
@@ -100,6 +99,7 @@ impl IsolateState {
flags,
metrics: Metrics::default(),
worker_channels: worker_channels.map(Mutex::new),
global_timer: Mutex::new(GlobalTimer::new()),
}
}

@@ -194,7 +194,6 @@ impl Isolate {
rx,
tx,
ntasks: Cell::new(0),
timeout_due: Cell::new(None),
modules: RefCell::new(Modules::new()),
state,
permissions: Arc::new(permissions),
@@ -222,16 +221,6 @@ impl Isolate {
&*ptr
}

#[inline]
pub fn get_timeout_due(&self) -> Option<Instant> {
self.timeout_due.clone().into_inner()
}

#[inline]
pub fn set_timeout_due(&self, inst: Option<Instant>) {
self.timeout_due.set(inst);
}

#[inline]
pub fn check_read(&self, filename: &str) -> DenoResult<()> {
self.permissions.check_read(filename)
@@ -463,10 +452,9 @@ impl Isolate {
pub fn event_loop(&self) -> Result<(), JSError> {
// Main thread event loop.
while !self.is_idle() {
match recv_deadline(&self.rx, self.get_timeout_due()) {
match self.rx.recv() {
Ok((zero_copy_id, buf)) => self.complete_op(zero_copy_id, buf),
Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(),
Err(e) => panic!("recv_deadline() failed: {:?}", e),
Err(e) => panic!("Isolate.rx.recv() failed: {:?}", e),
}
self.check_promise_errors();
if let Some(err) = self.last_exception() {
@@ -495,7 +483,7 @@ impl Isolate {

#[inline]
fn is_idle(&self) -> bool {
self.ntasks.get() == 0 && self.get_timeout_due().is_none()
self.ntasks.get() == 0
}
}

@@ -596,28 +584,6 @@ extern "C" fn pre_dispatch(
}
}

fn recv_deadline<T>(
rx: &mpsc::Receiver<T>,
maybe_due: Option<Instant>,
) -> Result<T, mpsc::RecvTimeoutError> {
match maybe_due {
None => rx.recv().map_err(|e| e.into()),
Some(due) => {
// Subtracting two Instants causes a panic if the resulting duration
// would become negative. Avoid this.
let now = Instant::now();
let timeout = if due > now {
due - now
} else {
Duration::new(0, 0)
};
// TODO: use recv_deadline() instead of recv_timeout() when this
// feature becomes stable/available.
rx.recv_timeout(timeout)
}
}
}

#[cfg(test)]
mod tests {
use super::*;
@@ -14,6 +14,7 @@ pub mod deno_dir;
pub mod errors;
pub mod flags;
mod fs;
mod global_timer;
mod http_body;
mod http_util;
pub mod isolate;
@@ -16,6 +16,9 @@ union Any {
FetchRes,
FormatError,
FormatErrorRes,
GlobalTimer,
GlobalTimerRes,
GlobalTimerStop,
IsTTY,
IsTTYRes,
Listen,
@@ -55,7 +58,6 @@ union Any {
RunStatusRes,
Seek,
SetEnv,
SetTimeout,
Shutdown,
Start,
StartRes,
@@ -210,10 +212,14 @@ table Chdir {
directory: string;
}

table SetTimeout {
table GlobalTimer {
timeout: int;
}

table GlobalTimerRes { }

table GlobalTimerStop { }

table Exit {
code: int;
}
Oops, something went wrong.

0 comments on commit 58cc69f

Please sign in to comment.
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.