Skip to content

Commit

Permalink
Add some collector classes for objects that get registered in a Colle…
Browse files Browse the repository at this point in the history
…ctorSet (#19098)

* Add some collector classes for objects that get registered in a CollectorSet

* comment cleanup

* don't pass an inline-defined logger to collectorSet

* add a helper logger function so collector has access to logger at construction
  • Loading branch information
tsullivan committed May 18, 2018
1 parent fcec107 commit a7c3255
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import { identity, noop } from 'lodash';
import sinon from 'sinon';
import expect from 'expect.js';
import { Collector } from '../collector';
import { CollectorSet } from '../collector_set';

const DEBUG_LOG = [ 'debug', 'monitoring-ui', 'kibana-monitoring' ];
Expand All @@ -17,52 +18,74 @@ const CHECK_DELAY = 100; // can be lower than COLLECTOR_INTERVAL because the col

describe('CollectorSet', () => {
describe('registers a collector set and runs lifecycle events', () => {
let log;
let server;
let init;
let cleanup;
let fetch;
beforeEach(() => {
log = sinon.spy();
server = {
log: sinon.spy()
};
init = noop;
cleanup = noop;
fetch = noop;
});

it('should throw an error if non-Collector type of object is registered', () => {
const collectors = new CollectorSet(server, {
interval: COLLECTOR_INTERVAL,
combineTypes: identity,
onPayload: identity
});

const registerPojo = () => {
collectors.register({
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
});
};

expect(registerPojo).to.throwException(({ message }) => {
expect(message).to.be('CollectorSet can only have Collector instances registered');
});
});

it('should skip bulk upload if payload is empty', (done) => {
const collectors = new CollectorSet({
const collectors = new CollectorSet(server, {
interval: COLLECTOR_INTERVAL,
logger: log,
combineTypes: identity,
onPayload: identity
});

collectors.register({
collectors.register(new Collector(server, {
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
});
}));

collectors.start();

// allow interval to tick a few times
setTimeout(() => {
collectors.cleanup();

expect(log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Initializing type_collector_test collector')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Fetching data from type_collector_test collector')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Skipping bulk uploading of an empty stats payload')).to.be(true); // proof of skip
expect(log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Running type_collector_test cleanup')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Initializing type_collector_test collector')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Fetching data from type_collector_test collector')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Skipping bulk uploading of an empty stats payload')).to.be(true); // proof of skip
expect(server.log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Running type_collector_test cleanup')).to.be(true);

done(); // for async exit
}, CHECK_DELAY);
});

it('should run the bulk upload handler', (done) => {
const log = sinon.spy();
const combineTypes = sinon.spy(data => {
return [
data[0][0],
Expand All @@ -71,34 +94,33 @@ describe('CollectorSet', () => {
});
const onPayload = sinon.spy();

const collectors = new CollectorSet({
const collectors = new CollectorSet(server, {
interval: COLLECTOR_INTERVAL,
logger: log,
combineTypes,
onPayload
});

fetch = () => ({ testFetch: true });
collectors.register({
collectors.register(new Collector(server, {
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
});
}));

collectors.start();

// allow interval to tick a few times
setTimeout(() => {
collectors.cleanup();

expect(log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Initializing type_collector_test collector')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Fetching data from type_collector_test collector')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Uploading bulk stats payload to the local cluster')).to.be(true);
expect(log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Running type_collector_test cleanup')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Initializing type_collector_test collector')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Fetching data from type_collector_test collector')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Uploading bulk stats payload to the local cluster')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Running type_collector_test cleanup')).to.be(true);

// un-flattened
expect(combineTypes.getCall(0).args[0]).to.eql(
Expand All @@ -115,29 +137,28 @@ describe('CollectorSet', () => {
});

it('should log the info-level status of stopping and restarting', (done) => {
const collectors = new CollectorSet({
const collectors = new CollectorSet(server, {
interval: COLLECTOR_INTERVAL,
logger: log,
combineTypes: identity,
onPayload: identity
});

collectors.register({
collectors.register(new Collector(server, {
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
});
}));

collectors.start();
expect(log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);

collectors.cleanup();
expect(log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);

collectors.start();
expect(log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);

// exit
collectors.cleanup();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { getCollectorLogger } from '../lib';

export class Collector {
/*
* @param {Object} server - server object
* @param {String} properties.type - property name as the key for the data
* @param {Function} properties.init (optional) - initialization function
* @param {Function} properties.fetch - function to query data
* @param {Function} properties.cleanup (optional) - cleanup function
* @param {Boolean} properties.fetchAfterInit (optional) - if collector should fetch immediately after init
*/
constructor(server, { type, init, fetch, cleanup, fetchAfterInit }) {
this.type = type;
this.init = init;
this.fetch = fetch;
this.cleanup = cleanup;
this.fetchAfterInit = fetchAfterInit;

this.log = getCollectorLogger(server);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
*/

import { flatten, isEmpty } from 'lodash';
import { LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG } from '../../../common/constants';
import Promise from 'bluebird';

const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG];
import { getCollectorLogger } from '../lib';
import { Collector } from './collector';
import { UsageCollector } from './usage_collector';

/*
* A collector object has types registered into it with the register(type)
Expand All @@ -18,47 +18,41 @@ const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG];
export class CollectorSet {

/*
* @param options.interval {Number} in milliseconds
* @param options.logger {Function}
* @param options.combineTypes {Function}
* @param options.onPayload {Function}
* @param {Object} server - server object
* @param {Number} options.interval - in milliseconds
* @param {Function} options.combineTypes
* @param {Function} options.onPayload
*/
constructor({ interval, logger, combineTypes, onPayload }) {
constructor(server, { interval, combineTypes, onPayload }) {
this._collectors = [];
this._timer = null;

if (typeof interval !== 'number') {
throw new Error('interval number of milliseconds is required');
}
if (typeof logger !== 'function') {
throw new Error('Logger function is required');
}
if (typeof combineTypes !== 'function') {
throw new Error('combineTypes function is required');
}
if (typeof onPayload !== 'function') {
throw new Error('onPayload function is required');
}

this._log = {
debug: message => logger(['debug', ...LOGGING_TAGS], message),
info: message => logger(['info', ...LOGGING_TAGS], message),
warn: message => logger(['warning', ...LOGGING_TAGS], message)
};
this._log = getCollectorLogger(server);

this._interval = interval;
this._combineTypes = combineTypes;
this._onPayload = onPayload;
}

/*
* @param {String} type.type
* @param {Function} type.init (optional)
* @param {Function} type.fetch
* @param {Function} type.cleanup (optional)
* @param collector {Collector} collector object
*/
register(type) {
this._collectors.push(type);
register(collector) {
// check instanceof
if (!(collector instanceof Collector)) {
throw new Error('CollectorSet can only have Collector instances registered');
}
this._collectors.push(collector);
}

/*
Expand All @@ -75,10 +69,7 @@ export class CollectorSet {
collector.init();
}

if (collector.setLogger) {
this._log.debug(`Setting logger for ${collector.type} collector`);
collector.setLogger(this._log);
}
this._log.debug(`Setting logger for ${collector.type} collector`);

if (collector.fetchAfterInit) {
initialCollectors.push(collector);
Expand Down Expand Up @@ -139,6 +130,19 @@ export class CollectorSet {
});
}

async bulkFetchUsage() {
const usageCollectors = this._collectors.filter(c => c instanceof UsageCollector);
const bulk = await this._bulkFetch(usageCollectors);

// summarize each type of stat
return bulk.reduce((accumulatedStats, currentStat) => {
return {
...accumulatedStats,
[currentStat.type]: currentStat.result,
};
}, {});
}

cleanup() {
this._log.info(`Stopping all stats collectors`);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { Collector } from './collector';

export class UsageCollector extends Collector {
constructor(server, properties) {
super(server, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ describe('getKibanaUsageCollector', () => {
getCluster: sinon.stub()
}
},
config: () => ({ get: sinon.stub() })
config: () => ({ get: sinon.stub() }),
log: sinon.stub(),
};
serverStub.plugins.elasticsearch.getCluster.withArgs('admin').returns(clusterStub);
callClusterStub = callClusterFactory(serverStub).getCallClusterInternal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import { get, snakeCase } from 'lodash';
import { KIBANA_USAGE_TYPE } from '../../../common/constants';
import { UsageCollector } from '../classes/usage_collector';

const TYPES = [
'dashboard',
Expand All @@ -20,7 +21,7 @@ const TYPES = [
* Fetches saved object client counts by querying the saved object index
*/
export function getKibanaUsageCollector(server, callCluster) {
return {
return new UsageCollector(server, {
type: KIBANA_USAGE_TYPE,
async fetch() {
const index = server.config().get('kibana.index');
Expand Down Expand Up @@ -52,15 +53,13 @@ export function getKibanaUsageCollector(server, callCluster) {

return {
index,

// combine the bucketCounts and 0s for types that don't have documents
...TYPES.reduce((acc, type) => ({
...TYPES.reduce((acc, type) => ({ // combine the bucketCounts and 0s for types that don't have documents
...acc,
[snakeCase(type)]: {
total: bucketCounts[type] || 0
}
}), {})
};
}
};
});
}
Loading

0 comments on commit a7c3255

Please sign in to comment.