Skip to content

Commit

Permalink
Add rollup search capabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
sulemanof committed Jan 18, 2019
1 parent a53d647 commit 194851a
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export default class DefaultSearchCapabilities {
constructor(req, batchRequestsSupport, capabilities) {
this.request = req;
this.batchRequestsSupport = batchRequestsSupport;
this.capabilities = capabilities;
}

getTimeZone() {
const { timezone } = this.request.payload.timerange;

return timezone;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import AbstractSearchStrategy from './strategies/abstract_search_strategy';
import AbstractSearchRequest from './searh_requests/abstract_request';
import DefaultSearchStrategy from './strategies/default_search_strategy';
import DefaultSearchCapabilities from './default_search_capabilities';

const strategies = [];

Expand All @@ -38,19 +39,20 @@ export default class SearchStrategiesRegister {
}

static exposeServer(server, searchStrategiesRegister) {
server.expose('addSearchStrategy', (searchStrategy) => searchStrategiesRegister.add(searchStrategy));
server.expose('AbstractSearchStrategy', AbstractSearchStrategy);
server.expose('AbstractSearchRequest', AbstractSearchRequest);
server.expose('DefaultSearchCapabilities', DefaultSearchCapabilities);
server.expose('addSearchStrategy', (searchStrategy) => searchStrategiesRegister.add(searchStrategy));
}

static async getViableStrategy(req, indexPattern) {
for (const searchStrategy of strategies) {
const { isViable, restrictions } = await searchStrategy.checkForViability(req, indexPattern);
const { isViable, capabilities } = await searchStrategy.checkForViability(req, indexPattern);

if (isViable) {
return {
searchStrategy,
restrictions,
capabilities,
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,26 @@
*/
import AbstractSearchStrategy from './abstract_search_strategy';
import SearchRequest from '../searh_requests/search_request';
import DefaultSearchCapabilities from '../default_search_capabilities';

const callWithRequestFactory = (server, request) => {
const { callWithRequest } = request.server.plugins.elasticsearch.getCluster('data');

return callWithRequest;
};
const batchRequestsSupport = true;

export default class DefaultSearchStrategy extends AbstractSearchStrategy {
name = 'default';
batchRequestsSupport = true;

constructor(server) {
super(server, callWithRequestFactory, SearchRequest);
}

checkForViability() {
checkForViability(req) {
return {
isViable: true,
restrictions: {},
capabilities: new DefaultSearchCapabilities(req, batchRequestsSupport)
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function validAnnotation(annotation) {

export default async (req, panel) => {
const indexPattern = panel.index_pattern;
const { searchStrategy } = await SearchStrategiesRegister.getViableStrategy(req, indexPattern);
const { searchStrategy, capabilities } = await SearchStrategiesRegister.getViableStrategy(req, indexPattern);
const searchRequest = searchStrategy.getSearchRequest(req, indexPattern);
const bodies = panel.annotations
.filter(validAnnotation)
Expand All @@ -40,10 +40,12 @@ export default async (req, panel) => {
const indexPattern = annotation.index_pattern;
const bodies = [];

bodies.push({
index: indexPattern,
ignoreUnavailable: true,
});
if (capabilities.batchRequestsSupport) {
bodies.push({
index: indexPattern,
ignoreUnavailable: true,
});
}

const body = buildAnnotationRequest(req, panel, annotation);
body.timeout = '90s';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import SearchStrategiesRegister from '../search_strategies/search_strategies_reg

export async function getSeriesData(req, panel) {
const indexPattern = panel.index_pattern;
const { searchStrategy } = await SearchStrategiesRegister.getViableStrategy(req, indexPattern);
const { searchStrategy, capabilities } = await SearchStrategiesRegister.getViableStrategy(req, indexPattern);
const searchRequest = searchStrategy.getSearchRequest(req, indexPattern);

const body = panel.series
.map(series => getRequestParams(req, panel, series, searchStrategy.batchRequestsSupport))
.map(series => getRequestParams(req, panel, series, capabilities.batchRequestsSupport))
.reduce((acc, items) => acc.concat(items), []);

return searchRequest.search({ body })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@
*/
import getRollupSearchStrategy from './rollup_search_strategy';
import getRollupSearchRequest from './rollup_search_request';
import getRollupSearchCapabilities from './rollup_search_capabilities';

export default (server) => {
const { addSearchStrategy, AbstractSearchRequest, AbstractSearchStrategy } = server.plugins.metrics;
const {
addSearchStrategy,
AbstractSearchRequest,
AbstractSearchStrategy,
DefaultSearchCapabilities,
} = server.plugins.metrics;

if (addSearchStrategy) {
const RollupSearchRequest = getRollupSearchRequest(AbstractSearchRequest);
const RollupSearchStrategy = getRollupSearchStrategy(AbstractSearchStrategy, RollupSearchRequest);
const RollupSearchCapabilities = getRollupSearchCapabilities(DefaultSearchCapabilities);
const RollupSearchStrategy = getRollupSearchStrategy(AbstractSearchStrategy, RollupSearchRequest, RollupSearchCapabilities);

addSearchStrategy(new RollupSearchStrategy(server));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export default (DefaultSearchCapabilities) =>
(class RollupSearchCapabilities extends DefaultSearchCapabilities {

getTimeZone() {
// todo: to be refactored
return 'UTC';
}
});
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,42 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { isEmpty } from 'lodash';
import { callWithRequestFactory } from '../call_with_request_factory';

const ROLLUP_INDEX_CAPABILITIES_METHOD = 'rollup.rollupIndexCapabilities';
const INDEX_PATTERN_SEPARATOR = ',';
const batchRequestsSupport = false;

export default (AbstractSearchStrategy, RollupSearchRequest) =>

export default (AbstractSearchStrategy, RollupSearchRequest, RollupSearchCapabilities) =>
(class RollupSearchStrategy extends AbstractSearchStrategy {
name = 'rollup';
batchRequestsSupport = false;

constructor(server) {
super(server, callWithRequestFactory, RollupSearchRequest);
}

async numberOfRollupJobs(req, indexPattern) {
getAllRollupaCapabilities(req, indexPattern) {
const callWithRequest = this.getCallWithRequestInstance(req);
const indices = (indexPattern || '').split(INDEX_PATTERN_SEPARATOR);

const requests = indices.map(index => callWithRequest(ROLLUP_INDEX_CAPABILITIES_METHOD, {
indexPattern: index,
}));

return Promise.all(requests)
.then((responses) => responses
.reduce((numberOfRollupJobs, response, index) => {
if (response[indices[index]]) {
numberOfRollupJobs += 1;
}

return numberOfRollupJobs;
}, 0))
.catch(() => Promise.resolve(0));
return Promise.all(requests);
}

async hasOneRollupJob(req, indexPattern) {
return await this.numberOfRollupJobs(req, indexPattern) === 1;
}

async checkForViability(req, indexPattern) {
const isViable = await this.hasOneRollupJob(req, indexPattern);
const rollupCapabilities = await this.getAllRollupaCapabilities(req, indexPattern)
.then((responses) => responses.filter(response => !isEmpty(response)));

const isViable = rollupCapabilities.length === 1;

return {
isViable,
restrictions: {},
capabilities: isViable ? new RollupSearchCapabilities(req, batchRequestsSupport, rollupCapabilities) : null
};
}
});

0 comments on commit 194851a

Please sign in to comment.