Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bench/k6.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export function handleSummary(data) {
},
});
}
return handleBenchmarkSummary(data, { vus, time });
return handleBenchmarkSummary(data, { vus, duration });
}

let identifiersMap = {};
Expand Down
2 changes: 1 addition & 1 deletion bin/gateway/src/pipeline/coerce_variables_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use std::sync::Arc;
use axum::body::Body;
use http::Request;
use query_plan_executor::variables::collect_variables;
use query_plan_executor::ExecutionRequest;
use serde_json::Value;
use tracing::{trace, warn};

use crate::pipeline::error::{PipelineError, PipelineErrorVariant};
use crate::pipeline::gateway_layer::{
GatewayPipelineLayer, GatewayPipelineStepDecision, ProcessorLayer,
};
use crate::pipeline::graphql_request_params::ExecutionRequest;
use crate::pipeline::http_request_params::HttpRequestParams;
use crate::pipeline::normalize_service::GraphQLNormalizationPayload;
use crate::shared_state::GatewaySharedState;
Expand Down
17 changes: 15 additions & 2 deletions bin/gateway/src/pipeline/graphql_request_params.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::HashMap;

use axum::body::{to_bytes, Body};
use axum::extract::Query;
use http::{Method, Request};
use query_plan_executor::ExecutionRequest;
use serde::Deserialize;
use serde_json::Value;
use tracing::{trace, warn};

use crate::pipeline::error::{PipelineError, PipelineErrorVariant};
Expand All @@ -24,6 +27,17 @@ struct GETQueryParams {
pub extensions: Option<String>,
}

#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ExecutionRequest {
pub query: String,
pub operation_name: Option<String>,
pub variables: Option<HashMap<String, Value>>,
// TODO: We don't use extensions yet, but we definitely will in the future.
#[allow(dead_code)]
pub extensions: Option<HashMap<String, Value>>,
}

