Skip to content

Commit

Permalink
ARROW-9762: [Rust] [DataFusion] ExecutionContext::sql now returns Dat…
Browse files Browse the repository at this point in the history
…aFrame

I need this change so that I can have Ballista use the DataFusion DataFrame trait and start testing the extension points for the physical planner.

Closes #8027 from andygrove/ARROW-9762

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
  • Loading branch information
andygrove committed Aug 22, 2020
1 parent 0a698c0 commit 2e8fcd4
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 22 deletions.
3 changes: 2 additions & 1 deletion rust/datafusion/benches/aggregate_query_sql.rs
Expand Up @@ -32,7 +32,8 @@ use datafusion::execution::context::ExecutionContext;

fn aggregate_query(ctx: &mut ExecutionContext, sql: &str) {
// execute the query
let results = ctx.sql(&sql).unwrap();
let df = ctx.sql(&sql).unwrap();
let results = df.collect().unwrap();

// display the relation
for _batch in results {}
Expand Down
3 changes: 2 additions & 1 deletion rust/datafusion/examples/csv_sql.rs
Expand Up @@ -36,12 +36,13 @@ fn main() -> Result<()> {
)?;

// execute the query
let results = ctx.sql(
let df = ctx.sql(
"SELECT c1, MIN(c12), MAX(c12) \
FROM aggregate_test_100 \
WHERE c11 > 0.1 AND c11 < 0.9 \
GROUP BY c1",
)?;
let results = df.collect()?;

// print the results
pretty::print_batches(&results)?;
Expand Down
3 changes: 2 additions & 1 deletion rust/datafusion/examples/parquet_sql.rs
Expand Up @@ -36,11 +36,12 @@ fn main() -> Result<()> {
)?;

// execute the query
let results = ctx.sql(
let df = ctx.sql(
"SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \
FROM alltypes_plain \
WHERE id > 1 AND tinyint_col < double_col",
)?;
let results = df.collect()?;

// print the results
pretty::print_batches(&results)?;
Expand Down
3 changes: 2 additions & 1 deletion rust/datafusion/src/bin/repl.rs
Expand Up @@ -103,7 +103,8 @@ fn is_exit_command(line: &str) -> bool {
fn exec_and_print(ctx: &mut ExecutionContext, sql: String) -> Result<()> {
let now = Instant::now();

let results = ctx.sql(&sql)?;
let df = ctx.sql(&sql)?;
let results = df.collect()?;

if results.is_empty() {
println!(
Expand Down
24 changes: 8 additions & 16 deletions rust/datafusion/src/execution/context.rs
Expand Up @@ -133,15 +133,8 @@ impl ExecutionContext {

/// Execute a SQL query and produce a Relation (a schema-aware iterator over a series
/// of RecordBatch instances)
pub fn sql(&mut self, sql: &str) -> Result<Vec<RecordBatch>> {
pub fn sql(&mut self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let plan = self.create_logical_plan(sql)?;
return self.collect_plan(&plan);
}

/// Executes a logical plan and produce a Relation (a schema-aware iterator over a series
/// of RecordBatch instances). This function is intended for internal use and should not be
/// called directly.
pub fn collect_plan(&mut self, plan: &LogicalPlan) -> Result<Vec<RecordBatch>> {
match plan {
LogicalPlan::CreateExternalTable {
ref schema,
Expand All @@ -158,23 +151,21 @@ impl ExecutionContext {
.schema(&schema)
.has_header(*has_header),
)?;
Ok(vec![])
let plan = LogicalPlanBuilder::empty().build()?;
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}
FileType::Parquet => {
self.register_parquet(name, location)?;
Ok(vec![])
let plan = LogicalPlanBuilder::empty().build()?;
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}
_ => Err(ExecutionError::ExecutionError(format!(
"Unsupported file type {:?}.",
file_type
))),
},

plan => {
let plan = self.optimize(&plan)?;
let plan = self.create_physical_plan(&plan)?;
Ok(self.collect(plan.as_ref())?)
}
plan => Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))),
}
}

Expand Down Expand Up @@ -1095,7 +1086,8 @@ mod tests {
let mut ctx = ExecutionContext::with_config(
ExecutionConfig::new().with_physical_planner(Arc::new(MyPhysicalPlanner {})),
);
ctx.sql("SELECT 1").expect_err("query not supported");
let df = ctx.sql("SELECT 1")?;
df.collect().expect_err("query not supported");
Ok(())
}

Expand Down
8 changes: 6 additions & 2 deletions rust/datafusion/src/execution/dataframe_impl.rs
Expand Up @@ -104,9 +104,13 @@ impl DataFrame for DataFrameImpl {
self.plan.clone()
}

// Convert the logical plan represented by this DataFrame into a physical plan and
// execute it
fn collect(&self) -> Result<Vec<RecordBatch>> {
let mut ctx = ExecutionContext::from(self.ctx_state.clone());
ctx.collect_plan(&self.plan.clone())
let ctx = ExecutionContext::from(self.ctx_state.clone());
let plan = ctx.optimize(&self.plan)?;
let plan = ctx.create_physical_plan(&plan)?;
Ok(ctx.collect(plan.as_ref())?)
}

/// Returns the schema from the logical plan
Expand Down

0 comments on commit 2e8fcd4

Please sign in to comment.