Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dist/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ MIT licenses
The following components are provided under the MIT License. See project link for details.
The text of each license is also included at licenses/LICENSE-[project].txt.

on-finished 2.3.0: https://github.com/jshttp/on-finished, MIT
uuid 8.1.0: https://github.com/uuidjs/uuid, MIT
winston 3.2.1: https://github.com/winstonjs/winston, MIT

Expand Down
17 changes: 5 additions & 12 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
"@types/google-protobuf": "^3.7.2",
"@types/jest": "^26.0.15",
"@types/node": "^14.0.11",
"@types/on-finished": "^2.3.1",
"@types/semver": "^7.2.0",
"@types/uuid": "^8.0.0",
"axios": "^0.21.0",
Expand All @@ -61,7 +60,6 @@
"dependencies": {
"google-protobuf": "^3.14.0",
"grpc": "^1.10.1",
"on-finished": "^2.3.0",
"semver": "^7.3.2",
"tslib": "^2.0.3",
"uuid": "^8.1.0",
Expand Down
4 changes: 2 additions & 2 deletions src/core/PluginInstaller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ while (topModule.parent) {

export default class PluginInstaller {
private readonly pluginDir: string;
private readonly require: (name: string) => any = topModule.require.bind(topModule);
readonly require: (name: string) => any = topModule.require.bind(topModule);
private readonly resolve = (request: string) => (module.constructor as any)._resolveFilename(request, topModule);

constructor() {
Expand Down Expand Up @@ -90,7 +90,7 @@ export default class PluginInstaller {

logger.info(`Installing plugin ${plugin.module} ${plugin.versions}`);

plugin.install();
plugin.install(this);
} catch (e) {
if (plugin) {
logger.error(`Error installing plugin ${plugin.module} ${plugin.versions}`);
Expand Down
4 changes: 3 additions & 1 deletion src/core/SwPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
*
*/

import PluginInstaller from './PluginInstaller';

export default interface SwPlugin {
readonly module: string;
readonly versions: string;

install(): void;
install(installer: PluginInstaller): void;
}
23 changes: 12 additions & 11 deletions src/plugins/AxiosPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,23 @@ import Span from '../trace/span/Span';
import Tag from '../Tag';
import { SpanLayer } from '../proto/language-agent/Tracing_pb';
import { createLogger } from '../logging';
import PluginInstaller from '../core/PluginInstaller';

const logger = createLogger(__filename);

class AxiosPlugin implements SwPlugin {
readonly module = 'axios';
readonly versions = '*';
axios = require('axios').default;

install(): void {
install(installer: PluginInstaller): void {
if (logger.isDebugEnabled()) {
logger.debug('installing axios plugin');
}
this.interceptClientRequest();
const axios = installer.require('axios').default;
this.interceptClientRequest(axios);
}

private interceptClientRequest() {
private interceptClientRequest(axios: any) {
const copyStatusAndStop = (span: Span, response: any) => {
if (response) {
if (response.status) {
Expand All @@ -54,9 +55,9 @@ class AxiosPlugin implements SwPlugin {
span.stop();
};

this.axios.interceptors.request.use(
axios.interceptors.request.use(
(config: any) => {
config.span.resync();
// config.span.resync(); // TODO: fix this https://github.com/apache/skywalking-nodejs/pull/20#issuecomment-753323425

(config.span as Span).inject().items.forEach((item) => {
config.headers.common[item.key] = item.value;
Expand All @@ -73,7 +74,7 @@ class AxiosPlugin implements SwPlugin {
},
);

this.axios.interceptors.response.use(
axios.interceptors.response.use(
(response: any) => {
copyStatusAndStop(response.config.span, response);

Expand All @@ -89,18 +90,18 @@ class AxiosPlugin implements SwPlugin {
},
);

const _request = this.axios.Axios.prototype.request;
const _request = axios.Axios.prototype.request;

this.axios.Axios.prototype.request = function(config: any) {
axios.Axios.prototype.request = function(config: any) {
const { host, pathname: operation } = new URL(config.url); // TODO: this may throw invalid URL
const span = ContextManager.current.newExitSpan(operation, host).start();

try {
span.component = Component.AXIOS; // TODO: add Component.AXIOS (to main Skywalking project)
span.component = Component.AXIOS;
span.layer = SpanLayer.HTTP;
span.peer = host;
span.tag(Tag.httpURL(host + operation));
span.async();
// span.async(); TODO: fix this https://github.com/apache/skywalking-nodejs/pull/20#issuecomment-753323425

return _request.call(this, { ...config, span });
} catch (e) {
Expand Down
12 changes: 7 additions & 5 deletions src/plugins/ExpressPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ import { Component } from '../trace/Component';
import Tag from '../Tag';
import { SpanLayer } from '../proto/language-agent/Tracing_pb';
import { ContextCarrier } from '../trace/context/ContextCarrier';
import onFinished from 'on-finished';
import PluginInstaller from '../core/PluginInstaller';

class ExpressPlugin implements SwPlugin {
readonly module = 'express';
readonly versions = '*';

install(): void {
this.interceptServerRequest();
install(installer: PluginInstaller): void {
this.interceptServerRequest(installer);
}

private interceptServerRequest() {
const router = require('express/lib/router');
private interceptServerRequest(installer: PluginInstaller) {
const router = installer.require('express/lib/router');
const onFinished = installer.require('on-finished');
const _handle = router.handle;

router.handle = function(req: IncomingMessage, res: ServerResponse, out: any) {
Expand Down Expand Up @@ -81,6 +82,7 @@ class ExpressPlugin implements SwPlugin {
}
out.call(this, err);
stopped -= 1; // skip first stop attempt, make sure stop executes once status code and message is set
onFinished(res, stopIfNotStopped); // this must run after any handlers deferred in 'out'
});
onFinished(res, stopIfNotStopped); // this must run after any handlers deferred in 'out'

Expand Down
82 changes: 34 additions & 48 deletions src/plugins/HttpPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import Tag from '../Tag';
import ExitSpan from '../trace/span/ExitSpan';
import { SpanLayer } from '../proto/language-agent/Tracing_pb';
import { ContextCarrier } from '../trace/context/ContextCarrier';
import onFinished from 'on-finished';

class HttpPlugin implements SwPlugin {
readonly module = 'http';
Expand All @@ -45,7 +44,7 @@ class HttpPlugin implements SwPlugin {
private interceptClientRequest(module: any) {
const _request = module.request;

module.request = function() {
module.request = function () {
const url: URL | string | RequestOptions = arguments[0];

const { host, pathname } =
Expand All @@ -59,21 +58,11 @@ class HttpPlugin implements SwPlugin {
};
const operation = pathname.replace(/\?.*$/g, '');

const span: ExitSpan = ContextManager.current.newExitSpan(operation, host).start() as ExitSpan;

let stopped = 0; // compensating if request aborted right after creation 'close' is not emitted
const stopIfNotStopped = (err?: Error | null) => {
if (stopped++) {
return;
}
span.stop();
if (err) {
span.error(err);
}
};
const stopIfNotStopped = () => !stopped++ ? span.stop() : null; // make sure we stop only once
const span: ExitSpan = ContextManager.current.newExitSpan(operation, host).start() as ExitSpan;

try {
// TODO: these should go into span class
if (span.depth === 1) { // only set HTTP if this span is not overridden by a higher level one
span.component = Component.HTTP;
span.layer = SpanLayer.HTTP;
Expand All @@ -86,13 +75,17 @@ class HttpPlugin implements SwPlugin {
span.tag(Tag.httpURL(httpURL));
}

const req: ClientRequest = _request.apply(this, arguments);
const request: ClientRequest = _request.apply(this, arguments);

span.inject().items.forEach((item) => {
req.setHeader(item.key, item.value);
request.setHeader(item.key, item.value);
});

req.prependListener('response', (res) => {
request.on('close', stopIfNotStopped);
request.on('abort', () => (span.errored = true, stopIfNotStopped()));
request.on('error', (err) => (span.error(err), stopIfNotStopped()));

request.prependListener('response', (res) => {
span.resync();
span.tag(Tag.httpStatusCode(res.statusCode));
if (res.statusCode && res.statusCode >= 400) {
Expand All @@ -102,14 +95,17 @@ class HttpPlugin implements SwPlugin {
span.tag(Tag.httpStatusMsg(res.statusMessage));
}
});
onFinished(req, stopIfNotStopped);

span.async();

return req;
return request;

} catch (e) {
stopIfNotStopped(e);
if (!stopped) { // don't want to set error if exception occurs after clean close
span.error(e);
stopIfNotStopped();
}

throw e;
}
};
Expand All @@ -118,7 +114,7 @@ class HttpPlugin implements SwPlugin {
private interceptServerRequest(module: any) {
const _emit = module.Server.prototype.emit;

module.Server.prototype.emit = function() {
module.Server.prototype.emit = function () {
if (arguments[0] !== 'request') {
return _emit.apply(this, arguments);
}
Expand All @@ -134,37 +130,27 @@ class HttpPlugin implements SwPlugin {

const carrier = ContextCarrier.from(headersMap);
const operation = (req.url || '/').replace(/\?.*/g, '');
const span = ContextManager.current.newEntrySpan(operation, carrier).start();
const span = ContextManager.current.newEntrySpan(operation, carrier);

span.component = Component.HTTP_SERVER;
span.layer = SpanLayer.HTTP;
span.peer = req.headers.host || '';
span.tag(Tag.httpURL(span.peer + req.url));
return ContextManager.withSpan(span, (self, args) => {
span.component = Component.HTTP_SERVER;
span.layer = SpanLayer.HTTP;
span.peer = req.headers.host || '';
span.tag(Tag.httpURL(span.peer + req.url));

let stopped = 0;
const stopIfNotStopped = (err: Error | null) => {
if (!stopped++) {
span.stop();
span.tag(Tag.httpStatusCode(res.statusCode));
if (res.statusCode && res.statusCode >= 400) {
span.errored = true;
}
if (err) {
span.error(err);
}
if (res.statusMessage) {
span.tag(Tag.httpStatusMsg(res.statusMessage));
}
const ret = _emit.apply(self, args);

span.tag(Tag.httpStatusCode(res.statusCode));
if (res.statusCode && res.statusCode >= 400) {
span.errored = true;
}
if (res.statusMessage) {
span.tag(Tag.httpStatusMsg(res.statusMessage));
}
};
onFinished(res, stopIfNotStopped);

try {
return _emit.apply(this, arguments);
} catch (e) {
stopIfNotStopped(e);
throw e;
}
return ret;

}, this, arguments);
};
}
}
Expand Down
Loading