Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Websocket Cancellation Handling #917

Merged
merged 3 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions go/grpcweb/websocket_wrapper.go
Expand Up @@ -176,6 +176,16 @@ func (w *webSocketWrappedReader) Read(p []byte) (int, error) {

// If the frame consists of only a single byte of value 1 then this indicates the client has finished sending
if len(framePayload) == 1 && framePayload[0] == 1 {
go func() {
for {
messageType, _, err := w.wsConn.Read(w.context)
if err == io.EOF || messageType == 0 {
// The client has closed the connection. Indicate to the response writer that it should close
w.cancel()
return
}
}
}()
return 0, io.EOF
}

Expand Down
161 changes: 82 additions & 79 deletions integration_test/ts/src/cancellation.spec.ts
Expand Up @@ -14,7 +14,7 @@ import {
} from "../_proto/improbable/grpcweb/test/test_pb";
import {TestService, TestUtilService} from "../_proto/improbable/grpcweb/test/test_pb_service";
import {DEBUG, continueStream} from "./util";
import { runWithHttp1AndHttp2 } from "./testRpcCombinations";
import { runWithHttp1AndHttp2, runWithSupportedTransports } from "./testRpcCombinations";

describe("Cancellation", () => {
runWithHttp1AndHttp2(({testHostUrl}) => {
Expand Down Expand Up @@ -52,91 +52,94 @@ describe("Cancellation", () => {
assert.equal(transportCancelFuncInvoked, true, "transport's cancel func must be invoked");
});

it("should handle aborting a streaming response mid-stream with propagation of the disconnection to the server", (done) => {
let onMessageId = 0;
runWithSupportedTransports((transport) => {
it("should handle aborting a streaming response mid-stream with propagation of the disconnection to the server", (done) => {
let onMessageId = 0;

const streamIdentifier = `rpc-${Math.random()}`;
const streamIdentifier = `rpc-${Math.random()}`;

const ping = new PingRequest();
ping.setValue("hello world");
ping.setResponseCount(100); // Request more messages than the client will accept before cancelling
ping.setStreamIdentifier(streamIdentifier);

let reqObj: grpc.Request;

// Checks are performed every 1s = 15s total wait
const maxAbortChecks = 15;

const numMessagesBeforeAbort = 5;

const doAbort = () => {
DEBUG && debug("doAbort");
reqObj.close();

// To ensure that the transport is successfully closing the connection, poll the server every 1s until
// it confirms the connection was closed. Connection closure is immediate in some browser/transport combinations,
// but can take several seconds in others.
function checkAbort(attempt: number) {
DEBUG && debug("checkAbort", attempt);
continueStream(testHostUrl, streamIdentifier, (status) => {
DEBUG && debug("checkAbort.continueStream.status", status);

const checkStreamClosedRequest = new CheckStreamClosedRequest();
checkStreamClosedRequest.setStreamIdentifier(streamIdentifier);
grpc.unary(TestUtilService.CheckStreamClosed, {
debug: DEBUG,
request: checkStreamClosedRequest,
host: testHostUrl,
onEnd: ({message}) => {
const closed = ( message as CheckStreamClosedResponse ).getClosed();
DEBUG && debug("closed", closed);
if (closed) {
done();
} else {
if (attempt >= maxAbortChecks) {
assert.ok(closed, `server did not observe connection closure within ${maxAbortChecks} seconds`);
const ping = new PingRequest();
ping.setValue("hello world");
ping.setResponseCount(100); // Request more messages than the client will accept before cancelling
ping.setStreamIdentifier(streamIdentifier);

let reqObj: grpc.Request;

// Checks are performed every 1s = 15s total wait
const maxAbortChecks = 15;

const numMessagesBeforeAbort = 5;

const doAbort = () => {
DEBUG && debug("doAbort");
reqObj.close();

// To ensure that the transport is successfully closing the connection, poll the server every 1s until
// it confirms the connection was closed. Connection closure is immediate in some browser/transport combinations,
// but can take several seconds in others.
function checkAbort(attempt: number) {
DEBUG && debug("checkAbort", attempt);
continueStream(testHostUrl, streamIdentifier, (status) => {
DEBUG && debug("checkAbort.continueStream.status", status);

const checkStreamClosedRequest = new CheckStreamClosedRequest();
checkStreamClosedRequest.setStreamIdentifier(streamIdentifier);
grpc.unary(TestUtilService.CheckStreamClosed, {
debug: DEBUG,
request: checkStreamClosedRequest,
host: testHostUrl,
onEnd: ({message}) => {
const closed = ( message as CheckStreamClosedResponse ).getClosed();
DEBUG && debug("closed", closed);
if (closed) {
done();
} else {
setTimeout(() => {
checkAbort(attempt + 1);
}, 1000);
if (attempt >= maxAbortChecks) {
assert.ok(closed, `server did not observe connection closure within ${maxAbortChecks} seconds`);
done();
} else {
setTimeout(() => {
checkAbort(attempt + 1);
}, 1000);
}
}
}
},
})
});
}
},
})
});
}

checkAbort(0);
};
checkAbort(0);
};

reqObj = grpc.invoke(TestService.PingList, {
debug: DEBUG,
request: ping,
host: testHostUrl,
onHeaders: (headers: grpc.Metadata) => {
DEBUG && debug("headers", headers);
},
onMessage: (message: PingResponse) => {
assert.ok(message instanceof PingResponse);
DEBUG && debug("onMessage.message.getCounter()", message.getCounter());
assert.strictEqual(message.getCounter(), onMessageId++);
if (message.getCounter() === numMessagesBeforeAbort) {
// Abort after receiving numMessagesBeforeAbort messages
doAbort();
} else if (message.getCounter() < numMessagesBeforeAbort) {
// Only request the next message if not yet aborted
continueStream(testHostUrl, streamIdentifier, (status) => {
DEBUG && debug("onMessage.continueStream.status", status);
});
reqObj = grpc.invoke(TestService.PingList, {
debug: DEBUG,
request: ping,
host: testHostUrl,
transport: transport,
onHeaders: (headers: grpc.Metadata) => {
DEBUG && debug("headers", headers);
},
onMessage: (message: PingResponse) => {
assert.ok(message instanceof PingResponse);
DEBUG && debug("onMessage.message.getCounter()", message.getCounter());
assert.strictEqual(message.getCounter(), onMessageId++);
if (message.getCounter() === numMessagesBeforeAbort) {
// Abort after receiving numMessagesBeforeAbort messages
doAbort();
} else if (message.getCounter() < numMessagesBeforeAbort) {
// Only request the next message if not yet aborted
continueStream(testHostUrl, streamIdentifier, (status) => {
DEBUG && debug("onMessage.continueStream.status", status);
});
}
},
onEnd: (status: grpc.Code, statusMessage: string, trailers: grpc.Metadata) => {
DEBUG && debug("status", status, "statusMessage", statusMessage, "trailers", trailers);
// onEnd shouldn't be called if abort is called prior to the response ending
assert.fail();
}
},
onEnd: (status: grpc.Code, statusMessage: string, trailers: grpc.Metadata) => {
DEBUG && debug("status", status, "statusMessage", statusMessage, "trailers", trailers);
// onEnd shouldn't be called if abort is called prior to the response ending
assert.fail();
}
});
}, 20000);
});
}, 20000);
})
});
});
109 changes: 105 additions & 4 deletions integration_test/ts/src/client.websocket.spec.ts
Expand Up @@ -4,9 +4,14 @@ import { grpc } from "@improbable-eng/grpc-web";
import { debug } from "../../../client/grpc-web/src/debug";
import { assert } from "chai";
// Generated Test Classes
import { PingRequest, PingResponse } from "../_proto/improbable/grpcweb/test/test_pb";
import { TestService } from "../_proto/improbable/grpcweb/test/test_pb_service";
import { DEBUG, DISABLE_WEBSOCKET_TESTS } from "./util";
import {
CheckStreamClosedRequest,
CheckStreamClosedResponse,
PingRequest,
PingResponse
} from "../_proto/improbable/grpcweb/test/test_pb";
import { TestService, TestUtilService } from "../_proto/improbable/grpcweb/test/test_pb_service";
import { continueStream, DEBUG, DISABLE_WEBSOCKET_TESTS } from "./util";
import { headerTrailerCombos, runWithHttp1AndHttp2 } from "./testRpcCombinations";

if (DISABLE_WEBSOCKET_TESTS) {
Expand Down Expand Up @@ -73,7 +78,7 @@ if (DISABLE_WEBSOCKET_TESTS) {

describe("bidirectional (websockets)", () => {
headerTrailerCombos((withHeaders, withTrailers) => {
it("should make a bidirectional request that is terminated by the client", (done) => {
it("should make a bidirectional request that is ended by the client finishing sending", (done) => {
let didGetOnHeaders = false;
let counter = 1;
let lastMessage = `helloworld:${counter}`;
Expand Down Expand Up @@ -129,6 +134,102 @@ if (DISABLE_WEBSOCKET_TESTS) {
});
});

headerTrailerCombos((withHeaders, withTrailers) => {
it("should make a bidirectional request that is aborted by the client with propagation of the disconnection to the server", (done) => {
let didGetOnHeaders = false;
let counter = 1;
const streamIdentifier = `rpc-${Math.random()}`;
let lastMessage = `helloworld:${counter}`;
const ping = new PingRequest();
ping.setStreamIdentifier(streamIdentifier);
ping.setSendHeaders(withHeaders);
ping.setSendTrailers(withTrailers);
ping.setValue(lastMessage);

const client = grpc.client(TestService.PingPongBidi, {
debug: DEBUG,
host: testHostUrl,
transport: grpc.WebsocketTransport(),
});

// Checks are performed every 1s = 15s total wait
const maxAbortChecks = 15;

const doAbort = () => {
DEBUG && debug("doAbort");
client.close();

// To ensure that the transport is successfully closing the connection, poll the server every 1s until
// it confirms the connection was closed. Connection closure is immediate in some browser/transport combinations,
// but can take several seconds in others.
function checkAbort(attempt: number) {
DEBUG && debug("checkAbort", attempt);
continueStream(testHostUrl, streamIdentifier, (status) => {
DEBUG && debug("checkAbort.continueStream.status", status);

const checkStreamClosedRequest = new CheckStreamClosedRequest();
checkStreamClosedRequest.setStreamIdentifier(streamIdentifier);
grpc.unary(TestUtilService.CheckStreamClosed, {
debug: DEBUG,
request: checkStreamClosedRequest,
host: testHostUrl,
onEnd: ({message}) => {
const closed = ( message as CheckStreamClosedResponse ).getClosed();
DEBUG && debug("closed", closed);
if (closed) {
done();
} else {
if (attempt >= maxAbortChecks) {
assert.ok(closed, `server did not observe connection closure within ${maxAbortChecks} seconds`);
done();
} else {
setTimeout(() => {
checkAbort(attempt + 1);
}, 1000);
}
}
},
})
});
}

checkAbort(0);
};

client.onHeaders((headers: grpc.Metadata) => {
DEBUG && debug("headers", headers);
didGetOnHeaders = true;
if (withHeaders) {
assert.deepEqual(headers.get("HeaderTestKey1"), ["ServerValue1"]);
assert.deepEqual(headers.get("HeaderTestKey2"), ["ServerValue2"]);
}
});
client.onMessage((message: PingResponse) => {
assert.ok(message instanceof PingResponse);
assert.deepEqual(message.getValue(), lastMessage);

if (counter === 10) {
doAbort();
} else {
counter++;
lastMessage = `helloworld:${counter}`;
const ping = new PingRequest();
ping.setValue(lastMessage);
client.send(ping);
}
});
client.onEnd((status: grpc.Code, statusMessage: string, trailers: grpc.Metadata) => {
DEBUG && debug("status", status, "statusMessage", statusMessage);
// onEnd shouldn't be called if abort is called prior to the response ending
assert.fail();
});
client.start();

// send initial message
client.send(ping);
});
});

headerTrailerCombos((withHeaders, withTrailers) => {
it("should make a bidirectional request that is terminated by the server", (done) => {
let didGetOnHeaders = false;
Expand Down