Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc/transcoding: add support for fragmented httpBody #11060

Merged
merged 2 commits into from May 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -519,10 +519,13 @@ Http::FilterDataStatus JsonTranscoderFilter::encodeData(Buffer::Instance& data,
has_body_ = true;

if (method_->response_type_is_http_body_) {
buildResponseFromHttpBodyOutput(*response_headers_, data);
bool frame_processed = buildResponseFromHttpBodyOutput(*response_headers_, data);
if (!method_->descriptor_->server_streaming()) {
return Http::FilterDataStatus::StopIterationAndBuffer;
}
if (!http_body_response_headers_set_ && !frame_processed) {
return Http::FilterDataStatus::StopIterationAndBuffer;
}
return Http::FilterDataStatus::Continue;
}

Expand Down Expand Up @@ -667,12 +670,12 @@ void JsonTranscoderFilter::maybeSendHttpBodyRequestMessage() {
first_request_sent_ = true;
}

void JsonTranscoderFilter::buildResponseFromHttpBodyOutput(
bool JsonTranscoderFilter::buildResponseFromHttpBodyOutput(
Http::ResponseHeaderMap& response_headers, Buffer::Instance& data) {
std::vector<Grpc::Frame> frames;
decoder_.decode(data, frames);
if (frames.empty()) {
return;
return false;
}

google::api::HttpBody http_body;
Expand All @@ -688,14 +691,16 @@ void JsonTranscoderFilter::buildResponseFromHttpBodyOutput(
// Non streaming case: single message with content type / length
response_headers.setContentType(http_body.content_type());
response_headers.setContentLength(body.size());
return;
return true;
} else if (!http_body_response_headers_set_) {
// Streaming case: set content type only once from first HttpBody message
response_headers.setContentType(http_body.content_type());
http_body_response_headers_set_ = true;
}
}
}

return true;
}

bool JsonTranscoderFilter::maybeConvertGrpcStatus(Grpc::Status::GrpcStatus grpc_status,
Expand Down
Expand Up @@ -162,7 +162,11 @@ class JsonTranscoderFilter : public Http::StreamFilter, public Logger::Loggable<
bool checkIfTranscoderFailed(const std::string& details);
bool readToBuffer(Protobuf::io::ZeroCopyInputStream& stream, Buffer::Instance& data);
void maybeSendHttpBodyRequestMessage();
void buildResponseFromHttpBodyOutput(Http::ResponseHeaderMap& response_headers,
/**
* Builds response from HttpBody protobuf.
* Returns true if at least one gRPC frame has processed.
*/
bool buildResponseFromHttpBodyOutput(Http::ResponseHeaderMap& response_headers,
Buffer::Instance& data);
bool maybeConvertGrpcStatus(Grpc::Status::GrpcStatus grpc_status,
Http::ResponseHeaderOrTrailerMap& trailers);
Expand Down
Expand Up @@ -387,6 +387,53 @@ TEST_P(GrpcJsonTranscoderIntegrationTest, StreamGetHttpBodyMultipleFramesInData)
EXPECT_EQ(response->body(), "HelloHelloHello");
}

TEST_P(GrpcJsonTranscoderIntegrationTest, StreamGetHttpBodyFragmented) {
HttpIntegrationTest::initialize();

// Make request to gRPC upstream
codec_client_ = makeHttpConnection(lookupPort("http"));
auto response = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{
{":method", "GET"},
{":path", "/indexStream"},
{":authority", "host"},
});
waitForNextUpstreamRequest();

// Send fragmented gRPC response
// Headers
Http::TestResponseHeaderMapImpl response_headers;
response_headers.setStatus(200);
response_headers.setContentType("application/grpc");
upstream_request_->encodeHeaders(response_headers, false);
// Fragmented payload
google::api::HttpBody http_body;
http_body.set_content_type("text/plain");
http_body.set_data(std::string(1024, 'a'));
// Fragment gRPC frame into 2 buffers equally divided
Buffer::OwnedImpl fragment1;
auto fragment2 = Grpc::Common::serializeToGrpcFrame(http_body);
fragment1.move(*fragment2, fragment2->length() / 2);
upstream_request_->encodeData(fragment1, false);
upstream_request_->encodeData(*fragment2, false);
// Trailers
Http::TestResponseTrailerMapImpl response_trailers;
auto grpc_status = Status();
response_trailers.setGrpcStatus(static_cast<uint64_t>(grpc_status.error_code()));
response_trailers.setGrpcMessage(
absl::string_view(grpc_status.error_message().data(), grpc_status.error_message().size()));
upstream_request_->encodeTrailers(response_trailers);
EXPECT_TRUE(upstream_request_->complete());

// Wait for complete
response->waitForEndStream();
EXPECT_TRUE(response->complete());
// Ensure that body was actually replaced
EXPECT_EQ(response->body(), http_body.data());
// As well as content-type header
auto content_type = response->headers().get(Http::LowerCaseString("content-type"));
EXPECT_EQ("text/plain", content_type->value().getStringView());
}

TEST_P(GrpcJsonTranscoderIntegrationTest, UnaryEchoHttpBody) {
HttpIntegrationTest::initialize();
testTranscoding<bookstore::EchoBodyRequest, google::api::HttpBody>(
Expand Down
Expand Up @@ -899,6 +899,41 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingStreamWithHttpBodyAsOutput) {
EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_.decodeTrailers(request_trailers));
}

TEST_F(GrpcJsonTranscoderFilterTest, TranscodingStreamWithFragmentedHttpBody) {
Http::TestRequestHeaderMapImpl request_headers{{":method", "GET"}, {":path", "/indexStream"}};

EXPECT_CALL(decoder_callbacks_, clearRouteCache());

EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_.decodeHeaders(request_headers, false));
EXPECT_EQ("application/grpc", request_headers.get_("content-type"));
EXPECT_EQ("/indexStream", request_headers.get_("x-envoy-original-path"));
EXPECT_EQ("/bookstore.Bookstore/GetIndexStream", request_headers.get_(":path"));
EXPECT_EQ("trailers", request_headers.get_("te"));

Http::TestResponseHeaderMapImpl response_headers{{"content-type", "application/grpc"},
{":status", "200"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
filter_.encodeHeaders(response_headers, false));

// "Send" one fragmented gRPC frame
google::api::HttpBody http_body;
http_body.set_content_type("text/html");
http_body.set_data("<h1>Fragmented Message!</h1>");
auto fragment2 = Grpc::Common::serializeToGrpcFrame(http_body);
Buffer::OwnedImpl fragment1;
fragment1.move(*fragment2, fragment2->length() / 2);
EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, filter_.encodeData(fragment1, false));
EXPECT_EQ(Http::FilterDataStatus::Continue, filter_.encodeData(*fragment2, false));

// Ensure that content-type is correct (taken from httpBody)
EXPECT_EQ("text/html", response_headers.get_("content-type"));

// Fragment1 is buffered by transcoder
EXPECT_EQ(0, fragment1.length());
// Second fragment contains entire body
EXPECT_EQ(http_body.data(), fragment2->toString());
}

class GrpcJsonTranscoderFilterGrpcStatusTest : public GrpcJsonTranscoderFilterTest {
public:
GrpcJsonTranscoderFilterGrpcStatusTest(
Expand Down