Skip to content

Commit

Permalink
src: refactor timers to remove TimerWrap
Browse files Browse the repository at this point in the history
Refactor Timers to behave more similarly to Immediates by having
a single uv_timer_t handle which is stored on the Environment.

No longer expose timers in a public binding and instead make
it part of the internalBinding.

PR-URL: nodejs#20894
Fixes: nodejs#10154
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Jeremiah Senkpiel <fishrock123@rocketmail.com>
Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com>
Reviewed-By: Gus Caplan <me@gus.host>
  • Loading branch information
apapirovski committed Jun 25, 2018
1 parent 6f63f8d commit 2930bd1
Show file tree
Hide file tree
Showing 36 changed files with 218 additions and 320 deletions.
2 changes: 1 addition & 1 deletion doc/api/async_hooks.md
Expand Up @@ -238,7 +238,7 @@ resource's constructor.
```text
FSEVENTWRAP, FSREQWRAP, GETADDRINFOREQWRAP, GETNAMEINFOREQWRAP, HTTPPARSER,
JSSTREAM, PIPECONNECTWRAP, PIPEWRAP, PROCESSWRAP, QUERYWRAP, SHUTDOWNWRAP,
SIGNALWRAP, STATWATCHER, TCPCONNECTWRAP, TCPSERVER, TCPWRAP, TIMERWRAP, TTYWRAP,
SIGNALWRAP, STATWATCHER, TCPCONNECTWRAP, TCPSERVER, TCPWRAP, TTYWRAP,
UDPSENDWRAP, UDPWRAP, WRITEWRAP, ZLIB, SSLCONNECTION, PBKDF2REQUEST,
RANDOMBYTESREQUEST, TLSWRAP, Timeout, Immediate, TickObject
```
Expand Down
4 changes: 3 additions & 1 deletion lib/internal/timers.js
Expand Up @@ -6,6 +6,7 @@ const {
initHooksExist,
emitInit
} = require('internal/async_hooks');
const { internalBinding } = require('internal/bootstrap/loaders');
// Symbols for storing async id state.
const async_id_symbol = Symbol('asyncId');
const trigger_async_id_symbol = Symbol('triggerId');
Expand All @@ -30,7 +31,8 @@ module.exports = {
kRefed,
initAsyncResource,
setUnrefTimeout,
validateTimerDuration
validateTimerDuration,
getLibuvNow: internalBinding('timers').getLibuvNow,
};

var timers;
Expand Down
64 changes: 25 additions & 39 deletions lib/timers.js
Expand Up @@ -21,10 +21,15 @@

'use strict';

