Skip to content

Commit

Permalink
[8.12] Handle content stream errors in report pre-deletion (#173792) (#…
Browse files Browse the repository at this point in the history
…174139)

# Backport

This will backport the following commits from `main` to `8.12`:
- [Handle content stream errors in report pre-deletion
(#173792)](#173792)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Tim
Sullivan","email":"tsullivan@users.noreply.github.com"},"sourceCommit":{"committedDate":"2024-01-02T23:00:53Z","message":"Handle
content stream errors in report pre-deletion (#173792)\n\nRe-addresses
#171363 bug was still
evident, especially when using network throttling to\r\nadd slight lag
to the request turnaround times.\r\n\r\nThis PR adds more handling of
errors that could be thrown slightly prior\r\nto deleting the report
document, when we try to clear all chunks of the\r\nreport using the
content
stream.\r\n\r\n<details>\r\n<summary>Before</summary>\r\n\r\n\r\n\r\nhttps://github.com/elastic/kibana/assets/908371/c27fe314-0f93-42b4-8076-99a1e30b8d2f\r\n\r\n\r\n</details>\r\n\r\n<details>\r\n<summary>After</summary>\r\n\r\n\r\nhttps://github.com/elastic/kibana/assets/908371/4c1f5edd-73f1-4ca4-a40a-f900ca5f9c78\r\n\r\n\r\n</details>\r\n\r\n###
Checklist\r\n- [x] Unit
tests","sha":"dc813c351fe111c895e85a188372ad31625d8c8c","branchLabelMapping":{"^v8.13.0$":"main","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:fix","backport:prev-minor","backport:prev-MAJOR","v8.13.0"],"title":"Handle
content stream errors in report
pre-deletion","number":173792,"url":"#173792
content stream errors in report pre-deletion (#173792)\n\nRe-addresses
#171363 bug was still
evident, especially when using network throttling to\r\nadd slight lag
to the request turnaround times.\r\n\r\nThis PR adds more handling of
errors that could be thrown slightly prior\r\nto deleting the report
document, when we try to clear all chunks of the\r\nreport using the
content
stream.\r\n\r\n<details>\r\n<summary>Before</summary>\r\n\r\n\r\n\r\nhttps://github.com/elastic/kibana/assets/908371/c27fe314-0f93-42b4-8076-99a1e30b8d2f\r\n\r\n\r\n</details>\r\n\r\n<details>\r\n<summary>After</summary>\r\n\r\n\r\nhttps://github.com/elastic/kibana/assets/908371/4c1f5edd-73f1-4ca4-a40a-f900ca5f9c78\r\n\r\n\r\n</details>\r\n\r\n###
Checklist\r\n- [x] Unit
tests","sha":"dc813c351fe111c895e85a188372ad31625d8c8c"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v8.13.0","branchLabelMappingKey":"^v8.13.0$","isSourceBranch":true,"state":"MERGED","url":"#173792
content stream errors in report pre-deletion (#173792)\n\nRe-addresses
#171363 bug was still
evident, especially when using network throttling to\r\nadd slight lag
to the request turnaround times.\r\n\r\nThis PR adds more handling of
errors that could be thrown slightly prior\r\nto deleting the report
document, when we try to clear all chunks of the\r\nreport using the
content
stream.\r\n\r\n<details>\r\n<summary>Before</summary>\r\n\r\n\r\n\r\nhttps://github.com/elastic/kibana/assets/908371/c27fe314-0f93-42b4-8076-99a1e30b8d2f\r\n\r\n\r\n</details>\r\n\r\n<details>\r\n<summary>After</summary>\r\n\r\n\r\nhttps://github.com/elastic/kibana/assets/908371/4c1f5edd-73f1-4ca4-a40a-f900ca5f9c78\r\n\r\n\r\n</details>\r\n\r\n###
Checklist\r\n- [x] Unit
tests","sha":"dc813c351fe111c895e85a188372ad31625d8c8c"}}]}] BACKPORT-->

Co-authored-by: Tim Sullivan <tsullivan@users.noreply.github.com>
  • Loading branch information
kibanamachine and tsullivan committed Jan 3, 2024
1 parent d43970c commit 73951f9
Show file tree
Hide file tree
Showing 3 changed files with 295 additions and 221 deletions.
Expand Up @@ -81,17 +81,43 @@ export const commonJobsRouteHandlerFactory = (reporting: ReportingCore) => {
return jobManagementPreRouting(reporting, res, docId, user, counters, async (doc) => {
const docIndex = doc.index;
const stream = await getContentStream(reporting, { id: docId, index: docIndex });
/** @note Overwriting existing content with an empty buffer to remove all the chunks. */
await new Promise<void>((resolve) => {
stream.end('', 'utf8', () => {
resolve();
});
const reportingSetup = reporting.getPluginSetupDeps();
const logger = reportingSetup.logger.get('delete-report');

// An "error" event is emitted if an error is
// passed to the `stream.end` callback from
// the _final method of the ContentStream.
// This event must be handled.
stream.on('error', (err) => {
logger.error(err);
});
await jobsQuery.delete(docIndex, docId);

return res.ok({
body: { deleted: true },
});
try {
// Overwriting existing content with an
// empty buffer to remove all the chunks.
await new Promise<void>((resolve, reject) => {
stream.end('', 'utf8', (error?: Error) => {
if (error) {
// handle error that could be thrown
// from the _write method of the ContentStream
reject(error);
} else {
resolve();
}
});
});

await jobsQuery.delete(docIndex, docId);

return res.ok({
body: { deleted: true },
});
} catch (error) {
logger.error(error);
return res.customError({
statusCode: 500,
});
}
});
};

Expand Down
Expand Up @@ -36,7 +36,7 @@ import { registerJobInfoRoutesInternal as registerJobInfoRoutes } from '../jobs'

type SetupServerReturn = Awaited<ReturnType<typeof setupServer>>;

describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => {
describe(`Reporting Job Management Routes: Internal`, () => {
const reportingSymbol = Symbol('reporting');
let server: SetupServerReturn['server'];
let usageCounter: IUsageCounter;
Expand Down Expand Up @@ -144,148 +144,148 @@ describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => {
await server.stop();
});

it('fails on malformed download IDs', async () => {
mockEsClient.search.mockResponseOnce(getHits());
registerJobInfoRoutes(core);
describe('download report', () => {
it('fails on malformed download IDs', async () => {
mockEsClient.search.mockResponseOnce(getHits());
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`)
.expect(400)
.then(({ body }) =>
expect(body.message).toMatchInlineSnapshot(
'"[request params.docId]: value has length [1] but it must have a minimum length of [3]."'
)
);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`)
.expect(400)
.then(({ body }) =>
expect(body.message).toMatchInlineSnapshot(
'"[request params.docId]: value has length [1] but it must have a minimum length of [3]."'
)
);
});

it('fails on unauthenticated users', async () => {
mockStartDeps = await createMockPluginStart(
{
licensing: {
...licensingMock.createStart(),
license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }),
it('fails on unauthenticated users', async () => {
mockStartDeps = await createMockPluginStart(
{
licensing: {
...licensingMock.createStart(),
license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }),
},
security: { authc: { getCurrentUser: () => undefined } },
},
security: { authc: { getCurrentUser: () => undefined } },
},
mockConfigSchema
);
core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps);
registerJobInfoRoutes(core);
mockConfigSchema
);
core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps);
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`)
.expect(401)
.then(({ body }) =>
expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`)
);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`)
.expect(401)
.then(({ body }) =>
expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`)
);
});

it('returns 404 if job not found', async () => {
mockEsClient.search.mockResponseOnce(getHits());
registerJobInfoRoutes(core);
it('returns 404 if job not found', async () => {
mockEsClient.search.mockResponseOnce(getHits());
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
.expect(404);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
.expect(404);
});

it('returns a 403 if not a valid job type', async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: 'invalidJobType',
payload: { title: 'invalid!' },
})
);
registerJobInfoRoutes(core);
it('returns a 403 if not a valid job type', async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: 'invalidJobType',
payload: { title: 'invalid!' },
})
);
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
.expect(403);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
.expect(403);
});

it(`returns job's info`, async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: mockJobTypeBase64Encoded,
payload: {}, // payload is irrelevant
})
);
it(`returns job's info`, async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: mockJobTypeBase64Encoded,
payload: {}, // payload is irrelevant
})
);

registerJobInfoRoutes(core);
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
.expect(200);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
.expect(200);
});

it(`returns 403 if a user cannot view a job's info`, async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: 'customForbiddenJobType',
payload: {}, // payload is irrelevant
})
);
it(`returns 403 if a user cannot view a job's info`, async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: 'customForbiddenJobType',
payload: {}, // payload is irrelevant
})
);

registerJobInfoRoutes(core);
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
.expect(403);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
.expect(403);
});

it('when a job is incomplete', async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: mockJobTypeUnencoded,
status: 'pending',
payload: { title: 'incomplete!' },
})
);
registerJobInfoRoutes(core);

await server.start();
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
.expect(503)
.expect('Content-Type', 'text/plain; charset=utf-8')
.expect('Retry-After', '30')
.then(({ text }) => expect(text).toEqual('pending'));
});
it('when a job is incomplete', async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: mockJobTypeUnencoded,
status: 'pending',
payload: { title: 'incomplete!' },
})
);
registerJobInfoRoutes(core);

it('when a job fails', async () => {
mockEsClient.search.mockResponse(
getHits({
jobtype: mockJobTypeUnencoded,
status: 'failed',
output: { content: 'job failure message' },
payload: { title: 'failing job!' },
})
);
registerJobInfoRoutes(core);

await server.start();
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
.expect(500)
.expect('Content-Type', 'application/json; charset=utf-8')
.then(({ body }) =>
expect(body.message).toEqual('Reporting generation failed: job failure message')
await server.start();
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
.expect(503)
.expect('Content-Type', 'text/plain; charset=utf-8')
.expect('Retry-After', '30')
.then(({ text }) => expect(text).toEqual('pending'));
});

it('when a job fails', async () => {
mockEsClient.search.mockResponse(
getHits({
jobtype: mockJobTypeUnencoded,
status: 'failed',
output: { content: 'job failure message' },
payload: { title: 'failing job!' },
})
);
});
registerJobInfoRoutes(core);

await server.start();
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
.expect(500)
.expect('Content-Type', 'application/json; charset=utf-8')
.then(({ body }) =>
expect(body.message).toEqual('Reporting generation failed: job failure message')
);
});

describe('successful downloads', () => {
it('when a known job-type is complete', async () => {
mockEsClient.search.mockResponseOnce(getCompleteHits());
registerJobInfoRoutes(core);
Expand Down Expand Up @@ -483,4 +483,28 @@ describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => {
});
});
});

describe('delete report', () => {
it('handles content stream errors', async () => {
stream = new Readable({
read() {
this.push('test');
this.push(null);
},
}) as typeof stream;
stream.end = jest.fn().mockImplementation((_name, _encoding, callback) => {
callback(new Error('An error occurred in ending the content stream'));
});

(getContentStream as jest.MockedFunction<typeof getContentStream>).mockResolvedValue(stream);
mockEsClient.search.mockResponseOnce(getCompleteHits());
registerJobInfoRoutes(core);

await server.start();
await supertest(httpSetup.server.listener)
.delete(`${INTERNAL_ROUTES.JOBS.DELETE_PREFIX}/dank`)
.expect(500)
.expect('Content-Type', 'application/json; charset=utf-8');
});
});
});

0 comments on commit 73951f9

Please sign in to comment.