impl TryInto<ExecutionRequest> for GETQueryParams {
type Error = PipelineErrorVariant;

Expand Down Expand Up @@ -58,7 +72,6 @@ impl TryInto<ExecutionRequest> for GETQueryParams {
operation_name: self.operation_name,
variables,
extensions,
representations: None,
};

Ok(execution_request)
Expand Down
2 changes: 1 addition & 1 deletion bin/gateway/src/pipeline/normalize_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::sync::Arc;
use axum::body::Body;
use http::Request;
use query_plan_executor::introspection::filter_introspection_fields_in_operation;
use query_plan_executor::ExecutionRequest;
use query_planner::ast::document::NormalizedDocument;
use query_planner::ast::normalization::normalize_operation;
use query_planner::ast::operation::OperationDefinition;
Expand All @@ -12,6 +11,7 @@ use crate::pipeline::error::{PipelineError, PipelineErrorVariant};
use crate::pipeline::gateway_layer::{
GatewayPipelineLayer, GatewayPipelineStepDecision, ProcessorLayer,
};
use crate::pipeline::graphql_request_params::ExecutionRequest;
use crate::pipeline::http_request_params::HttpRequestParams;
use crate::pipeline::parser_service::GraphQLParserPayload;
use crate::shared_state::GatewaySharedState;
Expand Down
2 changes: 1 addition & 1 deletion bin/gateway/src/pipeline/parser_service.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use axum::body::Body;
use graphql_parser::query::Document;
use http::Request;
use query_plan_executor::ExecutionRequest;
use query_planner::utils::parsing::safe_parse_operation;

use crate::pipeline::error::{PipelineError, PipelineErrorVariant};
use crate::pipeline::gateway_layer::{
GatewayPipelineLayer, GatewayPipelineStepDecision, ProcessorLayer,
};
use crate::pipeline::graphql_request_params::ExecutionRequest;
use crate::pipeline::http_request_params::HttpRequestParams;
use tracing::{error, trace};

Expand Down
13 changes: 8 additions & 5 deletions lib/query-plan-executor/src/executors/async_graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,26 @@ use async_trait::async_trait;
use serde_json::json;

use crate::{
executors::common::SubgraphExecutor, ExecutionRequest, ExecutionResult, GraphQLError,
GraphQLErrorLocation,
executors::common::SubgraphExecutor, ExecutionResult, GraphQLError, GraphQLErrorLocation,
SubgraphExecutionRequest,
};

#[async_trait]
impl<Executor> SubgraphExecutor for Executor
where
Executor: async_graphql::Executor,
{
async fn execute(&self, execution_request: ExecutionRequest) -> ExecutionResult {
async fn execute<'a>(
&self,
execution_request: SubgraphExecutionRequest<'a>,
) -> ExecutionResult {
let response: async_graphql::Response = self.execute(execution_request.into()).await;
response.into()
}
}

impl From<ExecutionRequest> for async_graphql::Request {
fn from(exec_request: ExecutionRequest) -> Self {
impl<'a> From<SubgraphExecutionRequest<'a>> for async_graphql::Request {
fn from(exec_request: SubgraphExecutionRequest) -> Self {
let mut req = async_graphql::Request::new(exec_request.query);
if let Some(variables) = exec_request.variables {
req = req.variables(async_graphql::Variables::from_json(json!(variables)));
Expand Down
5 changes: 3 additions & 2 deletions lib/query-plan-executor/src/executors/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use std::sync::Arc;

use async_trait::async_trait;

use crate::{ExecutionRequest, ExecutionResult};
use crate::{ExecutionResult, SubgraphExecutionRequest};

#[async_trait]
pub trait SubgraphExecutor {
async fn execute(&self, execution_request: ExecutionRequest) -> ExecutionResult;
async fn execute<'a>(&self, execution_request: SubgraphExecutionRequest<'a>)
-> ExecutionResult;
fn to_boxed_arc<'a>(self) -> Arc<Box<dyn SubgraphExecutor + Send + Sync + 'a>>
where
Self: Sized + Send + Sync + 'a,
Expand Down
20 changes: 14 additions & 6 deletions lib/query-plan-executor/src/executors/http.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use async_trait::async_trait;
use tracing::{error, instrument, trace};

use crate::{executors::common::SubgraphExecutor, ExecutionRequest, ExecutionResult};
use crate::{
executors::common::SubgraphExecutor, json_writer::write_and_escape_string, ExecutionResult,
SubgraphExecutionRequest,
};

#[derive(Debug)]
pub struct HTTPSubgraphExecutor {
Expand All @@ -19,14 +22,16 @@ impl HTTPSubgraphExecutor {
}
}

async fn _execute(
async fn _execute<'a>(
&self,
execution_request: ExecutionRequest,
execution_request: SubgraphExecutionRequest<'a>,
) -> Result<ExecutionResult, String> {
trace!("Executing HTTP request to subgraph at {}", self.endpoint);

let mut body =
"{\"query\":".to_string() + &serde_json::to_string(&execution_request.query).unwrap();
// We may want to remove it, but let's see.
let mut body = String::with_capacity(4096);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4096 is generally enough? how did we end up with this value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guess

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will never be correct

body.push_str("{\"query\":");
write_and_escape_string(&mut body, execution_request.query);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

let mut first_variable = true;
if let Some(variables) = &execution_request.variables {
for (variable_name, variable_value) in variables {
Expand Down Expand Up @@ -87,7 +92,10 @@ impl HTTPSubgraphExecutor {
#[async_trait]
impl SubgraphExecutor for HTTPSubgraphExecutor {
#[instrument(level = "trace", skip(self), name = "http_subgraph_execute", fields(endpoint = %self.endpoint))]
async fn execute(&self, execution_request: ExecutionRequest) -> ExecutionResult {
async fn execute<'a>(
&self,
execution_request: SubgraphExecutionRequest<'a>,
) -> ExecutionResult {
self._execute(execution_request).await.unwrap_or_else(|e| {
error!(e);
ExecutionResult::from_error_message(e)
Expand Down
4 changes: 2 additions & 2 deletions lib/query-plan-executor/src/executors/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ impl SubgraphExecutorMap {
}

#[instrument(level = "trace", name = "subgraph_execute", skip_all, fields(subgraph_name = %subgraph_name, execution_request = ?execution_request))]
pub async fn execute(
pub async fn execute<'a>(
&self,
subgraph_name: &str,
execution_request: crate::ExecutionRequest,
execution_request: crate::SubgraphExecutionRequest<'a>,
) -> crate::ExecutionResult {
match self.inner.get(subgraph_name) {
Some(executor) => executor.execute(execution_request).await,
Expand Down
41 changes: 20 additions & 21 deletions lib/query-plan-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ trait ExecutableFetchNode {
indexes: BTreeSet<usize>,
) -> ExecuteForRepresentationsResult;
fn apply_output_rewrites(&self, possible_types: &PossibleTypes, data: &mut Value);
fn prepare_variables_for_fetch_node(
&self,
variable_values: &Option<HashMap<String, Value>>,
) -> Option<HashMap<String, Value>>;
fn prepare_variables_for_fetch_node<'a>(
&'a self,
variable_values: &'a Option<HashMap<String, Value>>,
) -> Option<HashMap<&'a str, &'a Value>>;
}

#[async_trait]
Expand Down Expand Up @@ -155,9 +155,9 @@ impl ExecutableFetchNode for FetchNode {
) -> ExecutionResult {
let variables = self.prepare_variables_for_fetch_node(execution_context.variable_values);

let execution_request = ExecutionRequest {
query: self.operation.document_str.clone(),
operation_name: self.operation_name.clone(),
let execution_request = SubgraphExecutionRequest {
query: &self.operation.document_str,
operation_name: self.operation_name.as_deref(),
variables,
extensions: None,
representations: None,
Expand Down Expand Up @@ -191,9 +191,9 @@ impl ExecutableFetchNode for FetchNode {
indexes: BTreeSet<usize>,
) -> ExecuteForRepresentationsResult {
// 2. Prepare variables for fetch
let execution_request = ExecutionRequest {
query: self.operation.document_str.clone(),
operation_name: self.operation_name.clone(),
let execution_request = SubgraphExecutionRequest {
query: &self.operation.document_str,
operation_name: self.operation_name.as_deref(),
variables: self.prepare_variables_for_fetch_node(execution_context.variable_values),
extensions: None,
representations: Some(filtered_representations),
Expand Down Expand Up @@ -242,10 +242,10 @@ impl ExecutableFetchNode for FetchNode {
skip(self, variable_values),
name = "prepare_variables_for_fetch_node"
)]
fn prepare_variables_for_fetch_node(
&self,
variable_values: &Option<HashMap<String, Value>>,
) -> Option<HashMap<String, Value>> {
fn prepare_variables_for_fetch_node<'a>(
&'a self,
variable_values: &'a Option<HashMap<String, Value>>,
) -> Option<HashMap<&'a str, &'a Value>> {
match (&self.variable_usages, variable_values) {
(Some(ref variable_usages), Some(variable_values)) => {
if variable_usages.is_empty() || variable_values.is_empty() {
Expand All @@ -257,7 +257,7 @@ impl ExecutableFetchNode for FetchNode {
.filter_map(|variable_name| {
variable_values
.get(variable_name)
.map(|v| (variable_name.to_string(), v.clone()))
.map(|v| (variable_name.as_str(), v))
})
.collect(),
)
Expand Down Expand Up @@ -764,12 +764,11 @@ pub struct GraphQLErrorLocation {
pub column: usize,
}

#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ExecutionRequest {
pub query: String,
pub operation_name: Option<String>,
pub variables: Option<HashMap<String, Value>>,
#[derive(Debug, Clone)]
pub struct SubgraphExecutionRequest<'a> {
pub query: &'a str,
pub operation_name: Option<&'a str>,
pub variables: Option<HashMap<&'a str, &'a Value>>,
pub extensions: Option<HashMap<String, Value>>,
pub representations: Option<String>,
}
Expand Down
31 changes: 13 additions & 18 deletions lib/query-plan-executor/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub fn project_by_operation(
)
)]
fn project_selection_set(
data: &mut Value,
data: &Value,
errors: &mut Vec<GraphQLError>,
selection_set: &SelectionSet,
type_name: &str,
Expand All @@ -97,7 +97,6 @@ fn project_selection_set(
Value::String(value) => {
if let Some(enum_values) = schema_metadata.enum_values.get(type_name) {
if !enum_values.contains(value) {
*data = Value::Null;
errors.push(GraphQLError {
message: format!(
"Value is not a valid enum value for type '{}'",
Expand All @@ -116,7 +115,7 @@ fn project_selection_set(
Value::Array(arr) => {
buffer.push('[');
let mut first = true;
for item in arr.iter_mut() {
for item in arr.iter() {
if !first {
buffer.push(',');
}
Expand Down Expand Up @@ -171,7 +170,7 @@ fn project_selection_set(
// TODO: simplfy args
#[allow(clippy::too_many_arguments)]
fn project_selection_set_with_map(
obj: &mut Map<String, Value>,
obj: &Map<String, Value>,
errors: &mut Vec<GraphQLError>,
selection_set: &SelectionSet,
type_name: &str,
Expand All @@ -183,9 +182,8 @@ fn project_selection_set_with_map(
let type_name = match obj.get(TYPENAME_FIELD) {
Some(Value::String(type_name)) => type_name,
_ => type_name,
}
.to_string();
let field_map = match schema_metadata.type_fields.get(&type_name) {
};
let field_map = match schema_metadata.type_fields.get(type_name) {
Some(field_map) => field_map,
None => {
// If the type is not found, we can't project anything
Expand Down Expand Up @@ -230,7 +228,7 @@ fn project_selection_set_with_map(
buffer.push('"');
buffer.push_str(response_key);
buffer.push_str("\":\"");
buffer.push_str(&type_name);
buffer.push_str(type_name);
buffer.push('"');
continue;
}
Expand All @@ -244,14 +242,11 @@ fn project_selection_set_with_map(
.map(|s| s.as_str())
.unwrap_or("Any");

if field.name == "__schema" && type_name == "Query" {
obj.insert(
response_key.to_string(),
schema_metadata.introspection_schema_root_json.clone(),
);
}

let field_val = obj.get_mut(response_key);
let field_val = if field.name == "__schema" && type_name == "Query" {
Some(&schema_metadata.introspection_schema_root_json)
} else {
obj.get(response_key)
};

if let Some(field_val) = field_val {
project_selection_set(
Expand All @@ -272,13 +267,13 @@ fn project_selection_set_with_map(
SelectionItem::InlineFragment(inline_fragment) => {
if schema_metadata
.possible_types
.entity_satisfies_type_condition(&type_name, &inline_fragment.type_condition)
.entity_satisfies_type_condition(type_name, &inline_fragment.type_condition)
{
project_selection_set_with_map(
obj,
errors,
&inline_fragment.selections,
&type_name,
type_name,
schema_metadata,
variable_values,
buffer,
Expand Down
Loading