Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions packages/grpc-js/src/server-call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {StatusObject} from './call-stream';
import {Status} from './constants';
import {Deserialize, Serialize} from './make-client';
import {Metadata} from './metadata';
import {StreamDecoder} from './stream-decoder';

function noop(): void {}

Expand Down Expand Up @@ -107,6 +108,15 @@ export class ServerReadableStreamImpl<RequestType, ResponseType> extends
private _deserialize: Deserialize<RequestType>) {
super({objectMode: true});
this.cancelled = false;
this.call.setupReadable(this);
}

_read(size: number) {
this.call.resume();
}

deserialize(input: Buffer): RequestType {
return this._deserialize(input);
}

getPeer(): string {
Expand Down Expand Up @@ -468,6 +478,38 @@ export class Http2ServerCallStream<RequestType, ResponseType> extends
this.sendMetadata();
return this.stream.write(chunk);
}

resume() {
this.stream.resume();
}

setupReadable(readable: ServerReadableStream<RequestType, ResponseType>|
ServerDuplexStream<RequestType, ResponseType>) {
const decoder = new StreamDecoder();

this.stream.on('data', async (data: Buffer) => {
const message = decoder.write(data);

if (message === null) {
return;
}

try {
const deserialized = await this.deserializeMessage(message);

if (!readable.push(deserialized)) {
this.stream.pause();
}
} catch (err) {
err.code = Status.INTERNAL;
readable.emit('error', err);
}
});

this.stream.once('end', () => {
readable.push(null);
});
}
}

// tslint:disable:no-any
Expand Down
17 changes: 16 additions & 1 deletion packages/grpc-js/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,22 @@ function handleClientStreaming<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>,
handler: ClientStreamingHandler<RequestType, ResponseType>,
metadata: Metadata): void {
throw new Error('not implemented yet');
const stream = new ServerReadableStreamImpl<RequestType, ResponseType>(
call, metadata, handler.deserialize);

function respond(
err: ServiceError|null, value: ResponseType|null, trailer?: Metadata,
flags?: number) {
stream.destroy();
call.sendUnaryMessage(err, value, trailer, flags);
}

if (call.cancelled) {
return;
}

stream.on('error', respond);
handler.func(stream, respond);
}


Expand Down
97 changes: 97 additions & 0 deletions packages/grpc-js/test/test-server-errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ describe('Client malformed response handling', () => {
});
});

it('should get an INTERNAL status with a client stream call', (done) => {
const call = client.clientStream((err: ServiceError, data: any) => {
assert(err);
assert.strictEqual(err.code, grpc.status.INTERNAL);
done();
});

call.write({});
call.end();
});

it('should get an INTERNAL status with a server stream call', (done) => {
const call = client.serverStream({});

Expand Down Expand Up @@ -229,6 +240,17 @@ describe('Server serialization failure handling', () => {
});
});

it('should get an INTERNAL status with a client stream call', (done) => {
const call = client.clientStream((err: ServiceError, data: any) => {
assert(err);
assert.strictEqual(err.code, grpc.status.INTERNAL);
done();
});

call.write({});
call.end();
});

it('should get an INTERNAL status with a server stream call', (done) => {
const call = client.serverStream({});

Expand Down Expand Up @@ -397,6 +419,18 @@ describe('Other conditions', () => {
});
});

it('should respond correctly to a client stream', (done) => {
const call =
misbehavingClient.clientStream((err: ServiceError, data: any) => {
assert(err);
assert.strictEqual(err.code, grpc.status.INTERNAL);
done();
});

call.write(badArg);
call.end();
});

it('should respond correctly to a server stream', (done) => {
const call = misbehavingClient.serverStream(badArg);

Expand Down Expand Up @@ -457,6 +491,56 @@ describe('Other conditions', () => {
});
});

it('should be present when a client stream call succeeds', (done) => {
let count = 0;
const call = client.clientStream((err: ServiceError, data: any) => {
assert.ifError(err);

count++;
if (count === 2) {
done();
}
});

call.write({error: false});
call.write({error: false});
call.end();

call.on('status', (status: grpc.StatusObject) => {
assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']);

count++;
if (count === 2) {
done();
}
});
});

it('should be present when a client stream call fails', (done) => {
let count = 0;
const call = client.clientStream((err: ServiceError, data: any) => {
assert(err);

count++;
if (count === 2) {
done();
}
});

call.write({error: false});
call.write({error: true});
call.end();

call.on('status', (status: grpc.StatusObject) => {
assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']);

count++;
if (count === 2) {
done();
}
});
});

it('should be present when a server stream call succeeds', (done) => {
const call = client.serverStream({error: false});

Expand Down Expand Up @@ -489,6 +573,19 @@ describe('Other conditions', () => {
});
});

it('for a client stream call', (done) => {
const call = client.clientStream((err: ServiceError, data: any) => {
assert(err);
assert.strictEqual(err.code, grpc.status.UNKNOWN);
assert.strictEqual(err.details, 'Requested error');
done();
});

call.write({error: false});
call.write({error: true});
call.end();
});

it('for a server stream call', (done) => {
const call = client.serverStream({error: true});

Expand Down
10 changes: 10 additions & 0 deletions packages/grpc-js/test/test-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,16 @@ describe('Server', () => {
});
});

it('should respond to a client stream with UNIMPLEMENTED', (done) => {
const call = client.sum((error: ServiceError, response: any) => {
assert(error);
assert.strictEqual(error.code, grpc.status.UNIMPLEMENTED);
done();
});

call.end();
});

it('should respond to a server stream with UNIMPLEMENTED', (done) => {
const call = client.fib({limit: 5});

Expand Down