Skip to content

Commit

Permalink
[Search Sessions] fix updating deleting sessions from non-default spa…
Browse files Browse the repository at this point in the history
…ce (#96123) (#96211)

* add spaces test

* fix updating and deleting sessions in non-default space

* revert back to batch update

* Add space tests

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Liza K <liza.katz@elastic.co>
# Conflicts:
#	x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.ts

Co-authored-by: Anton Dosov <anton.dosov@elastic.co>
  • Loading branch information
lizozom and Dosant committed Apr 5, 2021
1 parent de58272 commit d04dfad
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ import {
EQL_SEARCH_STRATEGY,
} from '../../../common';
import { savedObjectsClientMock } from '../../../../../../src/core/server/mocks';
import type { SavedObjectsClientContract } from 'kibana/server';
import { SearchSessionsConfig, SearchStatus } from './types';
import moment from 'moment';
import {
SavedObjectsBulkUpdateObject,
SavedObjectsDeleteOptions,
SavedObjectsClientContract,
} from '../../../../../../src/core/server';

describe('getSearchStatus', () => {
let mockClient: any;
Expand Down Expand Up @@ -263,6 +267,45 @@ describe('getSearchStatus', () => {
expect(savedObjectsClient.delete).not.toBeCalled();
});

test('deletes in space', async () => {
savedObjectsClient.find.mockResolvedValue({
saved_objects: [
{
id: '123',
namespaces: ['awesome'],
attributes: {
persisted: false,
status: SearchSessionStatus.IN_PROGRESS,
created: moment().subtract(moment.duration(3, 'm')),
touched: moment().subtract(moment.duration(2, 'm')),
idMapping: {
'map-key': {
strategy: ENHANCED_ES_SEARCH_STRATEGY,
id: 'async-id',
},
},
},
},
],
total: 1,
} as any);

await checkRunningSessions(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config
);

expect(savedObjectsClient.delete).toBeCalled();

const [, id, opts] = savedObjectsClient.delete.mock.calls[0];
expect(id).toBe('123');
expect((opts as SavedObjectsDeleteOptions).namespace).toBe('awesome');
});

test('deletes a non persisted, abandoned session', async () => {
savedObjectsClient.find.mockResolvedValue({
saved_objects: [
Expand Down Expand Up @@ -479,6 +522,50 @@ describe('getSearchStatus', () => {
expect(savedObjectsClient.delete).not.toBeCalled();
});

test('updates in space', async () => {
savedObjectsClient.bulkUpdate = jest.fn();
const so = {
namespaces: ['awesome'],
attributes: {
status: SearchSessionStatus.IN_PROGRESS,
touched: '123',
idMapping: {
'search-hash': {
id: 'search-id',
strategy: 'cool',
status: SearchStatus.IN_PROGRESS,
},
},
},
};
savedObjectsClient.find.mockResolvedValue({
saved_objects: [so],
total: 1,
} as any);

mockClient.asyncSearch.status.mockResolvedValue({
body: {
is_partial: false,
is_running: false,
completion_status: 200,
},
});

await checkRunningSessions(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config
);

expect(mockClient.asyncSearch.status).toBeCalledWith({ id: 'search-id' });
const [updateInput] = savedObjectsClient.bulkUpdate.mock.calls[0];
const updatedAttributes = updateInput[0] as SavedObjectsBulkUpdateObject;
expect(updatedAttributes.namespace).toBe('awesome');
});

test('updates to complete if the search is done', async () => {
savedObjectsClient.bulkUpdate = jest.fn();
const so = {
Expand Down Expand Up @@ -562,7 +649,6 @@ describe('getSearchStatus', () => {
config
);
const [updateInput] = savedObjectsClient.bulkUpdate.mock.calls[0];

const updatedAttributes = updateInput[0].attributes as SearchSessionSavedObjectAttributes;
expect(updatedAttributes.status).toBe(SearchSessionStatus.ERROR);
expect(updatedAttributes.idMapping['search-hash'].status).toBe(SearchStatus.ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
ElasticsearchClient,
SavedObjectsFindResult,
SavedObjectsClientContract,
SavedObjectsUpdateResponse,
} from 'kibana/server';
import moment from 'moment';
import { EMPTY, from } from 'rxjs';
Expand Down Expand Up @@ -163,12 +164,20 @@ export async function checkRunningSessions(

if (!session.attributes.persisted) {
if (isSessionStale(session, config, logger)) {
deleted = true;
// delete saved object to free up memory
// TODO: there's a potential rare edge case of deleting an object and then receiving a new trackId for that same session!
// Maybe we want to change state to deleted and cleanup later?
logger.debug(`Deleting stale session | ${session.id}`);
await savedObjectsClient.delete(SEARCH_SESSION_TYPE, session.id);
try {
await savedObjectsClient.delete(SEARCH_SESSION_TYPE, session.id, {
namespace: session.namespaces?.[0],
});
deleted = true;
} catch (e) {
logger.error(
`Error while deleting stale search session ${session.id}: ${e.message}`
);
}

// Send a delete request for each async search to ES
Object.keys(session.attributes.idMapping).map(async (searchKey: string) => {
Expand All @@ -177,8 +186,8 @@ export async function checkRunningSessions(
try {
await client.asyncSearch.delete({ id: searchInfo.id });
} catch (e) {
logger.debug(
`Error ignored while deleting async_search ${searchInfo.id}: ${e.message}`
logger.error(
`Error while deleting async_search ${searchInfo.id}: ${e.message}`
);
}
}
Expand All @@ -196,9 +205,31 @@ export async function checkRunningSessions(
if (updatedSessions.length) {
// If there's an error, we'll try again in the next iteration, so there's no need to check the output.
const updatedResponse = await savedObjectsClient.bulkUpdate<SearchSessionSavedObjectAttributes>(
updatedSessions
updatedSessions.map((session) => ({
...session,
namespace: session.namespaces?.[0],
}))
);

const success: Array<
SavedObjectsUpdateResponse<SearchSessionSavedObjectAttributes>
> = [];
const fail: Array<SavedObjectsUpdateResponse<SearchSessionSavedObjectAttributes>> = [];

updatedResponse.saved_objects.forEach((savedObjectResponse) => {
if ('error' in savedObjectResponse) {
fail.push(savedObjectResponse);
logger.error(
`Error while updating search session ${savedObjectResponse?.id}: ${savedObjectResponse.error?.message}`
);
} else {
success.push(savedObjectResponse);
}
});

logger.debug(
`Updating search sessions: success: ${success.length}, fail: ${fail.length}`
);
logger.debug(`Updated ${updatedResponse.saved_objects.length} search sessions`);
}
})
)
Expand Down
153 changes: 153 additions & 0 deletions x-pack/test/api_integration/apis/search/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export default function ({ getService }: FtrProviderContext) {
const supertestWithoutAuth = getService('supertestWithoutAuth');
const security = getService('security');
const retry = getService('retry');
const spacesService = getService('spaces');

describe('search session', () => {
describe('session management', () => {
Expand Down Expand Up @@ -490,5 +491,157 @@ export default function ({ getService }: FtrProviderContext) {
.expect(403);
});
});

describe('in non-default space', () => {
const spaceId = 'foo-space';
before(async () => {
try {
await spacesService.create({
id: spaceId,
name: 'Foo Space',
});
} catch {
// might already be created
}
});

after(async () => {
await spacesService.delete(spaceId);
});

it('should complete and delete non-persistent sessions', async () => {
const sessionId = `my-session-${Math.random()}`;

// run search
const searchRes = await supertest
.post(`/s/${spaceId}/internal/search/ese`)
.set('kbn-xsrf', 'foo')
.send({
sessionId,
params: {
body: {
query: {
term: {
agent: '1',
},
},
},
wait_for_completion_timeout: '1ms',
},
})
.expect(200);

const { id } = searchRes.body;

await retry.waitForWithTimeout('searches persisted into session', 5000, async () => {
const resp = await supertest
.get(`/s/${spaceId}/internal/session/${sessionId}`)
.set('kbn-xsrf', 'foo')
.expect(200);

const { touched, created, persisted, idMapping } = resp.body.attributes;
expect(persisted).to.be(false);
expect(touched).not.to.be(undefined);
expect(created).not.to.be(undefined);

const idMappings = Object.values(idMapping).map((value: any) => value.id);
expect(idMappings).to.contain(id);
return true;
});

// not touched timeout in tests is 15s, wait to give a chance for status to update
await new Promise((resolve) =>
setTimeout(() => {
resolve(void 0);
}, 15_000)
);

await retry.waitForWithTimeout(
'searches eventually complete and session gets into the complete state',
30_000,
async () => {
await supertest
.get(`/s/${spaceId}/internal/session/${sessionId}`)
.set('kbn-xsrf', 'foo')
.expect(404);

return true;
}
);
});

it('should complete persisten session', async () => {
const sessionId = `my-session-${Math.random()}`;

// run search
const searchRes = await supertest
.post(`/s/${spaceId}/internal/search/ese`)
.set('kbn-xsrf', 'foo')
.send({
sessionId,
params: {
body: {
query: {
term: {
agent: '1',
},
},
},
wait_for_completion_timeout: '1ms',
},
})
.expect(200);

const { id } = searchRes.body;

// persist session
await supertest
.post(`/s/${spaceId}/internal/session`)
.set('kbn-xsrf', 'foo')
.send({
sessionId,
name: 'My Session',
appId: 'discover',
expires: '123',
urlGeneratorId: 'discover',
})
.expect(200);

await retry.waitForWithTimeout('searches persisted into session', 5000, async () => {
const resp = await supertest
.get(`/s/${spaceId}/internal/session/${sessionId}`)
.set('kbn-xsrf', 'foo')
.expect(200);

const { touched, created, persisted, idMapping } = resp.body.attributes;
expect(persisted).to.be(true);
expect(touched).not.to.be(undefined);
expect(created).not.to.be(undefined);

const idMappings = Object.values(idMapping).map((value: any) => value.id);
expect(idMappings).to.contain(id);
return true;
});

// session refresh interval is 5 seconds, wait to give a chance for status to update
await new Promise((resolve) => setTimeout(resolve, 5000));

await retry.waitForWithTimeout(
'searches eventually complete and session gets into the complete state',
5000,
async () => {
const resp = await supertest
.get(`/s/${spaceId}/internal/session/${sessionId}`)
.set('kbn-xsrf', 'foo')
.expect(200);

const { status } = resp.body.attributes;

expect(status).to.be(SearchSessionStatus.COMPLETE);
return true;
}
);
});
});
});
}

0 comments on commit d04dfad

Please sign in to comment.