From 7d58d87a3c6666b7b688f7168f3d962579dd6061 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Tue, 1 Jul 2025 17:34:13 +0300 Subject: [PATCH] Avoid extra allocation/clone in SubgraphExecutionRequest --- bench/k6.js | 2 +- .../src/pipeline/coerce_variables_service.rs | 2 +- .../src/pipeline/graphql_request_params.rs | 17 +++++++- bin/gateway/src/pipeline/normalize_service.rs | 2 +- bin/gateway/src/pipeline/parser_service.rs | 2 +- .../src/executors/async_graphql.rs | 13 +++--- .../src/executors/common.rs | 5 ++- lib/query-plan-executor/src/executors/http.rs | 20 ++++++--- lib/query-plan-executor/src/executors/map.rs | 4 +- lib/query-plan-executor/src/lib.rs | 41 +++++++++---------- lib/query-plan-executor/src/projection.rs | 31 ++++++-------- 11 files changed, 79 insertions(+), 60 deletions(-) diff --git a/bench/k6.js b/bench/k6.js index 7a7feabff..70a1ad5ca 100644 --- a/bench/k6.js +++ b/bench/k6.js @@ -57,7 +57,7 @@ export function handleSummary(data) { }, }); } - return handleBenchmarkSummary(data, { vus, time }); + return handleBenchmarkSummary(data, { vus, duration }); } let identifiersMap = {}; diff --git a/bin/gateway/src/pipeline/coerce_variables_service.rs b/bin/gateway/src/pipeline/coerce_variables_service.rs index 68c1ba3d0..08eb9cab7 100644 --- a/bin/gateway/src/pipeline/coerce_variables_service.rs +++ b/bin/gateway/src/pipeline/coerce_variables_service.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use axum::body::Body; use http::Request; use query_plan_executor::variables::collect_variables; -use query_plan_executor::ExecutionRequest; use serde_json::Value; use tracing::{trace, warn}; @@ -12,6 +11,7 @@ use crate::pipeline::error::{PipelineError, PipelineErrorVariant}; use crate::pipeline::gateway_layer::{ GatewayPipelineLayer, GatewayPipelineStepDecision, ProcessorLayer, }; +use crate::pipeline::graphql_request_params::ExecutionRequest; use crate::pipeline::http_request_params::HttpRequestParams; use crate::pipeline::normalize_service::GraphQLNormalizationPayload; use crate::shared_state::GatewaySharedState; diff --git a/bin/gateway/src/pipeline/graphql_request_params.rs b/bin/gateway/src/pipeline/graphql_request_params.rs index 46b9ed78e..83337acb0 100644 --- a/bin/gateway/src/pipeline/graphql_request_params.rs +++ b/bin/gateway/src/pipeline/graphql_request_params.rs @@ -1,7 +1,10 @@ +use std::collections::HashMap; + use axum::body::{to_bytes, Body}; use axum::extract::Query; use http::{Method, Request}; -use query_plan_executor::ExecutionRequest; +use serde::Deserialize; +use serde_json::Value; use tracing::{trace, warn}; use crate::pipeline::error::{PipelineError, PipelineErrorVariant}; @@ -24,6 +27,17 @@ struct GETQueryParams { pub extensions: Option, } +#[derive(Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ExecutionRequest { + pub query: String, + pub operation_name: Option, + pub variables: Option>, + // TODO: We don't use extensions yet, but we definitely will in the future. + #[allow(dead_code)] + pub extensions: Option>, +} + impl TryInto for GETQueryParams { type Error = PipelineErrorVariant; @@ -58,7 +72,6 @@ impl TryInto for GETQueryParams { operation_name: self.operation_name, variables, extensions, - representations: None, }; Ok(execution_request) diff --git a/bin/gateway/src/pipeline/normalize_service.rs b/bin/gateway/src/pipeline/normalize_service.rs index 0eb5c198e..e8cc2b8e6 100644 --- a/bin/gateway/src/pipeline/normalize_service.rs +++ b/bin/gateway/src/pipeline/normalize_service.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use axum::body::Body; use http::Request; use query_plan_executor::introspection::filter_introspection_fields_in_operation; -use query_plan_executor::ExecutionRequest; use query_planner::ast::document::NormalizedDocument; use query_planner::ast::normalization::normalize_operation; use query_planner::ast::operation::OperationDefinition; @@ -12,6 +11,7 @@ use crate::pipeline::error::{PipelineError, PipelineErrorVariant}; use crate::pipeline::gateway_layer::{ GatewayPipelineLayer, GatewayPipelineStepDecision, ProcessorLayer, }; +use crate::pipeline::graphql_request_params::ExecutionRequest; use crate::pipeline::http_request_params::HttpRequestParams; use crate::pipeline::parser_service::GraphQLParserPayload; use crate::shared_state::GatewaySharedState; diff --git a/bin/gateway/src/pipeline/parser_service.rs b/bin/gateway/src/pipeline/parser_service.rs index 3fefa00bd..3c84f4610 100644 --- a/bin/gateway/src/pipeline/parser_service.rs +++ b/bin/gateway/src/pipeline/parser_service.rs @@ -1,13 +1,13 @@ use axum::body::Body; use graphql_parser::query::Document; use http::Request; -use query_plan_executor::ExecutionRequest; use query_planner::utils::parsing::safe_parse_operation; use crate::pipeline::error::{PipelineError, PipelineErrorVariant}; use crate::pipeline::gateway_layer::{ GatewayPipelineLayer, GatewayPipelineStepDecision, ProcessorLayer, }; +use crate::pipeline::graphql_request_params::ExecutionRequest; use crate::pipeline::http_request_params::HttpRequestParams; use tracing::{error, trace}; diff --git a/lib/query-plan-executor/src/executors/async_graphql.rs b/lib/query-plan-executor/src/executors/async_graphql.rs index c66ce4bbf..5043150a0 100644 --- a/lib/query-plan-executor/src/executors/async_graphql.rs +++ b/lib/query-plan-executor/src/executors/async_graphql.rs @@ -4,8 +4,8 @@ use async_trait::async_trait; use serde_json::json; use crate::{ - executors::common::SubgraphExecutor, ExecutionRequest, ExecutionResult, GraphQLError, - GraphQLErrorLocation, + executors::common::SubgraphExecutor, ExecutionResult, GraphQLError, GraphQLErrorLocation, + SubgraphExecutionRequest, }; #[async_trait] @@ -13,14 +13,17 @@ impl SubgraphExecutor for Executor where Executor: async_graphql::Executor, { - async fn execute(&self, execution_request: ExecutionRequest) -> ExecutionResult { + async fn execute<'a>( + &self, + execution_request: SubgraphExecutionRequest<'a>, + ) -> ExecutionResult { let response: async_graphql::Response = self.execute(execution_request.into()).await; response.into() } } -impl From for async_graphql::Request { - fn from(exec_request: ExecutionRequest) -> Self { +impl<'a> From> for async_graphql::Request { + fn from(exec_request: SubgraphExecutionRequest) -> Self { let mut req = async_graphql::Request::new(exec_request.query); if let Some(variables) = exec_request.variables { req = req.variables(async_graphql::Variables::from_json(json!(variables))); diff --git a/lib/query-plan-executor/src/executors/common.rs b/lib/query-plan-executor/src/executors/common.rs index dbb10e0b7..b6d8a2f55 100644 --- a/lib/query-plan-executor/src/executors/common.rs +++ b/lib/query-plan-executor/src/executors/common.rs @@ -2,11 +2,12 @@ use std::sync::Arc; use async_trait::async_trait; -use crate::{ExecutionRequest, ExecutionResult}; +use crate::{ExecutionResult, SubgraphExecutionRequest}; #[async_trait] pub trait SubgraphExecutor { - async fn execute(&self, execution_request: ExecutionRequest) -> ExecutionResult; + async fn execute<'a>(&self, execution_request: SubgraphExecutionRequest<'a>) + -> ExecutionResult; fn to_boxed_arc<'a>(self) -> Arc> where Self: Sized + Send + Sync + 'a, diff --git a/lib/query-plan-executor/src/executors/http.rs b/lib/query-plan-executor/src/executors/http.rs index 89a9f0507..269f99211 100644 --- a/lib/query-plan-executor/src/executors/http.rs +++ b/lib/query-plan-executor/src/executors/http.rs @@ -1,7 +1,10 @@ use async_trait::async_trait; use tracing::{error, instrument, trace}; -use crate::{executors::common::SubgraphExecutor, ExecutionRequest, ExecutionResult}; +use crate::{ + executors::common::SubgraphExecutor, json_writer::write_and_escape_string, ExecutionResult, + SubgraphExecutionRequest, +}; #[derive(Debug)] pub struct HTTPSubgraphExecutor { @@ -19,14 +22,16 @@ impl HTTPSubgraphExecutor { } } - async fn _execute( + async fn _execute<'a>( &self, - execution_request: ExecutionRequest, + execution_request: SubgraphExecutionRequest<'a>, ) -> Result { trace!("Executing HTTP request to subgraph at {}", self.endpoint); - let mut body = - "{\"query\":".to_string() + &serde_json::to_string(&execution_request.query).unwrap(); + // We may want to remove it, but let's see. + let mut body = String::with_capacity(4096); + body.push_str("{\"query\":"); + write_and_escape_string(&mut body, execution_request.query); let mut first_variable = true; if let Some(variables) = &execution_request.variables { for (variable_name, variable_value) in variables { @@ -87,7 +92,10 @@ impl HTTPSubgraphExecutor { #[async_trait] impl SubgraphExecutor for HTTPSubgraphExecutor { #[instrument(level = "trace", skip(self), name = "http_subgraph_execute", fields(endpoint = %self.endpoint))] - async fn execute(&self, execution_request: ExecutionRequest) -> ExecutionResult { + async fn execute<'a>( + &self, + execution_request: SubgraphExecutionRequest<'a>, + ) -> ExecutionResult { self._execute(execution_request).await.unwrap_or_else(|e| { error!(e); ExecutionResult::from_error_message(e) diff --git a/lib/query-plan-executor/src/executors/map.rs b/lib/query-plan-executor/src/executors/map.rs index a27c8b14a..d80bcb132 100644 --- a/lib/query-plan-executor/src/executors/map.rs +++ b/lib/query-plan-executor/src/executors/map.rs @@ -22,10 +22,10 @@ impl SubgraphExecutorMap { } #[instrument(level = "trace", name = "subgraph_execute", skip_all, fields(subgraph_name = %subgraph_name, execution_request = ?execution_request))] - pub async fn execute( + pub async fn execute<'a>( &self, subgraph_name: &str, - execution_request: crate::ExecutionRequest, + execution_request: crate::SubgraphExecutionRequest<'a>, ) -> crate::ExecutionResult { match self.inner.get(subgraph_name) { Some(executor) => executor.execute(execution_request).await, diff --git a/lib/query-plan-executor/src/lib.rs b/lib/query-plan-executor/src/lib.rs index 9b965a4fb..10611237a 100644 --- a/lib/query-plan-executor/src/lib.rs +++ b/lib/query-plan-executor/src/lib.rs @@ -117,10 +117,10 @@ trait ExecutableFetchNode { indexes: BTreeSet, ) -> ExecuteForRepresentationsResult; fn apply_output_rewrites(&self, possible_types: &PossibleTypes, data: &mut Value); - fn prepare_variables_for_fetch_node( - &self, - variable_values: &Option>, - ) -> Option>; + fn prepare_variables_for_fetch_node<'a>( + &'a self, + variable_values: &'a Option>, + ) -> Option>; } #[async_trait] @@ -155,9 +155,9 @@ impl ExecutableFetchNode for FetchNode { ) -> ExecutionResult { let variables = self.prepare_variables_for_fetch_node(execution_context.variable_values); - let execution_request = ExecutionRequest { - query: self.operation.document_str.clone(), - operation_name: self.operation_name.clone(), + let execution_request = SubgraphExecutionRequest { + query: &self.operation.document_str, + operation_name: self.operation_name.as_deref(), variables, extensions: None, representations: None, @@ -191,9 +191,9 @@ impl ExecutableFetchNode for FetchNode { indexes: BTreeSet, ) -> ExecuteForRepresentationsResult { // 2. Prepare variables for fetch - let execution_request = ExecutionRequest { - query: self.operation.document_str.clone(), - operation_name: self.operation_name.clone(), + let execution_request = SubgraphExecutionRequest { + query: &self.operation.document_str, + operation_name: self.operation_name.as_deref(), variables: self.prepare_variables_for_fetch_node(execution_context.variable_values), extensions: None, representations: Some(filtered_representations), @@ -242,10 +242,10 @@ impl ExecutableFetchNode for FetchNode { skip(self, variable_values), name = "prepare_variables_for_fetch_node" )] - fn prepare_variables_for_fetch_node( - &self, - variable_values: &Option>, - ) -> Option> { + fn prepare_variables_for_fetch_node<'a>( + &'a self, + variable_values: &'a Option>, + ) -> Option> { match (&self.variable_usages, variable_values) { (Some(ref variable_usages), Some(variable_values)) => { if variable_usages.is_empty() || variable_values.is_empty() { @@ -257,7 +257,7 @@ impl ExecutableFetchNode for FetchNode { .filter_map(|variable_name| { variable_values .get(variable_name) - .map(|v| (variable_name.to_string(), v.clone())) + .map(|v| (variable_name.as_str(), v)) }) .collect(), ) @@ -764,12 +764,11 @@ pub struct GraphQLErrorLocation { pub column: usize, } -#[derive(Deserialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -pub struct ExecutionRequest { - pub query: String, - pub operation_name: Option, - pub variables: Option>, +#[derive(Debug, Clone)] +pub struct SubgraphExecutionRequest<'a> { + pub query: &'a str, + pub operation_name: Option<&'a str>, + pub variables: Option>, pub extensions: Option>, pub representations: Option, } diff --git a/lib/query-plan-executor/src/projection.rs b/lib/query-plan-executor/src/projection.rs index 60e0c6fe1..fe9b7a92b 100644 --- a/lib/query-plan-executor/src/projection.rs +++ b/lib/query-plan-executor/src/projection.rs @@ -81,7 +81,7 @@ pub fn project_by_operation( ) )] fn project_selection_set( - data: &mut Value, + data: &Value, errors: &mut Vec, selection_set: &SelectionSet, type_name: &str, @@ -97,7 +97,6 @@ fn project_selection_set( Value::String(value) => { if let Some(enum_values) = schema_metadata.enum_values.get(type_name) { if !enum_values.contains(value) { - *data = Value::Null; errors.push(GraphQLError { message: format!( "Value is not a valid enum value for type '{}'", @@ -116,7 +115,7 @@ fn project_selection_set( Value::Array(arr) => { buffer.push('['); let mut first = true; - for item in arr.iter_mut() { + for item in arr.iter() { if !first { buffer.push(','); } @@ -171,7 +170,7 @@ fn project_selection_set( // TODO: simplfy args #[allow(clippy::too_many_arguments)] fn project_selection_set_with_map( - obj: &mut Map, + obj: &Map, errors: &mut Vec, selection_set: &SelectionSet, type_name: &str, @@ -183,9 +182,8 @@ fn project_selection_set_with_map( let type_name = match obj.get(TYPENAME_FIELD) { Some(Value::String(type_name)) => type_name, _ => type_name, - } - .to_string(); - let field_map = match schema_metadata.type_fields.get(&type_name) { + }; + let field_map = match schema_metadata.type_fields.get(type_name) { Some(field_map) => field_map, None => { // If the type is not found, we can't project anything @@ -230,7 +228,7 @@ fn project_selection_set_with_map( buffer.push('"'); buffer.push_str(response_key); buffer.push_str("\":\""); - buffer.push_str(&type_name); + buffer.push_str(type_name); buffer.push('"'); continue; } @@ -244,14 +242,11 @@ fn project_selection_set_with_map( .map(|s| s.as_str()) .unwrap_or("Any"); - if field.name == "__schema" && type_name == "Query" { - obj.insert( - response_key.to_string(), - schema_metadata.introspection_schema_root_json.clone(), - ); - } - - let field_val = obj.get_mut(response_key); + let field_val = if field.name == "__schema" && type_name == "Query" { + Some(&schema_metadata.introspection_schema_root_json) + } else { + obj.get(response_key) + }; if let Some(field_val) = field_val { project_selection_set( @@ -272,13 +267,13 @@ fn project_selection_set_with_map( SelectionItem::InlineFragment(inline_fragment) => { if schema_metadata .possible_types - .entity_satisfies_type_condition(&type_name, &inline_fragment.type_condition) + .entity_satisfies_type_condition(type_name, &inline_fragment.type_condition) { project_selection_set_with_map( obj, errors, &inline_fragment.selections, - &type_name, + type_name, schema_metadata, variable_values, buffer,