Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js: Implement deadline and cancellation propagation #1616

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 13 additions & 3 deletions packages/grpc-js/src/call-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import * as http2 from 'http2';

import { CallCredentials } from './call-credentials';
import { Status } from './constants';
import { Propagate, Status } from './constants';
import { Filter, FilterFactory } from './filter';
import { FilterStackFactory, FilterStack } from './filter-stack';
import { Metadata } from './metadata';
Expand All @@ -27,6 +27,7 @@ import { ChannelImplementation } from './channel';
import { Subchannel } from './subchannel';
import * as logging from './logging';
import { LogVerbosity } from './constants';
import { ServerSurfaceCall } from './server-call';

const TRACER_NAME = 'call_stream';

Expand All @@ -42,7 +43,7 @@ export interface CallStreamOptions {
deadline: Deadline;
flags: number;
host: string;
parentCall: Call | null;
parentCall: ServerSurfaceCall | null;
}

export type PartialCallStreamOptions = Partial<CallStreamOptions>;
Expand Down Expand Up @@ -218,6 +219,11 @@ export class Http2CallStream implements Call {
metadata: new Metadata(),
});
};
if (this.options.parentCall && this.options.flags & Propagate.CANCELLATION) {
this.options.parentCall.on('cancelled', () => {
this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call');
});
}
}

