# Joins

In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession\
   .builder\
   .getOrCreate()

First I create two DataFrames with students and university data, which I will use to checkout Spark's join operations.

In [None]:
from pyspark.sql.types import NullType
leftDF = spark.createDataFrame(
    [
        (1, "Peter Pan", "1999-04-23", 90, 3),
        (2, "Alice Smith", "2000-05-15", 100, 2),
        (3, "Bob Miller", "2000-09-01", 80, 3),
        (4, "April May", "1999-09-30", 110, 1),
        (5, "Billie Jean", "1990-07-12", 360, None)
    ],
    ["leftid", "name", "date-of-birth", "creditscore", "universityid"]
)
leftDF.show()

In [None]:
rightDF = spark.createDataFrame(
    [
        (1, "Harvard", "Massachusetts", 1636),
        (2, "MIT", "Massachusetts", 1861),
        (3, "Stanford", "California", 1891),
        (4, "UC Berkeley", "California", 1868),
        (5, "Princeton", "New Jersey", 1746)
    ],
    ["rightid", "name", "state", "founded"]
)
rightDF.show()

## Join Types
To perform a join I need to define two things, a join expression and the join type. The join expression can be any complex expressions that evaluates to True or False.

In [None]:
joinExpression = leftDF["universityid"] == rightDF["rightid"]
joinType = "inner"

leftDF.join(rightDF, on=joinExpression, how=joinType).show()

In [None]:
joinType = "left_outer"
leftDF.join(rightDF, joinExpression, joinType).show()

In [None]:
joinType = "right_outer"
leftDF.join(rightDF, joinExpression, joinType).show()

In [None]:
joinType = "full_outer"
leftDF.join(rightDF, joinExpression, joinType).show()

Join types:
* inner (is the default)
* cross 
* outer, full, full_outer (all are equivalent) 
* left, left_outer (both are equivalent) 
* right, right_outer (both are equivalent) 
* left_semi
* left_anti

Especially the last to join types are interesting because they implement a do-exist respectively do-not-exist logic. The columns of the right side do not appear in the result set. The right side is just used for the existence check. Using these join types saves me from removing many columns from the result DataFrame if I'm not interested in the column values of the right side.

In [None]:
joinType = "left_semi"
leftDF.join(rightDF, joinExpression, joinType).show()

In [None]:
joinType = "left_anti"
leftDF.join(rightDF, joinExpression, joinType).show()

I'm wondering why there is an additiona `crossJoin()` function, even though I can use `join()` with join type `cross`. So I want to double check if both functions will provide trhe same result.

In [None]:
leftDF.join(rightDF, joinExpression, how="cross").show()

It looks to me, that `join(..., how="cross")` doesn't work because it provides the same result as an inner join. What about `crossJoin()`?

In [None]:
leftDF.crossJoin(rightDF).show()

Yes, `crossJoin()` works properly.

Just to remind me: I can do all this joining stuff also with my well-known SQL.

In [None]:
leftDF.createOrReplaceTempView("leftTable")
rightDF.createOrReplaceTempView("rightTable")

spark.sql("""
    SELECT *
    FROM leftTable INNER JOIN rightTable ON universityid = rightid"""
).show()

## Shuffle Joins vs. Broadcast Joins
To handle very large data sets, DataFrames get partitioned and these partitions are spread accross the Spark cluster.  Therefore join operations are like to be wide transformations where Spark has to shuffle the data between all nodes to match corresponding rows of the joined DataFrames. As I learned on day 3, shuffle operations are wide transformations which cannot be perfomred entirely in-memory. Additionally, this all-to-all communication between the participating nodes increases heavily with the more nodes get involved. Such *shuffle joins* can cause performance issues in the data pipeline so I would like to avoid them whenever possible.

There is actually a chance to avoid a shuffle join when at least one of the joined DataFrames is small enough to fit into each nodes' memory. In that case it can be broadcasted to all nodes so that the join operations can be applied locally and in-memory without further shuffling. To initiate a broadcast join, I hust have to appley the `broadcast()` function on the small table.

In [None]:
from pyspark.sql.functions import broadcast

joinExpression = leftDF["universityid"] == rightDF["rightid"]
joinType = "inner"

leftDF.join(broadcast(rightDF), on=joinExpression, how=joinType).explain()

The explain plan shows that two steps of the join transformation: one `BroadcastExchange` step followed by one -> `BroadcastHashJoin` step.
    
Without using the `broadcast()` function the explain plan shows the shuffle join comprising two parallel `Exchange hashpartitioning` stepts and one final `SortMergeJoin` step

In [None]:
leftDF.join(rightDF, on=joinExpression, how=joinType).explain()