Skip to content

Commit

Permalink
add ndc_explain to select node (#285)
Browse files Browse the repository at this point in the history
Co-authored-by: Anon Ray <ecthiender@users.noreply.github.com>
V3_GIT_ORIGIN_REV_ID: e45510454fe299fb41690a8fb56bcdce7628cd67
  • Loading branch information
2 people authored and hasura-bot committed Jan 29, 2024
1 parent 25472da commit a0d30fe
Show file tree
Hide file tree
Showing 28 changed files with 2,245 additions and 2,893 deletions.
2 changes: 1 addition & 1 deletion v3/ci.docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ services:
timeout: 10s
retries: 20
postgres_connector:
image: ghcr.io/hasura/ndc-postgres:dev-main-5aec135c1
image: ghcr.io/hasura/ndc-postgres:dev-main-dd4172889
volumes:
- ./engine/tests/pg_ndc_config.json:/config.json
depends_on:
Expand Down
2 changes: 1 addition & 1 deletion v3/custom-connector/src/bin/agent/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ async fn get_capabilities() -> Json<models::CapabilitiesResponse> {
Json(models::CapabilitiesResponse {
versions: "^0.1.0".into(),
capabilities: models::Capabilities {
explain: Some(models::LeafCapability {}),
explain: None,
query: models::QueryCapabilities {
aggregates: Some(models::LeafCapability {}),
variables: Some(models::LeafCapability {}),
Expand Down
2 changes: 1 addition & 1 deletion v3/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ services:
COLLECTOR_OTLP_ENABLED: 'true'
COLLECTOR_ZIPKIN_HOST_PORT: '9411'
postgres_connector:
image: ghcr.io/hasura/ndc-postgres:dev-main-5aec135c1
image: ghcr.io/hasura/ndc-postgres:dev-main-dd4172889
ports:
- 8100:8100
volumes:
Expand Down
33 changes: 21 additions & 12 deletions v3/engine/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,18 +238,27 @@ pub async fn execute_request_internal(
.await
}
RequestMode::Explain => {
tracer.in_span("explain", SpanVisibility::Internal, || {
// convert the query plan to explain step
let explain_response =
match crate::execute::explain::explain_query_plan(query_plan) {
Ok(step) => step.to_explain_response(),
Err(e) => explain::types::ExplainResponse::error(
e.to_graphql_error(None),
),
};

ExecuteOrExplainResponse::Explain(explain_response)
})
tracer
.in_span_async("explain", SpanVisibility::Internal, || {
Box::pin(async {
// convert the query plan to explain step
let explain_response =
match crate::execute::explain::explain_query_plan(
http_client,
query_plan,
)
.await
{
Ok(step) => step.make_explain_response(),
Err(e) => explain::types::ExplainResponse::error(
e.to_graphql_error(None),
),
};

ExecuteOrExplainResponse::Explain(explain_response)
})
})
.await
}
};
Ok(response)
Expand Down
121 changes: 100 additions & 21 deletions v3/engine/src/execute/explain.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use super::remote_joins::types::RemoteJoinType;
use super::ExecuteOrExplainResponse;
use crate::execute::query_plan::{NodeQueryPlan, ProcessResponseAs};
use crate::execute::remote_joins::types::{JoinId, JoinLocations, RemoteJoin};
use crate::execute::{error, query_plan};
use crate::metadata::resolved;
use crate::schema::GDS;
use async_recursion::async_recursion;
use hasura_authn_core::Session;
use lang_graphql as gql;
use lang_graphql::{http::RawRequest, schema::Schema};
use nonempty::NonEmpty;

use crate::execute::{error, query_plan};

use super::remote_joins::types::RemoteJoinType;
use super::ExecuteOrExplainResponse;
use tracing_util::SpanVisibility;
pub mod types;
use lang_graphql::ast::common as ast;
use ndc_client as ndc;

pub async fn execute_explain(
http_client: &reqwest::Client,
Expand Down Expand Up @@ -49,7 +52,8 @@ pub async fn execute_explain_internal(
}
}

pub fn explain_query_plan(
pub(crate) async fn explain_query_plan(
http_client: &reqwest::Client,
query_plan: query_plan::QueryPlan<'_, '_, '_>,
) -> Result<types::Step, error::Error> {
let mut parallel_root_steps = vec![];
Expand All @@ -58,20 +62,26 @@ pub fn explain_query_plan(
match node {
NodeQueryPlan::NDCQueryExecution(ndc_query_execution) => {
let sequence_steps = get_execution_steps(
http_client,
alias,
&ndc_query_execution.process_response_as,
ndc_query_execution.execution_tree.remote_executions,
ndc_query_execution.execution_tree.root_node.query,
);
ndc_query_execution.execution_tree.root_node.data_connector,
)
.await;
parallel_root_steps.push(Box::new(types::Step::Sequence(sequence_steps)));
}
NodeQueryPlan::RelayNodeSelect(Some(ndc_query_execution)) => {
let sequence_steps = get_execution_steps(
http_client,
alias,
&ndc_query_execution.process_response_as,
ndc_query_execution.execution_tree.remote_executions,
ndc_query_execution.execution_tree.root_node.query,
);
ndc_query_execution.execution_tree.root_node.data_connector,
)
.await;
parallel_root_steps.push(Box::new(types::Step::Sequence(sequence_steps)));
}
NodeQueryPlan::TypeName { .. } => {
Expand Down Expand Up @@ -114,65 +124,92 @@ pub fn explain_query_plan(
}
}

fn get_execution_steps<'s>(
async fn get_execution_steps<'s>(
http_client: &reqwest::Client,
alias: gql::ast::common::Alias,
process_response_as: &ProcessResponseAs<'s>,
join_locations: JoinLocations<(RemoteJoin<'s, '_>, JoinId)>,
ndc_query_request: ndc_client::models::QueryRequest,
data_connector: &resolved::data_connector::DataConnector,
) -> NonEmpty<Box<types::Step>> {
let mut sequence_steps = match process_response_as {
ProcessResponseAs::CommandResponse { .. } => {
// A command execution node
let data_connector_explain = fetch_explain_from_data_connector(
http_client,
ndc_query_request.clone(),
data_connector,
)
.await;
NonEmpty::new(Box::new(types::Step::CommandSelect(
types::CommandSelectIR {
command_name: alias.to_string(),
query_request: ndc_query_request,
ndc_explain: data_connector_explain,
},
)))
}
ProcessResponseAs::Array { .. } | ProcessResponseAs::Object { .. } => {
// A model execution node
let data_connector_explain = fetch_explain_from_data_connector(
http_client,
ndc_query_request.clone(),
data_connector,
)
.await;
NonEmpty::new(Box::new(types::Step::ModelSelect(types::ModelSelectIR {
model_name: alias.to_string(),
query_request: ndc_query_request,
ndc_explain: data_connector_explain,
})))
}
};
if let Some(join_steps) = get_join_steps(alias.to_string(), join_locations) {
if let Some(join_steps) = get_join_steps(alias.to_string(), join_locations, http_client).await {
sequence_steps.push(Box::new(types::Step::Parallel(join_steps)));
sequence_steps.push(Box::new(types::Step::HashJoin));
};
sequence_steps
}

fn get_join_steps(
#[async_recursion]
async fn get_join_steps(
_root_field_name: String,
join_locations: JoinLocations<(RemoteJoin<'_, '_>, JoinId)>,
join_locations: JoinLocations<(RemoteJoin<'async_recursion, 'async_recursion>, JoinId)>,
http_client: &reqwest::Client,
) -> Option<NonEmpty<Box<types::Step>>> {
let mut parallel_join_steps = vec![];
for (alias, location) in join_locations.locations {
let mut sequence_steps = vec![];
if let Some((remote_join, _join_id)) = location.join_node {
let mut query_request = remote_join.target_ndc_ir;
query_request.variables = Some(vec![]);
let data_connector_explain = fetch_explain_from_data_connector(
http_client,
query_request.clone(),
remote_join.target_data_connector,
)
.await;
sequence_steps.push(Box::new(types::Step::ForEach(
// We don't support ndc_explain for for-each steps yet
match remote_join.remote_join_type {
RemoteJoinType::ToModel => {
types::ForEachStep::ModelSelect(types::ModelSelectIR {
model_name: alias.clone(),
query_request,
ndc_explain: data_connector_explain,
})
}
RemoteJoinType::ToCommand => {
types::ForEachStep::CommandSelect(types::CommandSelectIR {
command_name: alias.clone(),
query_request,
ndc_explain: data_connector_explain,
})
}
},
)))
};
if let Some(rest_join_steps) = get_join_steps(alias, location.rest) {
if let Some(rest_join_steps) = get_join_steps(alias, location.rest, http_client).await {
sequence_steps.push(Box::new(types::Step::Parallel(rest_join_steps)));
sequence_steps.push(Box::new(types::Step::HashJoin));
};
Expand Down Expand Up @@ -212,32 +249,74 @@ fn simplify_step(step: Box<types::Step>) -> Box<types::Step> {
}
}

async fn fetch_explain_from_data_connector(
http_client: &reqwest::Client,
query_request: ndc_client::models::QueryRequest,
data_connector: &resolved::data_connector::DataConnector,
) -> types::NDCExplainResponse {
let tracer = tracing_util::global_tracer();
let response = tracer
.in_span_async(
"fetch_explain_from_data_connector",
SpanVisibility::Internal,
|| {
Box::pin(async {
let ndc_config = ndc::apis::configuration::Configuration {
base_path: data_connector.url.get_url(ast::OperationType::Query),
user_agent: None,
// This is isn't expensive, reqwest::Client is behind an Arc
client: http_client.clone(),
headers: data_connector.headers.0.clone(),
};
{
// TODO: use capabilities from the data connector context
let capabilities =
ndc::apis::default_api::capabilities_get(&ndc_config).await?;
match capabilities.capabilities.explain {
None => Ok(None),
Some(_) => {
ndc::apis::default_api::explain_post(&ndc_config, query_request)
.await
.map(Some)
.map_err(error::Error::from) // ndc_client::apis::Error -> InternalError -> Error
}
}
}
})
},
)
.await;
match response {
Ok(Some(response)) => types::NDCExplainResponse::success(response),
Ok(None) => types::NDCExplainResponse::not_supported(),
Err(e) => types::NDCExplainResponse::error(e),
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_simplify_steps() {
let step = types::Step::HashJoin;

let simplified_steps = simplify_step(Box::new(types::Step::Parallel(nonempty::nonempty![
Box::new(step.clone())
Box::new(types::Step::HashJoin)
])));
assert_eq!(*simplified_steps, step.clone());
assert_eq!(*simplified_steps, types::Step::HashJoin);

let simplified_steps = simplify_step(Box::new(types::Step::Sequence(nonempty::nonempty![
Box::new(step.clone())
Box::new(types::Step::HashJoin)
])));
assert_eq!(*simplified_steps, step.clone());
assert_eq!(*simplified_steps, types::Step::HashJoin);

let nested_step = types::Step::Parallel(nonempty::nonempty![Box::new(
types::Step::Sequence(nonempty::nonempty![Box::new(types::Step::Parallel(
nonempty::nonempty![Box::new(types::Step::Sequence(nonempty::nonempty![
Box::new(step.clone())
Box::new(types::Step::HashJoin)
]))]
))])
)]);
let simplified_steps = simplify_step(Box::new(nested_step));
assert_eq!(*simplified_steps, step.clone());
assert_eq!(*simplified_steps, types::Step::HashJoin);
}
}
Loading

0 comments on commit a0d30fe

Please sign in to comment.