diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 9528c30c8..b2da85c8c 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -24,6 +24,7 @@ import {StatusObject} from './call-stream'; import {Status} from './constants'; import {Deserialize, Serialize} from './make-client'; import {Metadata} from './metadata'; +import {StreamDecoder} from './stream-decoder'; function noop(): void {} @@ -107,6 +108,15 @@ export class ServerReadableStreamImpl extends private _deserialize: Deserialize) { super({objectMode: true}); this.cancelled = false; + this.call.setupReadable(this); + } + + _read(size: number) { + this.call.resume(); + } + + deserialize(input: Buffer): RequestType { + return this._deserialize(input); } getPeer(): string { @@ -468,6 +478,38 @@ export class Http2ServerCallStream extends this.sendMetadata(); return this.stream.write(chunk); } + + resume() { + this.stream.resume(); + } + + setupReadable(readable: ServerReadableStream| + ServerDuplexStream) { + const decoder = new StreamDecoder(); + + this.stream.on('data', async (data: Buffer) => { + const message = decoder.write(data); + + if (message === null) { + return; + } + + try { + const deserialized = await this.deserializeMessage(message); + + if (!readable.push(deserialized)) { + this.stream.pause(); + } + } catch (err) { + err.code = Status.INTERNAL; + readable.emit('error', err); + } + }); + + this.stream.once('end', () => { + readable.push(null); + }); + } } // tslint:disable:no-any diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 421c089d6..68f6e1498 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -314,7 +314,22 @@ function handleClientStreaming( call: Http2ServerCallStream, handler: ClientStreamingHandler, metadata: Metadata): void { - throw new Error('not implemented yet'); + const stream = new ServerReadableStreamImpl( + call, metadata, handler.deserialize); + + function respond( + err: ServiceError|null, value: ResponseType|null, trailer?: Metadata, + flags?: number) { + stream.destroy(); + call.sendUnaryMessage(err, value, trailer, flags); + } + + if (call.cancelled) { + return; + } + + stream.on('error', respond); + handler.func(stream, respond); } diff --git a/packages/grpc-js/test/test-server-errors.ts b/packages/grpc-js/test/test-server-errors.ts index eabae3aae..6f352f736 100644 --- a/packages/grpc-js/test/test-server-errors.ts +++ b/packages/grpc-js/test/test-server-errors.ts @@ -126,6 +126,17 @@ describe('Client malformed response handling', () => { }); }); + it('should get an INTERNAL status with a client stream call', (done) => { + const call = client.clientStream((err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + + call.write({}); + call.end(); + }); + it('should get an INTERNAL status with a server stream call', (done) => { const call = client.serverStream({}); @@ -229,6 +240,17 @@ describe('Server serialization failure handling', () => { }); }); + it('should get an INTERNAL status with a client stream call', (done) => { + const call = client.clientStream((err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + + call.write({}); + call.end(); + }); + it('should get an INTERNAL status with a server stream call', (done) => { const call = client.serverStream({}); @@ -397,6 +419,18 @@ describe('Other conditions', () => { }); }); + it('should respond correctly to a client stream', (done) => { + const call = + misbehavingClient.clientStream((err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + + call.write(badArg); + call.end(); + }); + it('should respond correctly to a server stream', (done) => { const call = misbehavingClient.serverStream(badArg); @@ -457,6 +491,56 @@ describe('Other conditions', () => { }); }); + it('should be present when a client stream call succeeds', (done) => { + let count = 0; + const call = client.clientStream((err: ServiceError, data: any) => { + assert.ifError(err); + + count++; + if (count === 2) { + done(); + } + }); + + call.write({error: false}); + call.write({error: false}); + call.end(); + + call.on('status', (status: grpc.StatusObject) => { + assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']); + + count++; + if (count === 2) { + done(); + } + }); + }); + + it('should be present when a client stream call fails', (done) => { + let count = 0; + const call = client.clientStream((err: ServiceError, data: any) => { + assert(err); + + count++; + if (count === 2) { + done(); + } + }); + + call.write({error: false}); + call.write({error: true}); + call.end(); + + call.on('status', (status: grpc.StatusObject) => { + assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']); + + count++; + if (count === 2) { + done(); + } + }); + }); + it('should be present when a server stream call succeeds', (done) => { const call = client.serverStream({error: false}); @@ -489,6 +573,19 @@ describe('Other conditions', () => { }); }); + it('for a client stream call', (done) => { + const call = client.clientStream((err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.UNKNOWN); + assert.strictEqual(err.details, 'Requested error'); + done(); + }); + + call.write({error: false}); + call.write({error: true}); + call.end(); + }); + it('for a server stream call', (done) => { const call = client.serverStream({error: true}); diff --git a/packages/grpc-js/test/test-server.ts b/packages/grpc-js/test/test-server.ts index f44f5d5dd..d310ff072 100644 --- a/packages/grpc-js/test/test-server.ts +++ b/packages/grpc-js/test/test-server.ts @@ -263,6 +263,16 @@ describe('Server', () => { }); }); + it('should respond to a client stream with UNIMPLEMENTED', (done) => { + const call = client.sum((error: ServiceError, response: any) => { + assert(error); + assert.strictEqual(error.code, grpc.status.UNIMPLEMENTED); + done(); + }); + + call.end(); + }); + it('should respond to a server stream with UNIMPLEMENTED', (done) => { const call = client.fib({limit: 5});