Skip to content

Commit

Permalink
[8.14] [Reporting/CSV] Resolve max_concurrent_shard_requests issue (#…
Browse files Browse the repository at this point in the history
…182536) (#182742)

# Backport

This will backport the following commits from `main` to `8.14`:
- [[Reporting/CSV] Resolve max_concurrent_shard_requests issue
(#182536)](#182536)

<!--- Backport version: 8.9.8 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Tim
Sullivan","email":"tsullivan@users.noreply.github.com"},"sourceCommit":{"committedDate":"2024-05-03T14:30:45Z","message":"[Reporting/CSV]
Resolve max_concurrent_shard_requests issue (#182536)\n\n##
Summary\r\n\r\nThere has been a consistent failure in a Discover-related
test set in\r\nthe kibana-ES-serverless verification job, meaning that
ES-Kibana\r\ncompatibility has drifted.\r\n\r\nError details:\r\n```\r\n
+ \"Encountered an unknown error: status_exception\r\n + \tRoot
causes:\r\n + \t\tstatus_exception: Parameter validation failed for
[/_search]: The http parameter [max_concurrent_shard_requests] (with
value [5]) is not permitted when running in serverless mode\"\r\n +
\"Encountered an error with the number of CSV rows generated fromthe
search: expected rows were indeterminable, received 0.\"\r\n at
Context.<anonymous> (reporting.ts:182:33)\r\n at
processTicksAndRejections (node:internal/process/task_queues:95:5)\r\n
at Object.apply (wrap_function.js:73:16)\r\n```\r\n\r\nThis tracked back
to a feature added for reporting awhile back, which\r\ncreated a config
schema field for the `max_concurrent_shard_requests`\r\nparameter in the
search
queries:\r\nhttps://github.com//pull/170344/files#diff-7bceb37eef3761e1161cf04f41668dd9195bfac9fea36e734a230b5ed878a974\r\n\r\nMost
of the changes in this PR are in test code. I created \"Test\"
which\r\nextend protected methods in the original classes. This was done
to\r\nremove the `@ts-expect-errors` lines of
code.","sha":"f51c5c92bcc13686119ddf215eaa083ffac8e517","branchLabelMapping":{"^v8.15.0$":"main","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["bug","Feature:Reporting","release_note:skip","backport:skip","Team:SharedUX","v8.15.0"],"number":182536,"url":"#182536
Resolve max_concurrent_shard_requests issue (#182536)\n\n##
Summary\r\n\r\nThere has been a consistent failure in a Discover-related
test set in\r\nthe kibana-ES-serverless verification job, meaning that
ES-Kibana\r\ncompatibility has drifted.\r\n\r\nError details:\r\n```\r\n
+ \"Encountered an unknown error: status_exception\r\n + \tRoot
causes:\r\n + \t\tstatus_exception: Parameter validation failed for
[/_search]: The http parameter [max_concurrent_shard_requests] (with
value [5]) is not permitted when running in serverless mode\"\r\n +
\"Encountered an error with the number of CSV rows generated fromthe
search: expected rows were indeterminable, received 0.\"\r\n at
Context.<anonymous> (reporting.ts:182:33)\r\n at
processTicksAndRejections (node:internal/process/task_queues:95:5)\r\n
at Object.apply (wrap_function.js:73:16)\r\n```\r\n\r\nThis tracked back
to a feature added for reporting awhile back, which\r\ncreated a config
schema field for the `max_concurrent_shard_requests`\r\nparameter in the
search
queries:\r\nhttps://github.com//pull/170344/files#diff-7bceb37eef3761e1161cf04f41668dd9195bfac9fea36e734a230b5ed878a974\r\n\r\nMost
of the changes in this PR are in test code. I created \"Test\"
which\r\nextend protected methods in the original classes. This was done
to\r\nremove the `@ts-expect-errors` lines of
code.","sha":"f51c5c92bcc13686119ddf215eaa083ffac8e517"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v8.15.0","labelRegex":"^v8.15.0$","isSourceBranch":true,"state":"MERGED","url":"#182536
Resolve max_concurrent_shard_requests issue (#182536)\n\n##
Summary\r\n\r\nThere has been a consistent failure in a Discover-related
test set in\r\nthe kibana-ES-serverless verification job, meaning that
ES-Kibana\r\ncompatibility has drifted.\r\n\r\nError details:\r\n```\r\n
+ \"Encountered an unknown error: status_exception\r\n + \tRoot
causes:\r\n + \t\tstatus_exception: Parameter validation failed for
[/_search]: The http parameter [max_concurrent_shard_requests] (with
value [5]) is not permitted when running in serverless mode\"\r\n +
\"Encountered an error with the number of CSV rows generated fromthe
search: expected rows were indeterminable, received 0.\"\r\n at
Context.<anonymous> (reporting.ts:182:33)\r\n at
processTicksAndRejections (node:internal/process/task_queues:95:5)\r\n
at Object.apply (wrap_function.js:73:16)\r\n```\r\n\r\nThis tracked back
to a feature added for reporting awhile back, which\r\ncreated a config
schema field for the `max_concurrent_shard_requests`\r\nparameter in the
search
queries:\r\nhttps://github.com//pull/170344/files#diff-7bceb37eef3761e1161cf04f41668dd9195bfac9fea36e734a230b5ed878a974\r\n\r\nMost
of the changes in this PR are in test code. I created \"Test\"
which\r\nextend protected methods in the original classes. This was done
to\r\nremove the `@ts-expect-errors` lines of
code.","sha":"f51c5c92bcc13686119ddf215eaa083ffac8e517"}}]}] BACKPORT-->
  • Loading branch information
tsullivan committed May 7, 2024
1 parent ed86cd0 commit 03da0c9
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 73 deletions.
1 change: 1 addition & 0 deletions config/serverless.yml
Expand Up @@ -145,6 +145,7 @@ xpack.screenshotting.enabled: false
xpack.reporting.queue.pollInterval: 3m
xpack.reporting.roles.enabled: false
xpack.reporting.statefulSettings.enabled: false
xpack.reporting.csv.maxConcurrentShardRequests: 0

# Disabled Observability plugins
xpack.ux.enabled: false
Expand Down
175 changes: 133 additions & 42 deletions packages/kbn-generate-csv/src/lib/search_cursor_pit.test.ts
Expand Up @@ -6,22 +6,53 @@
* Side Public License, v 1.
*/

import * as Rx from 'rxjs';

import type { estypes } from '@elastic/elasticsearch';
import type { IScopedClusterClient, Logger } from '@kbn/core/server';
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
import type { ISearchClient } from '@kbn/data-plugin/common';
import { createSearchSourceMock } from '@kbn/data-plugin/common/search/search_source/mocks';
import { createSearchRequestHandlerContext } from '@kbn/data-plugin/server/search/mocks';
import type { SearchCursor, SearchCursorSettings } from './search_cursor';
import type { SearchCursorSettings } from './search_cursor';
import { SearchCursorPit } from './search_cursor_pit';

class TestSearchCursorPit extends SearchCursorPit {
constructor(...args: ConstructorParameters<typeof SearchCursorPit>) {
super(...args);
}

public getCursorId() {
return this.cursorId;
}

public openPointInTime() {
return super.openPointInTime();
}

public searchWithPit(...args: Parameters<SearchCursorPit['searchWithPit']>) {
return super.searchWithPit(...args);
}

public setSearchAfter(...args: Parameters<SearchCursorPit['setSearchAfter']>) {
return super.setSearchAfter(...args);
}

public getSearchAfter() {
return super.getSearchAfter();
}
}

describe('CSV Export Search Cursor', () => {
let settings: SearchCursorSettings;
let es: IScopedClusterClient;
let data: ISearchClient;
let logger: Logger;
let cursor: SearchCursor;
let cursor: TestSearchCursorPit;

let openPointInTimeSpy: jest.SpyInstance<Promise<estypes.OpenPointInTimeResponse>>;

beforeEach(async () => {
beforeEach(() => {
settings = {
scroll: {
duration: jest.fn(() => '10m'),
Expand All @@ -34,56 +65,116 @@ describe('CSV Export Search Cursor', () => {

es = elasticsearchServiceMock.createScopedClusterClient();
data = createSearchRequestHandlerContext();
jest.spyOn(es.asCurrentUser, 'openPointInTime').mockResolvedValue({ id: 'somewhat-pit-id' });

openPointInTimeSpy = jest
.spyOn(es.asCurrentUser, 'openPointInTime')
.mockResolvedValue({ id: 'somewhat-pit-id' });

logger = loggingSystemMock.createLogger();
});

cursor = new SearchCursorPit(
'test-index-pattern-string',
settings,
{ data, es },
new AbortController(),
logger
);
describe('with default settings', () => {
beforeEach(async () => {
cursor = new TestSearchCursorPit(
'test-index-pattern-string',
settings,
{ data, es },
new AbortController(),
logger
);

const openPointInTimeSpy = jest
// @ts-expect-error create spy on private method
.spyOn(cursor, 'openPointInTime');
await cursor.initialize();

await cursor.initialize();
expect(openPointInTimeSpy).toBeCalledTimes(1);
});

expect(openPointInTimeSpy).toBeCalledTimes(1);
});
it('supports pit and max_concurrent_shard_requests', async () => {
const dataSearchSpy = jest
.spyOn(data, 'search')
.mockReturnValue(Rx.of({ rawResponse: { hits: { hits: [] } } }));

it('supports point-in-time', async () => {
const searchWithPitSpy = jest
// @ts-expect-error create spy on private method
.spyOn(cursor, 'searchWithPit')
// @ts-expect-error mock resolved value for spy on private method
.mockResolvedValueOnce({ rawResponse: { hits: [] } });
const searchSource = createSearchSourceMock();
await cursor.getPage(searchSource);

const searchSource = createSearchSourceMock();
await cursor.getPage(searchSource);
expect(searchWithPitSpy).toBeCalledTimes(1);
});
expect(dataSearchSpy).toBeCalledTimes(1);
expect(dataSearchSpy).toBeCalledWith(
{
params: {
body: expect.objectContaining({ pit: { id: 'somewhat-pit-id', keep_alive: '10m' } }),
max_concurrent_shard_requests: 5,
},
},
expect.objectContaining({
strategy: 'es',
transport: { maxRetries: 0, requestTimeout: '10m' },
})
);
});

it('can update internal cursor ID', () => {
cursor.updateIdFromResults({ pit_id: 'very-typical-pit-id', hits: { hits: [] } });
expect(cursor.getCursorId()).toBe('very-typical-pit-id');
});

it('can update internal cursor ID', () => {
cursor.updateIdFromResults({ pit_id: 'very-typical-pit-id', hits: { hits: [] } });
// @ts-expect-error private field
expect(cursor.cursorId).toBe('very-typical-pit-id');
it('manages search_after', () => {
cursor.setSearchAfter([
{
_index: 'test-index',
_id: 'test-doc-id',
sort: ['Wed Jan 17 15:35:47 MST 2024', 42],
},
]);

expect(cursor.getSearchAfter()).toEqual(['Wed Jan 17 15:35:47 MST 2024', 42]);
});
});

it('manages search_after', () => {
// @ts-expect-error access private method
cursor.setSearchAfter([
{
_index: 'test-index',
_id: 'test-doc-id',
sort: ['Wed Jan 17 15:35:47 MST 2024', 42],
},
]);
describe('with max_concurrent_shard_requests=0', () => {
beforeEach(async () => {
settings.maxConcurrentShardRequests = 0;

cursor = new TestSearchCursorPit(
'test-index-pattern-string',
settings,
{ data, es },
new AbortController(),
logger
);

await cursor.initialize();

expect(openPointInTimeSpy).toBeCalledTimes(1);
});

it('suppresses max_concurrent_shard_requests from search body', async () => {
const dataSearchSpy = jest
.spyOn(data, 'search')
.mockReturnValue(Rx.of({ rawResponse: { hits: { hits: [] } } }));

const searchSource = createSearchSourceMock();
await cursor.getPage(searchSource);

// @ts-expect-error access private method
expect(cursor.getSearchAfter()).toEqual(['Wed Jan 17 15:35:47 MST 2024', 42]);
expect(dataSearchSpy).toBeCalledTimes(1);
expect(dataSearchSpy).toBeCalledWith(
{
params: {
body: {
fields: [],
pit: { id: 'somewhat-pit-id', keep_alive: '10m' },
query: { bool: { filter: [], must: [], must_not: [], should: [] } },
runtime_mappings: {},
script_fields: {},
stored_fields: ['*'],
},
max_concurrent_shard_requests: undefined,
},
},
{
abortSignal: expect.any(AbortSignal),
strategy: 'es',
transport: { maxRetries: 0, requestTimeout: '10m' },
}
);
});
});
});
14 changes: 9 additions & 5 deletions packages/kbn-generate-csv/src/lib/search_cursor_pit.ts
Expand Up @@ -37,7 +37,7 @@ export class SearchCursorPit extends SearchCursor {
this.cursorId = await this.openPointInTime();
}

private async openPointInTime() {
protected async openPointInTime() {
const { includeFrozen, maxConcurrentShardRequests, scroll, taskInstanceFields } = this.settings;

let pitId: string | undefined;
Expand Down Expand Up @@ -74,13 +74,17 @@ export class SearchCursorPit extends SearchCursor {
return pitId;
}

private async searchWithPit(searchBody: SearchRequest) {
protected async searchWithPit(searchBody: SearchRequest) {
const { maxConcurrentShardRequests, scroll, taskInstanceFields } = this.settings;

// maxConcurrentShardRequests=0 is not supported
const effectiveMaxConcurrentShardRequests =
maxConcurrentShardRequests > 0 ? maxConcurrentShardRequests : undefined;

const searchParamsPit = {
params: {
body: searchBody,
max_concurrent_shard_requests: maxConcurrentShardRequests,
max_concurrent_shard_requests: effectiveMaxConcurrentShardRequests,
},
};

Expand Down Expand Up @@ -146,14 +150,14 @@ export class SearchCursorPit extends SearchCursor {
this.setSearchAfter(hits); // for pit only
}

private getSearchAfter() {
protected getSearchAfter() {
return this.searchAfter;
}

/**
* For managing the search_after parameter, needed for paging using point-in-time
*/
private setSearchAfter(hits: Array<estypes.SearchHit<unknown>>) {
protected setSearchAfter(hits: Array<estypes.SearchHit<unknown>>) {
// Update last sort results for next query. PIT is used, so the sort results
// automatically include _shard_doc as a tiebreaker
this.searchAfter = hits[hits.length - 1]?.sort as estypes.SortResults | undefined;
Expand Down

0 comments on commit 03da0c9

Please sign in to comment.