Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,7 @@ impl CustomExec {
schema: SchemaRef,
db: CustomDataSource,
) -> Self {
let projected_schema =
project_schema(&schema, projections.map(|v| v.as_ref())).unwrap();
let projected_schema = project_schema(&schema, projections).unwrap();
let cache = Self::compute_properties(projected_schema.clone());
Self {
db,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ impl TableProvider for ListingTable {

// if no files need to be read, return an `EmptyExec`
if partitioned_file_lists.is_empty() {
let projected_schema = project_schema(&self.schema(), projection.as_deref())?;
let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
}

Expand Down
18 changes: 11 additions & 7 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,12 @@ impl Statistics {
/// For example, if we had statistics for columns `{"a", "b", "c"}`,
/// projecting to `vec![2, 1]` would return statistics for columns `{"c",
/// "b"}`.
pub fn project(mut self, projection: Option<&[usize]>) -> Self {
let Some(projection) = projection.map(AsRef::as_ref) else {
pub fn project<P: AsRef<[usize]>>(self, p: Option<&P>) -> Self {
self.project_inner(p.as_ref().map(|p| p.as_ref()))
}

pub fn project_inner(mut self, projection: Option<&[usize]>) -> Self {
let Some(projection) = projection else {
return self;
};

Expand Down Expand Up @@ -1066,29 +1070,29 @@ mod tests {

#[test]
fn test_project_none() {
let projection: Option<&[usize]> = None;
let stats = make_stats(vec![10, 20, 30]).project(projection);
let projection: Option<Vec<usize>> = None;
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
assert_eq!(stats, make_stats(vec![10, 20, 30]));
}

#[test]
fn test_project_empty() {
let projection = Some(vec![]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_deref());
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
assert_eq!(stats, make_stats(vec![]));
}

#[test]
fn test_project_swap() {
let projection = Some(vec![2, 1]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_deref());
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
assert_eq!(stats, make_stats(vec![30, 20]));
}

#[test]
fn test_project_repeated() {
let projection = Some(vec![1, 2, 1, 1, 0, 2]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_deref());
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30]));
}

Expand Down
10 changes: 5 additions & 5 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use std::thread::available_parallelism;
///
/// // Pick columns 'c' and 'b'
/// let projection = Some(vec![2, 1]);
/// let projected_schema = project_schema(&schema, projection.as_deref()).unwrap();
/// let projected_schema = project_schema(&schema, projection.as_ref()).unwrap();
///
/// let expected_schema = SchemaRef::new(Schema::new(vec![
/// Field::new("c", DataType::Utf8, true),
Expand All @@ -68,12 +68,12 @@ use std::thread::available_parallelism;
///
/// assert_eq!(projected_schema, expected_schema);
/// ```
pub fn project_schema(
pub fn project_schema<P: AsRef<[usize]>>(
schema: &SchemaRef,
projection: Option<&[usize]>,
projection: Option<P>,
) -> Result<SchemaRef> {
let schema = match projection.map(AsRef::as_ref) {
Some(columns) => Arc::new(schema.project(columns)?),
let schema = match projection {
Some(columns) => Arc::new(schema.project(columns.as_ref())?),
None => Arc::clone(schema),
};
Ok(schema)
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ impl TableProvider for EmptyTable {
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// even though there is no data, projections apply
let projected_schema =
project_schema(&self.schema, projection.map(AsRef::as_ref))?;
let projected_schema = project_schema(&self.schema, projection)?;
Ok(Arc::new(
EmptyExec::new(projected_schema).with_partitions(self.partitions),
))
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/custom_sources_cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl CustomExecutionPlan {
fn new(projection: Option<Vec<usize>>) -> Self {
let schema = TEST_CUSTOM_SCHEMA_REF!();
let schema =
project_schema(&schema, projection.as_deref()).expect("projected schema");
project_schema(&schema, projection.as_ref()).expect("projected schema");
let cache = Self::compute_properties(schema);
Self { projection, cache }
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ async fn test_hash_join_swap_on_joins_with_projections(
"ProjectionExec won't be added above if HashJoinExec contains embedded projection",
);

assert_eq!(swapped_join.projection.as_ref().unwrap(), [0_usize]);
assert_eq!(swapped_join.projection, Some(vec![0_usize]));
assert_eq!(swapped.schema().fields.len(), 1);
assert_eq!(swapped.schema().fields[0].name(), "small_col");
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion datafusion/datasource/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl MemorySourceConfig {
schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Result<Self> {
let projected_schema = project_schema(&schema, projection.as_deref())?;
let projected_schema = project_schema(&schema, projection.as_ref())?;
Ok(Self {
partitions: partitions.to_vec(),
schema,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,13 +855,13 @@ impl OptionProjectionRef {
/// Applies an optional projection to a [`SchemaRef`], returning the
/// projected schema.
pub fn project_schema(&self, schema: &SchemaRef) -> Result<SchemaRef> {
project_schema(schema, self.inner.as_deref())
project_schema(schema, self.inner.as_ref())
}

/// Applies an optional projection to a [`Statistics`], returning the
/// projected stats.
pub fn project_statistics(&self, stats: Statistics) -> Statistics {
stats.project(self.inner.as_deref())
stats.project(self.inner.as_ref())
}
}

Expand Down
13 changes: 0 additions & 13 deletions docs/source/library-user-guide/upgrading.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,6 @@

**Note:** DataFusion `53.0.0` has not been released yet. The information provided in this section pertains to features and changes that have already been merged to the main branch and are awaiting release in this version.

### Schema, statistics project fn take an option slice instead of Vec ref

`project_schema` and `Statistics::project` now take `Option<&[usize]>` instead of `Option<&Vec<usize>>`.

To convert `Option<&Vec<usize>>` into `Option<&[usize]>` you can use `map(|v| v.as_ref())` call,
for example:

```diff
- let projected_schema = project_schema(&schema, projections)?;
+ let projected_schema =
+ project_schema(&schema, projections.map(|v| v.as_ref()))?;
```

### `SimplifyInfo` trait removed, `SimplifyContext` now uses builder-style API

The `SimplifyInfo` trait has been removed and replaced with the concrete `SimplifyContext` struct. This simplifies the expression simplification API and removes the need for trait objects.
Expand Down