Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Telemetry] Move Monitoring collection strategy to a collector #82638

Merged
merged 5 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/plugins/telemetry_collection_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,9 @@ export class TelemetryCollectionManagerPlugin
return stats.map((stat) => {
const license = licenses[stat.cluster_uuid];
return {
collectionSource: collection.title,
...(license ? { license } : {}),
...stat,
collectionSource: collection.title,
};
});
}
Expand Down
1 change: 0 additions & 1 deletion x-pack/plugins/monitoring/kibana.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
],
"optionalPlugins": [
"infra",
"telemetryCollectionManager",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The monitoring plugin no longer depends on the telemetryCollectionManager 🎉

"usageCollection",
"home",
"cloud",
Expand Down
35 changes: 7 additions & 28 deletions x-pack/plugins/monitoring/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import {
CoreStart,
CustomHttpResponseOptions,
ResponseError,
IClusterClient,
SavedObjectsServiceStart,
} from 'kibana/server';
import { DEFAULT_APP_CATEGORIES } from '../../../../src/core/server';
import {
Expand All @@ -41,7 +39,7 @@ import { initInfraSource } from './lib/logs/init_infra_source';
import { mbSafeQuery } from './lib/mb_safe_query';
import { instantiateClient } from './es_client/instantiate_client';
import { registerCollectors } from './kibana_monitoring/collectors';
import { registerMonitoringCollection } from './telemetry_collection';
import { registerMonitoringTelemetryCollection } from './telemetry_collection';
import { LicenseService } from './license_service';
import { AlertsFactory } from './alerts';
import {
Expand Down Expand Up @@ -76,8 +74,6 @@ export class Plugin {
private monitoringCore = {} as MonitoringCore;
private legacyShimDependencies = {} as LegacyShimDependencies;
private bulkUploader: IBulkUploader = {} as IBulkUploader;
private telemetryElasticsearchClient: IClusterClient | undefined;
private telemetrySavedObjectsService: SavedObjectsServiceStart | undefined;

constructor(initializerContext: PluginInitializerContext) {
this.initializerContext = initializerContext;
Expand Down Expand Up @@ -145,19 +141,6 @@ export class Plugin {
plugins.alerts?.registerType(alert.getAlertType());
}

// Initialize telemetry
if (plugins.telemetryCollectionManager) {
registerMonitoringCollection({
telemetryCollectionManager: plugins.telemetryCollectionManager,
esCluster: this.cluster,
esClientGetter: () => this.telemetryElasticsearchClient,
soServiceGetter: () => this.telemetrySavedObjectsService,
customContext: {
maxBucketSize: config.ui.max_bucket_size,
},
});
}

// Register collector objects for stats to show up in the APIs
if (plugins.usageCollection) {
core.savedObjects.registerType({
Expand All @@ -174,6 +157,11 @@ export class Plugin {
});

registerCollectors(plugins.usageCollection, config, cluster);
registerMonitoringTelemetryCollection(
plugins.usageCollection,
cluster,
config.ui.max_bucket_size
);
}

// Always create the bulk uploader
Expand Down Expand Up @@ -253,16 +241,7 @@ export class Plugin {
};
}

start({ elasticsearch, savedObjects }: CoreStart) {
// TODO: For the telemetry plugin to work, we need to provide the new ES client.
// The new client should be inititalized with a similar config to `this.cluster` but, since we're not using
// the new client in Monitoring Telemetry collection yet, setting the local client allows progress for now.
// The usage collector `fetch` method has been refactored to accept a `collectorFetchContext` object,
// exposing both es clients and the saved objects client.
// We will update the client in a follow up PR.
this.telemetryElasticsearchClient = elasticsearch.client;
this.telemetrySavedObjectsService = savedObjects;
Comment on lines -257 to -264
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These clients can be removed now that the monitoring plugin does not need to catch up with the changes in the telemetryCollectionManager 🎉

}
start() {}

stop() {
if (this.cluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@ import { getStackStats, getAllStats, handleAllStats } from './get_all_stats';
import { ESClusterStats } from './get_es_stats';
import { KibanaStats } from './get_kibana_stats';
import { ClustersHighLevelStats } from './get_high_level_stats';
import { coreMock } from 'src/core/server/mocks';

describe('get_all_stats', () => {
const timestamp = Date.now();
const callCluster = sinon.stub();
const esClient = sinon.stub();
const soClient = sinon.stub();

const esClusters = [
{ cluster_uuid: 'a' },
Expand Down Expand Up @@ -172,24 +169,7 @@ describe('get_all_stats', () => {
.onCall(4)
.returns(Promise.resolve({})); // Beats state

expect(
await getAllStats(
[{ clusterUuid: 'a' }],
{
callCluster: callCluster as any,
esClient: esClient as any,
soClient: soClient as any,
usageCollection: {} as any,
kibanaRequest: undefined,
timestamp,
},
{
logger: coreMock.createPluginInitializerContext().logger.get('test'),
version: 'version',
maxBucketSize: 1,
}
)
).toStrictEqual(allClusters);
expect(await getAllStats(['a'], callCluster, timestamp, 1)).toStrictEqual(allClusters);
});

it('returns empty clusters', async () => {
Expand All @@ -199,24 +179,7 @@ describe('get_all_stats', () => {

callCluster.withArgs('search').returns(Promise.resolve(clusterUuidsResponse));

expect(
await getAllStats(
[],
{
callCluster: callCluster as any,
esClient: esClient as any,
soClient: soClient as any,
usageCollection: {} as any,
kibanaRequest: undefined,
timestamp,
},
{
logger: coreMock.createPluginInitializerContext().logger.get('test'),
version: 'version',
maxBucketSize: 1,
}
)
).toStrictEqual([]);
expect(await getAllStats([], callCluster, timestamp, 1)).toStrictEqual([]);
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import { set } from '@elastic/safer-lodash-set';
import { get, merge } from 'lodash';

import { StatsGetter } from 'src/plugins/telemetry_collection_manager/server';
import moment from 'moment';
import { LegacyAPICaller } from 'kibana/server';
import {
LOGSTASH_SYSTEM_ID,
KIBANA_SYSTEM_ID,
Expand All @@ -20,24 +20,20 @@ import { getKibanaStats, KibanaStats } from './get_kibana_stats';
import { getBeatsStats, BeatsStatsByClusterUuid } from './get_beats_stats';
import { getHighLevelStats, ClustersHighLevelStats } from './get_high_level_stats';

export interface CustomContext {
maxBucketSize: number;
}
/**
* Get statistics for all products joined by Elasticsearch cluster.
* Returns the array of clusters joined with the Kibana and Logstash instances.
*
*/
export const getAllStats: StatsGetter<CustomContext> = async (
clustersDetails,
{ callCluster, timestamp },
{ maxBucketSize }
) => {
export async function getAllStats(
clusterUuids: string[],
callCluster: LegacyAPICaller, // TODO: To be changed to the new ES client when the plugin migrates
timestamp: number,
maxBucketSize: number
) {
const start = moment(timestamp).subtract(USAGE_FETCH_INTERVAL, 'ms').toISOString();
const end = moment(timestamp).toISOString();

const clusterUuids = clustersDetails.map((clusterDetails) => clusterDetails.clusterUuid);

const [esClusters, kibana, logstash, beats] = await Promise.all([
getElasticsearchStats(callCluster, clusterUuids, maxBucketSize), // cluster_stats, stack_stats.xpack, cluster_name/uuid, license, version
getKibanaStats(callCluster, clusterUuids, start, end, maxBucketSize), // stack_stats.kibana
Expand All @@ -46,7 +42,7 @@ export const getAllStats: StatsGetter<CustomContext> = async (
]);

return handleAllStats(esClusters, { kibana, logstash, beats });
};
}

/**
* Combine the statistics from the stack to create "cluster" stats that associate all products together based on the cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,35 @@
*/

import sinon from 'sinon';
import { elasticsearchServiceMock, savedObjectsRepositoryMock } from 'src/core/server/mocks';
import {
getClusterUuids,
fetchClusterUuids,
handleClusterUuidsResponse,
} from './get_cluster_uuids';

describe('get_cluster_uuids', () => {
const kibanaRequest = undefined;
const callCluster = sinon.stub();
const esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const soClient = savedObjectsRepositoryMock.create();
const response = {
aggregations: {
cluster_uuids: {
buckets: [{ key: 'abc' }, { key: 'xyz' }, { key: '123' }],
},
},
};
const expectedUuids = response.aggregations.cluster_uuids.buckets
.map((bucket) => bucket.key)
.map((expectedUuid) => ({ clusterUuid: expectedUuid }));
const expectedUuids = response.aggregations.cluster_uuids.buckets.map((bucket) => bucket.key);
const timestamp = Date.now();

describe('getClusterUuids', () => {
it('returns cluster UUIDs', async () => {
callCluster.withArgs('search').returns(Promise.resolve(response));
expect(
await getClusterUuids(
{ callCluster, esClient, soClient, timestamp, kibanaRequest, usageCollection: {} as any },
{
maxBucketSize: 1,
} as any
)
).toStrictEqual(expectedUuids);
expect(await getClusterUuids(callCluster, timestamp, 1)).toStrictEqual(expectedUuids);
});
});

describe('fetchClusterUuids', () => {
it('searches for clusters', async () => {
callCluster.returns(Promise.resolve(response));
expect(
await fetchClusterUuids(
{ callCluster, esClient, soClient, timestamp, kibanaRequest, usageCollection: {} as any },
{
maxBucketSize: 1,
} as any
)
).toStrictEqual(response);
expect(await fetchClusterUuids(callCluster, timestamp, 1)).toStrictEqual(response);
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,31 @@

import { get } from 'lodash';
import moment from 'moment';
import {
ClusterDetailsGetter,
StatsCollectionConfig,
ClusterDetails,
} from 'src/plugins/telemetry_collection_manager/server';
import { LegacyAPICaller } from 'kibana/server';
import { createQuery } from './create_query';
import {
INDEX_PATTERN_ELASTICSEARCH,
CLUSTER_DETAILS_FETCH_INTERVAL,
} from '../../common/constants';
import { CustomContext } from './get_all_stats';

/**
* Get a list of Cluster UUIDs that exist within the specified timespan.
*/
export const getClusterUuids: ClusterDetailsGetter<CustomContext> = async (
config,
{ maxBucketSize }
) => {
const response = await fetchClusterUuids(config, maxBucketSize);
export async function getClusterUuids(
callCluster: LegacyAPICaller, // TODO: To be changed to the new ES client when the plugin migrates
timestamp: number,
maxBucketSize: number
) {
const response = await fetchClusterUuids(callCluster, timestamp, maxBucketSize);
return handleClusterUuidsResponse(response);
};
}

/**
* Fetch the aggregated Cluster UUIDs from the monitoring cluster.
*/
export async function fetchClusterUuids(
{ callCluster, timestamp }: StatsCollectionConfig,
callCluster: LegacyAPICaller,
timestamp: number,
maxBucketSize: number
) {
const start = moment(timestamp).subtract(CLUSTER_DETAILS_FETCH_INTERVAL, 'ms').toISOString();
Expand Down Expand Up @@ -66,10 +64,7 @@ export async function fetchClusterUuids(
* @param {Object} response The aggregation response
* @return {Array} Strings; each representing a Cluster's UUID.
*/
export function handleClusterUuidsResponse(response: any): ClusterDetails[] {
export function handleClusterUuidsResponse(response: any): string[] {
const uuidBuckets: any[] = get(response, 'aggregations.cluster_uuids.buckets', []);

return uuidBuckets.map((uuidBucket) => ({
clusterUuid: uuidBucket.key as string,
}));
return uuidBuckets.map((uuidBucket) => uuidBucket.key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ describe('get_licenses', () => {
},
};
const expectedClusters = response.hits.hits.map((hit) => hit._source);
const clusterUuids = expectedClusters.map((cluster) => ({ clusterUuid: cluster.cluster_uuid }));
const clusterUuids = expectedClusters.map((cluster) => cluster.cluster_uuid);
const expectedLicenses = {
abc: { type: 'basic' },
xyz: { type: 'basic' },
Expand All @@ -30,27 +30,15 @@ describe('get_licenses', () => {
it('returns clusters', async () => {
callWith.withArgs('search').returns(Promise.resolve(response));

expect(
await getLicenses(
clusterUuids,
{ callCluster: callWith } as any,
{ maxBucketSize: 1 } as any
)
).toStrictEqual(expectedLicenses);
expect(await getLicenses(clusterUuids, callWith, 1)).toStrictEqual(expectedLicenses);
});
});

describe('fetchLicenses', () => {
it('searches for clusters', async () => {
callWith.returns(response);

expect(
await fetchLicenses(
callWith,
clusterUuids.map(({ clusterUuid }) => clusterUuid),
{ maxBucketSize: 1 } as any
)
).toStrictEqual(response);
expect(await fetchLicenses(callWith, clusterUuids, 1)).toStrictEqual(response);
});
});

Expand Down