Skip to content

A bug from EnforceDistribution. #18202

@baiguoname

Description

@baiguoname

Describe the bug

In the context of unbounded streaming data, there is bug I thought. If I skip the EnforceDistribution like this

// phsical_planners.rs, when i==5, the optimizer is the `EnforceDistribution`
        for (i, optimizer) in optimizers.iter().enumerate() {
            if i == 5 {
                continue;
            }

everything works fine.
But if I do not skip, then I get this error:

called `Result::unwrap()` on an `Err` value: Execution("Non Panic Task error: task 42 was cancelled\n\nbacktrace:    0: datafusion_common::error::DataFusionError::get_back_trace\n             at /root/datafusion/datafusion/common/src/error.rs:478:30\n   1: datafusion_physical_plan::stream::ReceiverStreamBuilder<O>::build::{{closure}}\n  

which comes from the source code:

// stream.rs
        let check = async move {
            while let Some(result) = join_set.join_next().await {
                match result {
                    Ok(task_result) => {
                        match task_result {
                            // Nothing to report
                            Ok(_) => continue,
                            // This means a blocking task error
                            Err(error) => return Some(Err(error)),
                        }
                    }
                    // This means a tokio task error, likely a panic
                    Err(e) => {
                        if e.is_panic() {
                            // resume on the main thread
                            std::panic::resume_unwind(e.into_panic());
                        } else {
                            // This should only occur if the task is
                            // cancelled, which would only occur if
                            // the JoinSet were aborted, which in turn
                            // would imply that the receiver has been
                            // dropped and this code is not running
                            return Some(exec_err!("Non Panic Task error: {e}"));
                        }
                    }
                }
            }
            None
        };

Here is the whole code:

use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use arrow::util::pretty::{self, pretty_format_batches};
use arrow::array::{Array, Float64Array, Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow_schema::{SchemaBuilder, SchemaRef, SortOptions};
use datafusion::catalog::{ScanArgs, ScanResult, Session};
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::functions_aggregate::first_last::{FirstValue, LastValue};
use datafusion::functions_aggregate::stddev::Stddev;
use datafusion::functions_aggregate::sum::Sum;
use datafusion::logical_expr::expr::WindowFunction;
use datafusion::logical_expr::{AggregateUDF, AggregateUDFImpl, Literal, Sort, SortExpr, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition};
use datafusion::physical_expr::{EquivalenceProperties, PhysicalSortExpr};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::prelude::{Expr, ExprFunctionExt, SessionContext, col};
use datafusion::datasource::{TableProvider, TableType};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream};
use datafusion::error::DataFusionError;
use datafusion::scalar::ScalarValue;
use futures_util::Stream;
use polars::frame::DataFrame as PDF;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use datafusion::functions_aggregate::average::{Avg, AvgAccumulator};
use datafusion::functions_aggregate::expr_fn::{avg, first_value, last_value, stddev};
use datafusion::error::Result;
use futures_util::StreamExt;
use qust_ds::prelude::*;
use datafusion::prelude::lit;

use crate::arrow_ffi::FfiConvert;

pub fn runtime_async<F: Future<Output = N>, N>(f: F) -> N {
    tokio::task::block_in_place(|| {
        let rt = tokio::runtime::Runtime::new().unwrap();
        rt.block_on(f)
    })
}

// A custom TableProvider that consumes from a channel.
#[derive(Debug, Clone)]
pub struct ChannelTableProvider {
    schema: Arc<Schema>,
    receiver: Arc<tokio::sync::Mutex<Receiver<RecordBatch>>>,
}

#[async_trait::async_trait]
impl TableProvider for ChannelTableProvider {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    fn schema(&self) -> Arc<Schema> {
        self.schema.clone()
    }

    fn table_type(&self) -> TableType {
        TableType::Base
    }

    async fn scan(
        &self,
        _state: &dyn Session,
        projection: Option<&Vec<usize>>,
        _filters: &[Expr],
        _limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        let schema = match projection {
            Some(n) => {
                self.schema.project(n)?.into()
            }
            None => {
                self.schema.clone()
            }
        };
        let stream = DynamicRecordBatchStream::new(schema, self.receiver.clone());
        Ok(Arc::new(DynamicExecutionPlan::new(self.schema.clone(), stream)))
    }
}

// Custom ExecutionPlan to produce batches dynamically from a channel.
#[derive(Debug)]
pub struct DynamicExecutionPlan {
    properties: PlanProperties,
    stream: DynamicRecordBatchStream, // You might store the stream or a way to create it.
}

impl DynamicExecutionPlan {
    pub fn new(schema: Arc<Schema>, stream: DynamicRecordBatchStream) -> Self {
        use datafusion::physical_plan::expressions::Column;
        let g1 = PhysicalSortExpr::new(
            Arc::new(Column::new("row_index", 0)), 
            SortOptions { descending: false, nulls_first: false }
        );
        let g2 = PhysicalSortExpr::new(
            Arc::new(Column::new("code", 1)), 
            SortOptions { descending: false, nulls_first: false }
        );
        let properties = PlanProperties::new(
            {
                EquivalenceProperties::new_with_orderings(
                    schema.clone(),
                    [[g1]]
                )

            },
            // {
            //     Partitioning::Hash(vec![Arc::new(Column::new("code", 1))], 3)
            // },
            Partitioning::UnknownPartitioning(1),
            EmissionType::Incremental,
            Boundedness::Unbounded { requires_infinite_memory: false },
        );

        Self {
            properties,
            stream,
        }
    }
}


impl DisplayAs for DynamicExecutionPlan {
    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "bbb")
    }
}

