# Spark RAPIDS - Existence Join Demo

In [None]:
spark

In [None]:
spark.conf.set('spark.rapids.sql.explain', 'NONE')
spark.conf.set('spark.rapids.sql.enabled', False)

## Create Data

In [None]:
spark.createDataFrame([
    [1, 101, 2500],
    [2, 102, 1110], 
    [3, 103, 500], 
    [4, 104, 400], 
    [5, 105, 150], 
    [6, 106, 450]],
    'paymentId byte, cId int, amount long'
).createOrReplaceTempView('payments')

In [None]:
sql("select * from payments").show()

In [None]:
spark.createDataFrame([
    [101, 'Jon'], 
    [102, 'Aron'],
    [103, 'Sam'],
    [107, 'Jack']],
    'cId int, name string'
).createOrReplaceTempView('customers')

In [None]:
sql("select * from customers").show()

### What's ExistenceJoin?

ExistenceJoin is a query evaluation optimization when there is a existential query (\[NOT\] EXISTS, IN) that rewrites to a LeftSemi join when there is an unrelated **disjunctive (OR)** condition

In [None]:
# Consider a plain existential query 
query_plain_exists="""
SELECT *
  FROM payments p
 WHERE p.cId IN
         (SELECT c.cId
            FROM customers c)
"""
sql(query_plain_exists).explain()

In [None]:
# A conjunctive existential query 
query_and_exists="""
SELECT *
  FROM payments p
 WHERE p.cId IN
         (SELECT c.cId
            FROM customers c)
   AND p.amount >= 400
"""
sql(query_and_exists).explain()

In [None]:
# A disjunctive existential query yields ExistenceJoin
query_or_exists="""
SELECT *
  FROM payments p
 WHERE p.cId IN
         (SELECT c.cId
            FROM customers c)
    OR p.amount >= 400
"""
sql(query_or_exists).explain()

So boils down to filtering an intermediate table with a Boolean "exists" column

# (exists#202 OR (amount#2L >= 400)), true

|paymentId|cId|amount|exists|
|---------|---|------|------|
|        1|101|  2500|true  |
|        2|102|  1110|true  |
|        3|103|   500|true  |
|        4|104|   400|false |
|        5|105|   150|false | 
|        6|106|   450|false |

In [None]:
spark.conf.set('spark.rapids.sql.enabled', True)

### ExistenceJoin on top of a hash-based join

In [None]:
sql(query_or_exists).explain()
sql(query_or_exists).show()

## BroadcastNestedLoop still falls back on CPU: = vs <

In [None]:
query_bnl_exists="""
SELECT *
  FROM payments p
 WHERE EXISTS
         (SELECT c.cId
            FROM customers c    
           WHERE c.cId > p.cId)
    OR p.amount <= 400
"""
spark.conf.set('spark.rapids.sql.explain', 'ALL')
sql(query_bnl_exists).explain()
spark.conf.set('spark.rapids.sql.explain', 'NONE')
sql(query_bnl_exists).show()