## Joins

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("day3").getOrCreate()


your 131072x1 screen size is bogus. expect trouble
23/09/04 15:07:48 WARN Utils: Your hostname, SUSHAN resolves to a loopback address: 127.0.1.1; using 172.31.76.59 instead (on interface eth0)
23/09/04 15:07:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/04 15:07:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/09/04 15:07:52 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.



Question: You are given two DataFrames: employees_df and departments_df, which contain information about employees and their respective departments. The schema for the DataFrames is as follows:

employees_df schema:
|-- employee_id: integer (nullable = true)
|-- employee_name: string (nullable = true)
|-- department_id: integer (nullable = true)

departments_df schema:

|-- department_id: integer (nullable = true)
|-- department_name: string (nullable = true)

Employees DataFrame:
                                                                                
+-----------+-------------+-------------+
|employee_id|employee_name|department_id|
+-----------+-------------+-------------+
|1          |Pallavi mam  |101          |
|2          |Bob          |102          |
|3          |Cathy        |101          |
|4          |David        |103          |
|5          |Amrit Sir    |104          |
|6          |Alice        |null         |
|7          |Eva          |null         |
|8          |Frank        |110          |
|9          |Grace        |109          |
|10         |Henry        |null         |
+-----------+-------------+-------------+



Departments DataFrame:
+-------------+------------------------+
|department_id|department_name         |
+-------------+------------------------+
|101          |HR                      |
|102          |Engineering             |
|103          |Finance                 |
|104          |Marketing               |
|105          |Operations              |
|106          |null                    |
|107          |Operations              |
|108          |Production              |
|null         |Finance                 |
|110          |Research and Development|
+-------------+----------------------


In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

employees_schema = StructType([
    StructField("employee_id", IntegerType(), True),
    StructField("employee_name", StringType(), True),
    StructField("department_id", IntegerType(), True)
])

departments_schema = StructType([
    StructField("department_id", IntegerType(), True),
    StructField("department_name", StringType(), True)
])


In [3]:
employees_data = [
    (1, "Pallavi mam", 101),
    (2, "Bob", 102),
    (3, "Cathy", 101),
    (4, "David", 103),
    (5, "Amrit Sir", 104),
    (6, "Alice", None),
    (7, "Eva", None),
    (8, "Frank", 110),
    (9, "Grace", 109),
    (10, "Henry", None)
]

departments_data = [
    (101, "HR"),
    (102, "Engineering"),
    (103, "Finance"),
    (104, "Marketing"),
    (105, "Operations"),
    (106, None),
    (107, "Operations"),
    (108, "Production"),
    (None, "Finance"),
    (110, "Research and Development")
]

In [4]:
employees_df = spark.createDataFrame(employees_data, schema=employees_schema)
departments_df = spark.createDataFrame(departments_data, schema=departments_schema)

In [5]:
employees_df.show()
employees_df.printSchema()
departments_df.show()
departments_df.printSchema()

                                                                                

+-----------+-------------+-------------+
|employee_id|employee_name|department_id|
+-----------+-------------+-------------+
|          1|  Pallavi mam|          101|
|          2|          Bob|          102|
|          3|        Cathy|          101|
|          4|        David|          103|
|          5|    Amrit Sir|          104|
|          6|        Alice|         null|
|          7|          Eva|         null|
|          8|        Frank|          110|
|          9|        Grace|          109|
|         10|        Henry|         null|
+-----------+-------------+-------------+

root
 |-- employee_id: integer (nullable = true)
 |-- employee_name: string (nullable = true)
 |-- department_id: integer (nullable = true)

+-------------+--------------------+
|department_id|     department_name|
+-------------+--------------------+
|          101|                  HR|
|          102|         Engineering|
|          103|             Finance|
|          104|           Marketing|
|          

In [6]:
departments_df.createOrReplaceTempView("department_table")
employees_df.createOrReplaceTempView("employee_table")

### Join Expressions

Question: How can you combine the employees_df and departments_df DataFrames based on the common "department_id" column to get a combined DataFrame with employee names and their respective department names?

In [7]:
# syntax: df1.join(df2, joinExpression, joinType)
employees_df.join(departments_df, "department_id", "inner").show(truncate = False)



+-------------+-----------+-------------+------------------------+
|department_id|employee_id|employee_name|department_name         |
+-------------+-----------+-------------+------------------------+
|101          |1          |Pallavi mam  |HR                      |
|101          |3          |Cathy        |HR                      |
|102          |2          |Bob          |Engineering             |
|103          |4          |David        |Finance                 |
|104          |5          |Amrit Sir    |Marketing               |
|110          |8          |Frank        |Research and Development|
+-------------+-----------+-------------+------------------------+



                                                                                

In [8]:
innerJoin_sql = spark.sql("""
                select  e.department_id, e.employee_id, e.employee_name, d.department_name 
                from employee_table e
               join department_table d
               on e.department_id = d.department_id

""")
innerJoin_sql.show()

+-------------+-----------+-------------+--------------------+
|department_id|employee_id|employee_name|     department_name|
+-------------+-----------+-------------+--------------------+
|          101|          1|  Pallavi mam|                  HR|
|          101|          3|        Cathy|                  HR|
|          102|          2|          Bob|         Engineering|
|          103|          4|        David|             Finance|
|          104|          5|    Amrit Sir|           Marketing|
|          110|          8|        Frank|Research and Deve...|
+-------------+-----------+-------------+--------------------+



### Inner Joins

Question: How can you retrieve employee names and their respective department names for employees belonging to the "Engineering" department?

In [9]:
join_expr = employees_df['department_id']==departments_df['department_id']

employees_df.join(departments_df,join_expr,'inner').select('employee_name','department_name')\
    .where(departments_df['department_name']=='Engineering').show()

+-------------+---------------+
|employee_name|department_name|
+-------------+---------------+
|          Bob|    Engineering|
+-------------+---------------+



In [10]:
inner_join_sql = spark.sql("""
                select  e.employee_name, d.department_name 
                from employee_table e
               join department_table d
               on e.department_id = d.department_id
               where d.department_name = "Engineering"

""")
inner_join_sql.show()

+-------------+---------------+
|employee_name|department_name|
+-------------+---------------+
|          Bob|    Engineering|
+-------------+---------------+



### Outer Joins

Question: Retrieve a DataFrame that contains all employees along with their department names. If an employee doesn't have a department assigned, display "No Department".

In [11]:
join_expr = employees_df['department_id']==departments_df['department_id']

employees_df.join(departments_df, join_expr, 'outer')\
            .select('employee_name', departments_df['department_name'])\
            .na.fill('No Department')\
            .show()

+-------------+--------------------+
|employee_name|     department_name|
+-------------+--------------------+
|        Alice|       No Department|
|          Eva|       No Department|
|        Henry|       No Department|
|No Department|             Finance|
|  Pallavi mam|                  HR|
|        Cathy|                  HR|
|          Bob|         Engineering|
|        David|             Finance|
|    Amrit Sir|           Marketing|
|No Department|          Operations|
|No Department|       No Department|
|No Department|          Operations|
|No Department|          Production|
|        Grace|       No Department|
|        Frank|Research and Deve...|
+-------------+--------------------+



In [12]:
outer_joins_sql = spark.sql("""
                select  coalesce(e.employee_name, "No Department") as employee_name,
                        coalesce(d.department_name, "No Department") as department_name
                from employee_table e
               full outer join department_table d
               on e.department_id = d.department_id
""")
outer_joins_sql.show()

+-------------+--------------------+
|employee_name|     department_name|
+-------------+--------------------+
|        Alice|       No Department|
|          Eva|       No Department|
|        Henry|       No Department|
|No Department|             Finance|
|  Pallavi mam|                  HR|
|        Cathy|                  HR|
|          Bob|         Engineering|
|        David|             Finance|
|    Amrit Sir|           Marketing|
|No Department|          Operations|
|No Department|       No Department|
|No Department|          Operations|
|No Department|          Production|
|        Grace|       No Department|
|        Frank|Research and Deve...|
+-------------+--------------------+



### Left Outer Joins

Question: List all employees along with their department names. If an employee doesn't have a department assigned, display "No Department".

In [13]:
employees_df.join(departments_df, join_expr, 'left_outer')\
            .select('employee_name',departments_df['department_name'])\
            .na.fill('No Department')\
            .show()

+-------------+--------------------+
|employee_name|     department_name|
+-------------+--------------------+
|  Pallavi mam|                  HR|
|          Bob|         Engineering|
|        Cathy|                  HR|
|        David|             Finance|
|    Amrit Sir|           Marketing|
|        Alice|       No Department|
|          Eva|       No Department|
|        Frank|Research and Deve...|
|        Henry|       No Department|
|        Grace|       No Department|
+-------------+--------------------+



In [14]:
left_outer_sql = spark.sql("""
                select  e.employee_name,
                        coalesce(d.department_name, "No Department") as department_name
                from employee_table e
               left outer join department_table d
               on e.department_id = d.department_id
""")
left_outer_sql.show()

+-------------+--------------------+
|employee_name|     department_name|
+-------------+--------------------+
|  Pallavi mam|                  HR|
|          Bob|         Engineering|
|        Cathy|                  HR|
|        David|             Finance|
|    Amrit Sir|           Marketing|
|        Alice|       No Department|
|          Eva|       No Department|
|        Frank|Research and Deve...|
|        Henry|       No Department|
|        Grace|       No Department|
+-------------+--------------------+



### Right Outer Joins

Question: Display a list of departments along with employee names. If a department has no employees, display "No Employees".



In [15]:
employees_df.join(departments_df, join_expr, 'right_outer')\
            .select(departments_df['department_name'], 'employee_name')\
            .na.fill('No Employee')\
            .show()

+--------------------+-------------+
|     department_name|employee_name|
+--------------------+-------------+
|                  HR|        Cathy|
|                  HR|  Pallavi mam|
|         Engineering|          Bob|
|             Finance|        David|
|          Operations|  No Employee|
|           Marketing|    Amrit Sir|
|         No Employee|  No Employee|
|          Operations|  No Employee|
|          Production|  No Employee|
|             Finance|  No Employee|
|Research and Deve...|        Frank|
+--------------------+-------------+



In [16]:
right_outer_sql = spark.sql("""
                select coalesce(d.department_name, "No Employee") as department_name, 
                        coalesce(e.employee_name, "No Employee") as employee_name
                from employee_table e
               right outer join department_table d
               on e.department_id = d.department_id
""")
right_outer_sql.show()

+--------------------+-------------+
|     department_name|employee_name|
+--------------------+-------------+
|                  HR|        Cathy|
|                  HR|  Pallavi mam|
|         Engineering|          Bob|
|             Finance|        David|
|          Operations|  No Employee|
|           Marketing|    Amrit Sir|
|         No Employee|  No Employee|
|          Operations|  No Employee|
|          Production|  No Employee|
|             Finance|  No Employee|
|Research and Deve...|        Frank|
+--------------------+-------------+



### Left Semi Joins

Question: Retrieve a DataFrame that includes employee names for departments that have employees.



In [17]:
employees_df.join(departments_df, join_expr, 'left_semi')\
            .select('employee_name')\
            .show()

+-------------+
|employee_name|
+-------------+
|  Pallavi mam|
|        Cathy|
|          Bob|
|        David|
|    Amrit Sir|
|        Frank|
+-------------+



In [18]:
left_semi_sql = spark.sql("""
                select  e.employee_name 
                from employee_table e
               left semi join department_table d
               on e.department_id = d.department_id

""")
left_semi_sql.show()

+-------------+
|employee_name|
+-------------+
|  Pallavi mam|
|        Cathy|
|          Bob|
|        David|
|    Amrit Sir|
|        Frank|
+-------------+



### Left Anti Joins

Question: Find the employees who don't belong to any department.

In [19]:
employees_df.join(departments_df, join_expr, 'left_anti')\
            .select('employee_name')\
            .show()

+-------------+
|employee_name|
+-------------+
|        Alice|
|          Eva|
|        Henry|
|        Grace|
+-------------+



In [20]:
left_anti_sql = spark.sql("""
                select e.employee_name 
                from employee_table e
               left anti join department_table d
               on e.department_id = d.department_id

""")
left_anti_sql.show()

+-------------+
|employee_name|
+-------------+
|        Alice|
|          Eva|
|        Henry|
|        Grace|
+-------------+



### Cross (Cartesian) Joins

Question: Create a DataFrame that contains all possible combinations of employees and departments.

In [21]:
employees_df.crossJoin(departments_df).show()

+-----------+-------------+-------------+-------------+--------------------+
|employee_id|employee_name|department_id|department_id|     department_name|
+-----------+-------------+-------------+-------------+--------------------+
|          1|  Pallavi mam|          101|          101|                  HR|
|          1|  Pallavi mam|          101|          102|         Engineering|
|          1|  Pallavi mam|          101|          103|             Finance|
|          1|  Pallavi mam|          101|          104|           Marketing|
|          1|  Pallavi mam|          101|          105|          Operations|
|          1|  Pallavi mam|          101|          106|                null|
|          1|  Pallavi mam|          101|          107|          Operations|
|          1|  Pallavi mam|          101|          108|          Production|
|          1|  Pallavi mam|          101|         null|             Finance|
|          1|  Pallavi mam|          101|          110|Research and Deve...|

                                                                                

In [22]:
cross_join_sql = spark.sql("""
                select * 
                from employee_table e
               cross join department_table d
""")
cross_join_sql.show()

+-----------+-------------+-------------+-------------+--------------------+
|employee_id|employee_name|department_id|department_id|     department_name|
+-----------+-------------+-------------+-------------+--------------------+
|          1|  Pallavi mam|          101|          101|                  HR|
|          1|  Pallavi mam|          101|          102|         Engineering|
|          1|  Pallavi mam|          101|          103|             Finance|
|          1|  Pallavi mam|          101|          104|           Marketing|
|          1|  Pallavi mam|          101|          105|          Operations|
|          1|  Pallavi mam|          101|          106|                null|
|          1|  Pallavi mam|          101|          107|          Operations|
|          1|  Pallavi mam|          101|          108|          Production|
|          1|  Pallavi mam|          101|         null|             Finance|
|          1|  Pallavi mam|          101|          110|Research and Deve...|