impl ExecutionPlan for DynamicExecutionPlan {
    fn name(&self) -> &str {
        "gg"
    }

    fn properties(&self) -> &PlanProperties {
        &self.properties
    }

    fn as_any(&self) -> &dyn std::any::Any {
        self
    }
    fn schema(&self) -> Arc<Schema> {
        self.stream.schema.clone()
    }
    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
        vec![]
    }

    fn with_new_children(
        self: Arc<Self>,
        children: Vec<Arc<dyn ExecutionPlan>>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        if children.is_empty() {
            Ok(self)
        } else {
            Err(DataFusionError::Internal(
                "DynamicExecutionPlan can't have children".to_string(),
            ))
        } 
    }

    fn execute(&self, _partition: usize, _context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
        Ok(Box::pin(self.stream.clone()))
    }


}

// Custom Stream for RecordBatches coming from the channel.
#[derive(Debug, Clone)]
pub struct DynamicRecordBatchStream {
    schema: Arc<Schema>,
    receiver: Arc<tokio::sync::Mutex<Receiver<RecordBatch>>>,
}

impl DynamicRecordBatchStream {
    pub fn new(schema: Arc<Schema>, receiver: Arc<tokio::sync::Mutex<Receiver<RecordBatch>>>) -> Self {
        Self { schema, receiver }
    }
}

impl Stream for DynamicRecordBatchStream {
    type Item = Result<RecordBatch>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // let mut receiver;
        // loop {
        //     match self.receiver.try_lock() {
        //         Ok(k) => {
        //             receiver = k;
        //             break;
        //         }
        //         Err(e) => {
        //             println!("{:?}", e);
        //             continue;
        //         }
        //     }
        // };
        let mut receiver = self.receiver.try_lock().unwrap();
        match receiver.poll_recv(cx) {
            Poll::Ready(Some(batch)) => {
                println!("111");
                Poll::Ready(Some(Ok(batch)))
            },
            Poll::Ready(None) => {
                println!("222");
                Poll::Ready(None)
            }
            Poll::Pending => {
                println!("333");
                Poll::Pending
            }
        }


    }

}

impl RecordBatchStream for DynamicRecordBatchStream {
    fn schema(&self) -> Arc<Schema> {
        self.schema.clone()
    }
}

pub struct DataCalc {
    pub stream: Pin<Box<dyn RecordBatchStream>>,
    pub sender: Sender<RecordBatch>,
}

impl DataCalc {
    pub async fn calc_data_async(&mut self, data: RecordBatch) -> Result<Option<RecordBatch>> {
        self
            .sender
            .send(data)
            .await
            .map_err(|e| {
                eprintln!(" eeeeee {}", e);
                datafusion::error::DataFusionError::External(Box::new(e))
            })?;
        let st = std::time::Instant::now();
        let res: Option<Result<RecordBatch>> = self.stream.next().await;
        println!("-- {:?} --", st.elapsed());
        res.transpose()
    }

    pub fn calc_data(&mut self, data: RecordBatch) -> Result<Option<RecordBatch>> {
        runtime_async(async {
            self.calc_data_async(data).await
        })
    }

    pub fn calc_df(&mut self, data: PDF) -> Result<PDF> {
        let (_, rb): (_, RecordBatch) = data.ffi_convert().unwrap();
        let res: RecordBatch = self.calc_data(rb)?.unwrap();
        let g: PDF = res.ffi_convert().unwrap();
        Ok(g)
    }
}

#[derive(Clone)]
pub struct DfPro {
    pub schema: SchemaRef,
    pub exprs: Vec<Expr>,
    ctx: SessionContext,
}

impl DfPro {

    pub fn new(data: PDF, exprs: Vec<Expr>) -> Self {
        let (schema, _): (SchemaRef, RecordBatch) = data.ffi_convert().unwrap();
        Self {
            schema,
            exprs,
            ctx: SessionContext::new(),
        }
    }

