Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-9762: [Rust] [DataFusion] ExecutionContext::sql now returns DataFrame #8027

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion rust/datafusion/benches/aggregate_query_sql.rs
Expand Up @@ -32,7 +32,8 @@ use datafusion::execution::context::ExecutionContext;

fn aggregate_query(ctx: &mut ExecutionContext, sql: &str) {
// execute the query
let results = ctx.sql(&sql).unwrap();
let df = ctx.sql(&sql).unwrap();
let results = df.collect().unwrap();

// display the relation
for _batch in results {}
Expand Down
3 changes: 2 additions & 1 deletion rust/datafusion/examples/csv_sql.rs
Expand Up @@ -36,12 +36,13 @@ fn main() -> Result<()> {
)?;

// execute the query
let results = ctx.sql(
let df = ctx.sql(
"SELECT c1, MIN(c12), MAX(c12) \
FROM aggregate_test_100 \
WHERE c11 > 0.1 AND c11 < 0.9 \
GROUP BY c1",
)?;
let results = df.collect()?;

// print the results
pretty::print_batches(&results)?;
Expand Down
3 changes: 2 additions & 1 deletion rust/datafusion/examples/parquet_sql.rs
Expand Up @@ -36,11 +36,12 @@ fn main() -> Result<()> {
)?;

// execute the query
let results = ctx.sql(
let df = ctx.sql(
"SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \
FROM alltypes_plain \
WHERE id > 1 AND tinyint_col < double_col",
)?;
let results = df.collect()?;

// print the results
pretty::print_batches(&results)?;
Expand Down
3 changes: 2 additions & 1 deletion rust/datafusion/src/bin/repl.rs
Expand Up @@ -103,7 +103,8 @@ fn is_exit_command(line: &str) -> bool {
fn exec_and_print(ctx: &mut ExecutionContext, sql: String) -> Result<()> {
let now = Instant::now();

let results = ctx.sql(&sql)?;
let df = ctx.sql(&sql)?;
let results = df.collect()?;

if results.is_empty() {
println!(
Expand Down
24 changes: 8 additions & 16 deletions rust/datafusion/src/execution/context.rs
Expand Up @@ -133,15 +133,8 @@ impl ExecutionContext {

/// Execute a SQL query and produce a Relation (a schema-aware iterator over a series
/// of RecordBatch instances)
pub fn sql(&mut self, sql: &str) -> Result<Vec<RecordBatch>> {
pub fn sql(&mut self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let plan = self.create_logical_plan(sql)?;
return self.collect_plan(&plan);
}

/// Executes a logical plan and produce a Relation (a schema-aware iterator over a series
/// of RecordBatch instances). This function is intended for internal use and should not be
/// called directly.
pub fn collect_plan(&mut self, plan: &LogicalPlan) -> Result<Vec<RecordBatch>> {
match plan {
LogicalPlan::CreateExternalTable {
ref schema,
Expand All @@ -158,23 +151,21 @@ impl ExecutionContext {
.schema(&schema)
.has_header(*has_header),
)?;
Ok(vec![])
let plan = LogicalPlanBuilder::empty().build()?;
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}
FileType::Parquet => {
self.register_parquet(name, location)?;
Ok(vec![])
let plan = LogicalPlanBuilder::empty().build()?;
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}
_ => Err(ExecutionError::ExecutionError(format!(
"Unsupported file type {:?}.",
file_type
))),
},

plan => {
let plan = self.optimize(&plan)?;
let plan = self.create_physical_plan(&plan)?;
Ok(self.collect(plan.as_ref())?)
}
plan => Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))),
}
}

Expand Down Expand Up @@ -1095,7 +1086,8 @@ mod tests {
let mut ctx = ExecutionContext::with_config(
ExecutionConfig::new().with_physical_planner(Arc::new(MyPhysicalPlanner {})),
);
ctx.sql("SELECT 1").expect_err("query not supported");
let df = ctx.sql("SELECT 1")?;
df.collect().expect_err("query not supported");
Ok(())
}

Expand Down
8 changes: 6 additions & 2 deletions rust/datafusion/src/execution/dataframe_impl.rs
Expand Up @@ -104,9 +104,13 @@ impl DataFrame for DataFrameImpl {
self.plan.clone()
}

// Convert the logical plan represented by this DataFrame into a physical plan and
// execute it
fn collect(&self) -> Result<Vec<RecordBatch>> {
let mut ctx = ExecutionContext::from(self.ctx_state.clone());
ctx.collect_plan(&self.plan.clone())
let ctx = ExecutionContext::from(self.ctx_state.clone());
let plan = ctx.optimize(&self.plan)?;
let plan = ctx.create_physical_plan(&plan)?;
Ok(ctx.collect(plan.as_ref())?)
}

/// Returns the schema from the logical plan
Expand Down