Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: incorporate timeoutMiddleware to allow for server to utilise client's caller timeout #42

Merged
merged 3 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,45 @@ main();

```
![img.png](images/unaryTest.png)

## Specifications

### Throwing Timeouts

By default, a timeout will not cause an RPC call to automatically throw, this must be manually done by the handler when it receives the abort signal from `ctx.signal`. An example of this is like so:

```ts
class TestMethod extends UnaryHandler {
public handle = async (
input: JSONValue,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): Promise<JSONValue> => {
const abortProm = utils.promise<never>();
ctx.signal.addEventListener('abort', () => {
resolveCtxP(ctx);
abortProm.resolveP(ctx.signal.reason);
});
throw await abortProm.p;
};
}
```

### Timeout Middleware

The `timeoutMiddleware` sets an RPCServer's timeout based on the lowest timeout between the Client and the Server. This is so that handlers can eagerly time out and stop processing as soon as it is known that the client has timed out.

This case can be seen in the first diagram, where the server is able to stop the processing of the handler, and close the associated stream of the RPC call based on the shorter timeout sent by the client:

![RPCServer sets timeout based on RPCClient](images/timeoutMiddlewareClientTimeout.svg)

Where the `RPCClient` sends a timeout that is longer than that set on the `RPCServer`, it will be rejected. This is as the timeout of the client should never be expected to exceed that of the server, so that the server's timeout is an absolute limit.

![RPCServer rejects longer timeout sent by RPCClient](images/timeoutMiddlewareServerTimeout.svg)

The `timeoutMiddleware` is enabled by default, and uses the `.metadata.timeout` property on a JSON-RPC request object for the client to send it's timeout.

## Development

Run `nix-shell`, and once you're inside, you can use:
Expand Down
17 changes: 17 additions & 0 deletions images/timeoutMiddlewareClientTimeout.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
17 changes: 17 additions & 0 deletions images/timeoutMiddlewareServerTimeout.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
23 changes: 15 additions & 8 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ class RPCServer {
// Input generator derived from the forward stream
const inputGen = async function* (): AsyncIterable<I> {
for await (const data of forwardStream) {
ctx.timer.refresh();
if (ctx.timer.status !== 'settled') {
ctx.timer.refresh();
}
yield data.params as I;
}
};
Expand All @@ -296,7 +298,9 @@ class RPCServer {
timer: ctx.timer,
});
for await (const response of handlerG) {
ctx.timer.refresh();
if (ctx.timer.status !== 'settled') {
ctx.timer.refresh();
}
const responseMessage: JSONRPCResponseResult = {
jsonrpc: '2.0',
result: response,
Expand Down Expand Up @@ -570,13 +574,16 @@ class RPCServer {
}
// Setting up Timeout logic
const timeout = this.defaultTimeoutMap.get(method);
if (timeout != null && timeout < this.handlerTimeoutTime) {
// Reset timeout with new delay if it is less than the default
timer.reset(timeout);
} else {
// Otherwise refresh
timer.refresh();
if (timer.status !== 'settled') {
if (timeout != null && timeout < this.handlerTimeoutTime) {
// Reset timeout with new delay if it is less than the default
timer.reset(timeout);
} else {
// Otherwise refresh
timer.refresh();
}
}

this.logger.info(`Handling stream with method (${method})`);
let handlerResult: [JSONValue | undefined, ReadableStream<Uint8Array>];
const headerWriter = rpcStream.writable.getWriter();
Expand Down
7 changes: 7 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ class ErrorRPCStreamEnded<T> extends ErrorRPCProtocol<T> {
class ErrorRPCTimedOut<T> extends ErrorRPCProtocol<T> {
static description = 'RPC handler has timed out';
code = JSONRPCErrorCode.RPCTimedOut;
public toJSON(): JSONRPCError {
const json = super.toJSON();
if (typeof json === 'object' && !Array.isArray(json)) {
(json as POJO).type = this.constructor.name;
}
return json;
}
}

class ErrorUtilsUndefinedBehaviour<T> extends ErrorRPCProtocol<T> {
Expand Down
96 changes: 90 additions & 6 deletions src/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import type {
JSONRPCResponse,
JSONRPCResponseResult,
MiddlewareFactory,
JSONValue,
JSONRPCRequestMetadata,
JSONRPCResponseMetadata,
} from './types';
import type { ContextTimed } from '@matrixai/contexts';
import { TransformStream } from 'stream/web';
import { JSONParser } from '@streamparser/json';
import * as utils from './utils';
Expand Down Expand Up @@ -75,6 +79,80 @@ function jsonMessageToBinaryStream(): TransformStream<
});
}

function timeoutMiddlewareServer(
ctx: ContextTimed,
_cancel: (reason?: any) => void,
_meta: Record<string, JSONValue> | undefined,
) {
const currentTimeout = ctx.timer.delay;
// Flags for tracking if the first message has been processed
let forwardFirst = true;
return {
forward: new TransformStream<
JSONRPCRequest<JSONRPCRequestMetadata>,
JSONRPCRequest<JSONRPCRequestMetadata>
>({
transform: (chunk, controller) => {
controller.enqueue(chunk);
if (forwardFirst) {
forwardFirst = false;
let clientTimeout = chunk.metadata?.timeout;
if (clientTimeout === undefined) return;
if (clientTimeout === null) clientTimeout = Infinity;
if (clientTimeout < currentTimeout) ctx.timer.reset(clientTimeout);
}
},
}),
reverse: new TransformStream<
JSONRPCResponse<JSONRPCResponseMetadata>,
JSONRPCResponse<JSONRPCResponseMetadata>
>({
transform: (chunk, controller) => {
// Passthrough chunk, no need for server to send ctx.timeout
controller.enqueue(chunk);
},
}),
};
}

/**
* This adds its own timeout to the forward metadata and updates it's timeout
* based on the reverse metadata.
* @param ctx
* @param _cancel
* @param _meta
*/
function timeoutMiddlewareClient(
ctx: ContextTimed,
_cancel: (reason?: any) => void,
_meta: Record<string, JSONValue> | undefined,
) {
const currentTimeout = ctx.timer.delay;
// Flags for tracking if the first message has been processed
let forwardFirst = true;
return {
forward: new TransformStream<JSONRPCRequest, JSONRPCRequest>({
transform: (chunk, controller) => {
if (forwardFirst) {
forwardFirst = false;
if (chunk == null) chunk = { jsonrpc: '2.0', method: '' };
if (chunk.metadata == null) chunk.metadata = {};
(chunk.metadata as any).timeout = currentTimeout;
}
controller.enqueue(chunk);
},
}),
reverse: new TransformStream<
JSONRPCResponse<JSONRPCResponseMetadata>,
JSONRPCResponse<JSONRPCResponseMetadata>
>({
transform: (chunk, controller) => {
controller.enqueue(chunk); // Passthrough chunk, no need for client to set ctx.timeout
},
}),
};
}

/**
* This function is a factory for creating a pass-through streamPair. It is used
* as the default middleware for the middleware wrappers.
Expand Down Expand Up @@ -116,12 +194,14 @@ function defaultServerMiddlewareWrapper(
>();

const middleMiddleware = middlewareFactory(ctx, cancel, meta);
const timeoutMiddleware = timeoutMiddlewareServer(ctx, cancel, meta);

const forwardReadable = inputTransformStream.readable.pipeThrough(
middleMiddleware.forward,
); // Usual middleware here
const forwardReadable = inputTransformStream.readable
.pipeThrough(timeoutMiddleware.forward) // Timeout middleware here
.pipeThrough(middleMiddleware.forward); // Usual middleware here
const reverseReadable = outputTransformStream.readable
.pipeThrough(middleMiddleware.reverse) // Usual middleware here
.pipeThrough(timeoutMiddleware.reverse) // Timeout middleware here
.pipeThrough(jsonMessageToBinaryStream());

return {
Expand Down Expand Up @@ -172,13 +252,15 @@ const defaultClientMiddlewareWrapper = (
JSONRPCRequest
>();

const timeoutMiddleware = timeoutMiddlewareClient(ctx, cancel, meta);
const middleMiddleware = middleware(ctx, cancel, meta);
const forwardReadable = inputTransformStream.readable
.pipeThrough(timeoutMiddleware.forward)
.pipeThrough(middleMiddleware.forward) // Usual middleware here
.pipeThrough(jsonMessageToBinaryStream());
const reverseReadable = outputTransformStream.readable.pipeThrough(
middleMiddleware.reverse,
); // Usual middleware here
const reverseReadable = outputTransformStream.readable
.pipeThrough(middleMiddleware.reverse)
.pipeThrough(timeoutMiddleware.reverse); // Usual middleware here

return {
forward: {
Expand All @@ -196,6 +278,8 @@ const defaultClientMiddlewareWrapper = (
export {
binaryToJsonMessageStream,
jsonMessageToBinaryStream,
timeoutMiddlewareClient,
timeoutMiddlewareServer,
defaultMiddleware,
defaultServerMiddlewareWrapper,
defaultClientMiddlewareWrapper,
Expand Down
36 changes: 33 additions & 3 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type JSONRPCRequestMessage<T extends JSONValue = JSONValue> = {
* SHOULD NOT contain fractional parts [2]
*/
id: string | number | null;
};
} & JSONRPCRequestMetadata;

/**
* This is the JSON RPC notification object. this is used for a request that
Expand All @@ -60,7 +60,7 @@ type JSONRPCRequestNotification<T extends JSONValue = JSONValue> = {
* This member MAY be omitted.
*/
params?: T;
};
} & JSONRPCRequestMetadata;

/**
* This is the JSON RPC response result object. It contains the response data for a
Expand All @@ -84,7 +84,7 @@ type JSONRPCResponseResult<T extends JSONValue = JSONValue> = {
* it MUST be Null.
*/
id: string | number | null;
};
} & JSONRPCResponseMetadata;

/**
* This is the JSON RPC response Error object. It contains any errors that have
Expand All @@ -110,6 +110,34 @@ type JSONRPCResponseError = {
id: string | number | null;
};

/**
* Used when an empty object is needed.
* Defined here with a linter override to avoid a false positive.
*/
// eslint-disable-next-line
type ObjectEmpty = {};

// Prevent overwriting the metadata type with `Omit<>`
type JSONRPCRequestMetadata<T extends Record<string, JSONValue> = ObjectEmpty> =
{
metadata?: {
[Key: string]: JSONValue;
} & Partial<{
timeout: number | null;
}>;
} & Omit<T, 'metadata'>;

// Prevent overwriting the metadata type with `Omit<>`
type JSONRPCResponseMetadata<
T extends Record<string, JSONValue> = ObjectEmpty,
> = {
metadata?: {
[Key: string]: JSONValue;
} & Partial<{
timeout: number | null;
}>;
} & Omit<T, 'metadata'>;

/**
* This is a JSON RPC error object, it encodes the error data for the JSONRPCResponseError object.
*/
Expand Down Expand Up @@ -357,6 +385,8 @@ export type {
JSONRPCRequestNotification,
JSONRPCResponseResult,
JSONRPCResponseError,
JSONRPCRequestMetadata,
JSONRPCResponseMetadata,
JSONRPCError,
JSONRPCRequest,
JSONRPCResponse,
Expand Down
2 changes: 2 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ const standardErrors: {
URIError,
AggregateError,
AbstractError,
ErrorRPCTimedOut: errors.ErrorRPCTimedOut,
amydevs marked this conversation as resolved.
Show resolved Hide resolved
};

/**
Expand Down Expand Up @@ -342,6 +343,7 @@ function toError(
let e: Error;
switch (eClass) {
case AbstractError:
case errors.ErrorRPCTimedOut:
e = eClass.fromJSON(errorData);
break;
case AggregateError:
Expand Down
Loading