diff --git a/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.cc b/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.cc index 78ae748fbc58..980a614a5efb 100644 --- a/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.cc +++ b/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.cc @@ -519,10 +519,13 @@ Http::FilterDataStatus JsonTranscoderFilter::encodeData(Buffer::Instance& data, has_body_ = true; if (method_->response_type_is_http_body_) { - buildResponseFromHttpBodyOutput(*response_headers_, data); + bool frame_processed = buildResponseFromHttpBodyOutput(*response_headers_, data); if (!method_->descriptor_->server_streaming()) { return Http::FilterDataStatus::StopIterationAndBuffer; } + if (!http_body_response_headers_set_ && !frame_processed) { + return Http::FilterDataStatus::StopIterationAndBuffer; + } return Http::FilterDataStatus::Continue; } @@ -667,12 +670,12 @@ void JsonTranscoderFilter::maybeSendHttpBodyRequestMessage() { first_request_sent_ = true; } -void JsonTranscoderFilter::buildResponseFromHttpBodyOutput( +bool JsonTranscoderFilter::buildResponseFromHttpBodyOutput( Http::ResponseHeaderMap& response_headers, Buffer::Instance& data) { std::vector frames; decoder_.decode(data, frames); if (frames.empty()) { - return; + return false; } google::api::HttpBody http_body; @@ -688,7 +691,7 @@ void JsonTranscoderFilter::buildResponseFromHttpBodyOutput( // Non streaming case: single message with content type / length response_headers.setContentType(http_body.content_type()); response_headers.setContentLength(body.size()); - return; + return true; } else if (!http_body_response_headers_set_) { // Streaming case: set content type only once from first HttpBody message response_headers.setContentType(http_body.content_type()); @@ -696,6 +699,8 @@ void JsonTranscoderFilter::buildResponseFromHttpBodyOutput( } } } + + return true; } bool JsonTranscoderFilter::maybeConvertGrpcStatus(Grpc::Status::GrpcStatus grpc_status, diff --git a/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.h b/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.h index 5c271dafde24..31180556ac54 100644 --- a/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.h +++ b/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.h @@ -162,7 +162,11 @@ class JsonTranscoderFilter : public Http::StreamFilter, public Logger::Loggable< bool checkIfTranscoderFailed(const std::string& details); bool readToBuffer(Protobuf::io::ZeroCopyInputStream& stream, Buffer::Instance& data); void maybeSendHttpBodyRequestMessage(); - void buildResponseFromHttpBodyOutput(Http::ResponseHeaderMap& response_headers, + /** + * Builds response from HttpBody protobuf. + * Returns true if at least one gRPC frame has processed. + */ + bool buildResponseFromHttpBodyOutput(Http::ResponseHeaderMap& response_headers, Buffer::Instance& data); bool maybeConvertGrpcStatus(Grpc::Status::GrpcStatus grpc_status, Http::ResponseHeaderOrTrailerMap& trailers); diff --git a/test/extensions/filters/http/grpc_json_transcoder/grpc_json_transcoder_integration_test.cc b/test/extensions/filters/http/grpc_json_transcoder/grpc_json_transcoder_integration_test.cc index 27bf41e4a661..982a3dffa707 100644 --- a/test/extensions/filters/http/grpc_json_transcoder/grpc_json_transcoder_integration_test.cc +++ b/test/extensions/filters/http/grpc_json_transcoder/grpc_json_transcoder_integration_test.cc @@ -387,6 +387,53 @@ TEST_P(GrpcJsonTranscoderIntegrationTest, StreamGetHttpBodyMultipleFramesInData) EXPECT_EQ(response->body(), "HelloHelloHello"); } +TEST_P(GrpcJsonTranscoderIntegrationTest, StreamGetHttpBodyFragmented) { + HttpIntegrationTest::initialize(); + + // Make request to gRPC upstream + codec_client_ = makeHttpConnection(lookupPort("http")); + auto response = codec_client_->makeHeaderOnlyRequest(Http::TestRequestHeaderMapImpl{ + {":method", "GET"}, + {":path", "/indexStream"}, + {":authority", "host"}, + }); + waitForNextUpstreamRequest(); + + // Send fragmented gRPC response + // Headers + Http::TestResponseHeaderMapImpl response_headers; + response_headers.setStatus(200); + response_headers.setContentType("application/grpc"); + upstream_request_->encodeHeaders(response_headers, false); + // Fragmented payload + google::api::HttpBody http_body; + http_body.set_content_type("text/plain"); + http_body.set_data(std::string(1024, 'a')); + // Fragment gRPC frame into 2 buffers equally divided + Buffer::OwnedImpl fragment1; + auto fragment2 = Grpc::Common::serializeToGrpcFrame(http_body); + fragment1.move(*fragment2, fragment2->length() / 2); + upstream_request_->encodeData(fragment1, false); + upstream_request_->encodeData(*fragment2, false); + // Trailers + Http::TestResponseTrailerMapImpl response_trailers; + auto grpc_status = Status(); + response_trailers.setGrpcStatus(static_cast(grpc_status.error_code())); + response_trailers.setGrpcMessage( + absl::string_view(grpc_status.error_message().data(), grpc_status.error_message().size())); + upstream_request_->encodeTrailers(response_trailers); + EXPECT_TRUE(upstream_request_->complete()); + + // Wait for complete + response->waitForEndStream(); + EXPECT_TRUE(response->complete()); + // Ensure that body was actually replaced + EXPECT_EQ(response->body(), http_body.data()); + // As well as content-type header + auto content_type = response->headers().get(Http::LowerCaseString("content-type")); + EXPECT_EQ("text/plain", content_type->value().getStringView()); +} + TEST_P(GrpcJsonTranscoderIntegrationTest, UnaryEchoHttpBody) { HttpIntegrationTest::initialize(); testTranscoding( diff --git a/test/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter_test.cc b/test/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter_test.cc index 05977528ad16..20acafd6f472 100644 --- a/test/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter_test.cc +++ b/test/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter_test.cc @@ -899,6 +899,41 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingStreamWithHttpBodyAsOutput) { EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_.decodeTrailers(request_trailers)); } +TEST_F(GrpcJsonTranscoderFilterTest, TranscodingStreamWithFragmentedHttpBody) { + Http::TestRequestHeaderMapImpl request_headers{{":method", "GET"}, {":path", "/indexStream"}}; + + EXPECT_CALL(decoder_callbacks_, clearRouteCache()); + + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_.decodeHeaders(request_headers, false)); + EXPECT_EQ("application/grpc", request_headers.get_("content-type")); + EXPECT_EQ("/indexStream", request_headers.get_("x-envoy-original-path")); + EXPECT_EQ("/bookstore.Bookstore/GetIndexStream", request_headers.get_(":path")); + EXPECT_EQ("trailers", request_headers.get_("te")); + + Http::TestResponseHeaderMapImpl response_headers{{"content-type", "application/grpc"}, + {":status", "200"}}; + EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, + filter_.encodeHeaders(response_headers, false)); + + // "Send" one fragmented gRPC frame + google::api::HttpBody http_body; + http_body.set_content_type("text/html"); + http_body.set_data("

Fragmented Message!

"); + auto fragment2 = Grpc::Common::serializeToGrpcFrame(http_body); + Buffer::OwnedImpl fragment1; + fragment1.move(*fragment2, fragment2->length() / 2); + EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, filter_.encodeData(fragment1, false)); + EXPECT_EQ(Http::FilterDataStatus::Continue, filter_.encodeData(*fragment2, false)); + + // Ensure that content-type is correct (taken from httpBody) + EXPECT_EQ("text/html", response_headers.get_("content-type")); + + // Fragment1 is buffered by transcoder + EXPECT_EQ(0, fragment1.length()); + // Second fragment contains entire body + EXPECT_EQ(http_body.data(), fragment2->toString()); +} + class GrpcJsonTranscoderFilterGrpcStatusTest : public GrpcJsonTranscoderFilterTest { public: GrpcJsonTranscoderFilterGrpcStatusTest(