Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Reporting] Add max concurrent shards setting to schema #170344

Merged
merged 19 commits into from Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 8 additions & 4 deletions packages/kbn-generate-csv/src/generate_csv.test.ts
Expand Up @@ -118,6 +118,7 @@ describe('CsvGenerator', () => {
useByteOrderMarkEncoding: false,
scroll: { size: 500, duration: '30s' },
enablePanelActionDownload: true,
maxConcurrentShardRequests: 5,
};

searchSourceMock.getField = jest.fn((key: string) => {
Expand Down Expand Up @@ -245,6 +246,7 @@ describe('CsvGenerator', () => {
useByteOrderMarkEncoding: false,
scroll: { size: 500, duration: '30s' },
enablePanelActionDownload: true,
maxConcurrentShardRequests: 5,
};

mockDataClient.search = jest.fn().mockImplementation(() =>
Expand Down Expand Up @@ -345,7 +347,7 @@ describe('CsvGenerator', () => {

expect(mockDataClient.search).toHaveBeenCalledTimes(10);
expect(mockDataClient.search).toBeCalledWith(
{ params: { body: {}, ignore_throttled: undefined } },
{ params: { body: {}, ignore_throttled: undefined, max_concurrent_shard_requests: 5 } },
{ strategy: 'es', transport: { maxRetries: 0, requestTimeout: '30s' } }
);

Expand All @@ -356,7 +358,7 @@ describe('CsvGenerator', () => {
index: 'logstash-*',
keep_alive: '30s',
},
{ maxRetries: 0, requestTimeout: '30s' }
{ maxConcurrentShardRequests: 5, maxRetries: 0, requestTimeout: '30s' }
);

expect(mockEsClient.asCurrentUser.closePointInTime).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -763,6 +765,7 @@ describe('CsvGenerator', () => {
useByteOrderMarkEncoding: false,
scroll: { size: 500, duration: '30s' },
enablePanelActionDownload: true,
maxConcurrentShardRequests: 5,
};
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
Expand Down Expand Up @@ -833,7 +836,7 @@ describe('CsvGenerator', () => {
index: 'logstash-*',
keep_alive: '30s',
},
{ maxRetries: 0, requestTimeout: '30s' }
{ maxConcurrentShardRequests: 5, maxRetries: 0, requestTimeout: '30s' }
);

expect(mockEsClient.asCurrentUser.openPointInTime).toHaveBeenCalledWith(
Expand All @@ -843,13 +846,14 @@ describe('CsvGenerator', () => {
index: 'logstash-*',
keep_alive: '30s',
},
{ maxRetries: 0, requestTimeout: '30s' }
{ maxConcurrentShardRequests: 5, maxRetries: 0, requestTimeout: '30s' }
);

expect(mockDataClient.search).toBeCalledWith(
{
params: {
body: {},
max_concurrent_shard_requests: 5,
},
},
{ strategy: 'es', transport: { maxRetries: 0, requestTimeout: '30s' } }
Expand Down
22 changes: 17 additions & 5 deletions packages/kbn-generate-csv/src/generate_csv.ts
Expand Up @@ -12,6 +12,7 @@ import type { Writable } from 'stream';
import { errors as esErrors, estypes } from '@elastic/elasticsearch';
import type { IScopedClusterClient, IUiSettingsClient, Logger } from '@kbn/core/server';
import type {
IEsSearchRequest,
IKibanaSearchResponse,
ISearchSource,
ISearchStartSearchSource,
Expand Down Expand Up @@ -66,7 +67,11 @@ export class CsvGenerator {
) {}

private async openPointInTime(indexPatternTitle: string, settings: CsvExportSettings) {
const { duration } = settings.scroll;
const {
includeFrozen,
maxConcurrentShardRequests,
scroll: { duration },
} = settings;
let pitId: string | undefined;
this.logger.debug(`Requesting PIT for: [${indexPatternTitle}]...`);
try {
Expand All @@ -77,11 +82,12 @@ export class CsvGenerator {
keep_alive: duration,
ignore_unavailable: true,
// @ts-expect-error ignore_throttled is not in the type definition, but it is accepted by es
ignore_throttled: settings.includeFrozen ? false : undefined, // "true" will cause deprecation warnings logged in ES
ignore_throttled: includeFrozen ? false : undefined, // "true" will cause deprecation warnings logged in ES
},
{
requestTimeout: duration,
maxRetries: 0,
maxConcurrentShardRequests,
Copy link
Member

@tsullivan tsullivan Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also needs to be added to the transport field of the data.search() call in the doSearch() method. Doing that will throw a type compilation error, since the field isn't known in the data service. You'll have to reach out to them and make sure the field is recognized and correctly sent in the requests

I think the recommended way to go in doSearch would be to add the maxConcurrentShardRequests onto the searchSource object with:

private async doSearch(searchSource, settings, ...) {
  searchSource.setField('maxConcurrentShardRequests', settings.maxConcurrentShardRequests);
  ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Omg thank you good catch

Copy link
Member

@tsullivan tsullivan Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some searching around to find how this is handled in other parts of the code, and I think max_concurrent_shard_requests must be in the body, not the options.

      const response = await this.clients.es.asCurrentUser.openPointInTime(
        {
          // Note, this throws a TypeScript error, same as "ignore_throttled"
          // Check with the ES clients team to make sure this will work
          max_concurrent_shard_requests: maxConcurrentShardRequests,
          index: indexPatternTitle,
          keep_alive: duration,
          ignore_unavailable: true,
          // @ts-expect-error ignore_throttled is not in the type definition, but it is accepted by es
          ignore_throttled: includeFrozen ? false : undefined, // "true" will cause deprecation warnings logged in ES
        },
        {
          requestTimeout: duration,
          maxRetries: 0,
        }
      );

}
);
pitId = response.id;
Expand Down Expand Up @@ -135,7 +141,7 @@ export class CsvGenerator {
settings: CsvExportSettings,
searchAfter?: estypes.SortResults
) {
const { scroll: scrollSettings } = settings;
const { scroll: scrollSettings, maxConcurrentShardRequests } = settings;
searchSource.setField('size', scrollSettings.size);

if (searchAfter) {
Expand All @@ -153,8 +159,14 @@ export class CsvGenerator {
throw new Error('Could not retrieve the search body!');
}

const searchParams = { params: { body: searchBody } };
let results: estypes.SearchResponse<unknown>;
const searchParams: IEsSearchRequest = {
params: {
body: searchBody,
max_concurrent_shard_requests: maxConcurrentShardRequests,
},
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on other code I could find and the way the types are structured, it looks like max_concurrent_shard_requests should be deep one level further:

    const searchParams: IEsSearchRequest = {
      params: {
        body: searchBody,
        max_concurrent_shard_requests: maxConcurrentShardRequests,
      },
    };

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome! changes in bd31abe


let results: estypes.SearchResponse<unknown> | undefined;
try {
const { rawResponse, ...rawDetails } = await lastValueFrom(
this.clients.data.search(searchParams, {
Expand Down
2 changes: 2 additions & 0 deletions packages/kbn-generate-csv/src/get_export_settings.test.ts
Expand Up @@ -30,6 +30,7 @@ describe('getExportSettings', () => {
maxSizeBytes: 180000,
scroll: { size: 500, duration: '30s' },
useByteOrderMarkEncoding: false,
maxConcurrentShardRequests: 5,
enablePanelActionDownload: true,
};
const logger = loggingSystemMock.createLogger();
Expand Down Expand Up @@ -62,6 +63,7 @@ describe('getExportSettings', () => {
"escapeFormulaValues": false,
"escapeValue": [Function],
"includeFrozen": false,
"maxConcurrentShardRequests": 5,
"maxSizeBytes": 180000,
"scroll": Object {
"duration": "30s",
Expand Down
2 changes: 2 additions & 0 deletions packages/kbn-generate-csv/src/get_export_settings.ts
Expand Up @@ -31,6 +31,7 @@ export interface CsvExportSettings {
escapeFormulaValues: boolean;
escapeValue: (value: string) => string;
includeFrozen: boolean;
maxConcurrentShardRequests: number;
}

export const getExportSettings = async (
Expand Down Expand Up @@ -82,5 +83,6 @@ export const getExportSettings = async (
checkForFormulas: config.checkForFormulas,
escapeFormulaValues,
escapeValue,
maxConcurrentShardRequests: config.maxConcurrentShardRequests,
};
};

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/kbn-reporting/server/config_schema.ts
Expand Up @@ -75,6 +75,7 @@ const CsvSchema = schema.object({
}),
size: schema.number({ defaultValue: 500 }),
}),
maxConcurrentShardRequests: schema.number({ defaultValue: 5 }),
});

const EncryptionKeySchema = schema.conditional(
Expand Down