    pub async fn get_data_calc_async(&self) -> Result<DataCalc> {
        let (sender, rx) = channel::<RecordBatch>(100);
        let rx_mutex = Arc::new(tokio::sync::Mutex::new(rx));
        let table_provider = ChannelTableProvider {
            schema: self.schema.clone(),
            receiver: rx_mutex,
        };
        self.ctx.register_table("data", Arc::new(table_provider))?;
        let df = self.ctx.table("data").await?;
        let df = df
            .window(vec![
                col("factor_value")
                    .rolling_mean(2)
                    .partition_by(vec![col("code")])
                    .order_by(vec![SortExpr::new(col("row_index"), true, false)])
                    .window_frame({
                        let mut window_frame = WindowFrame::new(Some(true));
                        window_frame.start_bound = WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1)));
                        window_frame
                    })
                    .build()
                    .unwrap()
                    .alias("mean")
                    // .filter(col("mean").gt(lit(0.2)))
                    // .build()
                    // .unwrap()
            ])?;
        let stream = df.execute_stream().await?;
        let data_calc = DataCalc { stream, sender };
        Ok(data_calc)
    }

    pub fn get_data_calc(&self) -> Result<DataCalc> {
        runtime_async(async {
            self.get_data_calc_async().await
        })
    }
}

pub trait RollingG: Sized {
    fn rolling(self, method: impl AggregateUDFImpl + 'static, n: Option<u64>) -> Expr;

    fn rolling_mean(self, n: u64) -> Expr {
        self.rolling(Avg::new(), Some(n))
    }

    fn rolling_first(self, n: u64) -> Expr {
        self.rolling(FirstValue::new(), Some(n))
    }

    fn rolling_last(self, n: u64) -> Expr {
        self.rolling(LastValue::new(), Some(n))
    }

    fn rolling_sum(self, n: u64) -> Expr {
        self.rolling(Sum::new(), Some(n))
    }

    fn rolling_std(self, n: u64) -> Expr {
        self.rolling(Stddev::new(), Some(n))
    }

    fn mean(self) -> Expr {
        self.rolling(Avg::new(), None)
    }

    fn std(self) -> Expr {
        self.rolling(Stddev::new(), None)
    }

    fn first(self) -> Expr {
        self.rolling(FirstValue::new(), None)
    }

    fn last(self) -> Expr {
        self.rolling(FirstValue::new(), None)
    }
}

impl RollingG for Expr {
    fn rolling(self, method: impl AggregateUDFImpl + 'static, n: Option<u64>) -> Expr {
        let g = AggregateUDF::new_from_impl(method);
        let mut g1 = WindowFunction::new(Arc::new(g), vec![self]);
        let mut window_frame = WindowFrame::new(Some(true));
        if let Some(n) = n {
            window_frame.start_bound = WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n - 1)));
        }
        g1.params.window_frame = window_frame;
        Expr::WindowFunction(Box::new(g1))
    }
}

impl RollingG for &str {
    fn rolling(self, method: impl AggregateUDFImpl + 'static, n: Option<u64>) -> Expr {
        col(self).rolling(method, n)
    }
}

pub async fn test_data() -> Result<()> {
    let data1 = df!(
        "row_index" => [0, 1, 2],
        "code" => ["A", "B", "C"],
        "factor_value" => [0.0, 0.1, 0.2],
    ).unwrap();

    let data2 = df!(
        "row_index" => [3, 4, 5],
        "code" => ["A", "B", "C"],
        "factor_value" => [0.3, 0.4, 0.5],
    ).unwrap();

    let data3 = df!(
        "row_index" => [6, 7, 8],
        "code" => ["A", "B", "C"],
        "factor_value" => [0.6, 0.7, 0.8],
    ).unwrap();

    let data4 = df!(
        "row_index" => [9, 10, 11],
        "code" => ["A", "B", "C"],
        "factor_value" => [0.9, 1.0, 1.1],
    ).unwrap();
    let df_pro = DfPro::new(
        data1.clone(),
        vec![
            "factor_value".rolling_mean(30).alias("mean"),
        ]
    );
    let mut data_calc: DataCalc = df_pro.get_data_calc().unwrap();
    let (_, rb): (_, RecordBatch) = data1.clone().ffi_convert().unwrap();
    let res = data_calc.calc_data_async(rb).await.unwrap();
    // let res = data_calc.calc_df(data1.clone()).unwrap();
    println!("{:?}", res);
    // sleep(std::time::Duration::from_secs(3)).await;
    Ok(())
}

To Reproduce

like above

Expected behavior

There must be a bug I guess?

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions