Skip to content

Commit

Permalink
Merge pull request #47 from MatrixAI/feature-handler-timeout
Browse files Browse the repository at this point in the history
Update handler and caller timeouts to be able to overwrite server and client default timeouts regardless of value.
  • Loading branch information
tegefaulkes committed Oct 31, 2023
2 parents a154f27 + 9acf805 commit 928bbbc
Show file tree
Hide file tree
Showing 8 changed files with 452 additions and 167 deletions.
37 changes: 33 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class Sum extends ClientHandler<ContainerType, number, number> {
async function startServer() {
const rpcServer = new RPCServer({
logger: new Logger('rpc-server'),
handlerTimeoutTime: 60000,
timeoutTime: 60000,
idGen,
});

Expand Down Expand Up @@ -421,7 +421,7 @@ function factorialOf(n: number): number {

async function startServer() {
const rpcServer = new RPCServer({
handlerTimeoutTime: 200,
timeoutTime: 200,
logger,
idGen,
});
Expand Down Expand Up @@ -671,7 +671,7 @@ async function startServer() {
const wss = new WebSocket.Server({ port: 8080 });
const rpcServer = new RPCServer({
logger: new Logger('rpc-server'),
handlerTimeoutTime: 1000,
timeoutTime: 1000,
idGen,
});
rpcServer.start({
Expand Down Expand Up @@ -835,7 +835,7 @@ function createSyntheticStreams() {
async function startServer() {
const rpcServer = new RPCServer({
logger: new Logger('rpc-server'),
handlerTimeoutTime: 1000,
timeoutTime: 1000,
idGen,
});

Expand Down Expand Up @@ -924,6 +924,35 @@ class TestMethod extends UnaryHandler {
}
```

### Timeout Priority

A `timeoutTime` can be passed both to the constructors of `RPCServer` and `RPCClient`. This is the default `timeoutTime` for all callers/handlers.

In the case of `RPCServer`, a `timeout` can be specified when extending any `Handler` class. This will override the default `timeoutTime` set on `RPCServer` for that handler only.

```ts
class TestMethodArbitraryTimeout extends UnaryHandler {
public timeout = 100;
public handle = async (
input: JSONValue,
_cancel,
_meta,
ctx_,
): Promise<JSONValue> => {
return input;
};
}
```

In the case of `RPCClient`, a `ctx` with the property `timer` can be supplied with a `Timer` instance or `number` when making making an RPC call. This will override the default `timeoutTime` set on `RPCClient` for that call only.

```ts
await rpcClient.methods.testMethod({}, { timer: 100 });
await rpcClient.methods.testMethod({}, { timer: new Timer(undefined, 100) });
```

It's important to note that any of these timeouts will ultimately be overridden by the shortest timeout of the server and client combined using the timeout middleware below.

### Timeout Middleware

The `timeoutMiddleware` sets an RPCServer's timeout based on the lowest timeout between the Client and the Server. This is so that handlers can eagerly time out and stop processing as soon as it is known that the client has timed out.
Expand Down
17 changes: 10 additions & 7 deletions src/RPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class RPCClient<M extends ClientManifest> {
this.onTimeoutCallback = callback;
}
// Method proxies
public readonly streamKeepAliveTimeoutTime: number;
public readonly timeoutTime: number;
public readonly methodsProxy = new Proxy(
{},
{
Expand Down Expand Up @@ -76,7 +76,7 @@ class RPCClient<M extends ClientManifest> {
* The middlewareFactory needs to be a function that creates a pair of
* transform streams that convert `JSONRPCRequest` to `Uint8Array` on the forward
* path and `Uint8Array` to `JSONRPCResponse` on the reverse path.
* @param obj.streamKeepAliveTimeoutTime - Timeout time used if no timeout timer was provided when making a call.
* @param obj.timeoutTime - Timeout time used if no timeout timer was provided when making a call.
* Defaults to 60,000 milliseconds.
* for a client call.
* @param obj.logger
Expand All @@ -85,7 +85,7 @@ class RPCClient<M extends ClientManifest> {
manifest,
streamFactory,
middlewareFactory = middleware.defaultClientMiddlewareWrapper(),
streamKeepAliveTimeoutTime = Infinity,
timeoutTime = Infinity,
logger,
toError = utils.toError,
idGen = () => null,
Expand All @@ -98,16 +98,19 @@ class RPCClient<M extends ClientManifest> {
JSONRPCResponse,
Uint8Array
>;
streamKeepAliveTimeoutTime?: number;
timeoutTime?: number;
logger?: Logger;
idGen?: IdGen;
toError?: ToError;
}) {
if (timeoutTime < 0) {
throw new errors.ErrorRPCInvalidTimeout();
}
this.idGen = idGen;
this.callerTypes = utils.getHandlerTypes(manifest);
this.streamFactory = streamFactory;
this.middlewareFactory = middlewareFactory;
this.streamKeepAliveTimeoutTime = streamKeepAliveTimeoutTime;
this.timeoutTime = timeoutTime;
this.logger = logger ?? new Logger(this.constructor.name);
this.toError = toError;
}
Expand Down Expand Up @@ -254,7 +257,7 @@ class RPCClient<M extends ClientManifest> {
let timer: Timer;
if (!(ctx.timer instanceof Timer)) {
timer = new Timer({
delay: ctx.timer ?? this.streamKeepAliveTimeoutTime,
delay: ctx.timer ?? this.timeoutTime,
});
} else {
timer = ctx.timer;
Expand Down Expand Up @@ -403,7 +406,7 @@ class RPCClient<M extends ClientManifest> {
let timer: Timer;
if (!(ctx.timer instanceof Timer)) {
timer = new Timer({
delay: ctx.timer ?? this.streamKeepAliveTimeoutTime,
delay: ctx.timer ?? this.timeoutTime,
});
} else {
timer = ctx.timer;
Expand Down
132 changes: 71 additions & 61 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class RPCServer {
protected logger: Logger;
protected handlerMap: Map<string, RawHandlerImplementation> = new Map();
protected defaultTimeoutMap: Map<string, number | undefined> = new Map();
protected handlerTimeoutTime: number;
protected timeoutTime: number;
protected activeStreams: Set<PromiseCancellable<void>> = new Set();
protected fromError: FromError;
protected replacer?: (key: string, value: any) => any;
Expand All @@ -80,18 +80,15 @@ class RPCServer {
* The middlewareFactory needs to be a function that creates a pair of
* transform streams that convert `Uint8Array` to `JSONRPCRequest` on the forward
* path and `JSONRPCResponse` to `Uint8Array` on the reverse path.
* @param obj.streamKeepAliveTimeoutTime - Time before a connection is cleaned up due to no activity. This is the
* @param obj.timeoutTime - Time before a stream is cleaned up due to no activity. This is the
* value used if the handler doesn't specify its own timeout time. This timeout is advisory and only results in a
* signal sent to the handler. Stream is forced to end after the timeoutForceCloseTime. Defaults to 60,000
* milliseconds.
* @param obj.timeoutForceCloseTime - Time before the stream is forced to end after the initial timeout time.
* The stream will be forced to close after this amount of time after the initial timeout. This is a grace period for
* the handler to handle timeout before it is forced to end. Defaults to 2,000 milliseconds.
* @param obj.logger
*/
public constructor({
middlewareFactory = middleware.defaultServerMiddlewareWrapper(),
handlerTimeoutTime = Infinity,
timeoutTime = Infinity,
logger,
idGen = () => null,
fromError = utils.fromError,
Expand All @@ -103,15 +100,18 @@ class RPCServer {
Uint8Array,
JSONRPCResponseResult
>;
handlerTimeoutTime?: number;
timeoutTime?: number;
logger?: Logger;
idGen?: IdGen;
fromError?: FromError;
replacer?: (key: string, value: any) => any;
}) {
if (timeoutTime < 0) {
throw new errors.ErrorRPCInvalidTimeout();
}
this.idGen = idGen;
this.middlewareFactory = middlewareFactory;
this.handlerTimeoutTime = handlerTimeoutTime;
this.timeoutTime = timeoutTime;
this.fromError = fromError;
this.replacer = replacer;
this.logger = logger ?? new Logger(this.constructor.name);
Expand All @@ -129,58 +129,68 @@ class RPCServer {
manifest: ServerManifest;
}): Promise<void> {
this.logger.info(`Start ${this.constructor.name}`);
for (const [key, manifestItem] of Object.entries(manifest)) {
if (manifestItem instanceof RawHandler) {
this.registerRawStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof DuplexHandler) {
this.registerDuplexStreamHandler(
key,
// Bind the `this` to the generator handler to make the container available
manifestItem.handle.bind(manifestItem),
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ServerHandler) {
this.registerServerStreamHandler(
key,
// Bind the `this` to the generator handler to make the container available
manifestItem.handle.bind(manifestItem),
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ClientHandler) {
this.registerClientStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ClientHandler) {
this.registerClientStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof UnaryHandler) {
this.registerUnaryHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
try {
for (const [key, manifestItem] of Object.entries(manifest)) {
if (manifestItem.timeout != null && manifestItem.timeout < 0) {
throw new errors.ErrorRPCInvalidHandlerTimeout();
}
if (manifestItem instanceof RawHandler) {
this.registerRawStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof DuplexHandler) {
this.registerDuplexStreamHandler(
key,
// Bind the `this` to the generator handler to make the container available
manifestItem.handle.bind(manifestItem),
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ServerHandler) {
this.registerServerStreamHandler(
key,
// Bind the `this` to the generator handler to make the container available
manifestItem.handle.bind(manifestItem),
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ClientHandler) {
this.registerClientStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ClientHandler) {
this.registerClientStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof UnaryHandler) {
this.registerUnaryHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
utils.never();
}
utils.never();
} catch (e) {
// No need to clean up streams, as streams can only be handled after RPCServer has been started.
this.handlerMap.clear();
this.defaultTimeoutMap.clear();
throw e;
}
this.logger.info(`Started ${this.constructor.name}`);
}
Expand Down Expand Up @@ -453,7 +463,7 @@ class RPCServer {
const abortController = new AbortController();
// Setting up timeout timer logic
const timer = new Timer({
delay: this.handlerTimeoutTime,
delay: this.timeoutTime,
handler: () => {
abortController.abort(new errors.ErrorRPCTimedOut());
if (this.onTimeoutCallback) {
Expand Down Expand Up @@ -575,7 +585,7 @@ class RPCServer {
// Setting up Timeout logic
const timeout = this.defaultTimeoutMap.get(method);
if (timer.status !== 'settled') {
if (timeout != null && timeout < this.handlerTimeoutTime) {
if (timeout != null) {
// Reset timeout with new delay if it is less than the default
timer.reset(timeout);
} else {
Expand Down
10 changes: 10 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ class ErrorRPCCallerFailed<T> extends ErrorRPC<T> {
static description = 'Failed to call stream';
}

class ErrorRPCInvalidTimeout<T> extends ErrorRPC<T> {
static description = 'Invalid timeout provided';
}

class ErrorRPCInvalidHandlerTimeout<T> extends ErrorRPC<T> {
static description = 'Invalid handler timeout provided';
}

abstract class ErrorRPCProtocol<T> extends ErrorRPC<T> {
static error = 'RPC Protocol Error';
code: number;
Expand Down Expand Up @@ -257,6 +265,8 @@ export {
ErrorRPCConnectionLocal,
ErrorRPCConnectionPeer,
ErrorRPCConnectionKeepAliveTimeOut,
ErrorRPCInvalidTimeout,
ErrorRPCInvalidHandlerTimeout,
ErrorRPCConnectionInternal,
ErrorMissingHeader,
ErrorHandlerAborted,
Expand Down
Loading

0 comments on commit 928bbbc

Please sign in to comment.