diff --git a/src/explain.rs b/src/explain.rs index dc215ed..38db147 100644 --- a/src/explain.rs +++ b/src/explain.rs @@ -44,6 +44,9 @@ pub fn format_distributed_tasks( Ok(result) } +/// Builds a single RecordBatch with two columns, the first column with the type of plan and +/// the second column containing the formatted logical plan, physical plan, distributed plan, +/// and distributed tasks. pub fn build_explain_batch( logical_plan: &LogicalPlan, physical_plan: &Arc, diff --git a/src/planning.rs b/src/planning.rs index 461435e..e4474a7 100644 --- a/src/planning.rs +++ b/src/planning.rs @@ -45,7 +45,7 @@ use crate::{ }, }; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DDStage { /// our stage id pub stage_id: u64, @@ -221,6 +221,7 @@ pub async fn logical_planning(sql: &str, ctx: &SessionContext) -> Result, batch_size: usize, partitions_per_worker: Option, @@ -438,7 +439,7 @@ pub fn add_distributed_analyze( /// final stage only as that's all we care about from the call site pub async fn distribute_stages( query_id: &str, - stages: Vec, + stages: &[DDStage], worker_addrs: Vec, codec: &dyn PhysicalExtensionCodec, ) -> Result<(Addrs, Vec)> { diff --git a/src/proxy_service.rs b/src/proxy_service.rs index 12ac59d..308a732 100644 --- a/src/proxy_service.rs +++ b/src/proxy_service.rs @@ -177,12 +177,17 @@ impl FlightSqlHandler for DDProxyHandler { query: arrow_flight::sql::CommandStatementQuery, _request: Request, ) -> Result, Status> { - let query_plan = self + let mut query_plan = self .planner .prepare(&query.query) .await .map_err(|e| Status::internal(format!("Could not prepare query {e:?}")))?; + self.planner + .distribute_plan(&mut query_plan) + .await + .map_err(|e| Status::internal(format!("Could not distribute plan {e:?}")))?; + self.create_flight_info_response(query_plan) .map(Response::new) .context("Could not create flight info response") @@ -200,12 +205,17 @@ impl FlightSqlHandler for DDProxyHandler { None => return Err(Status::invalid_argument("Missing Substrait plan")), }; - let query_plan = self + let mut query_plan = self .planner .prepare_substrait(plan) .await .map_err(|e| Status::internal(format!("Could not prepare query {e:?}")))?; + self.planner + .distribute_plan(&mut query_plan) + .await + .map_err(|e| Status::internal(format!("Could not distribute plan {e:?}")))?; + self.create_flight_info_response(query_plan) .map(Response::new) .context("Could not create flight info response") diff --git a/src/query_planner.rs b/src/query_planner.rs index 46af7ca..2357ae8 100644 --- a/src/query_planner.rs +++ b/src/query_planner.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use anyhow::{anyhow, Context as AnyhowContext}; +use anyhow::anyhow; use arrow::{compute::concat_batches, datatypes::SchemaRef}; use datafusion::{ logical_expr::LogicalPlan, @@ -17,7 +17,8 @@ use crate::{ customizer::Customizer, explain::build_explain_batch, planning::{ - distribute_stages, execution_planning, get_ctx, logical_planning, physical_planning, + distribute_stages, distributed_physical_planning, get_ctx, logical_planning, + physical_planning, DDStage, }, record_batch_exec::RecordBatchExec, result::Result, @@ -36,6 +37,7 @@ pub struct QueryPlan { pub physical_plan: Arc, pub distributed_plan: Arc, pub distributed_tasks: Vec, + pub distributed_stages: Vec, } impl std::fmt::Debug for QueryPlan { @@ -75,17 +77,19 @@ impl QueryPlanner { customizer .clone() .map(|c| c as Arc) - .or(Some(Arc::new(DefaultPhysicalExtensionCodec {}))) - .unwrap(), + .unwrap_or(Arc::new(DefaultPhysicalExtensionCodec {})), )); Self { customizer, codec } } - /// Common planning steps shared by both query and its EXPLAIN + /// Prepare a Distributed DataFusion plan from a sql query. /// - /// Prepare a query by parsing the SQL, planning it, and distributing the - /// physical plan into stages that can be executed by workers. + /// This function parses the SQL, produces a logical plan, then derives the + /// physical plan and its distributed counterpart. + /// The resulting `QueryPlan` includes the logical plan, physical plan, + /// distributed plan, and distributed stages, but it does not yet contain + /// worker addresses or tasks, as they are filled in later by `distribute_plan()`. pub async fn prepare(&self, sql: &str) -> Result { let mut ctx = get_ctx().map_err(|e| anyhow!("Could not create context: {e}"))?; if let Some(customizer) = &self.customizer { @@ -105,8 +109,23 @@ impl QueryPlanner { } } + /// Prepare a Distributed DataFusion plan from a Substrait plan. + /// + /// 1. Convert the incoming Substrait plan into a `LogicalPlan` with DataFusion’s + /// default Substrait consumer. + /// 2. Derive the corresponding physical plan and distributed variant. + /// + /// The resulting `QueryPlan` contains the logical plan, physical plan, + /// distributed plan, and distributed stages, but it does not yet contain + /// worker addresses or tasks, as they are filled in later by `distribute_plan()`. pub async fn prepare_substrait(&self, substrait_plan: Plan) -> Result { - let ctx = get_ctx().map_err(|e| anyhow!("Could not create context: {e}"))?; + let mut ctx = get_ctx().map_err(|e| anyhow!("Could not create context: {e}"))?; + if let Some(customizer) = &self.customizer { + customizer + .customize(&mut ctx) + .await + .map_err(|e| anyhow!("Customization failed: {e:#?}"))?; + } let logical_plan = from_substrait_plan(&ctx.state(), &substrait_plan).await?; @@ -118,16 +137,33 @@ impl QueryPlanner { } } + /// Prepare a `QueryPlan` for a regular SELECT query async fn prepare_query( &self, logical_plan: LogicalPlan, ctx: SessionContext, ) -> Result { + // construct the initial physical plan from the logical plan let physical_plan = physical_planning(&logical_plan, &ctx).await?; - self.send_it(logical_plan, physical_plan, ctx).await + // construct the distributed physical plan and stages + let (distributed_plan, distributed_stages) = + distributed_physical_planning(physical_plan.clone(), 8192, Some(2)).await?; + + // build the initial plan, without the worker addresses and DDTasks + return self + .build_initial_plan( + distributed_plan, + distributed_stages, + ctx, + logical_plan, + physical_plan, + ) + .await; } + /// Prepare a `QueryPlan` for statements that should run locally on the proxy + /// node (e.g. `DESCRIBE TABLE`). async fn prepare_local( &self, logical_plan: LogicalPlan, @@ -151,9 +187,23 @@ impl QueryPlanner { let combined_batch = concat_batches(&batches[0].schema(), &batches)?; let physical_plan = Arc::new(RecordBatchExec::new(combined_batch)); - self.send_it(logical_plan, physical_plan, ctx).await + // construct the distributed physical plan and stages + let (distributed_plan, distributed_stages) = + distributed_physical_planning(physical_plan.clone(), 8192, Some(2)).await?; + + // build the initial plan, without the worker addresses and DDTasks + return self + .build_initial_plan( + distributed_plan, + distributed_stages, + ctx, + logical_plan, + physical_plan, + ) + .await; } + /// Prepare a `QueryPlan` for an EXPLAIN statement. async fn prepare_explain( &self, explain_plan: LogicalPlan, @@ -166,7 +216,8 @@ impl QueryPlanner { let logical_plan = child_plan[0]; - let query_plan = self.prepare_query(logical_plan.clone(), ctx).await?; + // construct the initial distributed physical plan, without the worker addresses and DDTasks + let mut query_plan = self.prepare_query(logical_plan.clone(), ctx).await?; let batch = build_explain_batch( &query_plan.logical_plan, @@ -176,56 +227,62 @@ impl QueryPlanner { self.codec.as_ref(), )?; let physical_plan = Arc::new(RecordBatchExec::new(batch)); + query_plan.physical_plan = physical_plan.clone(); - self.send_it( - query_plan.logical_plan, - physical_plan, - query_plan.session_context, - ) - .await + Ok(query_plan) } - async fn send_it( + + async fn build_initial_plan( &self, + distributed_plan: Arc, + distributed_stages: Vec, + ctx: SessionContext, logical_plan: LogicalPlan, physical_plan: Arc, - ctx: SessionContext, ) -> Result { let query_id = uuid::Uuid::new_v4().to_string(); - // divide the physical plan into chunks (tasks) that we can distribute to workers - let (distributed_plan, distributed_stages) = - execution_planning(physical_plan.clone(), 8192, Some(2)).await?; - - let worker_addrs = get_worker_addresses()?; - // gather some information we need to send back such that // we can send a ticket to the client let final_stage = &distributed_stages[distributed_stages.len() - 1]; let schema = Arc::clone(&final_stage.plan.schema()); let final_stage_id = final_stage.stage_id; - // distribute the stages to workers, further dividing them up - // into chunks of partitions (partition_groups) - let (final_workers, tasks) = distribute_stages( - &query_id, - distributed_stages, - worker_addrs, - self.codec.as_ref(), - ) - .await?; - - let qp = QueryPlan { + Ok(QueryPlan { query_id, session_context: ctx, - worker_addresses: final_workers, final_stage_id, schema, logical_plan, physical_plan, distributed_plan, - distributed_tasks: tasks, - }; + distributed_stages, + // will be populated on distribute_plan + worker_addresses: Addrs::default(), + distributed_tasks: Vec::new(), + }) + } + + /// Performs worker discovery, and distributes the query plan to workers, + /// also sets the final worker addresses and distributed tasks in the query plan. + pub async fn distribute_plan(&self, initial_plan: &mut QueryPlan) -> Result<()> { + // Perform worker discovery + let worker_addrs = get_worker_addresses()?; + + // Distribute the stages to workers, further dividing them up + // into chunks of partitions (partition_groups) + let (final_workers, tasks) = distribute_stages( + &initial_plan.query_id, + &initial_plan.distributed_stages, + worker_addrs, + self.codec.as_ref(), + ) + .await?; + + // set the distributed tasks and final worker addresses + initial_plan.worker_addresses = final_workers; + initial_plan.distributed_tasks = tasks; - Ok(qp) + Ok(()) } } diff --git a/src/util.rs b/src/util.rs index 0d9bd97..fc0883b 100644 --- a/src/util.rs +++ b/src/util.rs @@ -54,7 +54,6 @@ use tonic::transport::Channel; use url::Url; use crate::{ - codec::DDCodec, logging::{debug, error, trace}, protobuf::StageAddrs, result::Result,