Skip to content

Commit

Permalink
Merge pull request #1785 from murgatroid99/grpc-js_service_config_tim…
Browse files Browse the repository at this point in the history
…eout

grpc-js: Apply timeouts from service configs
  • Loading branch information
murgatroid99 committed May 14, 2021
2 parents 1e9bf30 + e3106b9 commit 663fe77
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 16 deletions.
30 changes: 23 additions & 7 deletions packages/grpc-js/src/call-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ function getSystemErrorName(errno: number): string {

export type Deadline = Date | number;

function getMinDeadline(deadlineList: Deadline[]): Deadline {
let minValue: number = Infinity;
for (const deadline of deadlineList) {
const deadlineMsecs = deadline instanceof Date ? deadline.getTime() : deadline;
if (deadlineMsecs < minValue) {
minValue = deadlineMsecs;
}
}
return minValue;
}

export interface CallStreamOptions {
deadline: Deadline;
flags: number;
Expand Down Expand Up @@ -235,6 +246,8 @@ export class Http2CallStream implements Call {

private internalError: SystemError | null = null;

private configDeadline: Deadline = Infinity;

constructor(
private readonly methodName: string,
private readonly channel: ChannelImplementation,
Expand Down Expand Up @@ -675,15 +688,14 @@ export class Http2CallStream implements Call {
}

getDeadline(): Deadline {
const deadlineList = [this.options.deadline];
if (this.options.parentCall && this.options.flags & Propagate.DEADLINE) {
const parentDeadline = this.options.parentCall.getDeadline();
const selfDeadline = this.options.deadline;
const parentDeadlineMsecs = parentDeadline instanceof Date ? parentDeadline.getTime() : parentDeadline;
const selfDeadlineMsecs = selfDeadline instanceof Date ? selfDeadline.getTime() : selfDeadline;
return Math.min(parentDeadlineMsecs, selfDeadlineMsecs);
} else {
return this.options.deadline;
deadlineList.push(this.options.parentCall.getDeadline());
}
if (this.configDeadline) {
deadlineList.push(this.configDeadline);
}
return getMinDeadline(deadlineList);
}

getCredentials(): CallCredentials {
Expand All @@ -710,6 +722,10 @@ export class Http2CallStream implements Call {
return this.options.host;
}

setConfigDeadline(configDeadline: Deadline) {
this.configDeadline = configDeadline;
}

startRead() {
/* If the stream has ended with an error, we should not emit any more
* messages and we should communicate that the stream has ended */
Expand Down
13 changes: 13 additions & 0 deletions packages/grpc-js/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,11 @@ export class ChannelImplementation implements Channel {
}

private tryGetConfig(stream: Http2CallStream, metadata: Metadata) {
if (stream.getStatus() !== null) {
/* If the stream has a status, it has already finished and we don't need
* to take any more actions on it. */
return;
}
if (this.configSelector === null) {
/* This branch will only be taken at the beginning of the channel's life,
* before the resolver ever returns a result. So, the
Expand All @@ -523,6 +528,14 @@ export class ChannelImplementation implements Channel {
} else {
const callConfig = this.configSelector(stream.getMethod(), metadata);
if (callConfig.status === Status.OK) {
if (callConfig.methodConfig.timeout) {
const deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + callConfig.methodConfig.timeout.seconds);
deadline.setMilliseconds(deadline.getMilliseconds() + callConfig.methodConfig.timeout.nanos / 1_000_000);
stream.setConfigDeadline(deadline);
// Refreshing the filters makes the deadline filter pick up the new deadline
stream.filterStack.refresh();
}
this.tryPick(stream, metadata, callConfig);
} else {
stream.cancelWithStatus(callConfig.status, "Failed to route call to method " + stream.getMethod());
Expand Down
24 changes: 20 additions & 4 deletions packages/grpc-js/src/deadline-filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,41 @@ function getDeadline(deadline: number) {

export class DeadlineFilter extends BaseFilter implements Filter {
private timer: NodeJS.Timer | null = null;
private deadline: number;
private deadline: number = Infinity;
constructor(
private readonly channel: Channel,
private readonly callStream: Call
) {
super();
const callDeadline = callStream.getDeadline();
this.retreiveDeadline();
this.runTimer();
}

private retreiveDeadline() {
const callDeadline = this.callStream.getDeadline();
if (callDeadline instanceof Date) {
this.deadline = callDeadline.getTime();
} else {
this.deadline = callDeadline;
}
}

private runTimer() {
if (this.timer) {
clearTimeout(this.timer);
}
const now: number = new Date().getTime();
let timeout = this.deadline - now;
if (timeout <= 0) {
process.nextTick(() => {
callStream.cancelWithStatus(
this.callStream.cancelWithStatus(
Status.DEADLINE_EXCEEDED,
'Deadline exceeded'
);
});
} else if (this.deadline !== Infinity) {
this.timer = setTimeout(() => {
callStream.cancelWithStatus(
this.callStream.cancelWithStatus(
Status.DEADLINE_EXCEEDED,
'Deadline exceeded'
);
Expand All @@ -74,6 +85,11 @@ export class DeadlineFilter extends BaseFilter implements Filter {
}
}

refresh() {
this.retreiveDeadline();
this.runTimer();
}

async sendMetadata(metadata: Promise<Metadata>) {
if (this.deadline === Infinity) {
return metadata;
Expand Down
6 changes: 6 additions & 0 deletions packages/grpc-js/src/filter-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ export class FilterStack implements Filter {

return result;
}

refresh(): void {
for (const filter of this.filters) {
filter.refresh();
}
}
}

export class FilterStackFactory implements FilterFactory<FilterStack> {
Expand Down
5 changes: 5 additions & 0 deletions packages/grpc-js/src/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ export interface Filter {
receiveMessage(message: Promise<Buffer>): Promise<Buffer>;

receiveTrailers(status: StatusObject): StatusObject;

refresh(): void;
}

export abstract class BaseFilter implements Filter {
Expand All @@ -54,6 +56,9 @@ export abstract class BaseFilter implements Filter {
receiveTrailers(status: StatusObject): StatusObject {
return status;
}

refresh(): void {
}
}

export interface FilterFactory<T extends Filter> {
Expand Down
27 changes: 22 additions & 5 deletions packages/grpc-js/src/service-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,15 @@ export interface MethodConfigName {
method?: string;
}

export interface Duration {
seconds: number;
nanos: number;
}

export interface MethodConfig {
name: MethodConfigName[];
waitForReady?: boolean;
timeout?: string;
timeout?: Duration;
maxRequestBytes?: number;
maxResponseBytes?: number;
}
Expand Down Expand Up @@ -101,13 +106,25 @@ function validateMethodConfig(obj: any): MethodConfig {
result.waitForReady = obj.waitForReady;
}
if ('timeout' in obj) {
if (
!(typeof obj.timeout === 'string') ||
!TIMEOUT_REGEX.test(obj.timeout)
if (typeof obj.timeout === 'object') {
if (!('seconds' in obj.timeout) || !(typeof obj.timeout.seconds === 'number')) {
throw new Error('Invalid method config: invalid timeout.seconds');
}
if (!('nanos' in obj.timeout) || !(typeof obj.timeout.nanos === 'number')) {
throw new Error('Invalid method config: invalid timeout.nanos');
}
result.timeout = obj.timeout;
} else if (
(typeof obj.timeout === 'string') && TIMEOUT_REGEX.test(obj.timeout)
) {
const timeoutParts = obj.timeout.substring(0, obj.timeout.length - 1).split('.');
result.timeout = {
seconds: timeoutParts[0] | 0,
nanos: (timeoutParts[1] ?? 0) | 0
}
} else {
throw new Error('Invalid method config: invalid timeout');
}
result.timeout = obj.timeout;
}
if ('maxRequestBytes' in obj) {
if (typeof obj.maxRequestBytes !== 'number') {
Expand Down

0 comments on commit 663fe77

Please sign in to comment.