Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 60 additions & 5 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
|[**log**](#log)|`object`|The router logger configuration.<br/>Default: `{"filter":null,"format":"json","level":"info"}`<br/>||
|[**query\_planner**](#query_planner)|`object`|Query planning configuration.<br/>Default: `{"allow_expose":false,"timeout":"10s"}`<br/>||
|[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).<br/>Default: `{"path":"supergraph.graphql","source":"file"}`<br/>||
|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs.<br/>Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`<br/>||
|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs.<br/>Default: `{"all":{"dedupe_enabled":true,"pool_idle_timeout_seconds":50},"max_connections_per_host":100}`<br/>||

**Additional Properties:** not allowed
**Example**
Expand Down Expand Up @@ -64,9 +64,10 @@ supergraph:
path: supergraph.graphql
source: file
traffic_shaping:
dedupe_enabled: true
all:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it should be outside all, next to it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be more specific, I meant that the deduple_enabled could live in all and also per subgraph (in future), but the max_connections_per_host should not be in all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rule of thumb here would be that all options of all should be also configurable per subgraph.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dedupe_enabled makes sense to have in all and per-subgraph, but max_connections_per_host does not as it's a global setting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dedupe_enabled: true
pool_idle_timeout_seconds: 50
max_connections_per_host: 100
pool_idle_timeout_seconds: 50

```

Expand Down Expand Up @@ -1366,15 +1367,69 @@ Configuration for the traffic-shaper executor. Use these configurations to contr

|Name|Type|Description|Required|
|----|----|-----------|--------|
|**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.<br/><br/>When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will<br/>be deduplicated by sharing the response of other in-flight requests.<br/>Default: `true`<br/>||
|[**all**](#traffic_shapingall)|`object`|The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.<br/>Default: `{"dedupe_enabled":true,"pool_idle_timeout_seconds":50}`<br/>||
|**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.<br/>Default: `100`<br/>Format: `"uint"`<br/>Minimum: `0`<br/>||
|[**subgraphs**](#traffic_shapingsubgraphs)|`object`|Optional per-subgraph configurations that will override the default configuration for specific subgraphs.<br/>||

**Example**

```yaml
all:
dedupe_enabled: true
pool_idle_timeout_seconds: 50
max_connections_per_host: 100

```

<a name="traffic_shapingall"></a>
### traffic\_shaping\.all: object

The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.


**Properties**

|Name|Type|Description|Required|
|----|----|-----------|--------|
|**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.<br/><br/>When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will<br/>be deduplicated by sharing the response of other in-flight requests.<br/>Default: `true`<br/>||
|**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.<br/>Default: `50`<br/>Format: `"uint64"`<br/>Minimum: `0`<br/>||
|**timeout**||Optional timeout configuration for requests to subgraphs.<br/><br/>Example with a fixed duration:<br/>```yaml<br/> timeout:<br/> duration: 5s<br/>```<br/><br/>Or with a VRL expression that can return a duration based on the operation kind:<br/>```yaml<br/> timeout:<br/> expression: \|<br/> if (.request.operation.type == "mutation") {<br/> 10000<br/> } else {<br/> 5000<br/> }<br/>```<br/>||

**Example**

```yaml
dedupe_enabled: true
pool_idle_timeout_seconds: 50

```

<a name="traffic_shapingsubgraphs"></a>
### traffic\_shaping\.subgraphs: object

Optional per-subgraph configurations that will override the default configuration for specific subgraphs.


**Additional Properties**

|Name|Type|Description|Required|
|----|----|-----------|--------|
|[**Additional Properties**](#traffic_shapingsubgraphsadditionalproperties)|`object`|||

<a name="traffic_shapingsubgraphsadditionalproperties"></a>
#### traffic\_shaping\.subgraphs\.additionalProperties: object

**Properties**

|Name|Type|Description|Required|
|----|----|-----------|--------|
|**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.<br/><br/>When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will<br/>be deduplicated by sharing the response of other in-flight requests.<br/>Default: `true`<br/>||
|**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.<br/>Default: `50`<br/>Format: `"uint64"`<br/>Minimum: `0`<br/>||
|**timeout**||Optional timeout configuration for requests to subgraphs.<br/><br/>Example with a fixed duration:<br/>```yaml<br/> timeout:<br/> duration: 5s<br/>```<br/><br/>Or with a VRL expression that can return a duration based on the operation kind:<br/>```yaml<br/> timeout:<br/> expression: \|<br/> if (.request.operation.type == "mutation") {<br/> 10000<br/> } else {<br/> 5000<br/> }<br/>```<br/>||

**Example**

```yaml
dedupe_enabled: true
max_connections_per_host: 100
pool_idle_timeout_seconds: 50

```
Expand Down
1 change: 1 addition & 0 deletions lib/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ subgraphs = { path = "../../bench/subgraphs" }
criterion = { workspace = true }
tokio = { workspace = true }
insta = { workspace = true }
axum = { version = "0.8.6", features = ["macros", "tokio", "json"]}

[[bench]]
name = "executor_benches"
Expand Down
1 change: 1 addition & 0 deletions lib/executor/src/execution/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ impl<'exec> Executor<'exec> {
variables: variable_refs,
representations,
headers: headers_map,
client_request: self.client_request,
},
)
.await
Expand Down
3 changes: 3 additions & 0 deletions lib/executor/src/executors/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use async_trait::async_trait;
use bytes::Bytes;
use http::HeaderMap;

use crate::execution::plan::ClientRequestDetails;

#[async_trait]
pub trait SubgraphExecutor {
async fn execute<'a>(
Expand All @@ -30,6 +32,7 @@ pub struct HttpExecutionRequest<'a> {
pub variables: Option<HashMap<&'a str, &'a sonic_rs::Value>>,
pub headers: HeaderMap,
pub representations: Option<Vec<u8>>,
pub client_request: &'a ClientRequestDetails<'a>,
}

pub struct HttpExecutionResponse {
Expand Down
22 changes: 22 additions & 0 deletions lib/executor/src/executors/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
use std::time::Duration;

use bytes::{BufMut, Bytes, BytesMut};

use crate::response::graphql_error::GraphQLError;

#[derive(thiserror::Error, Debug, Clone)]
pub enum SubgraphExecutorError {
#[error("Failed to parse endpoint \"{0}\" as URI: {1}")]
Expand All @@ -8,4 +14,20 @@ pub enum SubgraphExecutorError {
RequestFailure(String, String),
#[error("Failed to serialize variable \"{0}\": {1}")]
VariablesSerializationFailure(String, String),
#[error("Failed to parse timeout duration from expression: {0}")]
TimeoutExpressionParseFailure(String),
#[error("Request timed out after {0:?}")]
RequestTimeout(Duration),
}
pub fn error_to_graphql_bytes(endpoint: &http::Uri, e: SubgraphExecutorError) -> Bytes {
let graphql_error: GraphQLError =
format!("Failed to execute request to subgraph {}: {}", endpoint, e).into();
let errors = vec![graphql_error];
// This unwrap is safe as GraphQLError serialization shouldn't fail.
let errors_bytes = sonic_rs::to_vec(&errors).unwrap();
let mut buffer = BytesMut::new();
buffer.put_slice(b"{\"errors\":");
buffer.put_slice(&errors_bytes);
buffer.put_slice(b"}");
buffer.freeze()
}
41 changes: 14 additions & 27 deletions lib/executor/src/executors/http.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::sync::Arc;

use crate::executors::common::HttpExecutionResponse;
use crate::executors::dedupe::{request_fingerprint, ABuildHasher, SharedResponse};
use dashmap::DashMap;
use futures::TryFutureExt;
use hive_router_config::traffic_shaping::TrafficShapingExecutorConfig;
use tokio::sync::OnceCell;

use async_trait::async_trait;

use bytes::{BufMut, Bytes, BytesMut};
use bytes::{BufMut, Bytes};
use http::HeaderMap;
use http::HeaderValue;
use http_body_util::BodyExt;
Expand All @@ -18,9 +18,8 @@ use hyper_tls::HttpsConnector;
use hyper_util::client::legacy::{connect::HttpConnector, Client};
use tokio::sync::Semaphore;

use crate::executors::common::HttpExecutionRequest;
use crate::executors::error::SubgraphExecutorError;
use crate::response::graphql_error::GraphQLError;
use crate::executors::common::{HttpExecutionRequest, HttpExecutionResponse};
use crate::executors::error::{error_to_graphql_bytes, SubgraphExecutorError};
use crate::utils::consts::CLOSE_BRACE;
use crate::utils::consts::COLON;
use crate::utils::consts::COMMA;
Expand Down Expand Up @@ -132,9 +131,13 @@ impl HTTPSubgraphExecutor {

*req.headers_mut() = headers;

let res = self.http_client.request(req).await.map_err(|e| {
SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string())
})?;
let res = self
.http_client
.request(req)
.map_err(|e| {
SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string())
})
.await?;

let (parts, body) = res.into_parts();

Expand All @@ -150,22 +153,6 @@ impl HTTPSubgraphExecutor {
headers: parts.headers,
})
}

fn error_to_graphql_bytes(&self, e: SubgraphExecutorError) -> Bytes {
let graphql_error: GraphQLError = format!(
"Failed to execute request to subgraph {}: {}",
self.endpoint, e
)
.into();
let errors = vec![graphql_error];
// This unwrap is safe as GraphQLError serialization shouldn't fail.
let errors_bytes = sonic_rs::to_vec(&errors).unwrap();
let mut buffer = BytesMut::new();
buffer.put_slice(b"{\"errors\":");
buffer.put_slice(&errors_bytes);
buffer.put_slice(b"}");
buffer.freeze()
}
}

#[async_trait]
Expand All @@ -178,7 +165,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
Ok(body) => body,
Err(e) => {
return HttpExecutionResponse {
body: self.error_to_graphql_bytes(e),
body: error_to_graphql_bytes(&self.endpoint, e),
headers: Default::default(),
}
}
Expand All @@ -199,7 +186,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
headers: shared_response.headers,
},
Err(e) => HttpExecutionResponse {
body: self.error_to_graphql_bytes(e),
body: error_to_graphql_bytes(&self.endpoint, e),
headers: Default::default(),
},
};
Expand Down Expand Up @@ -238,7 +225,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
headers: shared_response.headers.clone(),
},
Err(e) => HttpExecutionResponse {
body: self.error_to_graphql_bytes(e.clone()),
body: error_to_graphql_bytes(&self.endpoint, e.clone()),
headers: Default::default(),
},
}
Expand Down
Loading
Loading