Skip to content

Commit

Permalink
Switch handlers to use asynchronous iterables (#430)
Browse files Browse the repository at this point in the history
  • Loading branch information
timostamm committed Jan 23, 2023
1 parent 4b4eb22 commit fbb089e
Show file tree
Hide file tree
Showing 32 changed files with 1,606 additions and 2,804 deletions.
233 changes: 122 additions & 111 deletions packages/connect-node-test/src/badweather/broken-input.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,130 +20,141 @@ import {
errorFromJson,
} from "@bufbuild/connect-core/protocol-connect";
import { http2Request } from "../helpers/http2-request.js";
import type { MethodInfo } from "@bufbuild/protobuf";
import type * as http2 from "http2";

describe("broken input", () => {
const servers = createTestServers();
beforeAll(async () => await servers.start());

// TODO(TCN-785) add @bufbuild/connect-node
// TODO(TCN-785) cover gRPC and gRPC-web
// TODO(TCN-785) cover invalid protobuf binary input
servers.describeServers(["connect-go (h2)"], (server, serverName) => {
async function req(method: MethodInfo, type: string, body: Uint8Array) {
const url = createMethodUrl(server.getUrl(), TestService, method);
const opt: http2.SecureClientSessionOptions = {};
if (serverName == "connect-go (h2)") {
opt.rejectUnauthorized = false; // TODO set up cert for go server correctly
}
return await http2Request(
"POST",
url,
{ "content-type": type },
body,
opt
);
}
// TODO(TCN-785) add @bufbuild/connect-node, cover gRPC and gRPC-web, cover invalid protobuf binary input
servers.describeServers(
["connect-go (h2)", "@bufbuild/connect-node (h2c)"],
(server, serverName) => {
const rejectUnauthorized = serverName !== "connect-go (h2)"; // TODO set up cert for go server correctly

describe("Connect unary", function () {
const reqUnary = async (type: string, body: Uint8Array) =>
req(TestService.methods.unaryCall, type, body).then((res) => ({
status: res.status,
error: errorFromJson(
JSON.parse(new TextDecoder().decode(res.body)) // eslint-disable-line @typescript-eslint/no-unsafe-argument
),
}));
it("should raise HTTP 400 for invalid JSON", async () => {
const body = new TextEncoder().encode("this is not json");
const { status, error } = await reqUnary("application/json", body);
expect(status).toBe(400);
expect(error.code).toBe(Code.InvalidArgument);
if (serverName == "@bufbuild/connect-node (h2c)") {
expect(error.rawMessage).toBe(
"json format: Unexpected token h in JSON at position 1"
describe("Connect unary", function () {
it("should raise HTTP 400 for invalid JSON", async () => {
const unaryRequest = async (body: Uint8Array) =>
http2Request({
body,
rejectUnauthorized,
url: createMethodUrl(
server.getUrl(),
TestService,
TestService.methods.unaryCall
),
method: "POST",
ctype: "application/json",
}).then((res) => {
return {
status: res.status,
error: errorFromJson(
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
JSON.parse(new TextDecoder().decode(res.body))
),
};
});
const { status, error } = await unaryRequest(
new TextEncoder().encode("this is not json")
);
}
});
});

for (const method of [
TestService.methods.streamingInputCall,
TestService.methods.streamingOutputCall,
TestService.methods.fullDuplexCall,
]) {
describe(`Connect streaming ${method.name}`, function () {
const reqStreaming = async (type: string, body: Uint8Array) =>
req(method, type, body).then((res) => ({
status: res.status,
endStream: endStreamFromJson(res.body.subarray(5)),
}));
it("should raise HTTP 400 for for invalid JSON", async () => {
const json = new TextEncoder().encode("this is not json");
const body = new Uint8Array(json.byteLength + 5);
body.set(json, 5);
const v = new DataView(body.buffer, body.byteOffset, body.byteLength);
v.setUint8(0, 0b00000000); // first byte is flags
v.setUint32(1, json.byteLength); // 4 bytes message length
const { status, endStream } = await reqStreaming(
"application/connect+json",
body
);
expect(status).toBe(200);
expect(endStream.error?.code).toBe(Code.InvalidArgument);
expect(status).toBe(400);
expect(error.code).toBe(Code.InvalidArgument);
if (serverName == "@bufbuild/connect-node (h2c)") {
expect(endStream.error?.rawMessage).toBe(
"json format: Unexpected token h in JSON at position 1"
expect(error.rawMessage).toMatch(
/^cannot decode grpc.testing.SimpleRequest from JSON: Unexpected token h in JSON/
);
}
});
it("should raise HTTP 400 for 0 message length", async () => {
const body = new Uint8Array(5);
const v = new DataView(body.buffer, body.byteOffset, body.byteLength);
v.setUint8(0, 0b00000000); // first byte is flags
v.setUint32(1, 1024); // 4 bytes message length
const { status, endStream } = await reqStreaming(
"application/connect+proto",
body
);
expect(status).toBe(200);
expect(endStream.error?.code).toBe(Code.InvalidArgument);
expect(endStream.error?.rawMessage).toBe(
"protocol error: promised 1024 bytes in enveloped message, got 0 bytes"
);
});
it("should raise HTTP 400 for short message", async () => {
const body = new Uint8Array(6);
const v = new DataView(body.buffer, body.byteOffset, body.byteLength);
v.setUint8(0, 0b00000000); // first byte is flags
v.setUint32(1, 1024); // 4 bytes message length
const { status, endStream } = await reqStreaming(
"application/connect+proto",
body
);
expect(status).toBe(200);
expect(endStream.error?.code).toBe(Code.InvalidArgument);
expect(endStream.error?.rawMessage).toMatch(
"^protocol error: promised 1024 bytes in enveloped message, got (1|less) bytes"
);
});
it("should raise HTTP 400 for short envelope", async () => {
const body = new Uint8Array(1);
const v = new DataView(body.buffer, body.byteOffset, body.byteLength);
v.setUint8(0, 0b00000000); // first byte is flags
const { status, endStream } = await reqStreaming(
"application/connect+proto",
body
);
expect(status).toBe(200);
expect(endStream.error?.code).toBe(Code.InvalidArgument);
expect(endStream.error?.rawMessage).toMatch(
"^protocol error: incomplete envelope"
);
});
});

for (const method of [
TestService.methods.streamingInputCall,
TestService.methods.streamingOutputCall,
TestService.methods.fullDuplexCall,
]) {
describe(`Connect streaming ${method.name}`, function () {
const streamingRequest = async (body: Uint8Array) =>
http2Request({
body,
rejectUnauthorized,
url: createMethodUrl(server.getUrl(), TestService, method),
method: "POST",
ctype: "application/connect+json",
}).then((res) => ({
status: res.status,
endStream: endStreamFromJson(res.body.subarray(5)),
}));

it("should raise HTTP 400 for for invalid JSON", async () => {
const json = new TextEncoder().encode("this is not json");
const body = new Uint8Array(json.byteLength + 5);
body.set(json, 5);
const v = new DataView(
body.buffer,
body.byteOffset,
body.byteLength
);
v.setUint8(0, 0b00000000); // first byte is flags
v.setUint32(1, json.byteLength); // 4 bytes message length
const { status, endStream } = await streamingRequest(body);
expect(status).toBe(200);
expect(endStream.error?.code).toBe(Code.InvalidArgument);
if (serverName == "@bufbuild/connect-node (h2c)") {
expect(endStream.error?.rawMessage).toMatch(
/^cannot decode grpc.testing.Streaming(Input|Output)CallRequest from JSON: Unexpected token h in JSON/
);
}
});
it("should raise HTTP 400 for 0 message length", async () => {
const body = new Uint8Array(5);
const v = new DataView(
body.buffer,
body.byteOffset,
body.byteLength
);
v.setUint8(0, 0b00000000); // first byte is flags
v.setUint32(1, 1024); // 4 bytes message length
const { status, endStream } = await streamingRequest(body);
expect(status).toBe(200);
expect(endStream.error?.code).toBe(Code.InvalidArgument);
expect(endStream.error?.rawMessage).toBe(
"protocol error: promised 1024 bytes in enveloped message, got 0 bytes"
);
});
it("should raise HTTP 400 for short message", async () => {
const body = new Uint8Array(6);
const v = new DataView(
body.buffer,
body.byteOffset,
body.byteLength
);
v.setUint8(0, 0b00000000); // first byte is flags
v.setUint32(1, 1024); // 4 bytes message length
const { status, endStream } = await streamingRequest(body);
expect(status).toBe(200);
expect(endStream.error?.code).toBe(Code.InvalidArgument);
expect(endStream.error?.rawMessage).toMatch(
"^protocol error: promised 1024 bytes in enveloped message, got (1|less) bytes"
);
});
it("should raise HTTP 400 for short envelope", async () => {
const body = new Uint8Array(1);
const v = new DataView(
body.buffer,
body.byteOffset,
body.byteLength
);
v.setUint8(0, 0b00000000); // first byte is flags
const { status, endStream } = await streamingRequest(body);
expect(status).toBe(200);
expect(endStream.error?.code).toBe(Code.InvalidArgument);
expect(endStream.error?.rawMessage).toMatch(
"^protocol error: incomplete envelope"
);
});
});
}
}
});
);

afterAll(async () => await servers.stop());
});
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import {
errorFromJson,
} from "@bufbuild/connect-core/protocol-connect";
import { http2Request } from "../helpers/http2-request.js";
import type { MethodInfo } from "@bufbuild/protobuf";
import type * as http from "http";
import type { JsonValue } from "@bufbuild/protobuf";

describe("unsupported content encoding", () => {
Expand All @@ -31,46 +29,52 @@ describe("unsupported content encoding", () => {
servers.describeServers(
["@bufbuild/connect-node (h2c)", "connect-go (h2)"],
(server, serverName) => {
function req(method: MethodInfo, headers: http.OutgoingHttpHeaders) {
const url = createMethodUrl(server.getUrl(), TestService, method);
if (serverName == "connect-go (h2)") {
return http2Request("POST", url, headers, undefined, {
rejectUnauthorized: false, // TODO set up cert for go server correctly
});
}
return http2Request("POST", url, headers);
}
const rejectUnauthorized = serverName !== "connect-go (h2)"; // TODO set up cert for go server correctly

describe("Connect unary method", function () {
const reqUnary = req.bind(null, TestService.methods.unaryCall);
it("should raise code unimplemented for unsupported content-encoding", async () => {
const res = await reqUnary({
"content-type": "application/json",
"content-encoding": "banana",
const res = await http2Request({
url: createMethodUrl(
server.getUrl(),
TestService,
TestService.methods.unaryCall
),
method: "POST",
headers: {
"content-type": "application/json",
"content-encoding": "banana",
},
rejectUnauthorized,
});
expect(res.status).toBe(404);
const err = errorFromJson(
JSON.parse(new TextDecoder().decode(res.body)) as JsonValue
);
expect(err.code).toBe(Code.Unimplemented);
expect(err.rawMessage).toMatch(
/^unknown compression "banana": supported encodings are gzip(, [a-z]+)*$/
/^unknown compression "banana": supported encodings are gzip(,[a-z]+)*$/
);
});
});
describe("Connect streaming method", function () {
const reqStreaming = req.bind(
null,
TestService.methods.streamingInputCall
);
it("should raise code unimplemented for unsupported connect-content-encoding", async () => {
const res = await reqStreaming({
"content-type": "application/connect+json",
"connect-content-encoding": "banana",
const res = await http2Request({
url: createMethodUrl(
server.getUrl(),
TestService,
TestService.methods.streamingInputCall
),
method: "POST",
headers: {
"content-type": "application/connect+json",
"connect-content-encoding": "banana",
},
rejectUnauthorized,
});
const endStream = endStreamFromJson(res.body.subarray(5));
expect(endStream.error?.code).toBe(Code.Unimplemented);
expect(endStream.error?.rawMessage).toMatch(
/^unknown compression "banana": supported encodings are gzip(, [a-z]+)*$/
/^unknown compression "banana": supported encodings are gzip(,[a-z]+)*$/
);
});
});
Expand Down
Loading

0 comments on commit fbb089e

Please sign in to comment.