Skip to content

Commit

Permalink
Add SearchStrategyRegistry with a default strategy to support existin…
Browse files Browse the repository at this point in the history
…g search behavior, and integrate it with CallClient.
  • Loading branch information
cjcenizal committed Jul 5, 2018
1 parent 3bae675 commit 2ca6559
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 17 deletions.
4 changes: 2 additions & 2 deletions src/ui/public/courier/courier.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ uiModules.get('kibana/courier').service('courier', ($rootScope, Private) => {
/**
* Fetch the pending requests.
*/
fetch = () => {
fetch() {
fetchSoon.fetchQueued().then(() => {
// Reset the timer using the time that we get this response as the starting point.
searchPoll.resetTimer();
});
};
}
}

return new Courier();
Expand Down
53 changes: 38 additions & 15 deletions src/ui/public/courier/fetch/call_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import { IsRequestProvider } from './is_request';
import { MergeDuplicatesRequestProvider } from './merge_duplicate_requests';
import { RequestStatus } from './req_status';
import { SerializeFetchParamsProvider } from './request/serialize_fetch_params';
import {
groupRequestsBySearchStrategyId,
getRequestsOrderedByStrategy,
getSearchStrategyForId,
} from './search_strategy_registry';

export function CallClientProvider(Private, Promise, es) {
const errorAllowExplicitIndex = Private(ErrorAllowExplicitIndexProvider);
Expand All @@ -39,11 +44,13 @@ export function CallClientProvider(Private, Promise, es) {
const statuses = mergeDuplicateRequests(requests);

// get the actual list of requests that we will be fetching
let requestsToFetch = statuses.filter(isRequest);
const requestsToFetch = statuses.filter(isRequest);
let execCount = requestsToFetch.length;

if (!execCount) return Promise.resolve([]);

const searchStrategyIdToRequestsMap = groupRequestsBySearchStrategyId(requestsToFetch);

// resolved by respond()
let esPromise = undefined;
let isRequestAborted = false;
Expand All @@ -52,7 +59,15 @@ export function CallClientProvider(Private, Promise, es) {
// for each respond with either the response or ABORTED
const respond = function (responses) {
responses = responses || [];
return Promise.map(requests, function (request, i) {
const requestsOrderedByStrategy = getRequestsOrderedByStrategy(searchStrategyIdToRequestsMap)
.filter(request => {
// We'll use the index of the request to map it to its response. If a request has already
// failed then it won't generate a response. In this case we need to remove the request
// to maintain parity between the list of requests and the list of correspoding responses.
return request !== undefined;
});

return Promise.map(requestsOrderedByStrategy, function (request, i) {
switch (statuses[i]) {
case ABORTED:
return ABORTED;
Expand Down Expand Up @@ -106,28 +121,36 @@ export function CallClientProvider(Private, Promise, es) {
// We're going to create a new async context here, so that the logic within it can execute
// asynchronously after we've returned a reference to defer.promise.
Promise.resolve().then(async () => {
// Flatten the searchSource within each searchRequest to get the fetch params,
// e.g. body, filters, index pattern, query.
const allFetchParams = await getAllFetchParams(requestsToFetch);
// Execute each request using its search strategy.
const esPromises = Object.keys(searchStrategyIdToRequestsMap).map(async searchStrategyId => {
const searchStrategy = getSearchStrategyForId(searchStrategyId);
const searchStrategyRequests = searchStrategyIdToRequestsMap[searchStrategyId];

// Serialize the fetch params into a format suitable for the body of an ES query.
const serializedFetchParams = await serializeAllFetchParams(allFetchParams, requestsToFetch);
// Flatten the searchSource within each searchRequest to get the fetch params,
// e.g. body, filters, index pattern, query.
const allFetchParams = await getAllFetchParams(searchStrategyRequests);

// The index of the request inside requestsToFetch determines which response is mapped to it.
// If a request won't generate a response, since it already failed, we need to remove the
// request from the requestsToFetch array so the indexes will continue to match up to the
// responses correctly.
requestsToFetch = requestsToFetch.filter(request => request !== undefined);
// Serialize the fetch params into a format suitable for the body of an ES query.
const serializedFetchParams = await serializeAllFetchParams(allFetchParams, searchStrategyRequests);

return searchStrategy.search({ body: serializedFetchParams, es });
});

try {
// The request was aborted while we were doing the above logic.
if (isRequestAborted) {
throw ABORTED;
}

esPromise = es.msearch({ body: serializedFetchParams });
const clientResponse = await esPromise;
await respond(clientResponse.responses);
esPromise = Promise.all(esPromises);
const segregatedResponses = await esPromise;

// Aggregate the responses returned by all of the search strategies.
const aggregatedResponses = segregatedResponses.reduce((aggregation, responses) => {
return aggregation.concat(responses.responses);
}, []);

await respond(aggregatedResponses);
} catch(error) {
if (error === ABORTED) {
return await respond();
Expand Down
68 changes: 68 additions & 0 deletions src/ui/public/courier/fetch/search_strategy_registry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

const defaultSearchStrategy = {
id: 'default',

search: ({ body, es }) => {
return es.msearch({ body });
},

isValidForRequest: request => {
// TODO: Discern between rollup index patterns and regular index patterns.
const indexPattern = request.source.getField('index');
return indexPattern;
},
};

const searchStrategies = [];
searchStrategies.push(defaultSearchStrategy);

const groupRequestsBySearchStrategyId = requests => {
const searchStrategyIdToRequestsMap = {};

requests.forEach(request => {
const searchStrategy = searchStrategies.find(searchStrategy => searchStrategy.isValidForRequest(request));
const { id } = searchStrategy;
if (!searchStrategyIdToRequestsMap[id]) {
searchStrategyIdToRequestsMap[id] = [];
}
searchStrategyIdToRequestsMap[id].push(request);
});

return searchStrategyIdToRequestsMap;
};

const getRequestsOrderedByStrategy = searchStrategyIdToRequestsMap => {
const ids = Object.keys(searchStrategyIdToRequestsMap);
return ids.reduce((allRequests, searchStrategyId) => {
const requestsForSearchStrategy = searchStrategyIdToRequestsMap[searchStrategyId];
return allRequests.concat(requestsForSearchStrategy);
}, []);
};

const getSearchStrategyForId = id => {
return searchStrategies.find(searchStrategy => searchStrategy.id === id);
};

export {
groupRequestsBySearchStrategyId,
getRequestsOrderedByStrategy,
getSearchStrategyForId,
};

0 comments on commit 2ca6559

Please sign in to comment.