Skip to content

Commit

Permalink
[data.search] Fix last search call to retrieve results (#182205)
Browse files Browse the repository at this point in the history
## Summary

Resolves #182204.
Resolves #181482.
Resolves #181493.

Flaky test runner:
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/5847

When a search request hits the timeout, we make one more call to the
search request for the final results before cancelling the search
request. Prior to this PR, there was a bug that made it so we actually
made the call to delete the request before making that final call, and
we weren't sending the ID in the request so it actually created an
entirely new request.

This PR moves the code that makes that last call to the call to
`pollSearch` and ensures we don't call `cancel` until it has returned.

### Checklist

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [ ] [Flaky Test
Runner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was
used on any tests changed
  • Loading branch information
lukasolson committed May 14, 2024
1 parent 1c44bc2 commit 71bd961
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -362,27 +362,12 @@ describe('SearchInterceptor', () => {
});

test('should DELETE a running async search on async timeout after first response', async () => {
const responses = [
{
time: 10,
value: {
isPartial: true,
isRunning: true,
rawResponse: {},
id: 1,
},
},
{
time: 2000,
value: {
isPartial: false,
isRunning: false,
rawResponse: {},
id: 1,
},
},
];
mockFetchImplementation(responses);
fetchMock.mockResolvedValue({
isPartial: true,
isRunning: true,
rawResponse: {},
id: 1,
});

const response = searchInterceptor.search({}, { pollInterval: 0 });
response.subscribe({ next, error });
Expand All @@ -394,36 +379,21 @@ describe('SearchInterceptor', () => {
expect(fetchMock).toHaveBeenCalled();
expect(mockCoreSetup.http.delete).not.toHaveBeenCalled();

// Long enough to reach the timeout but not long enough to reach the next response
// Long enough to reach the timeout
await timeTravel(1000);

// Expect 3 calls to fetch - the two polls and a final request for the results before deleting
expect(fetchMock).toHaveBeenCalledTimes(3);
expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1);
});

test('should return the last response on async timeout', async () => {
const responses = [
{
time: 10,
value: {
isPartial: true,
isRunning: true,
rawResponse: {},
id: 1,
},
fetchMock.mockResolvedValue({
isPartial: true,
isRunning: true,
rawResponse: {
foo: 'bar',
},
{
time: 2000,
value: {
isPartial: false,
isRunning: false,
rawResponse: {},
id: 1,
},
},
];
mockFetchImplementation(responses);
id: 1,
});

const response = searchInterceptor.search({}, { pollInterval: 0 });
response.subscribe({ next, error });
Expand All @@ -438,18 +408,15 @@ describe('SearchInterceptor', () => {
// Long enough to reach the timeout but not long enough to reach the next response
await timeTravel(1000);

expect(next).toHaveBeenCalledTimes(2);
expect(next).toHaveBeenCalledTimes(3);
expect(next.mock.calls[1]).toMatchInlineSnapshot(`
Array [
Object {
"id": 1,
"isPartial": true,
"isRunning": false,
"meta": Object {
"size": 10,
},
"isRunning": true,
"rawResponse": Object {
"timed_out": true,
"foo": "bar",
},
},
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ export class SearchInterceptor {
);

const cancel = async () => {
if (!id || isSavedToBackground) return;
// If the request times out, we handle cancellation after we make the last call to retrieve the results
if (!id || isSavedToBackground || searchAbortController.isTimeout()) return;
try {
await sendCancelRequest();
} catch (e) {
Expand Down Expand Up @@ -400,9 +401,22 @@ export class SearchInterceptor {
: response;
}),
catchError((e: Error) => {
searchTracker?.error();
cancel();
return throwError(e);
// If we aborted (search:timeout advanced setting) and there was a partial response, return it instead of just erroring out
if (searchAbortController.isTimeout()) {
return from(
this.runSearch({ id, ...request }, { ...options, retrieveResults: true })
).pipe(
map(toPartialResponseAfterTimeout),
tap(async () => {
await sendCancelRequest();
this.handleSearchError(e, request?.params?.body ?? {}, options, true);
})
);
} else {
searchTracker?.error();
cancel();
return throwError(e);
}
}),
finalize(() => {
searchAbortController.cleanup();
Expand Down Expand Up @@ -534,17 +548,6 @@ export class SearchInterceptor {
return response$.pipe(
takeUntil(aborted$),
catchError((e) => {
// If we aborted (search:timeout advanced setting) and there was a partial response, return it instead of just erroring out
if (searchAbortController.isTimeout()) {
return from(
this.runSearch(request, { ...searchOptions, retrieveResults: true })
).pipe(
tap(() =>
this.handleSearchError(e, request?.params?.body ?? {}, searchOptions, true)
),
map(toPartialResponseAfterTimeout)
);
}
return throwError(
this.handleSearchError(
e,
Expand Down
3 changes: 3 additions & 0 deletions src/plugins/data/server/search/routes/search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export function registerSearchRoute(router: DataPluginRouter): void {
sessionId: schema.maybe(schema.string()),
isStored: schema.maybe(schema.boolean()),
isRestore: schema.maybe(schema.boolean()),
retrieveResults: schema.maybe(schema.boolean()),
},
{ unknowns: 'allow' }
),
Expand All @@ -48,6 +49,7 @@ export function registerSearchRoute(router: DataPluginRouter): void {
sessionId,
isStored,
isRestore,
retrieveResults,
...searchRequest
} = request.body;
const { strategy, id } = request.params;
Expand All @@ -65,6 +67,7 @@ export function registerSearchRoute(router: DataPluginRouter): void {
sessionId,
isStored,
isRestore,
retrieveResults,
}
)
.pipe(first())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ export default function ({ getService, getPageObjects }: FtrProviderContext) {
const esArchiver = getService('esArchiver');
const remoteEsArchiver = getService('remoteEsArchiver' as 'esArchiver');

// Failing: See https://github.com/elastic/kibana/issues/181493
describe.skip('discover search CCS timeout', () => {
describe('discover search CCS timeout', () => {
before(async () => {
await esArchiver.loadIfNeeded('test/functional/fixtures/es_archiver/logstash_functional');
await remoteEsArchiver.loadIfNeeded(
Expand Down

0 comments on commit 71bd961

Please sign in to comment.