Skip to content

Commit

Permalink
feat: Implementing handler and caller timeouts which can override def…
Browse files Browse the repository at this point in the history
…ault server and client timeouts regardless of their default valu
  • Loading branch information
addievo committed Oct 26, 2023
1 parent 8434c70 commit 93d535a
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 31 deletions.
8 changes: 4 additions & 4 deletions src/RPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class RPCClient<M extends ClientManifest> {
this.onTimeoutCallback = callback;
}
// Method proxies
public readonly streamKeepAliveTimeoutTime: number;
public readonly timeout: number;
public readonly methodsProxy = new Proxy(
{},
{
Expand Down Expand Up @@ -106,7 +106,7 @@ class RPCClient<M extends ClientManifest> {
this.callerTypes = utils.getHandlerTypes(manifest);
this.streamFactory = streamFactory;
this.middlewareFactory = middlewareFactory;
this.streamKeepAliveTimeoutTime = streamKeepAliveTimeoutTime;
this.timeout = streamKeepAliveTimeoutTime;
this.logger = logger ?? new Logger(this.constructor.name);
this.toError = toError;
}
Expand Down Expand Up @@ -253,7 +253,7 @@ class RPCClient<M extends ClientManifest> {
let timer: Timer;
if (!(ctx.timer instanceof Timer)) {
timer = new Timer({
delay: ctx.timer ?? this.streamKeepAliveTimeoutTime,
delay: ctx.timer ?? this.timeout,
});
} else {
timer = ctx.timer;
Expand Down Expand Up @@ -402,7 +402,7 @@ class RPCClient<M extends ClientManifest> {
let timer: Timer;
if (!(ctx.timer instanceof Timer)) {
timer = new Timer({
delay: ctx.timer ?? this.streamKeepAliveTimeoutTime,
delay: ctx.timer ?? this.timeout,
});
} else {
timer = ctx.timer;
Expand Down
15 changes: 6 additions & 9 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class RPCServer {
protected logger: Logger;
protected handlerMap: Map<string, RawHandlerImplementation> = new Map();
protected defaultTimeoutMap: Map<string, number | undefined> = new Map();
protected handlerTimeoutTime: number;
protected timeout: number;
protected activeStreams: Set<PromiseCancellable<void>> = new Set();
protected fromError: FromError;
protected replacer?: (key: string, value: any) => any;
Expand All @@ -80,13 +80,10 @@ class RPCServer {
* The middlewareFactory needs to be a function that creates a pair of
* transform streams that convert `Uint8Array` to `JSONRPCRequest` on the forward
* path and `JSONRPCResponse` to `Uint8Array` on the reverse path.
* @param obj.streamKeepAliveTimeoutTime - Time before a connection is cleaned up due to no activity. This is the
* @param obj.handlerTimeoutTime - Time before a connection is cleaned up due to no activity. This is the
* value used if the handler doesn't specify its own timeout time. This timeout is advisory and only results in a
* signal sent to the handler. Stream is forced to end after the timeoutForceCloseTime. Defaults to 60,000
* milliseconds.
* @param obj.timeoutForceCloseTime - Time before the stream is forced to end after the initial timeout time.
* The stream will be forced to close after this amount of time after the initial timeout. This is a grace period for
* the handler to handle timeout before it is forced to end. Defaults to 2,000 milliseconds.
* @param obj.logger
*/
public constructor({
Expand All @@ -111,7 +108,7 @@ class RPCServer {
}) {
this.idGen = idGen;
this.middlewareFactory = middlewareFactory;
this.handlerTimeoutTime = handlerTimeoutTime;
this.timeout = handlerTimeoutTime;
this.fromError = fromError;
this.replacer = replacer;
this.logger = logger ?? new Logger(this.constructor.name);
Expand Down Expand Up @@ -449,7 +446,7 @@ class RPCServer {
const abortController = new AbortController();
// Setting up timeout timer logic
const timer = new Timer({
delay: this.handlerTimeoutTime,
delay: this.timeout,
handler: () => {
abortController.abort(new errors.ErrorRPCTimedOut());
if (this.onTimeoutCallback) {
Expand Down Expand Up @@ -570,8 +567,8 @@ class RPCServer {
}
// Setting up Timeout logic
const timeout = this.defaultTimeoutMap.get(method);
if (timeout != null && timeout < this.handlerTimeoutTime) {
// Reset timeout with new delay if it is less than the default
if (timeout != null) {
// Reset Handler.timeout with new delay if it is less than the default
timer.reset(timeout);
} else {
// Otherwise refresh
Expand Down
72 changes: 72 additions & 0 deletions tests/RPCClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1179,5 +1179,77 @@ describe(`${RPCClient.name}`, () => {
},
{ numRuns: 1 },
);
testProp(
'caller overrides client timeout - lesser value',
[specificMessageArb],
async (messages) => {
const inputStream = rpcTestUtils.messagesToReadableStream(messages);
const [outputResult, outputStream] =
rpcTestUtils.streamToArray<Uint8Array>();
const streamPair: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
meta: undefined,
readable: inputStream,
writable: outputStream,
};
const ctxProm = promise<ContextTimed>();
const rpcClient = new RPCClient({
manifest: {},
streamFactory: async (ctx) => {
ctxProm.resolveP(ctx);
return streamPair;
},
logger,
idGen,
// Setting timeout here to 150ms
streamKeepAliveTimeoutTime: 150,
});

const callerInterface = await rpcClient.duplexStreamCaller<
JSONValue,
JSONValue
>(methodName, { timer: 100 });

const ctx = await ctxProm.p;
expect(ctx.timer.delay).toEqual(100);
},
{ numRuns: 5 },
);
testProp(
'caller overrides client timeout - greater value',
[specificMessageArb],
async (messages) => {
const inputStream = rpcTestUtils.messagesToReadableStream(messages);
const [outputResult, outputStream] =
rpcTestUtils.streamToArray<Uint8Array>();
const streamPair: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
meta: undefined,
readable: inputStream,
writable: outputStream,
};
const ctxProm = promise<ContextTimed>();
const rpcClient = new RPCClient({
manifest: {},
streamFactory: async (ctx) => {
ctxProm.resolveP(ctx);
return streamPair;
},
logger,
idGen,
// Setting timeout here to 150ms
streamKeepAliveTimeoutTime: 150,
});

const callerInterface = await rpcClient.duplexStreamCaller<
JSONValue,
JSONValue
>(methodName, { timer: 300 });

const ctx = await ctxProm.p;
expect(ctx.timer.delay).toEqual(300);
},
{ numRuns: 5 },
);
});
});
112 changes: 94 additions & 18 deletions tests/RPCServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@ describe(`${RPCServer.name}`, () => {
data: rpcTestUtils.safeJsonValueArb,
}),
);

const timeoutArb = fc.oneof(
fc.constant(0),
fc.constant(-1),
fc.constant(1000),
fc.constant(-1000),
fc.integer(),
);
testProp(
'can stream data with raw duplex stream handler',
[specificMessageArb],
Expand Down Expand Up @@ -942,11 +948,12 @@ describe(`${RPCServer.name}`, () => {
}
await rpcServer.stop({ force: true });
});
test('handler overrides timeout', async () => {
test('handler overrides timeout - lesser value', async () => {
{
const waitProm = promise();
const ctxShortProm = promise<ContextTimed>();
class TestMethodShortTimeout extends UnaryHandler {
// This will override the default timeout from 50 to 25, thereby decreasing it.
timeout = 25;
public handle = async (
input: JSONValue,
Expand All @@ -959,20 +966,6 @@ describe(`${RPCServer.name}`, () => {
return input;
};
}
const ctxLongProm = promise<ContextTimed>();
class TestMethodLongTimeout extends UnaryHandler {
timeout = 100;
public handle = async (
input: JSONValue,
_cancel,
_meta,
ctx_,
): Promise<JSONValue> => {
ctxLongProm.resolveP(ctx_);
await waitProm.p;
return input;
};
}
const rpcServer = new RPCServer({
handlerTimeoutTime: 50,
logger,
Expand All @@ -981,7 +974,6 @@ describe(`${RPCServer.name}`, () => {
await rpcServer.start({
manifest: {
testShort: new TestMethodShortTimeout({}),
testLong: new TestMethodLongTimeout({}),
},
});
const streamShort = rpcTestUtils.messagesToReadableStream([
Expand All @@ -1000,6 +992,36 @@ describe(`${RPCServer.name}`, () => {
// Shorter timeout is updated
const ctxShort = await ctxShortProm.p;
expect(ctxShort.timer.delay).toEqual(25);
}
});
test('handler overrides timeout - greater value', async () => {
{
const waitProm = promise();
const ctxLongProm = promise<ContextTimed>();
class TestMethodLongTimeout extends UnaryHandler {
// This will override the default timeout from 50 to 250, thereby increasing it.
timeout = 250;
public handle = async (
input: JSONValue,
_cancel,
_meta,
ctx_,
): Promise<JSONValue> => {
ctxLongProm.resolveP(ctx_);
await waitProm.p;
return input;
};
}
const rpcServer = new RPCServer({
handlerTimeoutTime: 50,
logger,
idGen,
});
await rpcServer.start({
manifest: {
testLong: new TestMethodLongTimeout({}),
},
});
const streamLong = rpcTestUtils.messagesToReadableStream([
{
jsonrpc: '2.0',
Expand All @@ -1016,7 +1038,7 @@ describe(`${RPCServer.name}`, () => {

// Longer timeout is set to server's default
const ctxLong = await ctxLongProm.p;
expect(ctxLong.timer.delay).toEqual(50);
expect(ctxLong.timer.delay).toEqual(250);
waitProm.resolveP();
await rpcServer.stop({ force: true });
}
Expand Down Expand Up @@ -1147,6 +1169,60 @@ describe(`${RPCServer.name}`, () => {
await expect(ctx.timer).toReject();
await rpcServer.stop({ force: true });
});
testProp(
'handler overrides timeout - arbitrary value with edge cases',
[timeoutArb, timeoutArb],
async (serverTimeout, handlerTimeout) => {
const waitProm = promise();
const ctxLongProm = promise<ContextTimed>();

class TestMethodArbitraryTimeout extends UnaryHandler {
timeout = handlerTimeout;
public handle = async (
input: JSONValue,
_cancel,
_meta,
ctx_,
): Promise<JSONValue> => {
ctxLongProm.resolveP(ctx_);
await waitProm.p;
return input;
};
}

const rpcServer = new RPCServer({
handlerTimeoutTime: serverTimeout,
logger,
idGen,
});

await rpcServer.start({
manifest: {
testArbitrary: new TestMethodArbitraryTimeout({}),
},
});

const streamLong = rpcTestUtils.messagesToReadableStream([
{
jsonrpc: '2.0',
method: 'testArbitrary',
params: null,
},
]);

const readWriteStreamLong: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
readable: streamLong,
writable: new WritableStream(),
};

rpcServer.handleStream(readWriteStreamLong);
const ctxLong = await ctxLongProm.p;
expect(ctxLong.timer.delay).toEqual(handlerTimeout);
waitProm.resolveP();
await rpcServer.stop({ force: true });
},
);
testProp(
'middleware can update timeout timer',
[specificMessageArb],
Expand Down

0 comments on commit 93d535a

Please sign in to comment.