Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 52 additions & 15 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;

use datafusion::common::FileType;
use datafusion::sql::sqlparser;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use tokio::signal;
Expand Down Expand Up @@ -221,37 +222,73 @@ async fn exec_and_print(

let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
let adjusted =
Copy link
Contributor Author

@alamb alamb Apr 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I extracted the existing code to update the print options based on statement into its own structure which I think makes it clearer what is going on here

AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement);

let plan = create_plan(ctx, statement).await?;
let adjusted = adjusted.with_plan(&plan);

// For plans like `Explain` ignore `MaxRows` option and always display all rows
let should_ignore_maxrows = matches!(
plan,
LogicalPlan::Explain(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Analyze(_)
);
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

if physical_plan.execution_mode().is_unbounded() {
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
let mut print_options = print_options.clone();
if should_ignore_maxrows {
print_options.maxrows = MaxRows::Unlimited;
}
if print_options.format == PrintFormat::Automatic {
print_options.format = PrintFormat::Table;
}
let results = collect(physical_plan, task_ctx.clone()).await?;
print_options.print_batches(&results, now)?;
adjusted.into_inner().print_batches(&results, now)?;
}
}

Ok(())
}

/// Track adjustments to the print options based on the plan / statement being executed
#[derive(Debug)]
struct AdjustedPrintOptions {
inner: PrintOptions,
}

impl AdjustedPrintOptions {
fn new(inner: PrintOptions) -> Self {
Self { inner }
}
/// Adjust print options based on any statement specific requirements
fn with_statement(mut self, statement: &Statement) -> Self {
if let Statement::Statement(sql_stmt) = statement {
// SHOW / SHOW ALL
if let sqlparser::ast::Statement::ShowVariable { .. } = sql_stmt.as_ref() {
self.inner.maxrows = MaxRows::Unlimited
}
}
self
}

/// Adjust print options based on any plan specific requirements
fn with_plan(mut self, plan: &LogicalPlan) -> Self {
// For plans like `Explain` ignore `MaxRows` option and always display
// all rows
if matches!(
plan,
LogicalPlan::Explain(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Analyze(_)
) {
self.inner.maxrows = MaxRows::Unlimited;
}
self
}

/// Finalize and return the inner `PrintOptions`
fn into_inner(mut self) -> PrintOptions {
if self.inner.format == PrintFormat::Automatic {
self.inner.format = PrintFormat::Table;
}

self.inner
}
}

async fn create_plan(
ctx: &mut SessionContext,
statement: Statement,
Expand Down