Skip to content

Commit

Permalink
Replace &Option<T> with Option<&T> (#4446)
Browse files Browse the repository at this point in the history
* Replace &Option<T> with Option<&T>

* empty commit to start build

* Change &Option<Vec> to Option<&Vec>

Co-authored-by: askoa <askoa@local>
  • Loading branch information
askoa and askoa committed Dec 1, 2022
1 parent 38e9f30 commit bde3c91
Show file tree
Hide file tree
Showing 22 changed files with 62 additions and 61 deletions.
8 changes: 4 additions & 4 deletions datafusion-examples/examples/custom_datasource.rs
Expand Up @@ -118,7 +118,7 @@ impl Debug for CustomDataSource {
impl CustomDataSource {
pub(crate) async fn create_physical_plan(
&self,
projections: &Option<Vec<usize>>,
projections: Option<&Vec<usize>>,
schema: SchemaRef,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(CustomExec::new(projections, schema, self.clone())))
Expand Down Expand Up @@ -177,7 +177,7 @@ impl TableProvider for CustomDataSource {
async fn scan(
&self,
_state: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
// filters and limit can be used here to inject some push-down operations if needed
_filters: &[Expr],
_limit: Option<usize>,
Expand All @@ -194,11 +194,11 @@ struct CustomExec {

impl CustomExec {
fn new(
projections: &Option<Vec<usize>>,
projections: Option<&Vec<usize>>,
schema: SchemaRef,
db: CustomDataSource,
) -> Self {
let projected_schema = project_schema(&schema, projections.as_ref()).unwrap();
let projected_schema = project_schema(&schema, projections).unwrap();
Self {
db,
projected_schema,
Expand Down
12 changes: 9 additions & 3 deletions datafusion/common/src/scalar.rs
Expand Up @@ -2160,7 +2160,7 @@ impl ScalarValue {
fn eq_array_decimal(
array: &ArrayRef,
index: usize,
value: &Option<i128>,
value: Option<&i128>,
precision: u8,
scale: i8,
) -> Result<bool> {
Expand Down Expand Up @@ -2196,8 +2196,14 @@ impl ScalarValue {
pub fn eq_array(&self, array: &ArrayRef, index: usize) -> bool {
match self {
ScalarValue::Decimal128(v, precision, scale) => {
ScalarValue::eq_array_decimal(array, index, v, *precision, *scale)
.unwrap()
ScalarValue::eq_array_decimal(
array,
index,
v.as_ref(),
*precision,
*scale,
)
.unwrap()
}
ScalarValue::Boolean(val) => {
eq_array_primitive!(array, index, BooleanArray, val)
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/dataframe.rs
Expand Up @@ -795,12 +795,11 @@ impl TableProvider for DataFrame {
async fn scan(
&self,
_ctx: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut expr = projection
.as_ref()
// construct projections
.map_or_else(
|| {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/datasource.rs
Expand Up @@ -61,7 +61,7 @@ pub trait TableProvider: Sync + Send {
async fn scan(
&self,
ctx: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
// limit can be used to reduce the amount scanned
// from the datasource as a performance optimization.
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/empty.rs
Expand Up @@ -69,12 +69,12 @@ impl TableProvider for EmptyTable {
async fn scan(
&self,
_ctx: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// even though there is no data, projections apply
let projected_schema = project_schema(&self.schema, projection.as_ref())?;
let projected_schema = project_schema(&self.schema, projection)?;
Ok(Arc::new(
EmptyExec::new(false, projected_schema).with_partitions(self.partitions),
))
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/datasource/listing/table.rs
Expand Up @@ -523,7 +523,7 @@ impl TableProvider for ListingTable {
async fn scan(
&self,
ctx: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand All @@ -533,7 +533,7 @@ impl TableProvider for ListingTable {
// if no files need to be read, return an `EmptyExec`
if partitioned_file_lists.is_empty() {
let schema = self.schema();
let projected_schema = project_schema(&schema, projection.as_ref())?;
let projected_schema = project_schema(&schema, projection)?;
return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
}

Expand Down Expand Up @@ -562,7 +562,7 @@ impl TableProvider for ListingTable {
file_schema: Arc::clone(&self.file_schema),
file_groups: partitioned_file_lists,
statistics,
projection: projection.clone(),
projection: projection.cloned(),
limit,
output_ordering: self.try_create_output_ordering()?,
table_partition_cols,
Expand Down Expand Up @@ -686,7 +686,7 @@ mod tests {
let table = load_table(&ctx, "alltypes_plain.parquet").await?;
let projection = None;
let exec = table
.scan(&ctx.state(), &projection, &[], None)
.scan(&ctx.state(), projection, &[], None)
.await
.expect("Scan table");

Expand Down Expand Up @@ -716,7 +716,7 @@ mod tests {
.with_schema(schema);
let table = ListingTable::try_new(config)?;

let exec = table.scan(&state, &None, &[], None).await?;
let exec = table.scan(&state, None, &[], None).await?;
assert_eq!(exec.statistics().num_rows, Some(8));
assert_eq!(exec.statistics().total_byte_size, Some(671));

Expand Down Expand Up @@ -855,7 +855,7 @@ mod tests {
let filter = Expr::not_eq(col("p1"), lit("v1"));

let scan = table
.scan(&ctx.state(), &None, &[filter], None)
.scan(&ctx.state(), None, &[filter], None)
.await
.expect("Empty execution plan");

Expand Down
18 changes: 7 additions & 11 deletions datafusion/core/src/datasource/memory.rs
Expand Up @@ -69,7 +69,7 @@ impl MemTable {
ctx: &SessionState,
) -> Result<Self> {
let schema = t.schema();
let exec = t.scan(ctx, &None, &[], None).await?;
let exec = t.scan(ctx, None, &[], None).await?;
let partition_count = exec.output_partitioning().partition_count();

let tasks = (0..partition_count)
Expand Down Expand Up @@ -136,14 +136,14 @@ impl TableProvider for MemTable {
async fn scan(
&self,
_ctx: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(MemoryExec::try_new(
&self.batches.clone(),
self.schema(),
projection.clone(),
projection.cloned(),
)?))
}
}
Expand Down Expand Up @@ -184,7 +184,7 @@ mod tests {

// scan with projection
let exec = provider
.scan(&session_ctx.state(), &Some(vec![2, 1]), &[], None)
.scan(&session_ctx.state(), Some(&vec![2, 1]), &[], None)
.await?;

let mut it = exec.execute(0, task_ctx)?;
Expand Down Expand Up @@ -218,9 +218,7 @@ mod tests {

let provider = MemTable::try_new(schema, vec![vec![batch]])?;

let exec = provider
.scan(&session_ctx.state(), &None, &[], None)
.await?;
let exec = provider.scan(&session_ctx.state(), None, &[], None).await?;
let mut it = exec.execute(0, task_ctx)?;
let batch1 = it.next().await.unwrap()?;
assert_eq!(3, batch1.schema().fields().len());
Expand Down Expand Up @@ -253,7 +251,7 @@ mod tests {
let projection: Vec<usize> = vec![0, 4];

match provider
.scan(&session_ctx.state(), &Some(projection), &[], None)
.scan(&session_ctx.state(), Some(&projection), &[], None)
.await
{
Err(DataFusionError::ArrowError(ArrowError::SchemaError(e))) => {
Expand Down Expand Up @@ -381,9 +379,7 @@ mod tests {
let provider =
MemTable::try_new(Arc::new(merged_schema), vec![vec![batch1, batch2]])?;

let exec = provider
.scan(&session_ctx.state(), &None, &[], None)
.await?;
let exec = provider.scan(&session_ctx.state(), None, &[], None).await?;
let mut it = exec.execute(0, task_ctx)?;
let batch1 = it.next().await.unwrap()?;
assert_eq!(3, batch1.schema().fields().len());
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/view.rs
Expand Up @@ -61,8 +61,8 @@ impl ViewTable {
}

/// Get definition ref
pub fn definition(&self) -> &Option<String> {
&self.definition
pub fn definition(&self) -> Option<&String> {
self.definition.as_ref()
}

/// Get logical_plan ref
Expand Down Expand Up @@ -104,7 +104,7 @@ impl TableProvider for ViewTable {
async fn scan(
&self,
state: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Expand Up @@ -181,8 +181,8 @@ fn swap_reverting_projection(
}

/// Swaps join sides for filter column indices and produces new JoinFilter
fn swap_join_filter(filter: &Option<JoinFilter>) -> Option<JoinFilter> {
filter.as_ref().map(|filter| {
fn swap_join_filter(filter: Option<&JoinFilter>) -> Option<JoinFilter> {
filter.map(|filter| {
let column_indices = filter
.column_indices()
.iter()
Expand Down Expand Up @@ -334,7 +334,7 @@ fn try_collect_left(
Arc::clone(left),
Arc::clone(right),
hash_join.on().to_vec(),
hash_join.filter().clone(),
hash_join.filter().cloned(),
hash_join.join_type(),
PartitionMode::CollectLeft,
hash_join.null_equals_null(),
Expand All @@ -345,7 +345,7 @@ fn try_collect_left(
Arc::clone(left),
Arc::clone(right),
hash_join.on().to_vec(),
hash_join.filter().clone(),
hash_join.filter().cloned(),
hash_join.join_type(),
PartitionMode::CollectLeft,
hash_join.null_equals_null(),
Expand Down Expand Up @@ -377,7 +377,7 @@ fn partitioned_hash_join(hash_join: &HashJoinExec) -> Result<Arc<dyn ExecutionPl
Arc::clone(left),
Arc::clone(right),
hash_join.on().to_vec(),
hash_join.filter().clone(),
hash_join.filter().cloned(),
hash_join.join_type(),
PartitionMode::Partitioned,
hash_join.null_equals_null(),
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_plan/joins/hash_join.rs
Expand Up @@ -248,8 +248,8 @@ impl HashJoinExec {
}

/// Filters applied before join output
pub fn filter(&self) -> &Option<JoinFilter> {
&self.filter
pub fn filter(&self) -> Option<&JoinFilter> {
self.filter.as_ref()
}

/// How the join is performed
Expand Down Expand Up @@ -698,7 +698,7 @@ fn build_join_indices(
left_data: &JoinLeftData,
on_left: &[Column],
on_right: &[Column],
filter: &Option<JoinFilter>,
filter: Option<&JoinFilter>,
random_state: &RandomState,
null_equals_null: &bool,
) -> Result<(UInt64Array, UInt32Array)> {
Expand Down Expand Up @@ -1363,7 +1363,7 @@ impl HashJoinStream {
left_data,
&self.on_left,
&self.on_right,
&self.filter,
self.filter.as_ref(),
&self.random_state,
&self.null_equals_null,
);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/planner.rs
Expand Up @@ -480,7 +480,7 @@ impl DefaultPhysicalPlanner {
// referred to in the query
let filters = unnormalize_cols(filters.iter().cloned());
let unaliased: Vec<Expr> = filters.into_iter().map(unalias).collect();
source.scan(session_state, projection, &unaliased, *fetch).await
source.scan(session_state, projection.as_ref(), &unaliased, *fetch).await
}
LogicalPlan::Values(Values {
values,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/test_util.rs
Expand Up @@ -321,7 +321,7 @@ impl TableProvider for TestTableProvider {
async fn scan(
&self,
_ctx: &SessionState,
_projection: &Option<Vec<usize>>,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/custom_sources.rs
Expand Up @@ -193,12 +193,12 @@ impl TableProvider for CustomTableProvider {
async fn scan(
&self,
_state: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(CustomExecutionPlan {
projection: projection.clone(),
projection: projection.cloned(),
}))
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/provider_filter_pushdown.rs
Expand Up @@ -143,7 +143,7 @@ impl TableProvider for CustomProvider {
async fn scan(
&self,
_state: &SessionState,
_: &Option<Vec<usize>>,
_: Option<&Vec<usize>>,
filters: &[Expr],
_: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/row.rs
Expand Up @@ -34,7 +34,7 @@ async fn test_with_parquet() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
let exec = get_exec("alltypes_plain.parquet", projection.as_ref(), None).await?;
let schema = exec.schema().clone();

let batches = collect(exec, task_ctx).await?;
Expand All @@ -55,7 +55,7 @@ async fn test_with_parquet_word_aligned() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]);
let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
let exec = get_exec("alltypes_plain.parquet", projection.as_ref(), None).await?;
let schema = exec.schema().clone();

let batches = collect(exec, task_ctx).await?;
Expand All @@ -73,7 +73,7 @@ async fn test_with_parquet_word_aligned() -> Result<()> {

async fn get_exec(
file_name: &str,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let testdata = datafusion::test_util::parquet_test_data();
Expand Down Expand Up @@ -103,7 +103,7 @@ async fn get_exec(
file_schema,
file_groups,
statistics,
projection: projection.clone(),
projection: projection.cloned(),
limit,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/information_schema.rs
Expand Up @@ -191,7 +191,7 @@ async fn information_schema_tables_table_types() {
async fn scan(
&self,
_ctx: &SessionState,
_: &Option<Vec<usize>>,
_: Option<&Vec<usize>>,
_: &[Expr],
_: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down

0 comments on commit bde3c91

Please sign in to comment.