In [2]:
from pyspark.sql import SparkSession
spark = SparkSession\
  .builder\
  .master("local[2]")\
  .appName("SGD_Chapter08")\
  .getOrCreate()

In [3]:
person = spark.createDataFrame([
   (0, "Bill Chambers", 0, [100]),
   (1, "Matei Zaharia", 1, [500, 250, 100]),
   (2, "Michael Armbrust", 1, [250, 100])])\
   .toDF("id", "name", "gradute_program", "spark_status")

In [4]:
graduateProgram = spark.createDataFrame([
   (0, "Masters", "School of Information", "UC Berkeley"),
   (2, "Masters", "EECS", "UC Berkeley"),
   (1, "Ph.D.", "EECS", "UC Berkeley") ])\
   .toDF("id", "degree", "department", "school")

In [5]:
sparkStatus = spark.createDataFrame([
   (500, "Vice President"),
   (250, "PMC Member"),
   (100, "Contributor")])\
   .toDF("id", "status")

In [6]:
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")

In [7]:
joinExpression = (person["gradute_program"] == graduateProgram["id"])

In [8]:
joinExpression

Column<b'(gradute_program = id)'>

In [9]:
wrongJoinExpression = person["name"] == graduateProgram["school"]

In [10]:
wrongJoinExpression

Column<b'(name = school)'>

In [11]:
# Inner joins evaluate the keys in both of the DataFrames or tables and include (and join together) only the rows that evaluate to true.
spark.sql(
  "SELECT * FROM person JOIN graduateProgram\
   ON person.gradute_program = graduateProgram.id").show()

+---+----------------+---------------+---------------+---+-------+--------------------+-----------+
| id|            name|gradute_program|   spark_status| id| degree|          department|     school|
+---+----------------+---------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|              0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|              1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|              1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+---------------+---------------+---+-------+--------------------+-----------+



In [12]:
joinType = "inner"
person.join(graduateProgram, joinExpression, joinType).show()

+---+----------------+---------------+---------------+---+-------+--------------------+-----------+
| id|            name|gradute_program|   spark_status| id| degree|          department|     school|
+---+----------------+---------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|              0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|              1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|              1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+---------------+---------------+---+-------+--------------------+-----------+



In [13]:
joinType = "inner"
person.join(graduateProgram, wrongJoinExpression, joinType).show()

+---+----+---------------+------------+---+------+----------+------+
| id|name|gradute_program|spark_status| id|degree|department|school|
+---+----+---------------+------------+---+------+----------+------+
+---+----+---------------+------------+---+------+----------+------+



