Skip to content

Commit

Permalink
Add support for multiple rollup searches. (#21755)
Browse files Browse the repository at this point in the history
  • Loading branch information
cjcenizal authored and jen-huang committed Aug 10, 2018
1 parent f82629a commit 094edbd
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,23 @@ function getAllFetchParams(searchRequests, Promise) {
}

async function serializeAllFetchParams(fetchParams, searchRequests, serializeFetchParams) {
const searcRequestsWithFetchParams = [];
const searchRequestsWithFetchParams = [];
const failedSearchRequests = [];

// Gather the fetch param responses from all the successful requests.
fetchParams.forEach((result, index) => {
if (result.resolved) {
searcRequestsWithFetchParams.push(result.resolved);
searchRequestsWithFetchParams.push(result.resolved);
} else {
const searchRequest = searchRequests[index];

// TODO: All strategies will need to implement this.
searchRequest.handleFailure(result.rejected);
failedSearchRequests.push(searchRequest);
}
});

return {
serializedFetchParams: await serializeFetchParams(searcRequestsWithFetchParams),
serializedFetchParams: await serializeFetchParams(searchRequestsWithFetchParams),
failedSearchRequests,
};
}
Expand Down
76 changes: 60 additions & 16 deletions x-pack/plugins/rollup/public/search/rollup_search_strategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,52 @@
import { kfetchAbortable } from 'ui/kfetch';
import { SearchError } from 'ui/courier';

export const rollupSearchStrategy = {
id: 'rollup',
function getAllFetchParams(searchRequests, Promise) {
return Promise.map(searchRequests, (searchRequest) => {
return Promise.try(searchRequest.getFetchParams, void 0, searchRequest)
.then((fetchParams) => {
return (searchRequest.fetchParams = fetchParams);
})
.then(value => ({ resolved: value }))
.catch(error => ({ rejected: error }));
});
}

search: async ({ searchRequests, Promise }) => {
// TODO: Batch together requests to hit a bulk rollup search endpoint.
const searchRequest = searchRequests[0];
const searchParams = await searchRequest.getFetchParams();
const indexPattern = searchParams.index.title || searchParams.index;
async function serializeAllFetchParams(fetchParams, searchRequests) {
const searchRequestsWithFetchParams = [];
const failedSearchRequests = [];

// Gather the fetch param responses from all the successful requests.
fetchParams.forEach((result, index) => {
if (result.resolved) {
searchRequestsWithFetchParams.push(result.resolved);
} else {
const searchRequest = searchRequests[index];

searchRequest.handleFailure(result.rejected);
failedSearchRequests.push(searchRequest);
}
});

const serializedFetchParams = serializeFetchParams(searchRequestsWithFetchParams);

return {
serializedFetchParams,
failedSearchRequests,
};
}

function serializeFetchParams(searchRequestsWithFetchParams) {
return JSON.stringify(searchRequestsWithFetchParams.map(searchRequestWithFetchParams => {
const indexPattern = searchRequestWithFetchParams.index.title || searchRequestWithFetchParams.index;
const {
body: {
size,
aggs,
query: _query,
},
} = searchParams;
index,
} = searchRequestWithFetchParams;

// TODO: Temporarily automatically assign same timezone and interval as what's defined by
// the rollup job. This should be done by the visualization itself.
Expand All @@ -30,7 +61,7 @@ export const rollupSearchStrategy = {

Object.keys(subAggs).forEach(subAggName => {
if (subAggName === 'date_histogram') {
const dateHistogramAgg = searchRequest.source.getField('index').typeMeta.aggs.date_histogram;
const dateHistogramAgg = index.typeMeta.aggs.date_histogram;
const subAgg = subAggs[subAggName];
const field = subAgg.field;
subAgg.time_zone = dateHistogramAgg[field].time_zone;
Expand All @@ -39,30 +70,43 @@ export const rollupSearchStrategy = {
});
});

const index = indexPattern;
const query = {
'size': size,
'aggregations': aggs,
'query': _query,
};

return { index: indexPattern, query };
}));
}

export const rollupSearchStrategy = {
id: 'rollup',

search: async ({ searchRequests, Promise }) => {
// Flatten the searchSource within each searchRequest to get the fetch params,
// e.g. body, filters, index pattern, query.
const allFetchParams = await getAllFetchParams(searchRequests, Promise);

// Serialize the fetch params into a format suitable for the body of an ES query.
const {
serializedFetchParams,
failedSearchRequests,
} = await serializeAllFetchParams(allFetchParams, searchRequests);

const {
fetching,
abort,
} = kfetchAbortable({
pathname: '../api/rollup/search',
method: 'POST',
body: JSON.stringify({ index, query }),
body: serializedFetchParams,
});

// TODO: Implement this. Search requests which can't be sent.
const failedSearchRequests = [];

return {
// Munge data into shape expected by consumer.
searching: new Promise((resolve, reject) => {
fetching.then(result => {
resolve([ result ]);
resolve(result);
}).catch(error => {
const {
body: { statusText, error: title, message },
Expand Down
12 changes: 7 additions & 5 deletions x-pack/plugins/rollup/server/routes/api/search.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ export function registerSearchRoute(server) {
path: '/api/rollup/search',
method: 'POST',
handler: async (request, reply) => {
const { index, query } = request.payload;
const callWithRequest = callWithRequestFactory(server, request);

try {
const results = await callWithRequest('rollup.search', {
index,
body: query,
});
const requests = request.payload.map(({ index, query }) => (
callWithRequest('rollup.search', {
index,
body: query,
})
));

const results = await Promise.all(requests);
reply(results);
} catch(err) {
if (isEsError(err)) {
Expand Down

0 comments on commit 094edbd

Please sign in to comment.