Skip to content

Conversation

@jackwener
Copy link
Member

Which issue does this PR close?

Closes #6237.

Rationale for this change

projection_push_down don't consider VarProvider in columns.

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate optimizer Optimizer rules labels May 5, 2023
}

#[tokio::test]
async fn use_var_provider() -> Result<()> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how to set skip_failed_rule = false in dataframe

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        let runtime = Arc::new(RuntimeEnv::default());
        let config = SessionConfig::new().with_target_partitions(4);
        let config = config.set_bool("datafusion.optimizer.skip_failed_rules", false);
        SessionState::with_config_rt(config, runtime)
        let ctx = SessionContext::with_config(config);
        let df = ctx.sql(sql) 

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the version I got working:

#[derive(Debug)]
struct HardcodedIntProvider {}

impl VarProvider for HardcodedIntProvider {
    fn get_value(&self, _var_names: Vec<String>) -> Result<ScalarValue, DataFusionError> {
        Ok(ScalarValue::Int64(Some(1234)))
    }

    fn get_type(&self, _: &[String]) -> Option<DataType> {
        Some(DataType::Int64)
    }
}

#[tokio::test]
async fn use_var_provider() -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("foo", DataType::Int64, false),
        Field::new("bar", DataType::Int64, false),
    ]));

    let mem_table = Arc::new(MemTable::try_new(schema, vec![])?);

    let config = SessionConfig::new()
        .with_target_partitions(4)
        .set_bool("datafusion.optimizer.skip_failed_rules", false);

    let ctx = SessionContext::with_config(config);

    ctx.register_table("csv_table", mem_table)?;
    ctx.register_variable(VarType::UserDefined, Arc::new(HardcodedIntProvider {}));

    let dataframe = ctx
        .sql("SELECT foo FROM csv_table WHERE bar > @var")
        .await?;
    dataframe.collect().await?;
    Ok(())
}

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jackwener --- it is great to be so responsive

})
.collect::<Vec<Expr>>();
if columns.len() != expr.len() {
// Because columns may contain VarProvider, so the length of expr may be less than columns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is some way to strip out the columns earlier (like for example, perhaps we could skip pulling out ScalarVariable here)

https://github.com/apache/arrow-datafusion/blob/cf233c855f46fad8355342c581ab55f610758b6c/datafusion/expr/src/utils.rs#L273-L275

I tried this locally:

diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 7babd659e..00e1d0769 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -270,13 +270,11 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
             Expr::Column(qc) => {
                 accum.insert(qc.clone());
             }
-            Expr::ScalarVariable(_, var_names) => {
-                accum.insert(Column::from_name(var_names.join(".")));
-            }
             // Use explicit pattern match instead of a default
             // implementation, so that in the future if someone adds
             // new Expr types, they will check here as well
-            Expr::Alias(_, _)
+            Expr::ScalarVariable(_, _)
+            | Expr::Alias(_, _)
             | Expr::Literal(_)
             | Expr::BinaryExpr { .. }
             | Expr::Like { .. }

and it seems promising

}

#[tokio::test]
async fn use_var_provider() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the version I got working:

#[derive(Debug)]
struct HardcodedIntProvider {}

impl VarProvider for HardcodedIntProvider {
    fn get_value(&self, _var_names: Vec<String>) -> Result<ScalarValue, DataFusionError> {
        Ok(ScalarValue::Int64(Some(1234)))
    }

    fn get_type(&self, _: &[String]) -> Option<DataType> {
        Some(DataType::Int64)
    }
}

#[tokio::test]
async fn use_var_provider() -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("foo", DataType::Int64, false),
        Field::new("bar", DataType::Int64, false),
    ]));

    let mem_table = Arc::new(MemTable::try_new(schema, vec![])?);

    let config = SessionConfig::new()
        .with_target_partitions(4)
        .set_bool("datafusion.optimizer.skip_failed_rules", false);

    let ctx = SessionContext::with_config(config);

    ctx.register_table("csv_table", mem_table)?;
    ctx.register_variable(VarType::UserDefined, Arc::new(HardcodedIntProvider {}));

    let dataframe = ctx
        .sql("SELECT foo FROM csv_table WHERE bar > @var")
        .await?;
    dataframe.collect().await?;
    Ok(())
}

@jackwener
Copy link
Member Author

Thanks @comphead @alamb

@github-actions github-actions bot added logical-expr Logical plan and expressions and removed optimizer Optimizer rules labels May 6, 2023
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beautiful. Thank you @jackwener !

@alamb alamb merged commit 7760191 into apache:main May 8, 2023
@jackwener jackwener deleted the same_expr branch January 9, 2024 06:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate logical-expr Logical plan and expressions

Projects

None yet

Development

Successfully merging this pull request may close these issues.

push_down_projection optimization fails when using variables

3 participants