Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop using aborted event for KibanaRequest.events.aborted$ #126184

Merged
merged 7 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions src/core/server/http/integration_tests/request.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,42 @@ describe('KibanaRequest', () => {
expect(nextSpy).toHaveBeenCalledTimes(1);
});

it('emits once and completes when request aborted after the payload has been consumed', async () => {
expect.assertions(1);
const { server: innerServer, createRouter } = await server.setup(setupDeps);
const router = createRouter('/');

const nextSpy = jest.fn();

const done = new Promise<void>((resolve) => {
router.post(
{ path: '/', validate: { body: schema.any() } },
async (context, request, res) => {
request.events.aborted$.subscribe({
next: nextSpy,
complete: resolve,
});

// prevents the server to respond
await delay(30000);
return res.ok({ body: 'ok' });
}
);
});

await server.start();

const incomingRequest = supertest(innerServer.listener)
.post('/')
.send({ hello: 'dolly' })
// end required to send request
.end();

setTimeout(() => incomingRequest.abort(), 50);
await done;
expect(nextSpy).toHaveBeenCalledTimes(1);
});

it('completes & does not emit when request handled', async () => {
const { server: innerServer, createRouter } = await server.setup(setupDeps);
const router = createRouter('/');
Expand Down Expand Up @@ -336,6 +372,41 @@ describe('KibanaRequest', () => {
await done;
expect(nextSpy).toHaveBeenCalledTimes(1);
});

it('emits once and completes when response is aborted after the payload has been consumed', async () => {
expect.assertions(2);
const { server: innerServer, createRouter } = await server.setup(setupDeps);
const router = createRouter('/');

const nextSpy = jest.fn();

const done = new Promise<void>((resolve) => {
router.post(
{ path: '/', validate: { body: schema.any() } },
async (context, req, res) => {
req.events.completed$.subscribe({
next: nextSpy,
complete: resolve,
});

expect(nextSpy).not.toHaveBeenCalled();
await delay(30000);
return res.ok({ body: 'ok' });
}
);
});

await server.start();

const incomingRequest = supertest(innerServer.listener)
.post('/')
.send({ foo: 'bar' })
// end required to send request
.end();
setTimeout(() => incomingRequest.abort(), 50);
await done;
expect(nextSpy).toHaveBeenCalledTimes(1);
});
});
});

Expand Down
28 changes: 23 additions & 5 deletions src/core/server/http/router/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { URL } from 'url';
import uuid from 'uuid';
import { Request, RouteOptionsApp, RequestApplicationState, RouteOptions } from '@hapi/hapi';
import { Observable, fromEvent, merge } from 'rxjs';
import { shareReplay, first, takeUntil } from 'rxjs/operators';
import { shareReplay, first, takeUntil, filter } from 'rxjs/operators';
import { RecursiveReadonly } from '@kbn/utility-types';
import { deepFreeze } from '@kbn/std';

Expand Down Expand Up @@ -127,6 +127,7 @@ export class KibanaRequest<
const body = routeValidator.getBody(req.payload, 'request body');
return { query, params, body };
}

/**
* A identifier to identify this request.
*
Expand Down Expand Up @@ -215,11 +216,24 @@ export class KibanaRequest<
}

private getEvents(request: Request): KibanaRequestEvents {
// the response is completed, or its underlying connection was terminated prematurely
const finish$ = fromEvent(request.raw.res, 'close').pipe(shareReplay(1), first());
// the response is completed
const finished$ = fromEvent<void>(request.raw.res, 'close').pipe(
filter(() => {
return isCompleted(request);
}),
first()
);

// the response's underlying connection was terminated prematurely
const aborted$ = fromEvent<void>(request.raw.res, 'close').pipe(
filter(() => {
return !isCompleted(request);
}),
first(),
takeUntil(finished$)
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially tried to use only one source fromEvent observable, two destination subjects, and manually emitting to these depending on isCompleted, but the implementation was, in the end, more complex, and more vulnerable to edge cases such as errors. Given we were already using fromEvent twice before these changes, I think it's acceptable anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RxJS noob question here: do we still get any benefit from having one fromEvent observable?

Suggested change
const finished$ = fromEvent<void>(request.raw.res, 'close').pipe(
filter(() => {
return isCompleted(request);
}),
first()
);
// the response's underlying connection was terminated prematurely
const aborted$ = fromEvent<void>(request.raw.res, 'close').pipe(
filter(() => {
return !isCompleted(request);
}),
first(),
takeUntil(finished$)
);
const closed$ = fromEvent<void>(request.raw.res, 'close');
const finished$ = closed$.pipe(
filter(() => {
return isCompleted(request);
}),
first()
);
// the response's underlying connection was terminated prematurely
const aborted$ = closed$.pipe(
filter(() => {
return !isCompleted(request);
}),
first(),
takeUntil(finished$)
);

I'm not sure about the internal implementation of RxJS, but it looks like it'd create only one listener to the close event? Question is... would it work as expected?

Copy link
Contributor

@gsoldevila gsoldevila Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the fromEvent documentation:

Every time resulting Observable is subscribed, event handler function will be registered to event target on given event type. When that event fires, value passed as a first argument to registered function will be emitted by output Observable. When Observable is unsubscribed, function will be unregistered from event target.

To answer @afharo 's question, I created a small StackBlitz to confirm that 2 listeners are created / attached anyway with the proposed approach.

If you really want to have a single listener to the original event, there is no need to create Subjects and emit to them. I believe you can simply use the share operator:

const closed$ = fromEvent<void>(request.raw.res, 'close').pipe(share());

Copy link
Contributor

@gsoldevila gsoldevila Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether the finished$ and aborted$ Observables should also have the replay behavior.

Your proposed finished$ does not have it anymore (as opposed to finish$), which means if somebody subscribes to the event after it has happened they will just miss it and might get stuck.

IMHO there's no harm in replaying the last event (if it exists) for this type of network related events (we do have a first(), which ensures we'll only be getting one).

Copy link
Contributor Author

@pgayvallet pgayvallet Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your proposed finished$ does not have it anymore (as opposed to finish$),

That's true, but finished$ is only internal and the public completed$ observable subscribes to it 'instantly' (no return to the ev loop, so no risk for the event to fire in the middle), and has a replay effect, so I think it was unnecessary to have it in two places.

Regarding aborted$, there was no replay effect before the PR, and adding one did break tests, so I felt it was safer to KISS and preserve the original behavior as much as possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, perhaps we can reorder the operators and use first() first, this way there is no need to use takeUntil().

Suggested change
const finished$ = fromEvent<void>(request.raw.res, 'close').pipe(
filter(() => {
return isCompleted(request);
}),
first()
);
// the response's underlying connection was terminated prematurely
const aborted$ = fromEvent<void>(request.raw.res, 'close').pipe(
filter(() => {
return !isCompleted(request);
}),
first(),
takeUntil(finished$)
);
const closed$ = fromEvent<void>(request.raw.res, 'close').pipe(
shareReplay(1),
first(),
);
const finished$ = $closed.pipe(
filter(() => {
return isCompleted(request);
})
);
// the response's underlying connection was terminated prematurely
const aborted$ = $closed.pipe(
filter(() => {
return !isCompleted(request);
})
);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nevermind regarding adding replay to aborted$, it was probably very late yesterday. changes in 19b120b and df3173b

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, perhaps we can reorder the operators

Definitely better, yea. PR updated.


const aborted$ = fromEvent<void>(request.raw.req, 'aborted').pipe(first(), takeUntil(finish$));
const completed$ = merge<void, void>(finish$, aborted$).pipe(shareReplay(1), first());
const completed$ = merge<void, void>(finished$, aborted$).pipe(shareReplay(1), first());

return {
aborted$,
Expand Down Expand Up @@ -331,3 +345,7 @@ function isRequest(request: any): request is Request {
export function isRealRequest(request: unknown): request is KibanaRequest | Request {
return isKibanaRequest(request) || isRequest(request);
}

function isCompleted(request: Request) {
return request.raw.res.writableFinished;
}
Comment on lines +334 to +336
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the workaround recommended way to distinguish if a response was properly completed or aborted when a close event is fired, given nodejs/node#40775 (comment)