const { internalBinding } = require('internal/bootstrap/loaders');
const {
Timer: TimerWrap,
getLibuvNow,
setupTimers,
} = process.binding('timer_wrap');
scheduleTimer,
toggleTimerRef,
immediateInfo,
toggleImmediateRef
} = internalBinding('timers');
const L = require('internal/linkedlist');
const PriorityQueue = require('internal/priority_queue');
const {
Expand Down Expand Up @@ -53,8 +58,9 @@ const kCount = 0;
const kRefCount = 1;
const kHasOutstanding = 2;

const [immediateInfo, toggleImmediateRef] =
setupTimers(processImmediate, processTimers);
// Call into C++ to assign callbacks that are responsible for processing
// Immediates and TimerLists.
setupTimers(processImmediate, processTimers);

// HOW and WHY the timers implementation works the way it does.
//
Expand Down Expand Up @@ -156,47 +162,38 @@ function setPosition(node, pos) {
node.priorityQueuePosition = pos;
}

let handle = null;
let nextExpiry = Infinity;

let timerListId = Number.MIN_SAFE_INTEGER;
let refCount = 0;

function incRefCount() {
if (refCount++ === 0)
handle.ref();
toggleTimerRef(true);
}

function decRefCount() {
if (--refCount === 0)
handle.unref();
}

function createHandle(refed) {
debug('initial run, creating TimerWrap handle');
handle = new TimerWrap();
if (!refed)
handle.unref();
toggleTimerRef(false);
}

// Schedule or re-schedule a timer.
// The item must have been enroll()'d first.
const active = exports.active = function(item) {
insert(item, true, TimerWrap.now());
insert(item, true, getLibuvNow());
};

// Internal APIs that need timeouts should use `_unrefActive()` instead of
// `active()` so that they do not unnecessarily keep the process open.
exports._unrefActive = function(item) {
insert(item, false, TimerWrap.now());
insert(item, false, getLibuvNow());
};


// The underlying logic for scheduling or re-scheduling a timer.
//
// Appends a timer onto the end of an existing timers list, or creates a new
// TimerWrap backed list if one does not already exist for the specified timeout
// duration.
// list if one does not already exist for the specified timeout duration.
function insert(item, refed, start) {
const msecs = item._idleTimeout;
if (msecs < 0 || msecs === undefined)
Expand All @@ -213,9 +210,7 @@ function insert(item, refed, start) {
queue.insert(list);

if (nextExpiry > expiry) {
if (handle === null)
createHandle(refed);
handle.start(msecs);
scheduleTimer(msecs);
nextExpiry = expiry;
}
}
Expand Down Expand Up @@ -252,32 +247,23 @@ function processTimers(now) {

let list, ran;
while (list = queue.peek()) {
if (list.expiry > now)
break;
if (list.expiry > now) {
nextExpiry = list.expiry;
return refCount > 0 ? nextExpiry : -nextExpiry;
}
if (ran)
runNextTicks();
else
ran = true;
listOnTimeout(list, now);
ran = true;
}

if (refCount > 0)
handle.ref();
else
handle.unref();

if (list !== undefined) {
nextExpiry = list.expiry;
handle.start(Math.max(nextExpiry - TimerWrap.now(), 1));
}

return true;
return 0;
}

function listOnTimeout(list, now) {
const msecs = list.msecs;

debug('timeout callback %d', msecs);
debug('now: %d', now);

var diff, timer;
while (timer = L.peek(list)) {
Expand Down Expand Up @@ -336,7 +322,7 @@ function listOnTimeout(list, now) {
// 4.7) what is in this smaller function.
function tryOnTimeout(timer, start) {
if (start === undefined && timer._repeat)
start = TimerWrap.now();
start = getLibuvNow();
try {
ontimeout(timer);
} finally {
Expand Down Expand Up @@ -474,7 +460,7 @@ function ontimeout(timer) {
}

function rearm(timer, start) {
// // Do not re-arm unenroll'd or closed timers.
// Do not re-arm unenroll'd or closed timers.
if (timer._idleTimeout === -1)
return;

Expand Down
2 changes: 1 addition & 1 deletion node.gyp
Expand Up @@ -371,7 +371,7 @@
'src/stream_pipe.cc',
'src/stream_wrap.cc',
'src/tcp_wrap.cc',
'src/timer_wrap.cc',
'src/timers.cc',
'src/tracing/agent.cc',
'src/tracing/node_trace_buffer.cc',
'src/tracing/node_trace_writer.cc',
Expand Down
1 change: 0 additions & 1 deletion src/async_wrap.h
Expand Up @@ -63,7 +63,6 @@ namespace node {
V(TCPCONNECTWRAP) \
V(TCPSERVERWRAP) \
V(TCPWRAP) \
V(TIMERWRAP) \
V(TTYWRAP) \
V(UDPSENDWRAP) \
V(UDPWRAP) \
Expand Down
8 changes: 8 additions & 0 deletions src/env-inl.h
Expand Up @@ -334,6 +334,14 @@ inline tracing::Agent* Environment::tracing_agent() const {
return tracing_agent_;
}

inline Environment* Environment::from_timer_handle(uv_timer_t* handle) {
return ContainerOf(&Environment::timer_handle_, handle);
}

inline uv_timer_t* Environment::timer_handle() {
return &timer_handle_;
}

inline Environment* Environment::from_immediate_check_handle(
uv_check_t* handle) {
return ContainerOf(&Environment::immediate_check_handle_, handle);
Expand Down
81 changes: 81 additions & 0 deletions src/env.cc
Expand Up @@ -13,6 +13,7 @@
namespace node {

using v8::Context;
using v8::Function;
using v8::FunctionTemplate;
using v8::HandleScope;
using v8::Integer;
Expand All @@ -25,6 +26,7 @@ using v8::StackFrame;
using v8::StackTrace;
using v8::String;
using v8::Symbol;
using v8::TryCatch;
using v8::Value;
using worker::Worker;

Expand Down Expand Up @@ -173,6 +175,9 @@ void Environment::Start(int argc,
HandleScope handle_scope(isolate());
Context::Scope context_scope(context());

CHECK_EQ(0, uv_timer_init(event_loop(), timer_handle()));
uv_unref(reinterpret_cast<uv_handle_t*>(timer_handle()));

uv_check_init(event_loop(), immediate_check_handle());
uv_unref(reinterpret_cast<uv_handle_t*>(immediate_check_handle()));

Expand Down Expand Up @@ -227,6 +232,10 @@ void Environment::RegisterHandleCleanups() {
env->CloseHandle(handle, [](uv_handle_t* handle) {});
};

RegisterHandleCleanup(
reinterpret_cast<uv_handle_t*>(timer_handle()),
close_and_finish,
nullptr);
RegisterHandleCleanup(
reinterpret_cast<uv_handle_t*>(immediate_check_handle()),
close_and_finish,
Expand Down Expand Up @@ -470,6 +479,78 @@ void Environment::RunAndClearNativeImmediates() {
}


void Environment::ScheduleTimer(int64_t duration_ms) {
uv_timer_start(timer_handle(), RunTimers, duration_ms, 0);
}

void Environment::ToggleTimerRef(bool ref) {
if (ref) {
uv_ref(reinterpret_cast<uv_handle_t*>(timer_handle()));
} else {
uv_unref(reinterpret_cast<uv_handle_t*>(timer_handle()));
}
}

void Environment::RunTimers(uv_timer_t* handle) {
Environment* env = Environment::from_timer_handle(handle);

if (!env->can_call_into_js())
return;

HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());

Local<Object> process = env->process_object();
InternalCallbackScope scope(env, process, {0, 0});

Local<Function> cb = env->timers_callback_function();
MaybeLocal<Value> ret;
Local<Value> arg = env->GetNow();
// This code will loop until all currently due timers will process. It is
// impossible for us to end up in an infinite loop due to how the JS-side
// is structured.
do {
TryCatch try_catch(env->isolate());
try_catch.SetVerbose(true);
ret = cb->Call(env->context(), process, 1, &arg);
} while (ret.IsEmpty() && env->can_call_into_js());

// NOTE(apapirovski): If it ever becomes possibble that `call_into_js` above
// is reset back to `true` after being previously set to `false` then this
// code becomes invalid and needs to be rewritten. Otherwise catastrophic
// timers corruption will occurr and all timers behaviour will become
// entirely unpredictable.
if (ret.IsEmpty())
return;

// To allow for less JS-C++ boundary crossing, the value returned from JS
// serves a few purposes:
// 1. If it's 0, no more timers exist and the handle should be unrefed
// 2. If it's > 0, the value represents the next timer's expiry and there
// is at least one timer remaining that is refed.
// 3. If it's < 0, the absolute value represents the next timer's expiry
// and there are no timers that are refed.
int64_t expiry_ms =
ret.ToLocalChecked()->IntegerValue(env->context()).FromJust();

uv_handle_t* h = reinterpret_cast<uv_handle_t*>(handle);

if (expiry_ms != 0) {
int64_t duration_ms =
llabs(expiry_ms) - (uv_now(env->event_loop()) - env->timer_base());

env->ScheduleTimer(duration_ms > 0 ? duration_ms : 1);

if (expiry_ms > 0)
uv_ref(h);
else
uv_unref(h);
} else {
uv_unref(h);
}
}


void Environment::CheckImmediate(uv_check_t* handle) {
Environment* env = Environment::from_immediate_check_handle(handle);

Expand Down
8 changes: 8 additions & 0 deletions src/env.h
Expand Up @@ -628,6 +628,9 @@ class Environment {
inline uv_loop_t* event_loop() const;
inline uint32_t watched_providers() const;

static inline Environment* from_timer_handle(uv_timer_t* handle);
inline uv_timer_t* timer_handle();

static inline Environment* from_immediate_check_handle(uv_check_t* handle);
inline uv_check_t* immediate_check_handle();
inline uv_idle_t* immediate_idle_handle();
Expand Down Expand Up @@ -840,6 +843,8 @@ class Environment {
static inline Environment* ForAsyncHooks(AsyncHooks* hooks);

v8::Local<v8::Value> GetNow();
void ScheduleTimer(int64_t duration);
void ToggleTimerRef(bool ref);

inline void AddCleanupHook(void (*fn)(void*), void* arg);
inline void RemoveCleanupHook(void (*fn)(void*), void* arg);
Expand All @@ -857,6 +862,7 @@ class Environment {
v8::Isolate* const isolate_;
IsolateData* const isolate_data_;
tracing::Agent* const tracing_agent_;
uv_timer_t timer_handle_;
uv_check_t immediate_check_handle_;
uv_idle_t immediate_idle_handle_;
uv_prepare_t idle_prepare_handle_;
Expand Down Expand Up @@ -919,6 +925,8 @@ class Environment {

worker::Worker* worker_context_ = nullptr;

static void RunTimers(uv_timer_t* handle);

struct ExitCallback {
void (*cb_)(void* arg);
void* arg_;
Expand Down
2 changes: 1 addition & 1 deletion src/node_internals.h
Expand Up @@ -129,7 +129,7 @@ struct sockaddr;
V(string_decoder) \
V(symbols) \
V(tcp_wrap) \
V(timer_wrap) \
V(timers) \
V(trace_events) \
V(tty_wrap) \
V(types) \
Expand Down

0 comments on commit 2930bd1

Please sign in to comment.