private outputStatus() {
Expand Down Expand Up @@ -623,7 +629,11 @@ export class Http2CallStream implements Call {
}

getDeadline(): Deadline {
return this.options.deadline;
if (this.options.parentCall && this.options.flags & Propagate.DEADLINE) {
return this.options.parentCall.getDeadline();
} else {
return this.options.deadline;
}
}

getCredentials(): CallCredentials {
Expand Down
14 changes: 8 additions & 6 deletions packages/grpc-js/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
import { ChannelControlHelper } from './load-balancer';
import { UnavailablePicker, Picker, PickResultType } from './picker';
import { Metadata } from './metadata';
import { Status, LogVerbosity } from './constants';
import { Status, LogVerbosity, Propagate } from './constants';
import { FilterStackFactory } from './filter-stack';
import { CallCredentialsFilterFactory } from './call-credentials-filter';
import { DeadlineFilterFactory } from './deadline-filter';
Expand All @@ -39,6 +39,8 @@ import { SubchannelAddress } from './subchannel';
import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
import { mapProxyName } from './http_proxy';
import { GrpcUri, parseUri, uriToString } from './uri-parser';
import { ServerSurfaceCall } from './server-call';
import { SurfaceCall } from './call';

export enum ConnectivityState {
CONNECTING,
Expand Down Expand Up @@ -118,7 +120,7 @@ export interface Channel {
method: string,
deadline: Deadline,
host: string | null | undefined,
parentCall: any, // eslint-disable-line @typescript-eslint/no-explicit-any
parentCall: ServerSurfaceCall | null,
propagateFlags: number | null | undefined
): Call;
}
Expand Down Expand Up @@ -509,7 +511,7 @@ export class ChannelImplementation implements Channel {
method: string,
deadline: Deadline,
host: string | null | undefined,
parentCall: any, // eslint-disable-line @typescript-eslint/no-explicit-any
parentCall: ServerSurfaceCall | null,
propagateFlags: number | null | undefined
): Call {
if (typeof method !== 'string') {
Expand Down Expand Up @@ -537,9 +539,9 @@ export class ChannelImplementation implements Channel {
);
const finalOptions: CallStreamOptions = {
deadline: deadline,
flags: propagateFlags || 0,
host: host || this.defaultAuthority,
parentCall: parentCall || null,
flags: propagateFlags ?? Propagate.DEFAULTS,
host: host ?? this.defaultAuthority,
parentCall: parentCall,
};
const stream: Http2CallStream = new Http2CallStream(
method,
Expand Down
20 changes: 5 additions & 15 deletions packages/grpc-js/src/client-interceptors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,21 +311,11 @@ export class InterceptingCall implements InterceptingCallInterface {
}

function getCall(channel: Channel, path: string, options: CallOptions): Call {
let deadline;
let host;
const parent = null;
let propagateFlags;
let credentials;
if (options) {
deadline = options.deadline;
host = options.host;

propagateFlags = options.propagate_flags;
credentials = options.credentials;
}
if (deadline === undefined) {
deadline = Infinity;
}
const deadline = options.deadline ?? Infinity;
const host = options.host;
const parent = options.parent ?? null;
const propagateFlags = options.propagate_flags;
const credentials = options.credentials;
const call = channel.createCall(path, deadline, host, parent, propagateFlags);
if (credentials) {
call.setCredentials(credentials);
Expand Down
3 changes: 2 additions & 1 deletion packages/grpc-js/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ export enum Propagate {
CENSUS_STATS_CONTEXT = 2,
CENSUS_TRACING_CONTEXT = 4,
CANCELLATION = 8,
DEFAULTS = 65536,
// https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/propagation_bits.h#L43
DEFAULTS = 0xffff | Propagate.DEADLINE | Propagate.CENSUS_STATS_CONTEXT | Propagate.CENSUS_TRACING_CONTEXT | Propagate.CANCELLATION,
}

// -1 means unlimited
Expand Down
34 changes: 29 additions & 5 deletions packages/grpc-js/src/server-call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { EventEmitter } from 'events';
import * as http2 from 'http2';
import { Duplex, Readable, Writable } from 'stream';

import { StatusObject } from './call-stream';
import { Deadline, StatusObject } from './call-stream';
import {
Status,
DEFAULT_MAX_SEND_MESSAGE_LENGTH,
Expand Down Expand Up @@ -78,6 +78,7 @@ export type ServerSurfaceCall = {
readonly metadata: Metadata;
getPeer(): string;
sendMetadata(responseMetadata: Metadata): void;
getDeadline(): Deadline;
} & EventEmitter;

export type ServerUnaryCall<RequestType, ResponseType> = ServerSurfaceCall & {
Expand Down Expand Up @@ -120,6 +121,10 @@ export class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter
sendMetadata(responseMetadata: Metadata): void {
this.call.sendMetadata(responseMetadata);
}

getDeadline(): Deadline {
return this.call.getDeadline();
}
}

export class ServerReadableStreamImpl<RequestType, ResponseType>
Expand Down Expand Up @@ -153,6 +158,10 @@ export class ServerReadableStreamImpl<RequestType, ResponseType>
sendMetadata(responseMetadata: Metadata): void {
this.call.sendMetadata(responseMetadata);
}

getDeadline(): Deadline {
return this.call.getDeadline();
}
}

export class ServerWritableStreamImpl<RequestType, ResponseType>
Expand Down Expand Up @@ -186,6 +195,10 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
this.call.sendMetadata(responseMetadata);
}

getDeadline(): Deadline {
return this.call.getDeadline();
}

_write(
chunk: ResponseType,
encoding: string,
Expand Down Expand Up @@ -257,6 +270,10 @@ export class ServerDuplexStreamImpl<RequestType, ResponseType> extends Duplex
this.call.sendMetadata(responseMetadata);
}

getDeadline(): Deadline {
return this.call.getDeadline();
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
end(metadata?: any) {
if (metadata) {
Expand Down Expand Up @@ -357,7 +374,8 @@ export class Http2ServerCallStream<
ResponseType
> extends EventEmitter {
cancelled = false;
deadline: NodeJS.Timer = setTimeout(() => {}, 0);
deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0);
private deadline: Deadline = Infinity;
private wantTrailers = false;
private metadataSent = false;
private canPush = false;
Expand Down Expand Up @@ -405,7 +423,7 @@ export class Http2ServerCallStream<
}

// Clear noop timer
clearTimeout(this.deadline);
clearTimeout(this.deadlineTimer);
}

private checkCancelled(): boolean {
Expand Down Expand Up @@ -452,7 +470,9 @@ export class Http2ServerCallStream<

const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;

this.deadline = setTimeout(handleExpiredDeadline, timeout, this);
const now = new Date();
this.deadline = now.setMilliseconds(now.getMilliseconds() + timeout);
this.deadlineTimer = setTimeout(handleExpiredDeadline, timeout, this);
metadata.remove(GRPC_TIMEOUT_HEADER);
}

Expand Down Expand Up @@ -566,7 +586,7 @@ export class Http2ServerCallStream<
statusObj.details
);

clearTimeout(this.deadline);
clearTimeout(this.deadlineTimer);

if (!this.wantTrailers) {
this.wantTrailers = true;
Expand Down Expand Up @@ -779,6 +799,10 @@ export class Http2ServerCallStream<
return 'unknown';
}
}

getDeadline(): Deadline {
return this.deadline;
}
}

/* eslint-disable @typescript-eslint/no-explicit-any */
Expand Down