Skip to content

Commit

Permalink
feat(async): fix #740, use async hooks to wrap nodejs async api
Browse files Browse the repository at this point in the history
  • Loading branch information
JiaLiPassion committed May 28, 2017
1 parent b92b6e3 commit 8d64c10
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 56 deletions.
24 changes: 24 additions & 0 deletions lib/node/node_async_check.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/**
* @license
* Copyright Google Inc. All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/

Zone.__load_patch('node_async_check', (global: any, Zone: ZoneType, api: _ZonePrivate) => {
try {
require('async_hooks');
(process as any)._rawDebug('load async_hooks');
// nodejs 8.x with async_hooks support.
// disable original Zone patch
global['__Zone_disable_ZoneAwarePromise'] = true;
global['__Zone_disable_node_timers'] = true;
global['__Zone_disable_nextTick'] = true;
global['__Zone_disable_handleUnhandledPromiseRejection'] = true;
global['__Zone_disable_crypto'] = true;
global['__Zone_disable_fs'] = true;
} catch (err) {
global['__Zone_disable_node_async_hooks'] = true;
}
});
91 changes: 91 additions & 0 deletions lib/node/node_asynchooks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* @license
* Copyright Google Inc. All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/

/**
* patch nodejs async operations (timer, promise, net...) with
* nodejs async_hooks
*/
Zone.__load_patch('node_async_hooks', (global: any, Zone: ZoneType, api: _ZonePrivate) => {
let async_hooks;
const BEFORE_RUN_TASK_STATUS = 'BEFORE_RUN_TASK_STATUS';

async_hooks = require('async_hooks');

const idTaskMap: {[key: number]: Task} = (Zone as any)[Zone.__symbol__('nodeTasks')] = {};

const noop = function() {};

function init(id: number, provider: string, parentId: number, parentHandle: any) {
// @JiaLiPassion, check which tasks are microTask or macroTask
//(process as any)._rawDebug('init hook', id , provider);
if (provider === 'TIMERWRAP') {
return;
}
// create microTask if 'PROMISE'
if (provider === 'PROMISE') {
const task = idTaskMap[id] = Zone.current.scheduleMicroTask(provider, noop, null, noop);
//(process as any)._rawDebug('after init', id, 'status', task.state);
return;
}
// create macroTask in other cases
if (provider === 'Timeout' || provider === 'Immediate' || provider === 'FSREQWRAP') {
idTaskMap[id] = Zone.current.scheduleMacroTask(provider, noop, null, noop, noop);
}
}

function before(id: number) {
//(process as any)._rawDebug('before hook', id);
// call Zone.beforeRunTask
const task: Task = idTaskMap[id];
if (!task) {
return;
}
(task as any)[Zone.__symbol__(BEFORE_RUN_TASK_STATUS)] = api.beforeRunTask(task.zone, task);
}

function after(id: number) {
//(process as any)._rawDebug('after hook', id);
const task: Task = idTaskMap[id];
if (!task) {
return;
}
const beforeRunTask: BeforeRunTaskStatus = (task as any)[Zone.__symbol__(BEFORE_RUN_TASK_STATUS)];
if (beforeRunTask) {
return;
}
(task as any)[Zone.__symbol__(BEFORE_RUN_TASK_STATUS)] = null;
api.afterRunTask(task.zone, beforeRunTask, task);
}

function destroy(id: number) {
// try to cancel the task if is not canceled
const task: Task = idTaskMap[id];
if (task && task.state === 'scheduled') {
task.zone.cancelTask(task);
}
idTaskMap[id] = null;
}

process.on('uncaughtException', (err: any) => {
const task = Zone.currentTask;
if (task) {
const beforeRunTask: BeforeRunTaskStatus = (task as any)[Zone.__symbol__(BEFORE_RUN_TASK_STATUS)];
if (beforeRunTask) {
if ((task.zone as any)._zoneDelegate.handleError(Zone.current, err)) {
throw err;
}
}
}
});

global[Zone.__symbol__('setTimeout')] = global.setTimeout;
global[Zone.__symbol__('setInterval')] = global.setInterval;
global[Zone.__symbol__('setImmediate')] = global.setImmediate;

async_hooks.createHook({ init, before, after, destroy }).enable();
});
2 changes: 2 additions & 0 deletions lib/node/rollup-main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
*/

