diff --git a/rust/datafusion/benches/aggregate_query_sql.rs b/rust/datafusion/benches/aggregate_query_sql.rs index b42e7fca91afb..d4a82c8ed6f4b 100644 --- a/rust/datafusion/benches/aggregate_query_sql.rs +++ b/rust/datafusion/benches/aggregate_query_sql.rs @@ -32,7 +32,8 @@ use datafusion::execution::context::ExecutionContext; fn aggregate_query(ctx: &mut ExecutionContext, sql: &str) { // execute the query - let results = ctx.sql(&sql).unwrap(); + let df = ctx.sql(&sql).unwrap(); + let results = df.collect().unwrap(); // display the relation for _batch in results {} diff --git a/rust/datafusion/examples/csv_sql.rs b/rust/datafusion/examples/csv_sql.rs index 771d99bfc39b3..97085f85806dc 100644 --- a/rust/datafusion/examples/csv_sql.rs +++ b/rust/datafusion/examples/csv_sql.rs @@ -36,12 +36,13 @@ fn main() -> Result<()> { )?; // execute the query - let results = ctx.sql( + let df = ctx.sql( "SELECT c1, MIN(c12), MAX(c12) \ FROM aggregate_test_100 \ WHERE c11 > 0.1 AND c11 < 0.9 \ GROUP BY c1", )?; + let results = df.collect()?; // print the results pretty::print_batches(&results)?; diff --git a/rust/datafusion/examples/parquet_sql.rs b/rust/datafusion/examples/parquet_sql.rs index 6359023dd8296..cc9ab968b2247 100644 --- a/rust/datafusion/examples/parquet_sql.rs +++ b/rust/datafusion/examples/parquet_sql.rs @@ -36,11 +36,12 @@ fn main() -> Result<()> { )?; // execute the query - let results = ctx.sql( + let df = ctx.sql( "SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \ FROM alltypes_plain \ WHERE id > 1 AND tinyint_col < double_col", )?; + let results = df.collect()?; // print the results pretty::print_batches(&results)?; diff --git a/rust/datafusion/src/bin/repl.rs b/rust/datafusion/src/bin/repl.rs index 74d4320061b53..d93401b8787aa 100644 --- a/rust/datafusion/src/bin/repl.rs +++ b/rust/datafusion/src/bin/repl.rs @@ -103,7 +103,8 @@ fn is_exit_command(line: &str) -> bool { fn exec_and_print(ctx: &mut ExecutionContext, sql: String) -> Result<()> { let now = Instant::now(); - let results = ctx.sql(&sql)?; + let df = ctx.sql(&sql)?; + let results = df.collect()?; if results.is_empty() { println!( diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 6a0b1506792f9..ca87c2e75a111 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -133,15 +133,8 @@ impl ExecutionContext { /// Execute a SQL query and produce a Relation (a schema-aware iterator over a series /// of RecordBatch instances) - pub fn sql(&mut self, sql: &str) -> Result> { + pub fn sql(&mut self, sql: &str) -> Result> { let plan = self.create_logical_plan(sql)?; - return self.collect_plan(&plan); - } - - /// Executes a logical plan and produce a Relation (a schema-aware iterator over a series - /// of RecordBatch instances). This function is intended for internal use and should not be - /// called directly. - pub fn collect_plan(&mut self, plan: &LogicalPlan) -> Result> { match plan { LogicalPlan::CreateExternalTable { ref schema, @@ -158,11 +151,13 @@ impl ExecutionContext { .schema(&schema) .has_header(*has_header), )?; - Ok(vec![]) + let plan = LogicalPlanBuilder::empty().build()?; + Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))) } FileType::Parquet => { self.register_parquet(name, location)?; - Ok(vec![]) + let plan = LogicalPlanBuilder::empty().build()?; + Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))) } _ => Err(ExecutionError::ExecutionError(format!( "Unsupported file type {:?}.", @@ -170,11 +165,7 @@ impl ExecutionContext { ))), }, - plan => { - let plan = self.optimize(&plan)?; - let plan = self.create_physical_plan(&plan)?; - Ok(self.collect(plan.as_ref())?) - } + plan => Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))), } } @@ -1095,7 +1086,8 @@ mod tests { let mut ctx = ExecutionContext::with_config( ExecutionConfig::new().with_physical_planner(Arc::new(MyPhysicalPlanner {})), ); - ctx.sql("SELECT 1").expect_err("query not supported"); + let df = ctx.sql("SELECT 1")?; + df.collect().expect_err("query not supported"); Ok(()) } diff --git a/rust/datafusion/src/execution/dataframe_impl.rs b/rust/datafusion/src/execution/dataframe_impl.rs index 4698a84d9b0f6..d53fb380e3a32 100644 --- a/rust/datafusion/src/execution/dataframe_impl.rs +++ b/rust/datafusion/src/execution/dataframe_impl.rs @@ -104,9 +104,13 @@ impl DataFrame for DataFrameImpl { self.plan.clone() } + // Convert the logical plan represented by this DataFrame into a physical plan and + // execute it fn collect(&self) -> Result> { - let mut ctx = ExecutionContext::from(self.ctx_state.clone()); - ctx.collect_plan(&self.plan.clone()) + let ctx = ExecutionContext::from(self.ctx_state.clone()); + let plan = ctx.optimize(&self.plan)?; + let plan = ctx.create_physical_plan(&plan)?; + Ok(ctx.collect(plan.as_ref())?) } /// Returns the schema from the logical plan