Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple rollup searches. #21755

Merged
merged 2 commits into from
Aug 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,23 @@ function getAllFetchParams(searchRequests, Promise) {
}

async function serializeAllFetchParams(fetchParams, searchRequests, serializeFetchParams) {
const searcRequestsWithFetchParams = [];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what reviewer let this typo pass?? 😇

const searchRequestsWithFetchParams = [];
const failedSearchRequests = [];

// Gather the fetch param responses from all the successful requests.
fetchParams.forEach((result, index) => {
if (result.resolved) {
searcRequestsWithFetchParams.push(result.resolved);
searchRequestsWithFetchParams.push(result.resolved);
} else {
const searchRequest = searchRequests[index];

// TODO: All strategies will need to implement this.
searchRequest.handleFailure(result.rejected);
failedSearchRequests.push(searchRequest);
}
});

return {
serializedFetchParams: await serializeFetchParams(searcRequestsWithFetchParams),
serializedFetchParams: await serializeFetchParams(searchRequestsWithFetchParams),
failedSearchRequests,
};
}
Expand Down
76 changes: 60 additions & 16 deletions x-pack/plugins/rollup/public/search/rollup_search_strategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,52 @@
import { kfetchAbortable } from 'ui/kfetch';
import { SearchError } from 'ui/courier';

export const rollupSearchStrategy = {
id: 'rollup',
function getAllFetchParams(searchRequests, Promise) {
return Promise.map(searchRequests, (searchRequest) => {
return Promise.try(searchRequest.getFetchParams, void 0, searchRequest)
.then((fetchParams) => {
return (searchRequest.fetchParams = fetchParams);
})
.then(value => ({ resolved: value }))
.catch(error => ({ rejected: error }));
});
}

search: async ({ searchRequests, Promise }) => {
// TODO: Batch together requests to hit a bulk rollup search endpoint.
const searchRequest = searchRequests[0];
const searchParams = await searchRequest.getFetchParams();
const indexPattern = searchParams.index.title || searchParams.index;
async function serializeAllFetchParams(fetchParams, searchRequests) {
const searchRequestsWithFetchParams = [];
const failedSearchRequests = [];

// Gather the fetch param responses from all the successful requests.
fetchParams.forEach((result, index) => {
if (result.resolved) {
searchRequestsWithFetchParams.push(result.resolved);
} else {
const searchRequest = searchRequests[index];

searchRequest.handleFailure(result.rejected);
failedSearchRequests.push(searchRequest);
}
});

const serializedFetchParams = serializeFetchParams(searchRequestsWithFetchParams);

return {
serializedFetchParams,
failedSearchRequests,
};
}

function serializeFetchParams(searchRequestsWithFetchParams) {
return JSON.stringify(searchRequestsWithFetchParams.map(searchRequestWithFetchParams => {
const indexPattern = searchRequestWithFetchParams.index.title || searchRequestWithFetchParams.index;
const {
body: {
size,
aggs,
query: _query,
},
} = searchParams;
index,
} = searchRequestWithFetchParams;

// TODO: Temporarily automatically assign same timezone and interval as what's defined by
// the rollup job. This should be done by the visualization itself.
Expand All @@ -30,7 +61,7 @@ export const rollupSearchStrategy = {

Object.keys(subAggs).forEach(subAggName => {
if (subAggName === 'date_histogram') {
const dateHistogramAgg = searchRequest.source.getField('index').typeMeta.aggs.date_histogram;
const dateHistogramAgg = index.typeMeta.aggs.date_histogram;
const subAgg = subAggs[subAggName];
const field = subAgg.field;
subAgg.time_zone = dateHistogramAgg[field].time_zone;
Expand All @@ -39,30 +70,43 @@ export const rollupSearchStrategy = {
});
});

const index = indexPattern;
const query = {
'size': size,
'aggregations': aggs,
'query': _query,
};

return { index: indexPattern, query };
}));
}

export const rollupSearchStrategy = {
id: 'rollup',

search: async ({ searchRequests, Promise }) => {
// Flatten the searchSource within each searchRequest to get the fetch params,
// e.g. body, filters, index pattern, query.
const allFetchParams = await getAllFetchParams(searchRequests, Promise);

// Serialize the fetch params into a format suitable for the body of an ES query.
const {
serializedFetchParams,
failedSearchRequests,
} = await serializeAllFetchParams(allFetchParams, searchRequests);

const {
fetching,
abort,
} = kfetchAbortable({
pathname: '../api/rollup/search',
method: 'POST',
body: JSON.stringify({ index, query }),
body: serializedFetchParams,
});

// TODO: Implement this. Search requests which can't be sent.
const failedSearchRequests = [];

return {
// Munge data into shape expected by consumer.
searching: new Promise((resolve, reject) => {
fetching.then(result => {
resolve([ result ]);
resolve(result);
}).catch(error => {
const {
body: { statusText, error: title, message },
Expand Down
12 changes: 7 additions & 5 deletions x-pack/plugins/rollup/server/routes/api/search.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ export function registerSearchRoute(server) {
path: '/api/rollup/search',
method: 'POST',
handler: async (request, reply) => {
const { index, query } = request.payload;
const callWithRequest = callWithRequestFactory(server, request);

try {
const results = await callWithRequest('rollup.search', {
index,
body: query,
});
const requests = request.payload.map(({ index, query }) => (
callWithRequest('rollup.search', {
index,
body: query,
})
));

const results = await Promise.all(requests);
reply(results);
} catch(err) {
if (isEsError(err)) {
Expand Down