Skip to content

Commit

Permalink
Fix and add test for co-processors handling of streaming responses.
Browse files Browse the repository at this point in the history
Fixes #4013
  • Loading branch information
bryn committed Oct 11, 2023
1 parent cf4a79a commit bfd17ef
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 5 deletions.
5 changes: 5 additions & 0 deletions .changesets/fix_bryn_fix_coprocessor_stream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
### Fix panic when streaming responses to co-processor ([Issue #4013](https://github.com/apollographql/router/issues/4013))

Streamed responses will no longer cause a panic in the co-processor plugin. This affected defer and stream queries.

By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/4014
93 changes: 88 additions & 5 deletions apollo-router/src/plugins/coprocessor/supergraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ where
// Note: We deliberately DO NOT send headers or status_code even if the user has
// requested them. That's because they are meaningless on a deferred response and
// providing them will be a source of confusion.
let payload = Externalizable::router_builder()
let payload = Externalizable::supergraph_builder()
.stage(PipelineStep::SupergraphResponse)
.and_id(TraceId::maybe_new().map(|id| id.to_string()))
.and_body(body_to_send)
Expand Down Expand Up @@ -753,13 +753,13 @@ mod tests {

let value = response.headers().get("aheader").unwrap();

assert_eq!("a value", value);
assert_eq!(value, "a value");

assert_eq!(
"my error message",
response.body_mut().next().await.unwrap().errors[0]
.message
.as_str()
.as_str(),
"my error message"
);
}

Expand Down Expand Up @@ -852,7 +852,7 @@ mod tests {
"this-is-a-test-context": 42
}
},
"sdl": "the sdl shouldnt change"
"sdl": "the sdl shouldn't change"
});
Ok(hyper::Response::builder()
.body(Body::from(serde_json::to_string(&input).unwrap()))
Expand Down Expand Up @@ -889,8 +889,91 @@ mod tests {
let body = res.response.body_mut().next().await.unwrap();
// the body should have changed:
assert_eq!(
serde_json::to_value(&body).unwrap(),
json!({ "data": { "test": 42_u32 } }),
);
}

#[tokio::test]
async fn defer() {
let supergraph_stage = SupergraphStage {
response: SupergraphResponseConf {
headers: true,
context: true,
body: true,
sdl: true,
status_code: false,
},
request: Default::default(),
};

let mut mock_supergraph_service = MockSupergraphService::new();

mock_supergraph_service
.expect_call()
.returning(|req: supergraph::Request| {
Ok(supergraph::Response::fake_stream_builder()
.response(
graphql::Response::builder()
.data(json!({ "test": 1 }))
.has_next(true)
.build(),
)
.response(
graphql::Response::builder()
.data(json!({ "test": 2 }))
.has_next(false)
.build(),
)
.context(req.context)
.build()
.unwrap())
});

let mock_http_client = mock_with_deferred_callback(move |res: hyper::Request<Body>| {
Box::pin(async {
let deserialized_response: Externalizable<serde_json::Value> =
serde_json::from_slice(&hyper::body::to_bytes(res.into_body()).await.unwrap())
.unwrap();
assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version);
assert_eq!(
PipelineStep::SupergraphResponse.to_string(),
deserialized_response.stage
);

Ok(hyper::Response::builder()
.body(Body::from(
serde_json::to_string(&deserialized_response).unwrap(),
))
.unwrap())
})
});

let service = supergraph_stage.as_service(
mock_http_client,
mock_supergraph_service.boxed(),
"http://test".to_string(),
Arc::new("".to_string()),
);

let request = supergraph::Request::canned_builder()
.query("foo")
.build()
.unwrap();

let mut res = service.oneshot(request).await.unwrap();

let body = res.response.body_mut().next().await.unwrap();
// the body should have changed:
assert_eq!(
serde_json::to_value(&body).unwrap(),
json!({ "data": { "test": 1 }, "hasNext": true }),
);
let body = res.response.body_mut().next().await.unwrap();
// the body should have changed:
assert_eq!(
serde_json::to_value(&body).unwrap(),
json!({ "data": { "test": 2 }, "hasNext": false }),
);
}
}
11 changes: 11 additions & 0 deletions apollo-router/src/services/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,17 @@ mod test {
.build();
}

#[test]
#[should_panic]
fn it_will_not_build_router_externalizable_incorrectl_supergraph() {
Externalizable::<String>::router_builder()
.stage(PipelineStep::SupergraphRequest)
.build();
Externalizable::<String>::router_builder()
.stage(PipelineStep::SupergraphResponse)
.build();
}

#[test]
fn it_will_build_subgraph_externalizable_correctly() {
Externalizable::<String>::subgraph_builder()
Expand Down
28 changes: 28 additions & 0 deletions apollo-router/src/services/supergraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,34 @@ impl Response {
)
}

/// This is the constructor (or builder) to use when constructing a "fake" Response stream.
///
/// This does not enforce the provision of the data that is required for a fully functional
/// Response. It's usually enough for testing, when a fully constructed Response is
/// difficult to construct and not required for the purposes of the test.
///
/// In addition, fake responses are expected to be valid, and will panic if given invalid values.
#[builder(visibility = "pub")]
fn fake_stream_new(
responses: Vec<graphql::Response>,
status_code: Option<StatusCode>,
headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue>,
context: Context,
) -> Result<Self, BoxError> {
let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK));
for (key, values) in headers {
let header_name: HeaderName = key.try_into()?;
for value in values {
let header_value: HeaderValue = value.try_into()?;
builder = builder.header(header_name.clone(), header_value);
}
}

let stream = futures::stream::iter(responses);
let response = builder.body(stream.boxed())?;
Ok(Self { response, context })
}

/// This is the constructor (or builder) to use when constructing a Response that represents a global error.
/// It has no path and no response data.
/// This is useful for things such as authentication errors.
Expand Down

0 comments on commit bfd17ef

Please sign in to comment.