Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Reporting] Add max concurrent shards setting to schema #170344

Merged
merged 19 commits into from Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/kbn-generate-csv-types/index.ts
Expand Up @@ -24,4 +24,5 @@ export interface CsvConfig {
duration: string;
size: number;
};
maxConcurrentShardRequests: number;
tsullivan marked this conversation as resolved.
Show resolved Hide resolved
}
42 changes: 39 additions & 3 deletions packages/kbn-generate-csv/src/generate_csv.test.ts
Expand Up @@ -111,6 +111,7 @@ describe('CsvGenerator', () => {
maxSizeBytes: 180000,
useByteOrderMarkEncoding: false,
scroll: { size: 500, duration: '30s' },
maxConcurrentShardRequests: 5,
};

searchSourceMock.getField = jest.fn((key: string) => {
Expand Down Expand Up @@ -237,6 +238,7 @@ describe('CsvGenerator', () => {
maxSizeBytes: TEST_MAX_SIZE,
useByteOrderMarkEncoding: false,
scroll: { size: 500, duration: '30s' },
maxConcurrentShardRequests: 5,
};

mockDataClient.search = jest.fn().mockImplementation(() =>
Expand Down Expand Up @@ -348,7 +350,7 @@ describe('CsvGenerator', () => {
index: 'logstash-*',
keep_alive: '30s',
},
{ maxRetries: 0, requestTimeout: '30s' }
{ maxConcurrentShardRequests: 5, maxRetries: 0, requestTimeout: '30s' }
);

expect(mockEsClient.asCurrentUser.closePointInTime).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -750,6 +752,7 @@ describe('CsvGenerator', () => {
maxSizeBytes: 180000,
useByteOrderMarkEncoding: false,
scroll: { size: 500, duration: '30s' },
maxConcurrentShardRequests: 5,
};
mockDataClient.search = jest.fn().mockImplementation(() =>
Rx.of({
Expand Down Expand Up @@ -820,7 +823,7 @@ describe('CsvGenerator', () => {
index: 'logstash-*',
keep_alive: '30s',
},
{ maxRetries: 0, requestTimeout: '30s' }
{ maxConcurrentShardRequests: 5, maxRetries: 0, requestTimeout: '30s' }
);

expect(mockEsClient.asCurrentUser.openPointInTime).toHaveBeenCalledWith(
Expand All @@ -830,7 +833,7 @@ describe('CsvGenerator', () => {
index: 'logstash-*',
keep_alive: '30s',
},
{ maxRetries: 0, requestTimeout: '30s' }
{ maxConcurrentShardRequests: 5, maxRetries: 0, requestTimeout: '30s' }
);

expect(mockDataClient.search).toBeCalledWith(
Expand Down Expand Up @@ -1042,4 +1045,37 @@ describe('CsvGenerator', () => {
`);
});
});

it('handles max concurrent shards settings set to a number', async () => {
const mockShardsConfig = {
...mockConfig,
maxConcurrentShardRequests: 6,
};
const generateCsv = new CsvGenerator(
createMockJob({ columns: ['date', 'ip', 'message'] }),
mockShardsConfig,
{
es: mockEsClient,
data: mockDataClient,
uiSettings: uiSettingsClient,
},
{
searchSourceStart: mockSearchSourceService,
fieldFormatsRegistry: mockFieldFormatsRegistry,
},
new CancellationToken(),
mockLogger,
stream
);
await generateCsv.generateData();
await expect(mockEsClient.asCurrentUser.openPointInTime).toHaveBeenCalledWith(
{
ignore_unavailable: true,
ignore_throttled: undefined,
index: 'logstash-*',
keep_alive: '30s',
},
{ maxConcurrentShardRequests: 6, maxRetries: 0, requestTimeout: '30s' }
);
});
});
13 changes: 11 additions & 2 deletions packages/kbn-generate-csv/src/generate_csv.ts
Expand Up @@ -58,7 +58,11 @@ export class CsvGenerator {
private stream: Writable
) {}

private async openPointInTime(indexPatternTitle: string, settings: CsvExportSettings) {
private async openPointInTime(
indexPatternTitle: string,
settings: CsvExportSettings,
maxConcurrentShardRequests: number
) {
const { duration } = settings.scroll;
tsullivan marked this conversation as resolved.
Show resolved Hide resolved
let pitId: string | undefined;
this.logger.debug(`Requesting PIT for: [${indexPatternTitle}]...`);
Expand All @@ -75,6 +79,7 @@ export class CsvGenerator {
{
requestTimeout: duration,
maxRetries: 0,
maxConcurrentShardRequests,
Copy link
Member

@tsullivan tsullivan Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also needs to be added to the transport field of the data.search() call in the doSearch() method. Doing that will throw a type compilation error, since the field isn't known in the data service. You'll have to reach out to them and make sure the field is recognized and correctly sent in the requests

I think the recommended way to go in doSearch would be to add the maxConcurrentShardRequests onto the searchSource object with:

private async doSearch(searchSource, settings, ...) {
  searchSource.setField('maxConcurrentShardRequests', settings.maxConcurrentShardRequests);
  ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Omg thank you good catch

Copy link
Member

@tsullivan tsullivan Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some searching around to find how this is handled in other parts of the code, and I think max_concurrent_shard_requests must be in the body, not the options.

      const response = await this.clients.es.asCurrentUser.openPointInTime(
        {
          // Note, this throws a TypeScript error, same as "ignore_throttled"
          // Check with the ES clients team to make sure this will work
          max_concurrent_shard_requests: maxConcurrentShardRequests,
          index: indexPatternTitle,
          keep_alive: duration,
          ignore_unavailable: true,
          // @ts-expect-error ignore_throttled is not in the type definition, but it is accepted by es
          ignore_throttled: includeFrozen ? false : undefined, // "true" will cause deprecation warnings logged in ES
        },
        {
          requestTimeout: duration,
          maxRetries: 0,
        }
      );

}
);
pitId = response.id;
Expand Down Expand Up @@ -327,7 +332,11 @@ export class CsvGenerator {
let totalRelation = 'eq';
let searchAfter: estypes.SortResults | undefined;

let pitId = await this.openPointInTime(indexPatternTitle, settings);
let pitId = await this.openPointInTime(
indexPatternTitle,
settings,
this.config.maxConcurrentShardRequests
);

// apply timezone from the job to all date field formatters
try {
Expand Down
2 changes: 2 additions & 0 deletions packages/kbn-generate-csv/src/get_export_settings.test.ts
Expand Up @@ -28,6 +28,7 @@ describe('getExportSettings', () => {
maxSizeBytes: 180000,
scroll: { size: 500, duration: '30s' },
useByteOrderMarkEncoding: false,
maxConcurrentShardRequests: 5,
};
const logger = loggingSystemMock.createLogger();

Expand Down Expand Up @@ -59,6 +60,7 @@ describe('getExportSettings', () => {
"escapeFormulaValues": false,
"escapeValue": [Function],
"includeFrozen": false,
"maxConcurrentShardSize": 5,
"maxSizeBytes": 180000,
"scroll": Object {
"duration": "30s",
Expand Down
2 changes: 2 additions & 0 deletions packages/kbn-generate-csv/src/get_export_settings.ts
Expand Up @@ -33,6 +33,7 @@ export interface CsvExportSettings {
escapeFormulaValues: boolean;
escapeValue: (value: string) => string;
includeFrozen: boolean;
maxConcurrentShardSize: number;
tsullivan marked this conversation as resolved.
Show resolved Hide resolved
}

export const getExportSettings = async (
Expand Down Expand Up @@ -84,5 +85,6 @@ export const getExportSettings = async (
checkForFormulas: config.checkForFormulas,
escapeFormulaValues,
escapeValue,
maxConcurrentShardSize: config.maxConcurrentShardRequests,
};
};

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions x-pack/plugins/reporting/server/config/schema.ts
Expand Up @@ -74,6 +74,7 @@ const CsvSchema = schema.object({
}),
size: schema.number({ defaultValue: 500 }),
}),
maxConcurrentShardRequests: schema.number({ defaultValue: 5 }),
});

const EncryptionKeySchema = schema.conditional(
Expand Down