diff --git a/src/streamingCalls/streamDescriptor.ts b/src/streamingCalls/streamDescriptor.ts index c3047909b..d23f80219 100644 --- a/src/streamingCalls/streamDescriptor.ts +++ b/src/streamingCalls/streamDescriptor.ts @@ -27,10 +27,12 @@ import {StreamingApiCaller} from './streamingApiCaller'; export class StreamDescriptor implements Descriptor { type: StreamType; streaming: boolean; // needed for browser support + rest?: boolean; - constructor(streamType: StreamType) { + constructor(streamType: StreamType, rest?: boolean) { this.type = streamType; this.streaming = true; + this.rest = rest; } getApiCaller(settings: CallSettings): APICaller { diff --git a/src/streamingCalls/streaming.ts b/src/streamingCalls/streaming.ts index 71deedf82..8c3d975c6 100644 --- a/src/streamingCalls/streaming.ts +++ b/src/streamingCalls/streaming.ts @@ -26,7 +26,6 @@ import { SimpleCallbackFunction, } from '../apitypes'; import {RetryRequestOptions} from '../gax'; -import {StreamArrayParser} from '../streamArrayParser'; // eslint-disable-next-line @typescript-eslint/no-var-requires const duplexify: DuplexifyConstructor = require('duplexify'); @@ -91,6 +90,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult { private _isCancelCalled: boolean; stream?: CancellableStream; private _responseHasSent: boolean; + rest?: boolean; /** * StreamProxy is a proxy to gRPC-streaming method. * @@ -99,7 +99,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult { * @param {StreamType} type - the type of gRPC stream. * @param {ApiCallback} callback - the callback for further API call. */ - constructor(type: StreamType, callback: APICallback) { + constructor(type: StreamType, callback: APICallback, rest?: boolean) { super(undefined, undefined, { objectMode: true, readable: type !== StreamType.CLIENT_STREAMING, @@ -109,6 +109,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult { this._callback = callback; this._isCancelCalled = false; this._responseHasSent = false; + this.rest = rest; } cancel() { @@ -125,9 +126,6 @@ export class StreamProxy extends duplexify implements GRPCCallResult { */ forwardEvents(stream: Stream) { const eventsToForward = ['metadata', 'response', 'status']; - if (stream instanceof StreamArrayParser) { - eventsToForward.push('data', 'end', 'error'); - } eventsToForward.forEach(event => { stream.on(event, this.emit.bind(this, event)); }); @@ -175,26 +173,30 @@ export class StreamProxy extends duplexify implements GRPCCallResult { retryRequestOptions: RetryRequestOptions = {} ) { if (this.type === StreamType.SERVER_STREAMING) { - const retryStream = retryRequest(null, { - objectMode: true, - request: () => { - if (this._isCancelCalled) { - if (this.stream) { - this.stream.cancel(); + const stream = apiCall(argument, this._callback) as CancellableStream; + this.stream = stream; + if (this.rest) { + this.setReadable(stream); + } else { + const retryStream = retryRequest(null, { + objectMode: true, + request: () => { + if (this._isCancelCalled) { + if (this.stream) { + this.stream.cancel(); + } + return; } - return; - } - const stream = apiCall(argument, this._callback) as CancellableStream; - this.stream = stream; - this.forwardEvents(stream); - return stream; - }, - retries: retryRequestOptions!.retries, - currentRetryAttempt: retryRequestOptions!.currentRetryAttempt, - noResponseRetries: retryRequestOptions!.noResponseRetries, - shouldRetryFn: retryRequestOptions!.shouldRetryFn, - }); - this.setReadable(retryStream); + this.forwardEvents(stream); + return stream; + }, + retries: retryRequestOptions!.retries, + currentRetryAttempt: retryRequestOptions!.currentRetryAttempt, + noResponseRetries: retryRequestOptions!.noResponseRetries, + shouldRetryFn: retryRequestOptions!.shouldRetryFn, + }); + this.setReadable(retryStream); + } return; } diff --git a/src/streamingCalls/streamingApiCaller.ts b/src/streamingCalls/streamingApiCaller.ts index 101591bce..6f8dc3634 100644 --- a/src/streamingCalls/streamingApiCaller.ts +++ b/src/streamingCalls/streamingApiCaller.ts @@ -44,7 +44,11 @@ export class StreamingApiCaller implements APICaller { } init(callback: APICallback): StreamProxy { - return new StreamProxy(this.descriptor.type, callback); + return new StreamProxy( + this.descriptor.type, + callback, + this.descriptor.rest + ); } wrap(func: GRPCCall): GRPCCall { diff --git a/test/unit/streaming.ts b/test/unit/streaming.ts index 2515887bb..ea957ab37 100644 --- a/test/unit/streaming.ts +++ b/test/unit/streaming.ts @@ -36,14 +36,15 @@ function createApiCallStreaming( func: | Promise | sinon.SinonSpy, internal.Transform | StreamArrayParser>, - type: streaming.StreamType + type: streaming.StreamType, + rest?: boolean ) { const settings = new gax.CallSettings(); return createApiCall( //@ts-ignore Promise.resolve(func), settings, - new StreamDescriptor(type) + new StreamDescriptor(type, rest) ) as GaxCallStream; } @@ -459,7 +460,7 @@ describe('streaming', () => { }); }); -describe('apiCall return StreamArrayParser', () => { +describe('REST streaming apiCall return StreamArrayParser', () => { const protos_path = path.resolve(__dirname, '..', 'fixtures', 'user.proto'); const root = protobuf.loadSync(protos_path); const UserService = root.lookupService('UserService'); @@ -476,7 +477,8 @@ describe('apiCall return StreamArrayParser', () => { }); const apiCall = createApiCallStreaming( spy, - streaming.StreamType.SERVER_STREAMING + streaming.StreamType.SERVER_STREAMING, + true ); const s = apiCall({}, undefined); assert.strictEqual(s.readable, true); @@ -505,7 +507,8 @@ describe('apiCall return StreamArrayParser', () => { }); const apiCall = createApiCallStreaming( spy, - streaming.StreamType.SERVER_STREAMING + streaming.StreamType.SERVER_STREAMING, + true ); const s = apiCall({}, undefined); assert.strictEqual(s.readable, true); @@ -536,7 +539,8 @@ describe('apiCall return StreamArrayParser', () => { const apiCall = createApiCallStreaming( //@ts-ignore spy, - streaming.StreamType.SERVER_STREAMING + streaming.StreamType.SERVER_STREAMING, + true ); const s = apiCall({}, undefined); let counter = 0;