Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ mod sql_tests {
FROM test
LIMIT 5".to_string(),
cases: vec![Arc::new(test1), Arc::new(test2)],
error_operator: "Window Error".to_string()
error_operator: "Sort Error".to_string()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, thanks for taking this, why the error message changed to Sort Error, I dont see the respective logic change..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this happens because when the task of erroring out is left to PipelineChecker, which is the last rule, sorts are already in-place. While checking, the algorithm first encounters the sort (before the window), which is is pipeline-breaking. Hence the error messsage.

};

case.run().await?;
Expand All @@ -337,7 +337,7 @@ mod sql_tests {
SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1
FROM test".to_string(),
cases: vec![Arc::new(test1), Arc::new(test2)],
error_operator: "Window Error".to_string()
error_operator: "Sort Error".to_string()
};
case.run().await?;
Ok(())
Expand Down
13 changes: 8 additions & 5 deletions datafusion/core/src/physical_optimizer/pipeline_fixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,16 @@ fn apply_subrules_and_check_finiteness_requirements(
input = value;
}
}
input
let is_unbounded = input
.plan
.unbounded_output(&input.children_unbounded)
.map(|value| {
input.unbounded = value;
Transformed::Yes(input)
})
// Treat the cases where executor cannot be run on unbounded data
// as generating unbounded data. These executors may be fixed during optimization
// (Sort will be removed, Window will be swapped etc.), If cannot
// be fixed Pipeline checker will generate error anyway.
.unwrap_or(true);
input.unbounded = is_unbounded;
Ok(Transformed::Yes(input))
}

#[cfg(test)]
Expand Down
109 changes: 108 additions & 1 deletion datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

pub mod parquet;

use arrow::array::Int32Array;
use std::any::Any;
use std::collections::HashMap;
use std::fs::File;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand All @@ -40,10 +42,13 @@ use crate::prelude::{CsvReadOptions, SessionContext};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion_common::from_slice::FromSlice;
use datafusion_common::{Statistics, TableReference};
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_execution::config::SessionConfig;
use datafusion_expr::{col, CreateExternalTable, Expr, TableType};
use datafusion_physical_expr::PhysicalSortExpr;
use futures::Stream;
use tempfile::TempDir;

/// Compares formatted output of a record batch with an expected
/// vector of strings, with the result of pretty formatting record
Expand Down Expand Up @@ -288,6 +293,108 @@ pub fn aggr_test_schema_with_missing_col() -> SchemaRef {
Arc::new(schema)
}

// Return a static RecordBatch and its ordering for tests. RecordBatch is ordered by ts
fn get_test_data() -> Result<(RecordBatch, Vec<Expr>)> {
let ts_field = Field::new("ts", DataType::Int32, false);
let inc_field = Field::new("inc_col", DataType::Int32, false);
let desc_field = Field::new("desc_col", DataType::Int32, false);

let schema = Arc::new(Schema::new(vec![ts_field, inc_field, desc_field]));

let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from_slice([
1, 1, 5, 9, 10, 11, 16, 21, 22, 26, 26, 28, 31, 33, 38, 42, 47, 51, 53,
53, 58, 63, 67, 68, 70, 72, 72, 76, 81, 85, 86, 88, 91, 96, 97, 98, 100,
101, 102, 104, 104, 108, 112, 113, 113, 114, 114, 117, 122, 126, 131,
131, 136, 136, 136, 139, 141, 146, 147, 147, 152, 154, 159, 161, 163,
164, 167, 172, 173, 177, 180, 185, 186, 191, 195, 195, 199, 203, 207,
210, 213, 218, 221, 224, 226, 230, 232, 235, 238, 238, 239, 244, 245,
247, 250, 254, 258, 262, 264, 264,
])),
Arc::new(Int32Array::from_slice([
1, 5, 10, 15, 20, 21, 26, 29, 30, 33, 37, 40, 43, 44, 45, 49, 51, 53, 58,
61, 65, 70, 75, 78, 83, 88, 90, 91, 95, 97, 100, 105, 109, 111, 115, 119,
120, 124, 126, 129, 131, 135, 140, 143, 144, 147, 148, 149, 151, 155,
156, 159, 160, 163, 165, 170, 172, 177, 181, 182, 186, 187, 192, 196,
197, 199, 203, 207, 209, 213, 214, 216, 219, 221, 222, 225, 226, 231,
236, 237, 242, 245, 247, 248, 253, 254, 259, 261, 266, 269, 272, 275,
278, 283, 286, 289, 291, 296, 301, 305,
])),
Arc::new(Int32Array::from_slice([
100, 98, 93, 91, 86, 84, 81, 77, 75, 71, 70, 69, 64, 62, 59, 55, 50, 45,
41, 40, 39, 36, 31, 28, 23, 22, 17, 13, 10, 6, 5, 2, 1, -1, -4, -5, -6,
-8, -12, -16, -17, -19, -24, -25, -29, -34, -37, -42, -47, -48, -49, -53,
-57, -58, -61, -65, -67, -68, -71, -73, -75, -76, -78, -83, -87, -91,
-95, -98, -101, -105, -106, -111, -114, -116, -120, -125, -128, -129,
-134, -139, -142, -143, -146, -150, -154, -158, -163, -168, -172, -176,
-181, -184, -189, -193, -196, -201, -203, -208, -210, -213,
])),
],
)?;
let file_sort_order = vec![col("ts").sort(true, false)];
Ok((batch, file_sort_order))
}

/// Creates a test_context with table name `annotated_data` which has 100 rows.
// Columns in the table are ts, inc_col, desc_col. Source is CsvExec which is ordered by
// ts column.
pub async fn get_test_context(
tmpdir: &TempDir,
infinite_source: bool,
session_config: SessionConfig,
) -> Result<SessionContext> {
get_test_context_helper(tmpdir, infinite_source, session_config, get_test_data).await
}

async fn get_test_context_helper(
tmpdir: &TempDir,
infinite_source: bool,
session_config: SessionConfig,
data_receiver: fn() -> Result<(RecordBatch, Vec<Expr>)>,
) -> Result<SessionContext> {
let ctx = SessionContext::with_config(session_config);

let csv_read_options = CsvReadOptions::default();
let (batch, file_sort_order) = data_receiver()?;

let options_sort = csv_read_options
.to_listing_options(&ctx.copied_config())
.with_file_sort_order(Some(file_sort_order))
.with_infinite_source(infinite_source);

write_test_data_to_csv(tmpdir, 1, &batch)?;
let sql_definition = None;
ctx.register_listing_table(
"annotated_data",
tmpdir.path().to_string_lossy(),
options_sort.clone(),
Some(batch.schema()),
sql_definition,
)
.await
.unwrap();
Ok(ctx)
}

fn write_test_data_to_csv(
tmpdir: &TempDir,
n_file: usize,
batch: &RecordBatch,
) -> Result<()> {
let n_chunk = batch.num_rows() / n_file;
for i in 0..n_file {
let target_file = tmpdir.path().join(format!("{i}.csv"));
let file = File::create(target_file)?;
let chunks_start = i * n_chunk;
let cur_batch = batch.slice(chunks_start, n_chunk);
let mut writer = arrow::csv::Writer::new(file);
writer.write(&cur_batch)?;
}
Ok(())
}

/// TableFactory for tests
pub struct TestTableFactory {}

Expand Down
Loading