Skip to content

Commit

Permalink
fix(rust): describe/explain streaming plans
Browse files Browse the repository at this point in the history
This PR adds back the possibility to `describe` / `explain` streaming engine
plans. As these plans are not stored in the main `IRPlan`, there need to be
special cases for that in each `describe` format.

This specifically addresses it for:
- Base format: Adds streaming header, prints the underlying plan
- Tree format: Adds streaming header into top node, prints the underlying plan
- Dot format: Prints the underlying plan

Fixes pola-rs#16762
  • Loading branch information
coastalwhite committed Jun 6, 2024
1 parent 27abc9d commit 983e92e
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 44 deletions.
22 changes: 18 additions & 4 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,18 +232,30 @@ impl LazyFrame {
Ok(self.clone().to_alp()?.describe_tree_format())
}

// @NOTE: this is used because we want to set the `enable_fmt` flag of `optimize_with_scratch`
// to `true` for describe.
fn _describe_to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
let (mut lp_arena, mut expr_arena) = self.get_arenas();
let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], true)?;

Ok(IRPlan::new(node, lp_arena, expr_arena))
}

/// Return a String describing the optimized logical plan.
///
/// Returns `Err` if optimizing the logical plan fails.
pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
Ok(self.clone().to_alp_optimized()?.describe())
Ok(self.clone()._describe_to_alp_optimized()?.describe())
}

/// Return a String describing the optimized logical plan in tree format.
///
/// Returns `Err` if optimizing the logical plan fails.
pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
Ok(self.clone().to_alp_optimized()?.describe_tree_format())
Ok(self
.clone()
._describe_to_alp_optimized()?
.describe_tree_format())
}

/// Return a String describing the logical plan.
Expand Down Expand Up @@ -551,7 +563,7 @@ impl LazyFrame {
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
scratch: &mut Vec<Node>,
_fmt: bool,
enable_fmt: bool,
) -> PolarsResult<Node> {
#[allow(unused_mut)]
let mut opt_state = self.opt_state;
Expand Down Expand Up @@ -591,16 +603,18 @@ impl LazyFrame {
lp_arena,
expr_arena,
scratch,
_fmt,
enable_fmt,
true,
opt_state.row_estimate,
)?;
}
#[cfg(not(feature = "streaming"))]
{
_ = enable_fmt;
panic!("activate feature 'streaming'")
}
}

Ok(lp_top)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ pub(super) fn construct(
};
// keep the original around for formatting purposes
let original_lp = if fmt {
let original_lp = node_to_lp_cloned(insertion_location, expr_arena, lp_arena);
let original_lp = IRPlan::new(insertion_location, lp_arena.clone(), expr_arena.clone());
Some(original_lp)
} else {
None
Expand All @@ -233,7 +233,7 @@ fn get_pipeline_node(
lp_arena: &mut Arena<IR>,
mut pipelines: Vec<PipeLine>,
schema: SchemaRef,
original_lp: Option<DslPlan>,
original_lp: Option<IRPlan>,
) -> IR {
// create a dummy input as the map function will call the input
// so we just create a scan that returns an empty df
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ pub(crate) fn insert_streaming_nodes(
},
}
}

let mut inserted = false;
for tree in pipeline_trees {
if is_valid_tree(&tree)
Expand Down
10 changes: 9 additions & 1 deletion crates/polars-plan/src/logical_plan/alp/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::constants::UNLIMITED_CACHE;
use crate::prelude::alp::format::ColumnsDisplay;
use crate::prelude::*;

pub struct IRDotDisplay<'a>(pub(crate) IRPlanRef<'a>);
pub struct IRDotDisplay<'a>(IRPlanRef<'a>);

const INDENT: &str = " ";

Expand Down Expand Up @@ -43,6 +43,14 @@ fn write_label<'a, 'b>(
}

impl<'a> IRDotDisplay<'a> {
pub fn new(lp: IRPlanRef<'a>) -> Self {
if let Some(streaming_lp) = lp.extract_streaming_plan() {
return Self(streaming_lp);
}

Self(lp)
}

fn with_root(&self, root: Node) -> Self {
Self(self.0.with_root(root))
}
Expand Down
84 changes: 55 additions & 29 deletions crates/polars-plan/src/logical_plan/alp/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use recursive::recursive;

use crate::prelude::*;

pub struct IRDisplay<'a>(pub(crate) IRPlanRef<'a>);
pub struct IRDisplay<'a> {
is_streaming: bool,
lp: IRPlanRef<'a>,
}

#[derive(Clone, Copy)]
pub struct ExprIRDisplay<'a> {
Expand Down Expand Up @@ -91,13 +94,62 @@ fn write_scan(
}

