Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Monitoring] Fetch shard data more efficiently #54028

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ describe('getPaginatedNodes', () => {
},
},
};
const shardStats = {
const nodesShardCount = {
nodes: {
1: {
shardCount: 10,
Expand All @@ -78,7 +78,7 @@ describe('getPaginatedNodes', () => {
pagination,
sort,
queryText,
{ clusterStats, shardStats }
{ clusterStats, nodesShardCount }
);
expect(nodes).toEqual({
pageOfNodes: [
Expand All @@ -98,7 +98,7 @@ describe('getPaginatedNodes', () => {
pagination,
{ ...sort, field: 'foo', direction: 'desc' },
queryText,
{ clusterStats, shardStats }
{ clusterStats, nodesShardCount }
);
expect(nodes).toEqual({
pageOfNodes: [
Expand All @@ -118,7 +118,7 @@ describe('getPaginatedNodes', () => {
pagination,
sort,
'tw',
{ clusterStats, shardStats }
{ clusterStats, nodesShardCount }
);
expect(nodes).toEqual({
pageOfNodes: [{ name: 'two', uuid: 2, isOnline: false, shardCount: 5, foo: 12 }],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { LISTING_METRICS_NAMES, LISTING_METRICS_PATHS } from './nodes_listing_me
* @param {Object} shardStats: per-node information about shards
* @return {Array} node info combined with metrics for each node from handle_response
*/
export async function getNodes(req, esIndexPattern, pageOfNodes, clusterStats, shardStats) {
export async function getNodes(req, esIndexPattern, pageOfNodes, clusterStats, nodesShardCount) {
checkParam(esIndexPattern, 'esIndexPattern in getNodes');

const start = moment.utc(req.payload.timeRange.min).valueOf();
Expand Down Expand Up @@ -104,5 +104,9 @@ export async function getNodes(req, esIndexPattern, pageOfNodes, clusterStats, s
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
const response = await callWithRequest(req, 'search', params);

return handleResponse(response, clusterStats, shardStats, pageOfNodes, { min, max, bucketSize });
return handleResponse(response, clusterStats, nodesShardCount, pageOfNodes, {
min,
max,
bucketSize,
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export async function getPaginatedNodes(
pagination,
sort,
queryText,
{ clusterStats, shardStats }
{ clusterStats, nodesShardCount }
) {
const config = req.server.config();
const size = config.get('xpack.monitoring.max_bucket_size');
Expand All @@ -45,7 +45,7 @@ export async function getPaginatedNodes(
const clusterState = get(clusterStats, 'cluster_state', { nodes: {} });
for (const node of nodes) {
node.isOnline = !isUndefined(get(clusterState, ['nodes', node.uuid]));
node.shardCount = get(shardStats, `nodes[${node.uuid}].shardCount`, 0);
node.shardCount = get(nodesShardCount, `nodes[${node.uuid}].shardCount`, 0);
}

// `metricSet` defines a list of metrics that are sortable in the UI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,23 @@ import { uncovertMetricNames } from '../../convert_metric_names';
* Process the response from the get_nodes query
* @param {Object} response: response data from get_nodes
* @param {Object} clusterStats: cluster stats from cluster state document
* @param {Object} shardStats: per-node information about shards
* @param {Object} nodesShardCount: per-node information about shards
* @param {Object} timeOptions: min, max, and bucketSize needed for date histogram creation
* @return {Array} node info combined with metrics for each node
*/
export function handleResponse(response, clusterStats, shardStats, pageOfNodes, timeOptions = {}) {
export function handleResponse(
response,
clusterStats,
nodesShardCount,
pageOfNodes,
timeOptions = {}
) {
if (!get(response, 'hits.hits')) {
return [];
}

const nodeHits = get(response, 'hits.hits', []);
const nodesInfo = mapNodesInfo(nodeHits, clusterStats, shardStats);
const nodesInfo = mapNodesInfo(nodeHits, clusterStats, nodesShardCount);

/*
* Every node bucket is an object with a field for nodeId and fields for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import { calculateNodeType, getNodeTypeClassLabel } from '../';
/**
* @param {Array} nodeHits: info about each node from the hits in the get_nodes query
* @param {Object} clusterStats: cluster stats from cluster state document
* @param {Object} shardStats: per-node information about shards
* @param {Object} nodesShardCount: per-node information about shards
* @return {Object} summarized info about each node keyed by nodeId
*/
export function mapNodesInfo(nodeHits, clusterStats, shardStats) {
export function mapNodesInfo(nodeHits, clusterStats, nodesShardCount) {
const clusterState = get(clusterStats, 'cluster_state', { nodes: {} });

return nodeHits.reduce((prev, node) => {
Expand All @@ -35,7 +35,7 @@ export function mapNodesInfo(nodeHits, clusterStats, shardStats) {
isOnline,
nodeTypeLabel: nodeTypeLabel,
nodeTypeClass: nodeTypeClass,
shardCount: get(shardStats, `nodes[${sourceNode.uuid}].shardCount`, 0),
shardCount: get(nodesShardCount, `nodes[${sourceNode.uuid}].shardCount`, 0),
},
};
}, {});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { get } from 'lodash';
import { checkParam } from '../../error_missing_required';
import { createQuery } from '../../create_query';
import { ElasticsearchMetric } from '../../metrics';
import { calculateIndicesTotals } from './calculate_shard_stat_indices_totals';

async function getUnassignedShardData(req, esIndexPattern, cluster) {
const config = req.server.config();
const maxBucketSize = config.get('xpack.monitoring.max_bucket_size');
const metric = ElasticsearchMetric.getMetricFields();

const params = {
index: esIndexPattern,
size: 0,
ignoreUnavailable: true,
body: {
sort: { timestamp: { order: 'desc' } },
query: createQuery({
type: 'shards',
clusterUuid: cluster.cluster_uuid,
metric,
filters: [{ term: { state_uuid: get(cluster, 'cluster_state.state_uuid') } }],
}),
aggs: {
indices: {
terms: {
field: 'shard.index',
size: maxBucketSize,
},
aggs: {
state: {
filter: {
terms: {
'shard.state': ['UNASSIGNED', 'INITIALIZING'],
},
},
aggs: {
primary: {
terms: {
field: 'shard.primary',
size: 2,
},
},
},
},
},
},
},
},
};

const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
return await callWithRequest(req, 'search', params);
}

export async function getIndicesUnassignedShardStats(req, esIndexPattern, cluster) {
checkParam(esIndexPattern, 'esIndexPattern in elasticsearch/getShardStats');

const response = await getUnassignedShardData(req, esIndexPattern, cluster);
const indices = get(response, 'aggregations.indices.buckets', []).reduce((accum, bucket) => {
const index = bucket.key;
const states = get(bucket, 'state.primary.buckets', []);
const unassignedReplica = states
.filter(state => state.key_as_string === 'false')
.reduce((total, state) => total + state.doc_count, 0);
const unassignedPrimary = states
.filter(state => state.key_as_string === 'true')
.reduce((total, state) => total + state.doc_count, 0);

let status = 'green';
if (unassignedReplica > 0) {
status = 'yellow';
}
if (unassignedPrimary > 0) {
status = 'red';
}

accum[index] = {
unassigned: { primary: unassignedPrimary, replica: unassignedReplica },
status,
};
return accum;
}, {});

const indicesTotals = calculateIndicesTotals(indices);
return { indices, indicesTotals };
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { getIndicesUnassignedShardStats } from './get_indices_unassigned_shard_stats';

describe('getIndicesUnassignedShardStats', () => {
it('should return the unassigned shard stats for indices', async () => {
const indices = {
12345: { status: 'red', unassigned: { primary: 1, replica: 0 } },
6789: { status: 'yellow', unassigned: { primary: 0, replica: 1 } },
absdf82: { status: 'green', unassigned: { primary: 0, replica: 0 } },
};

const req = {
server: {
config: () => ({
get: () => {},
}),
plugins: {
elasticsearch: {
getCluster: () => ({
callWithRequest: () => ({
aggregations: {
indices: {
buckets: Object.keys(indices).map(id => ({
key: id,
state: {
primary: {
buckets:
indices[id].unassigned.primary || indices[id].unassigned.replica
? [
{
key_as_string: indices[id].unassigned.primary
? 'true'
: 'false',
doc_count: 1,
},
]
: [],
},
},
})),
},
},
}),
}),
},
},
},
};
const esIndexPattern = '*';
const cluster = {};
const stats = await getIndicesUnassignedShardStats(req, esIndexPattern, cluster);
expect(stats.indices).toEqual(indices);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to also test status here? Since, looks like you already have the right replica/primary counts to test for all three colors 💚 💛 ❤️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { get } from 'lodash';
import { checkParam } from '../../error_missing_required';
import { createQuery } from '../../create_query';
import { ElasticsearchMetric } from '../../metrics';

async function getShardCountPerNode(req, esIndexPattern, cluster) {
const config = req.server.config();
const maxBucketSize = config.get('xpack.monitoring.max_bucket_size');
const metric = ElasticsearchMetric.getMetricFields();

const params = {
index: esIndexPattern,
size: 0,
ignoreUnavailable: true,
body: {
sort: { timestamp: { order: 'desc' } },
query: createQuery({
type: 'shards',
clusterUuid: cluster.cluster_uuid,
metric,
filters: [{ term: { state_uuid: get(cluster, 'cluster_state.state_uuid') } }],
}),
aggs: {
nodes: {
terms: {
field: 'shard.node',
size: maxBucketSize,
},
},
},
},
};

const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
return await callWithRequest(req, 'search', params);
}

export async function getNodesShardCount(req, esIndexPattern, cluster) {
checkParam(esIndexPattern, 'esIndexPattern in elasticsearch/getShardStats');

const response = await getShardCountPerNode(req, esIndexPattern, cluster);
const nodes = get(response, 'aggregations.nodes.buckets', []).reduce((accum, bucket) => {
accum[bucket.key] = { shardCount: bucket.doc_count };
return accum;
}, {});
return { nodes };
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { getNodesShardCount } from './get_nodes_shard_count';

describe('getNodeShardCount', () => {
it('should return the shard count per node', async () => {
const nodes = {
12345: { shardCount: 10 },
6789: { shardCount: 1 },
absdf82: { shardCount: 20 },
};

const req = {
server: {
config: () => ({
get: () => {},
}),
plugins: {
elasticsearch: {
getCluster: () => ({
callWithRequest: () => ({
aggregations: {
nodes: {
buckets: Object.keys(nodes).map(id => ({
key: id,
doc_count: nodes[id].shardCount,
})),
},
},
}),
}),
},
},
},
};
const esIndexPattern = '*';
const cluster = {};
const counts = await getNodesShardCount(req, esIndexPattern, cluster);
expect(counts.nodes).toEqual(nodes);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* @param {Object} config - Kibana config service
* @param {Boolean} includeNodes - whether to add the aggs for node shards
*/
export function getShardAggs(config, includeNodes) {
export function getShardAggs(config, includeNodes, includeIndices) {
const maxBucketSize = config.get('xpack.monitoring.max_bucket_size');
const aggSize = 10;
const indicesAgg = {
Expand Down Expand Up @@ -40,7 +40,7 @@ export function getShardAggs(config, includeNodes) {
};

return {
...{ indices: indicesAgg },
...{ indices: includeIndices ? indicesAgg : undefined },
...{ nodes: includeNodes ? nodesAgg : undefined },
};
}
Loading