Skip to content

Anti join ignores join filters #2842

@andygrove

Description

@andygrove

Describe the bug
We currently have incorrect behavior for Anti joins where this is a join filter. Here is a repro case demonstrated in Spark:

scala> val df = spark.sql("""select * from (values (1,2,3), (3,4,5)) a
| where not exists (select col1 from (values (1,2,3), (3,4,5)) b where b.col1 = a.col1 and b.col2 <> a.col2)
| """)
df: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 1 more field]

scala> df.show
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
|   3|   4|   5|
+----+----+----+

The following unit test can be added to hash_join.rs to attempt the same query.

    #[tokio::test]
    async fn join_anti_with_filter() -> Result<()> {
        let session_ctx = SessionContext::new();
        let task_ctx = session_ctx.task_ctx();
        let left = build_table(
            ("col1", &vec![1, 3]),
            ("col2", &vec![2, 4]),
            ("col3", &vec![3, 5]),
        );
        let right = left.clone();

        // join on col1
        let on = vec![(
            Column::new_with_schema("col1", &left.schema())?,
            Column::new_with_schema("col1", &right.schema())?,
        )];

        // build filter b.col2 <> a.col2
        let column_indices = vec![
            ColumnIndex {
                index: 1,
                side: JoinSide::Left,
            },
            ColumnIndex {
                index: 1,
                side: JoinSide::Right,
            },
        ];
        let intermediate_schema = Schema::new(vec![
            Field::new("x", DataType::Int32, true),
            Field::new("x", DataType::Int32, true),
        ]);
        let filter_expression = Arc::new(BinaryExpr::new(
            Arc::new(Column::new("x", 0)),
            Operator::NotEq,
            Arc::new(Column::new("x", 1)),
        )) as Arc<dyn PhysicalExpr>;

        let filter = JoinFilter::new(filter_expression, column_indices, intermediate_schema);

        let join = join_with_filter(left, right, on, filter,&JoinType::Anti, false)?;

        let columns = columns(&join.schema());
        assert_eq!(columns, vec!["col1", "col2", "col3"]);

        let stream = join.execute(0, task_ctx)?;
        let batches = common::collect(stream).await?;

        let expected = vec![
            "+------+------+------+",
            "| col1 | col2 | col3 |",
            "+------+------+------+",
            "| 1    | 2    | 3    |",
            "| 3    | 4    | 5    |",
            "+------+------+------+",
        ];
        assert_batches_sorted_eq!(expected, &batches);
        Ok(())
    }

The test fails and produces an empty batch.

To Reproduce
See above

Expected behavior
Test should pass

Additional context
None

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions