Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement StreamTable and StreamTableProvider (#7994) #8021

Merged
merged 15 commits into from
Nov 15, 2023
37 changes: 5 additions & 32 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use super::PartitionedFile;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::{
create_ordering,
file_format::{
arrow::ArrowFormat,
avro::AvroFormat,
Expand All @@ -40,15 +41,13 @@ use crate::datasource::{
TableProvider, TableType,
};
use crate::logical_expr::TableProviderFilterPushDown;
use crate::physical_plan;
use crate::{
error::{DataFusionError, Result},
execution::context::SessionState,
logical_expr::Expr,
physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics},
};

use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
use arrow_schema::Schema;
use datafusion_common::{
Expand All @@ -57,10 +56,9 @@ use datafusion_common::{
};
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_expr::expr::Sort;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{
create_physical_expr, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement,
create_physical_expr, LexOrdering, PhysicalSortRequirement,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -677,34 +675,7 @@ impl ListingTable {

/// If file_sort_order is specified, creates the appropriate physical expressions
fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
let mut all_sort_orders = vec![];

for exprs in &self.options.file_sort_order {
// Construct PhsyicalSortExpr objects from Expr objects:
let sort_exprs = exprs
.iter()
.map(|expr| {
if let Expr::Sort(Sort { expr, asc, nulls_first }) = expr {
if let Expr::Column(col) = expr.as_ref() {
let expr = physical_plan::expressions::col(&col.name, self.table_schema.as_ref())?;
Ok(PhysicalSortExpr {
expr,
options: SortOptions {
descending: !asc,
nulls_first: *nulls_first,
},
})
} else {
plan_err!("Expected single column references in output_ordering, got {expr}")
}
} else {
plan_err!("Expected Expr::Sort in output_ordering, but got {expr}")
}
})
.collect::<Result<Vec<_>>>()?;
all_sort_orders.push(sort_exprs);
}
Ok(all_sort_orders)
create_ordering(&self.table_schema, &self.options.file_sort_order)
}
}

Expand Down Expand Up @@ -1040,9 +1011,11 @@ mod tests {

use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use arrow_schema::SortOptions;
use datafusion_common::stats::Precision;
use datafusion_common::{assert_contains, GetExt, ScalarValue};
use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
use datafusion_physical_expr::PhysicalSortExpr;
use rstest::*;
use tempfile::TempDir;

Expand Down
9 changes: 2 additions & 7 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,13 @@ use datafusion_expr::CreateExternalTable;
use async_trait::async_trait;

/// A `TableProviderFactory` capable of creating new `ListingTable`s
#[derive(Debug, Default)]
pub struct ListingTableFactory {}

impl ListingTableFactory {
/// Creates a new `ListingTableFactory`
pub fn new() -> Self {
Self {}
}
}

impl Default for ListingTableFactory {
fn default() -> Self {
Self::new()
Self::default()
}
}

Expand Down
44 changes: 44 additions & 0 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub mod memory;
pub mod physical_plan;
pub mod provider;
mod statistics;
pub mod stream;
pub mod streaming;
pub mod view;

Expand All @@ -43,3 +44,46 @@ pub use self::provider::TableProvider;
pub use self::view::ViewTable;
pub use crate::logical_expr::TableType;
pub use statistics::get_statistics_with_limit;

use arrow_schema::{Schema, SortOptions};
use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_expr::Expr;
use datafusion_physical_expr::{expressions, LexOrdering, PhysicalSortExpr};

fn create_ordering(
schema: &Schema,
sort_order: &[Vec<Expr>],
) -> Result<Vec<LexOrdering>> {
let mut all_sort_orders = vec![];

for exprs in sort_order {
// Construct PhysicalSortExpr objects from Expr objects:
let mut sort_exprs = vec![];
for expr in exprs {
match expr {
Expr::Sort(sort) => match sort.expr.as_ref() {
Expr::Column(col) => match expressions::col(&col.name, schema) {
Ok(expr) => {
sort_exprs.push(PhysicalSortExpr {
expr,
options: SortOptions {
descending: !sort.asc,
nulls_first: sort.nulls_first,
},
});
}
// Cannot find expression in the projected_schema, stop iterating
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary as the sort key may reference columns not found in the projected schema.

This change matches get_projected_output_ordering

// since rest of the orderings are violated
Err(_) => break,
}
expr => return plan_err!("Expected single column references in output_ordering, got {expr}"),
}
expr => return plan_err!("Expected Expr::Sort in output_ordering, but got {expr}"),
}
}
if !sort_exprs.is_empty() {
all_sort_orders.push(sort_exprs);
}
}
Ok(all_sort_orders)
}
40 changes: 40 additions & 0 deletions datafusion/core/src/datasource/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use datafusion_expr::{CreateExternalTable, LogicalPlan};
pub use datafusion_expr::{TableProviderFilterPushDown, TableType};

use crate::arrow::datatypes::SchemaRef;
use crate::datasource::listing_table_factory::ListingTableFactory;
use crate::datasource::stream::StreamTableFactory;
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
Expand Down Expand Up @@ -214,3 +216,41 @@ pub trait TableProviderFactory: Sync + Send {
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>>;
}

/// The default [`TableProviderFactory`]
///
/// If [`CreateExternalTable`] is unbounded calls [`StreamTableFactory::create`],
/// otherwise calls [`ListingTableFactory::create`]
#[derive(Debug, Default)]
pub struct DefaultTableFactory {
stream: StreamTableFactory,
listing: ListingTableFactory,
}

impl DefaultTableFactory {
/// Creates a new [`DefaultTableFactory`]
pub fn new() -> Self {
Self::default()
}
}

#[async_trait]
impl TableProviderFactory for DefaultTableFactory {
async fn create(
&self,
state: &SessionState,
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>> {
let mut unbounded = cmd.unbounded;
for (k, v) in &cmd.options {
if k.eq_ignore_ascii_case("unbounded") && v.eq_ignore_ascii_case("true") {
unbounded = true
}
}

match unbounded {
true => self.stream.create(state, cmd).await,
false => self.listing.create(state, cmd).await,
}
}
}