In [30]:
joinType = "inner"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+---+----------------+---------------+---------------+
| id| degree|          department|     school| id|            name|gradute_program|   spark_status|
+---+-------+--------------------+-----------+---+----------------+---------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|  0|   Bill Chambers|              0|          [100]|
|  1|  Ph.D.|                EECS|UC Berkeley|  1|   Matei Zaharia|              1|[500, 250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|  2|Michael Armbrust|              1|     [250, 100]|
+---+-------+--------------------+-----------+---+----------------+---------------+---------------+



<h3>Outer joins evaluate the keys in both of the DataFrames or tables and includes(and join together)
the rows that evaluate to true or false. If there is no equivalent row in either the left or right 
DataFrame, spark will insert null.<h3>

In [31]:
joinType = "outer"
person.join(graduateProgram, joinExpression, joinType).show()

+----+----------------+---------------+---------------+---+-------+--------------------+-----------+
|  id|            name|gradute_program|   spark_status| id| degree|          department|     school|
+----+----------------+---------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|              0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|              1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|              1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|           null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+---------------+---------------+---+-------+--------------------+-----------+



<h3>Left outer joins evaluate the keys in both of the dataframes or tables and includes all rows from
    the left DataFrame as well as any rows in the right DataFrame that have match in the left DataFrame. If there is no equivalent row in the right DataFrame, spark will insert null<h3>

In [32]:
joinType = "left_outer"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+----+----------------+---------------+---------------+
| id| degree|          department|     school|  id|            name|gradute_program|   spark_status|
+---+-------+--------------------+-----------+----+----------------+---------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|   0|   Bill Chambers|              0|          [100]|
|  1|  Ph.D.|                EECS|UC Berkeley|   1|   Matei Zaharia|              1|[500, 250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|   2|Michael Armbrust|              1|     [250, 100]|
|  2|Masters|                EECS|UC Berkeley|null|            null|           null|           null|
+---+-------+--------------------+-----------+----+----------------+---------------+---------------+



In [33]:
# one to one matching
joinType = "left_outer"
person.join(graduateProgram, joinExpression, joinType).show()

+---+----------------+---------------+---------------+---+-------+--------------------+-----------+
| id|            name|gradute_program|   spark_status| id| degree|          department|     school|
+---+----------------+---------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|              0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|              1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|              1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+---------------+---------------+---+-------+--------------------+-----------+



<h3> Right outer joins evaluate the keys in both of the Dataframes or tables and includes all rows
    from the right DataFrame as well as any rows in the left Dataframe that have a match in the right DataFrame. If there is no equivalent row in the left DataFrame, Spark will insert null<h3> 

In [34]:
joinType = "right_outer"
person.join(graduateProgram, joinExpression, joinType).show()

+----+----------------+---------------+---------------+---+-------+--------------------+-----------+
|  id|            name|gradute_program|   spark_status| id| degree|          department|     school|
+----+----------------+---------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|              0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|              1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|              1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|           null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+---------------+---------------+---+-------+--------------------+-----------+



In [35]:
joinType = "right_outer"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+---+----------------+---------------+---------------+
| id| degree|          department|     school| id|            name|gradute_program|   spark_status|
+---+-------+--------------------+-----------+---+----------------+---------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|  0|   Bill Chambers|              0|          [100]|
|  1|  Ph.D.|                EECS|UC Berkeley|  1|   Matei Zaharia|              1|[500, 250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|  2|Michael Armbrust|              1|     [250, 100]|
+---+-------+--------------------+-----------+---+----------------+---------------+---------------+



<h4>Semi joins are a bit of a departure from the other joins. They do not actually include any values from the right DataFrame.
    They only compare values to see if the value exists in the Second DataFrame. If the value does not exist, those  rows will be
    kept in the result, even if there are duplicate keys in the left DataFrame. Left semi joins act as a filters on a DataFrame<h4>

In [36]:
joinType = "left_semi"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



In [37]:
joinType = "left_semi"
person.join(graduateProgram, joinExpression, joinType).show()

+---+----------------+---------------+---------------+
| id|            name|gradute_program|   spark_status|
+---+----------------+---------------+---------------+
|  0|   Bill Chambers|              0|          [100]|
|  1|   Matei Zaharia|              1|[500, 250, 100]|
|  2|Michael Armbrust|              1|     [250, 100]|
+---+----------------+---------------+---------------+



<h2>Left anti joins are opposite of left semi joins<h2>

In [39]:
joinType = "left_anti"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+----------+-----------+
| id| degree|department|     school|
+---+-------+----------+-----------+
|  2|Masters|      EECS|UC Berkeley|
+---+-------+----------+-----------+



<h4>Natural joins make implicit gusses at the columns on which you would like to join. It finds matching columns and returns the results.
    Left, right and outer natural joins are all supported. !Implicit always dangerous!<h4>

<h4>cross joins or cartesian products will join every single row in the left DataFrame to every signle row in the right DataFrame.
Ex: if two tables have 1000 rows, table size will be 1000 x 1000 = 10,00,000<h4>

In [41]:
# Equivalent to inner join
joinType = "cross"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+---+----------------+---------------+---------------+
| id| degree|          department|     school| id|            name|gradute_program|   spark_status|
+---+-------+--------------------+-----------+---+----------------+---------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|  0|   Bill Chambers|              0|          [100]|
|  1|  Ph.D.|                EECS|UC Berkeley|  1|   Matei Zaharia|              1|[500, 250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|  2|Michael Armbrust|              1|     [250, 100]|
+---+-------+--------------------+-----------+---+----------------+---------------+---------------+



In [42]:
# To allow cross-joins without warnings or without spark trying to perform another join be setting spark.sql.crossjoin.enable to True in the session level conf.
graduateProgram.crossJoin(person).show()

+---+-------+--------------------+-----------+---+----------------+---------------+---------------+
| id| degree|          department|     school| id|            name|gradute_program|   spark_status|
+---+-------+--------------------+-----------+---+----------------+---------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|  0|   Bill Chambers|              0|          [100]|
|  0|Masters|School of Informa...|UC Berkeley|  1|   Matei Zaharia|              1|[500, 250, 100]|
|  0|Masters|School of Informa...|UC Berkeley|  2|Michael Armbrust|              1|     [250, 100]|
|  2|Masters|                EECS|UC Berkeley|  0|   Bill Chambers|              0|          [100]|
|  1|  Ph.D.|                EECS|UC Berkeley|  0|   Bill Chambers|              0|          [100]|
|  2|Masters|                EECS|UC Berkeley|  1|   Matei Zaharia|              1|[500, 250, 100]|
|  2|Masters|                EECS|UC Berkeley|  2|Michael Armbrust|              1|     [250, 100]|


<h1> Challenges when using joins <h1>
    <h4>Joins on Complex Types<h4>

In [62]:
from pyspark.sql.functions import expr
person.withColumnRenamed("id", "personId")\
  .join(sparkStatus, expr("array_contains(spark_status, id)")).show()

+--------+----------------+---------------+---------------+---+--------------+
|personId|            name|gradute_program|   spark_status| id|        status|
+--------+----------------+---------------+---------------+---+--------------+
|       0|   Bill Chambers|              0|          [100]|100|   Contributor|
|       1|   Matei Zaharia|              1|[500, 250, 100]|500|Vice President|
|       1|   Matei Zaharia|              1|[500, 250, 100]|250|    PMC Member|
|       1|   Matei Zaharia|              1|[500, 250, 100]|100|   Contributor|
|       2|Michael Armbrust|              1|     [250, 100]|250|    PMC Member|
|       2|Michael Armbrust|              1|     [250, 100]|100|   Contributor|
+--------+----------------+---------------+---------------+---+--------------+



In [52]:
sparkStatus.select("*").show()

+---+--------------+
| id|        status|
+---+--------------+
|500|Vice President|
|250|    PMC Member|
|100|   Contributor|
+---+--------------+



# Handling duplicate columns
    In a DataFrame, each column has a unique ID within spark's SQL engine, catalyst. This unique ID is purely internal and not something
    that you can directly reference.
    DataFrame with duplicate column names in two distinct situations
     1. The join expression that you specify does not remove one key from one of the input DataFrames and the keys have the same column name.
     2. Two columns on which you are not performing the join have the same name.

### Approach 1: Differnt Join expression
### Approach 2: Dropping the column after the join
### Approach 3: Renaming a column before the join

In [72]:
#Duplicate column join
gradProgramDupe = graduateProgram.withColumnRenamed("id", "gradute_program")
joinExpr = gradProgramDupe["gradute_program"] == person["gradute_program"]
person.join(gradProgramDupe, joinExpr).show()

+---+----------------+---------------+---------------+---------------+-------+--------------------+-----------+
| id|            name|gradute_program|   spark_status|gradute_program| degree|          department|     school|
+---+----------------+---------------+---------------+---------------+-------+--------------------+-----------+
|  0|   Bill Chambers|              0|          [100]|              0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|              1|[500, 250, 100]|              1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|              1|     [250, 100]|              1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+---------------+---------------+---------------+-------+--------------------+-----------+



In [76]:
# Araises the error
person.join(gradProgramDupe, joinExpr).select("gradute_program").show()

AnalysisException: "Reference 'gradute_program' is ambiguous, could be: gradute_program, gradute_program.;"

In [74]:
# App 1
person.join(gradProgramDupe, "gradute_program").select("gradute_program").show()

+---------------+
|gradute_program|
+---------------+
|              0|
|              1|
|              1|
+---------------+



In [77]:
# App 2
person.join(gradProgramDupe, joinExpr).drop(person.gradute_program).select("gradute_program").show()

+---------------+
|gradute_program|
+---------------+
|              0|
|              1|
|              1|
+---------------+



In [78]:
person.join(graduateProgram, joinExpression).explain()

== Physical Plan ==
*(5) SortMergeJoin [gradute_program#66L], [id#40L], Inner
:- *(2) Sort [gradute_program#66L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(gradute_program#66L, 200)
:     +- *(1) Project [_1#56L AS id#64L, _2#57 AS name#65, _3#58L AS gradute_program#66L, _4#59 AS spark_status#67]
:        +- *(1) Filter isnotnull(_3#58L)
:           +- Scan ExistingRDD[_1#56L,_2#57,_3#58L,_4#59]
+- *(4) Sort [id#40L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#40L, 200)
      +- *(3) Project [_1#32L AS id#40L, _2#33 AS degree#41, _3#34 AS department#42, _4#35 AS school#43]
         +- *(3) Filter isnotnull(_1#32L)
            +- Scan ExistingRDD[_1#32L,_2#33,_3#34,_4#35]


In [79]:
person.join(gradProgramDupe, joinExpr).explain()

== Physical Plan ==
*(5) SortMergeJoin [gradute_program#66L], [gradute_program#1195L], Inner
:- *(2) Sort [gradute_program#66L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(gradute_program#66L, 200)
:     +- *(1) Project [_1#56L AS id#64L, _2#57 AS name#65, _3#58L AS gradute_program#66L, _4#59 AS spark_status#67]
:        +- *(1) Filter isnotnull(_3#58L)
:           +- Scan ExistingRDD[_1#56L,_2#57,_3#58L,_4#59]
+- *(4) Sort [gradute_program#1195L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(gradute_program#1195L, 200)
      +- *(3) Project [_1#32L AS gradute_program#1195L, _2#33 AS degree#41, _3#34 AS department#42, _4#35 AS school#43]
         +- *(3) Filter isnotnull(_1#32L)
            +- Scan ExistingRDD[_1#32L,_2#33,_3#34,_4#35]


In [81]:
from pyspark.sql.functions import broadcast
person.join(broadcast(gradProgramDupe), joinExpr).explain()

== Physical Plan ==
*(2) BroadcastHashJoin [gradute_program#66L], [gradute_program#1195L], Inner, BuildRight
:- *(2) Project [_1#56L AS id#64L, _2#57 AS name#65, _3#58L AS gradute_program#66L, _4#59 AS spark_status#67]
:  +- *(2) Filter isnotnull(_3#58L)
:     +- Scan ExistingRDD[_1#56L,_2#57,_3#58L,_4#59]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
   +- *(1) Project [_1#32L AS gradute_program#1195L, _2#33 AS degree#41, _3#34 AS department#42, _4#35 AS school#43]
      +- *(1) Filter isnotnull(_1#32L)
         +- Scan ExistingRDD[_1#32L,_2#33,_3#34,_4#35]


## How spark performs joins
  1. node to node communication strategy
  2. per node computation strategy
### Big table to Big table join 
    : Shuffle join (all to all communication or broadcast join)
### Big table to small table