impl<'a> IRDisplay<'a> {
pub fn new(lp: IRPlanRef<'a>) -> Self {
if let Some(streaming_lp) = lp.extract_streaming_plan() {
return Self {
is_streaming: true,
lp: streaming_lp,
};
}

Self {
is_streaming: false,
lp,
}
}

fn root(&self) -> &IR {
self.lp.root()
}

fn with_root(&self, root: Node) -> Self {
Self {
is_streaming: false,
lp: self.lp.with_root(root),
}
}

fn display_expr(&self, root: &'a ExprIR) -> ExprIRDisplay<'a> {
ExprIRDisplay {
node: root.node(),
output_name: root.output_name_inner(),
expr_arena: self.lp.expr_arena,
}
}

fn display_expr_slice(&self, exprs: &'a [ExprIR]) -> ExprIRSliceDisplay<'a, ExprIR> {
ExprIRSliceDisplay {
exprs,
expr_arena: self.lp.expr_arena,
}
}

#[recursive]
fn _format(&self, f: &mut Formatter, indent: usize) -> fmt::Result {
if indent != 0 {
writeln!(f)?;
}

let indent = if self.is_streaming {
writeln!(f, "STREAMING:")?;
indent + 2
} else {
indent
};

let sub_indent = indent + 2;
use IR::*;

match self.root() {
#[cfg(feature = "python")]
PythonScan { options, predicate } => {
Expand Down Expand Up @@ -293,8 +345,7 @@ impl<'a> IRDisplay<'a> {
MapFunction {
input, function, ..
} => {
let function_fmt = format!("{function}");
write!(f, "{:indent$}{function_fmt}", "")?;
write!(f, "{:indent$}{function}", "")?;
self.with_root(*input)._format(f, sub_indent)
},
ExtContext { input, .. } => {
Expand All @@ -313,7 +364,7 @@ impl<'a> IRDisplay<'a> {
},
SimpleProjection { input, columns } => {
let num_columns = columns.as_ref().len();
let total_columns = self.0.lp_arena.get(*input).schema(self.0.lp_arena).len();
let total_columns = self.lp.lp_arena.get(*input).schema(self.lp.lp_arena).len();

let columns = ColumnsDisplay(columns.as_ref());
write!(
Expand All @@ -329,31 +380,6 @@ impl<'a> IRDisplay<'a> {
}
}

impl<'a> IRDisplay<'a> {
fn root(&self) -> &IR {
self.0.root()
}

fn with_root(&self, root: Node) -> Self {
Self(self.0.with_root(root))
}

fn display_expr(&self, root: &'a ExprIR) -> ExprIRDisplay<'a> {
ExprIRDisplay {
node: root.node(),
output_name: root.output_name_inner(),
expr_arena: self.0.expr_arena,
}
}

fn display_expr_slice(&self, exprs: &'a [ExprIR]) -> ExprIRSliceDisplay<'a, ExprIR> {
ExprIRSliceDisplay {
exprs,
expr_arena: self.0.expr_arena,
}
}
}

impl<'a> ExprIRDisplay<'a> {
fn with_slice<T: AsExpr>(&self, exprs: &'a [T]) -> ExprIRSliceDisplay<'a, T> {
ExprIRSliceDisplay {
Expand Down
29 changes: 25 additions & 4 deletions crates/polars-plan/src/logical_plan/alp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ impl IRPlan {
}
}

/// Extract the original logical plan if the plan is for the Streaming Engine
pub fn extract_streaming_plan(&self) -> Option<IRPlanRef> {
self.as_ref().extract_streaming_plan()
}

pub fn describe(&self) -> String {
self.as_ref().describe()
}
Expand All @@ -180,11 +185,11 @@ impl IRPlan {
}

pub fn display(&self) -> format::IRDisplay {
format::IRDisplay(self.as_ref())
self.as_ref().display()
}

pub fn display_dot(&self) -> dot::IRDotDisplay {
dot::IRDotDisplay(self.as_ref())
self.as_ref().display_dot()
}
}

Expand All @@ -201,12 +206,28 @@ impl<'a> IRPlanRef<'a> {
}
}

/// Extract the original logical plan if the plan is for the Streaming Engine
pub fn extract_streaming_plan(self) -> Option<IRPlanRef<'a>> {
// @NOTE: the streaming engine replaces the whole tree with a MapFunction { Pipeline, .. }
// and puts the original plan somewhere in there. This is how we extract it. Disguisting, I
// know.
let IR::MapFunction { input: _, function } = self.root() else {
return None;
};

let FunctionNode::Pipeline { original, .. } = function else {
return None;
};

Some(original.as_ref()?.as_ref().as_ref())
}

pub fn display(self) -> format::IRDisplay<'a> {
format::IRDisplay(self)
format::IRDisplay::new(self)
}

pub fn display_dot(self) -> dot::IRDotDisplay<'a> {
dot::IRDotDisplay(self)
dot::IRDotDisplay::new(self)
}

pub fn describe(self) -> String {
Expand Down
9 changes: 9 additions & 0 deletions crates/polars-plan/src/logical_plan/alp/tree_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ fn multiline_expression(expr: &str) -> std::borrow::Cow<'_, str> {

impl<'a> TreeFmtNode<'a> {
pub fn root_logical_plan(lp: IRPlanRef<'a>) -> Self {
if let Some(streaming_lp) = lp.extract_streaming_plan() {
return Self {
h: Some("Streaming".to_string()),
content: TreeFmtNodeContent::LogicalPlan(streaming_lp.lp_top),

lp: streaming_lp,
};
}

Self {
h: None,
content: TreeFmtNodeContent::LogicalPlan(lp.lp_top),
Expand Down
5 changes: 2 additions & 3 deletions crates/polars-plan/src/logical_plan/functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub enum FunctionNode {
Pipeline {
function: Arc<dyn DataFrameUdfMut>,
schema: SchemaRef,
original: Option<Arc<DslPlan>>,
original: Option<Arc<IRPlan>>,
},
Unnest {
columns: Arc<[Arc<str>]>,
Expand Down Expand Up @@ -327,8 +327,7 @@ impl Display for FunctionNode {
MergeSorted { .. } => write!(f, "MERGE SORTED"),
Pipeline { original, .. } => {
if let Some(original) = original {
let ir_plan = original.as_ref().clone().to_alp().unwrap();
let ir_display = ir_plan.display();
let ir_display = original.as_ref().display();

writeln!(f, "--- STREAMING")?;
write!(f, "{ir_display}")?;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-utils/src/arena.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Default for Node {

static ARENA_VERSION: AtomicU32 = AtomicU32::new(0);

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct Arena<T> {
version: u32,
items: Vec<T>,
Expand Down

0 comments on commit 983e92e

Please sign in to comment.