import '../zone';
import '../node_async_check';
import '../node_asynchooks';
import '../common/promise';
import '../common/to-string';
import './node';
69 changes: 49 additions & 20 deletions lib/zone.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,12 @@ interface ZoneType {
/** @internal */
type _PatchFn = (global: Window, Zone: ZoneType, api: _ZonePrivate) => void;

/** @internal */
interface BeforeRunTaskStatus {
reEntryGuard: boolean;
previousTask: Task;
}

/** @internal */
interface _ZonePrivate {
currentZoneFrame: () => _ZoneFrame;
Expand All @@ -323,6 +329,8 @@ interface _ZonePrivate {
patchEventTargetMethods:
(obj: any, addFnName?: string, removeFnName?: string, metaCreator?: any) => boolean;
patchOnProperties: (obj: any, properties: string[]) => void;
beforeRunTask: (zone: Zone, task: Task) => BeforeRunTaskStatus;
afterRunTask: (zone: Zone, beforeRunTaskStatus: BeforeRunTaskStatus, task: Task) => void;
}

/** @internal */
Expand Down Expand Up @@ -757,8 +765,7 @@ const Zone: ZoneType = (function(global: any) {
}
}


runTask(task: Task, applyThis?: any, applyArgs?: any): any {
beforeRunTask(task: Task) {
if (task.zone != this) {
throw new Error(
'A task can only be run in the zone of creation! (Creation: ' +
Expand All @@ -772,7 +779,7 @@ const Zone: ZoneType = (function(global: any) {
// typescript compiler will complain below
const isNotScheduled = task.state === notScheduled;
if (isNotScheduled && task.type === eventTask) {
return;
return null;
}

const reEntryGuard = task.state != running;
Expand All @@ -781,6 +788,33 @@ const Zone: ZoneType = (function(global: any) {
const previousTask = _currentTask;
_currentTask = task;
_currentZoneFrame = {parent: _currentZoneFrame, zone: this};
//(process as any)._rawDebug('currentFrame increase ', _currentZoneFrame && _currentZoneFrame.zone.name, task.source);
return {
reEntryGuard: reEntryGuard,
previousTask: previousTask
}
}

afterRunTask(beforeRunTaskStatus: BeforeRunTaskStatus, task: Task) {
// if the task's state is notScheduled or unknown, then it has already been cancelled
// we should not reset the state to scheduled
if (task.state !== notScheduled && task.state !== unknown) {
if (task.type == eventTask || (task.data && task.data.isPeriodic)) {
beforeRunTaskStatus.reEntryGuard && (task as ZoneTask<any>)._transitionTo(scheduled, running);
} else {
task.runCount = 0;
this._updateTaskCount(task as ZoneTask<any>, -1);
beforeRunTaskStatus.reEntryGuard &&
(task as ZoneTask<any>)._transitionTo(notScheduled, running, notScheduled);
}
}
_currentZoneFrame = _currentZoneFrame.parent;
//(process as any)._rawDebug('currentFrame decrease ', _currentZoneFrame && _currentZoneFrame.zone.name, task.source);
_currentTask = beforeRunTaskStatus.previousTask;
}

runTask(task: Task, applyThis?: any, applyArgs?: any): any {
const beforeRunTaskStatus = this.beforeRunTask(task);
try {
if (task.type == macroTask && task.data && !task.data.isPeriodic) {
task.cancelFn = null;
Expand All @@ -793,20 +827,7 @@ const Zone: ZoneType = (function(global: any) {
}
}
} finally {
// if the task's state is notScheduled or unknown, then it has already been cancelled
// we should not reset the state to scheduled
if (task.state !== notScheduled && task.state !== unknown) {
if (task.type == eventTask || (task.data && task.data.isPeriodic)) {
reEntryGuard && (task as ZoneTask<any>)._transitionTo(scheduled, running);
} else {
task.runCount = 0;
this._updateTaskCount(task as ZoneTask<any>, -1);
reEntryGuard &&
(task as ZoneTask<any>)._transitionTo(notScheduled, running, notScheduled);
}
}
_currentZoneFrame = _currentZoneFrame.parent;
_currentTask = previousTask;
this.afterRunTask(beforeRunTaskStatus, task);
}
}

Expand Down Expand Up @@ -1241,10 +1262,13 @@ const Zone: ZoneType = (function(global: any) {
// we must bootstrap the initial task creation by manually scheduling the drain
if (_numberOfNestedTaskFrames === 0 && _microTaskQueue.length === 0) {
// We are not running in Task, so we need to kickstart the microtask queue.
// @JiaLiPassion, use native promise if async_hooks is available
if (global[symbolPromise]) {
global[symbolPromise].resolve(0)[symbolThen](drainMicroTaskQueue);
} else {
} else if (global[symbolSetTimeout]) {
global[symbolSetTimeout](drainMicroTaskQueue, 0);
} else {
Promise.resolve(0).then(drainMicroTaskQueue);
}
}
task && _microTaskQueue.push(task);
Expand Down Expand Up @@ -1294,7 +1318,13 @@ const Zone: ZoneType = (function(global: any) {
scheduleMicroTask: scheduleMicroTask,
showUncaughtError: () => !(Zone as any)[__symbol__('ignoreConsoleErrorUncaughtError')],
patchEventTargetMethods: () => false,
patchOnProperties: noop
patchOnProperties: noop,
beforeRunTask: (zone: Zone, task: Task) => {
return zone.beforeRunTask(task);
},
afterRunTask: (zone: Zone, beforeRunTaskStatus: BeforeRunTaskStatus, task: Task) => {
return zone.afterRunTask(beforeRunTaskStatus, task);
}
};
let _currentZoneFrame: _ZoneFrame = {parent: null, zone: new Zone(null, null)};
let _currentTask: Task = null;
Expand All @@ -1306,7 +1336,6 @@ const Zone: ZoneType = (function(global: any) {
return '__zone_symbol__' + name;
}


performanceMeasure('Zone', 'Zone');
return global['Zone'] = Zone;
})(typeof window !== 'undefined' && window || typeof self !== 'undefined' && self || global);
Loading

0 comments on commit 8d64c10

Please sign in to comment.