Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ config.collectorApi = process.env.TRACE_COLLECTOR_API_URL ||
'https://trace-collector-api.risingstack.com';
config.collectorApiSampleEndpoint = '/service/sample';
config.collectorApiServiceEndpoint = '/service';
config.collectorApiMetricsEndpoint = '/service/%s/metrics';
config.collectorApiApmMetricsEndpoint = '/service/%s/apm-metrics';
config.collectorApiRpmMetricsEndpoint = '/service/%s/rpm-metrics';

config.appName = process.env.TRACE_APP_NAME;
config.configPath = process.env.TRACE_CONFIG_PATH;
Expand Down
1 change: 1 addition & 0 deletions lib/events/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Events.prototype.USER_SENT_EVENT = 'user_sent_event';
Events.prototype.TRACE_SERVICE_KEY = 'trace_service_key';
Events.prototype.HTTP_TRANSACTION = 'http_transaction';
Events.prototype.HTTP_TRANSACTION_STACK_TRACE = 'http_transaction_stack_trace';
Events.prototype.RPM_METRICS = 'rpm_metrics';
Events.prototype.APM_METRICS = 'apm_metrics';

module.exports.create = create;
27 changes: 25 additions & 2 deletions lib/providers/httpTransaction/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ function HttpTransaction(eventBus, options) {
this.sampleSize = options.sampleSize;
this.partials = {};
this.traces = [];
this.rpmMetrics = {};

this.on(HttpTransaction.CLIENT_RECV, this.onClientReceive);
this.on(HttpTransaction.CLIENT_SEND, this.onClientSend);
Expand All @@ -42,14 +43,18 @@ util.inherits(HttpTransaction, events.EventEmitter);
HttpTransaction.prototype.startCollecting = function () {
debug('HttpTransaction started collecting');
var _this = this;
this.intervalId = setInterval(function () {
this.transactionIntervalId = setInterval(function () {
_this._send();
}, this.collectInterval);
this.rpmMetricsIntervalId = setInterval(function () {
_this._sendRpm();
}, 60 * 1000);
};

HttpTransaction.prototype.stopCollecting = function () {
debug('HttpTransaction stopped collecting');
clearInterval(this.intervalId);
clearInterval(this.transactionIntervalId);
clearInterval(this.rpmMetricsIntervalId);
};

HttpTransaction.prototype.getService = function () {
Expand Down Expand Up @@ -153,6 +158,12 @@ HttpTransaction.prototype.onServerSend = function (data) {
this.traces.push(trace);
}

if (!this.rpmMetrics[trace.statusCode]) {
this.rpmMetrics[trace.statusCode] = 1;
} else {
this.rpmMetrics[trace.statusCode]++;
}

delete this.partials[data.id];
};

Expand Down Expand Up @@ -191,6 +202,18 @@ HttpTransaction.prototype.report = function (data) {
this.partials[traceId].events.push(dataToSend);
};

HttpTransaction.prototype._sendRpm = function () {
debug('sending rpm metrics to the trace service');
if (Object.keys(this.rpmMetrics).length > 0) {
var dataBag = {};
dataBag.requests = cloneDeep(this.rpmMetrics);
dataBag.timestamp = new Date().toISOString();

this.eventBus.emit(this.eventBus.RPM_METRICS, dataBag);
this.rpmMetrics = {};
}
};

HttpTransaction.prototype._send = function (options) {
options = options || {};
debug('sending logs to the trace service');
Expand Down
58 changes: 50 additions & 8 deletions lib/providers/httpTransaction/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe('The HttpTransaction module', function () {
});

it('can be instantiated', function () {
var httpTransaction = new HttpTransaction.create(eventBus, {});
var httpTransaction = HttpTransaction.create(eventBus, {});
expect(httpTransaction).to.be.ok;
});

Expand All @@ -34,7 +34,7 @@ describe('The HttpTransaction module', function () {
var createNamespace = require('continuation-local-storage').createNamespace;
var session = createNamespace('trace');

var httpTransaction = new HttpTransaction.create(eventBus, {
var httpTransaction = HttpTransaction.create(eventBus, {
service: 'aladdin'
});

Expand All @@ -51,7 +51,7 @@ describe('The HttpTransaction module', function () {
describe('events', function () {

it('stores the "ClientReceive" events w/ `x-span-id`', function () {
var httpTransaction = new HttpTransaction.create(eventBus, {});
var httpTransaction = HttpTransaction.create(eventBus, {});

httpTransaction.setService('aladdin');

Expand Down Expand Up @@ -92,7 +92,7 @@ describe('The HttpTransaction module', function () {
});

it('stores the "ClientSend" events w/ `x-span-id`', function () {
var httpTransaction = new HttpTransaction.create(eventBus, {});
var httpTransaction = HttpTransaction.create(eventBus, {});

httpTransaction.setService('aladdin');

Expand Down Expand Up @@ -133,7 +133,7 @@ describe('The HttpTransaction module', function () {
});

it('stores the "ServerRecieve" events with no parent and timing data', function () {
var httpTransaction = new HttpTransaction.create(eventBus, {});
var httpTransaction = HttpTransaction.create(eventBus, {});

httpTransaction.setService('aladdin');

Expand Down Expand Up @@ -176,7 +176,7 @@ describe('The HttpTransaction module', function () {
});

it('stores the "ServerRecieve" events with parent and timing data', function () {
var httpTransaction = new HttpTransaction.create(eventBus, {});
var httpTransaction = HttpTransaction.create(eventBus, {});

httpTransaction.setService('aladdin');

Expand Down Expand Up @@ -224,7 +224,7 @@ describe('The HttpTransaction module', function () {
});

it('stores the "ServerSend" events when it is not sampled', function () {
var httpTransaction = new HttpTransaction.create(eventBus, {});
var httpTransaction = HttpTransaction.create(eventBus, {});

httpTransaction.setService('aladdin');

Expand Down Expand Up @@ -257,7 +257,7 @@ describe('The HttpTransaction module', function () {
};
var host = 'localhost';

var httpTransaction = new HttpTransaction.create(eventBus, {});
var httpTransaction = HttpTransaction.create(eventBus, {});

httpTransaction.setService(service);

Expand Down Expand Up @@ -285,6 +285,48 @@ describe('The HttpTransaction module', function () {
}]
}]);

expect(httpTransaction.rpmMetrics).to.be.eql({
301: 1
});

done();
});

it('\'ServerSend\' adds to existing rpmMetrics', function (done) {

var time = 12345324953;
var id = '1235';
var service = 'aladdin';
var spanId = 'asdf';
var headers = {
'x-span-id': spanId
};
var host = 'localhost';

var metricsValue = 222;

var httpTransaction = HttpTransaction.create(eventBus, {});

httpTransaction.setService(service);

httpTransaction.sampleRate = 1;
httpTransaction.rpmMetrics = {
301: metricsValue
};

httpTransaction.onServerSend({
id: id,
statusCode: 301,
url: '/fruits/pear',
time: time,
headers: headers,
host: host
});

expect(httpTransaction.rpmMetrics).to.be.eql({
301: metricsValue + 1
});

done();
});
});
Expand Down
78 changes: 36 additions & 42 deletions lib/reporters/trace/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ var package = require('../../../package');

var config = require('../../config');

var COLLECTOR_API_SAMPLE = url.resolve(config.collectorApi, config.collectorApiSampleEndpoint);
var COLLECTOR_API_SERVICE = url.resolve(config.collectorApi, config.collectorApiServiceEndpoint);
var COLLECTOR_API_METRICS = url.resolve(config.collectorApi, config.collectorApiMetricsEndpoint);
var COLLECTOR_API_SAMPLE = url.resolve(config.collectorApi,
config.collectorApiSampleEndpoint);
var COLLECTOR_API_SERVICE = url.resolve(config.collectorApi,
config.collectorApiServiceEndpoint);
var COLLECTOR_API_METRICS = url.resolve(config.collectorApi,
config.collectorApiApmMetricsEndpoint);
var COLLECTOR_API_RPM_METRICS = url.resolve(config.collectorApi,
config.collectorApiRpmMetricsEndpoint);

function TraceReporter (options) {
this.apiKey = options.apiKey || process.env.TRACE_API_KEY;
Expand All @@ -29,45 +34,12 @@ function TraceReporter (options) {
TraceReporter.prototype.setEventBus = function (eventBus) {
this.eventBus = eventBus;
this.eventBus.on(this.eventBus.HTTP_TRANSACTION_STACK_TRACE, this.sendSync.bind(this));
this.eventBus.on(this.eventBus.HTTP_TRANSACTION, this.send.bind(this));
this.eventBus.on(this.eventBus.APM_METRICS, this.sendMetrics.bind(this));
this.eventBus.on(this.eventBus.HTTP_TRANSACTION, this.sendHttpTransactions.bind(this));
this.eventBus.on(this.eventBus.RPM_METRICS, this.sendRpmMetrics.bind(this));
this.eventBus.on(this.eventBus.APM_METRICS, this.sendApmMetrics.bind(this));
this.getService();
};

TraceReporter.prototype.sendMetrics = function (data) {
if (isNaN(this.serviceId)) {
return debug('Service id not present, cannot send metrics');
}

var payload = JSON.stringify(data);
var opts = url.parse(util.format(COLLECTOR_API_METRICS, this.serviceId));

var req = https.request({
hostname: opts.hostname,
port: opts.port,
path: opts.path,
method: 'POST',
headers: {
'Authorization': 'Bearer ' + this.apiKey,
'Content-Type': 'application/json',
'X-Reporter-Version': package.version,
'Content-Length': payload.length
}
}, function (res) {
res.setEncoding('utf8');
res.pipe(bl(function (err) {
if (err) {
return debug('There was an error when connecting to the Trace API', err);
}

debug('Metrics sent successfully');
}));
});

debug('sending metrics to trace servers: ', payload);
req.write(payload);
req.end();
};
// USE THIS WITH CAUTION, IT WILL BE BLOCKING
TraceReporter.prototype.sendSync = function (data) {
debug('sending data to trace servers sync: ', JSON.stringify(data));
Expand All @@ -82,9 +54,8 @@ TraceReporter.prototype.sendSync = function (data) {
});
};

TraceReporter.prototype.send = function (data) {

var opts = url.parse(COLLECTOR_API_SAMPLE);
TraceReporter.prototype._send = function (destinationUrl, data) {
var opts = url.parse(destinationUrl);
var payload = JSON.stringify(data);

var req = https.request({
Expand Down Expand Up @@ -114,6 +85,29 @@ TraceReporter.prototype.send = function (data) {
req.end();
};

TraceReporter.prototype.sendRpmMetrics = function (data) {
if (isNaN(this.serviceId)) {
return debug('Service id not present, cannot send rpm metrics');
}

var url = util.format(COLLECTOR_API_RPM_METRICS, this.serviceId);
this._send(url, data);
};

TraceReporter.prototype.sendApmMetrics = function (data) {
if (isNaN(this.serviceId)) {
return debug('Service id not present, cannot send metrics');
}

var url = util.format(COLLECTOR_API_METRICS, this.serviceId);
this._send(url, data);
};

TraceReporter.prototype.sendHttpTransactions = function (data) {
var url = COLLECTOR_API_SAMPLE;
this._send(url, data);
};

TraceReporter.prototype._getRetryInterval = function() {
var retryInterval = Math.pow(2, this.retryCount) * this.baseRetryInterval;
debug('retrying with %d ms', retryInterval);
Expand Down
32 changes: 31 additions & 1 deletion lib/reporters/trace/index.spec.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
var util = require('util');
var url = require('url');

var TraceReporter = require('./');

var expect = require('chai').expect;
Expand All @@ -11,6 +14,7 @@ var Events = require('../../events');
var collectorApi = config.collectorApi;
var collectorApiServiceEndpoint = config.collectorApiServiceEndpoint;
var collectorApiSampleEndpoint = config.collectorApiSampleEndpoint;
var collectorApiRpmMetricsEndpoint = config.collectorApiRpmMetricsEndpoint;

describe('The Trace reporter module', function () {

Expand Down Expand Up @@ -54,6 +58,32 @@ describe('The Trace reporter module', function () {
traceReporter.sendSync(data);
});

it('can send rpm to HttpTransaction server', function () {
var options = {
appName: 'testName',
apiKey: 'testApiKey'
};

var serviceId = 12;

var data = {
trace: 'very data'
};

var path = util.format(collectorApiRpmMetricsEndpoint, serviceId);
var sendUrl = url.resolve(config.collectorApi, path);

var traceReporter = TraceReporter.create(options);
traceReporter.serviceId = serviceId;

var sendStub = this.sandbox.stub(traceReporter, '_send');

traceReporter.sendRpmMetrics(data);

expect(sendStub).to.have.been.calledOnce;
expect(sendStub).to.have.been.calledWith(sendUrl, data);
});

it('can send data to HttpTransaction server', function () {
var options = {
appName: 'testName',
Expand All @@ -79,7 +109,7 @@ describe('The Trace reporter module', function () {

var traceReporter = TraceReporter.create(options);

traceReporter.send(data);
traceReporter.sendHttpTransactions(data);
apiMock.done();
});

Expand Down