## Joins

In [1]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("day3").getOrCreate()


23/09/04 15:17:49 WARN Utils: Your hostname, anujs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.68 instead (on interface en0)
23/09/04 15:17:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/04 15:17:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/09/04 15:17:50 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


23/09/04 15:18:06 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors



Question: You are given two DataFrames: employees_df and departments_df, which contain information about employees and their respective departments. The schema for the DataFrames is as follows:

employees_df schema:
|-- employee_id: integer (nullable = true)
|-- employee_name: string (nullable = true)
|-- department_id: integer (nullable = true)

departments_df schema:

|-- department_id: integer (nullable = true)
|-- department_name: string (nullable = true)

Employees DataFrame:
                                                                                
+-----------+-------------+-------------+
|employee_id|employee_name|department_id|
+-----------+-------------+-------------+
|1          |Pallavi mam  |101          |
|2          |Bob          |102          |
|3          |Cathy        |101          |
|4          |David        |103          |
|5          |Amrit Sir    |104          |
|6          |Alice        |null         |
|7          |Eva          |null         |
|8          |Frank        |110          |
|9          |Grace        |109          |
|10         |Henry        |null         |
+-----------+-------------+-------------+



Departments DataFrame:
+-------------+------------------------+
|department_id|department_name         |
+-------------+------------------------+
|101          |HR                      |
|102          |Engineering             |
|103          |Finance                 |
|104          |Marketing               |
|105          |Operations              |
|106          |null                    |
|107          |Operations              |
|108          |Production              |
|null         |Finance                 |
|110          |Research and Development|
+-------------+----------------------


In [2]:

#Importing necessary library 
from pyspark.sql.types import StructField,StructType,StringType,IntegerType

employee_data = [(1,'Pallavi mam',101),
                 (2,'Bob',102),
                 (3,'Cathy',101),
                 (4,'David',103),
                 (5,'Amrit Sir',104),
                 (6,'Alice',None),
                 (7,'Eva',None),
                 (8,'Frank',110),
                 (9,'Grace',109),
                 (10,'Henry',None)]

Department_Data = [(101,'Hr'),
                   (102,'Engineering'),
                   (103,'Finance'),
                   (104,'Marketing'),
                   (105,'Operation'),
                   (106,None),
                   (107,'Operations'),
                   (108,'Production'),
                   (None,'Finance'),
                   (110,'Research and Development')]

employee_schema = StructType([
    StructField("Employee_Id",IntegerType(),True),
    StructField("Employee_name",StringType(),True),
    StructField("department_id",IntegerType(),True)
])

department_schema = StructType([
    StructField("department_id",IntegerType(),True),
    StructField("department_name",StringType(),True)
])

employee_df  = spark.createDataFrame(data=employee_data,schema=employee_schema)
department_df = spark.createDataFrame(data=Department_Data, schema=department_schema)

print("Employee Dataframe")
employee_df.printSchema()
employee_df.show()

print("Department Dataframe")
department_df.printSchema()
department_df.show(truncate=False)

Employee Dataframe
root
 |-- Employee_Id: integer (nullable = true)
 |-- Employee_name: string (nullable = true)
 |-- department_id: integer (nullable = true)



                                                                                

+-----------+-------------+-------------+
|Employee_Id|Employee_name|department_id|
+-----------+-------------+-------------+
|          1|  Pallavi mam|          101|
|          2|          Bob|          102|
|          3|        Cathy|          101|
|          4|        David|          103|
|          5|    Amrit Sir|          104|
|          6|        Alice|         null|
|          7|          Eva|         null|
|          8|        Frank|          110|
|          9|        Grace|          109|
|         10|        Henry|         null|
+-----------+-------------+-------------+

Department Dataframe
root
 |-- department_id: integer (nullable = true)
 |-- department_name: string (nullable = true)

+-------------+------------------------+
|department_id|department_name         |
+-------------+------------------------+
|101          |Hr                      |
|102          |Engineering             |
|103          |Finance                 |
|104          |Marketing               |
|105

### Join Expressions

Question: How can you combine the employees_df and departments_df DataFrames based on the common "department_id" column to get a combined DataFrame with employee names and their respective department names?

In [3]:
department_df.createOrReplaceTempView("department")
employee_df.createOrReplaceTempView("employee")

In [4]:
join_exp = spark.sql("""
                    select d.department_id, e.employee_id, e.employee_name, d.department_name
                    from department d
                    join employee e
                    on d.department_id = e.department_id
""")
                     
join_exp.show()

+-------------+-----------+-------------+--------------------+
|department_id|employee_id|employee_name|     department_name|
+-------------+-----------+-------------+--------------------+
|          101|          1|  Pallavi mam|                  Hr|
|          101|          3|        Cathy|                  Hr|
|          102|          2|          Bob|         Engineering|
|          103|          4|        David|             Finance|
|          104|          5|    Amrit Sir|           Marketing|
|          110|          8|        Frank|Research and Deve...|
+-------------+-----------+-------------+--------------------+



### Inner Joins

Question: How can you retrieve employee names and their respective department names for employees belonging to the "Engineering" department?

In [5]:

inner_join_sql = spark.sql("""
                            select e.employee_name, d.department_name
                           from department d
                           join employee e
                           on e.department_id = d.department_id
                           where d.department_name = 'Engineering' 
 """)

inner_join_sql.show()

+-------------+---------------+
|employee_name|department_name|
+-------------+---------------+
|          Bob|    Engineering|
+-------------+---------------+



### Outer Joins

Question: Retrieve a DataFrame that contains all employees along with their department names. If an employee doesn't have a department assigned, display "No Department".

In [6]:
outer_join_sql = spark.sql("""
                            select coalesce(e.employee_name, 'no Employee') as employee_name, 
                                   coalesce(d.department_name, 'no department') as department_name
                           from employee e
                           full outer join department d
                           on d.department_id = e.department_id
"""
)

outer_join_sql.show()

+-------------+--------------------+
|employee_name|     department_name|
+-------------+--------------------+
|        Alice|       no department|
|          Eva|       no department|
|        Henry|       no department|
|  no Employee|             Finance|
|  Pallavi mam|                  Hr|
|        Cathy|                  Hr|
|          Bob|         Engineering|
|        David|             Finance|
|    Amrit Sir|           Marketing|
|  no Employee|           Operation|
|  no Employee|       no department|
|  no Employee|          Operations|
|  no Employee|          Production|
|        Grace|       no department|
|        Frank|Research and Deve...|
+-------------+--------------------+



### Left Outer Joins

Question: List all employees along with their department names. If an employee doesn't have a department assigned, display "No Department".

In [7]:
left_outer_join_sql = spark.sql("""
                                    select e.employee_name,
                                            coalesce(d.department_name, 'no department') as department_name
                                    from employee e
                                    left outer join department d
                                    on e.department_id = d.department_id       

                                """)

left_outer_join_sql.show()

+-------------+--------------------+
|employee_name|     department_name|
+-------------+--------------------+
|  Pallavi mam|                  Hr|
|          Bob|         Engineering|
|        Cathy|                  Hr|
|        David|             Finance|
|    Amrit Sir|           Marketing|
|        Alice|       no department|
|          Eva|       no department|
|        Frank|Research and Deve...|
|        Henry|       no department|
|        Grace|       no department|
+-------------+--------------------+



### Right Outer Joins

Question: Display a list of departments along with employee names. If a department has no employees, display "No Employees".



In [8]:
right_outer_join_sql = spark.sql("""
                                select d.department_name as department_name,
                                       coalesce(e.employee_name,'No employee') as employee_name
                                 from employee e
                                 right outer join department d
                                 on e.department_id = d.department_id
                                """)

right_outer_join_sql.show()

+--------------------+-------------+
|     department_name|employee_name|
+--------------------+-------------+
|                  Hr|        Cathy|
|                  Hr|  Pallavi mam|
|         Engineering|          Bob|
|             Finance|        David|
|           Operation|  No employee|
|           Marketing|    Amrit Sir|
|                null|  No employee|
|          Operations|  No employee|
|          Production|  No employee|
|             Finance|  No employee|
|Research and Deve...|        Frank|
+--------------------+-------------+



### Left Semi Joins

Question: Retrieve a DataFrame that includes employee names for departments that have employees.



In [9]:
left_semi_join_sql = spark.sql("""
                                select e.employee_name
                               from employee e
                               left semi join department d
                               on e.department_id = d.department_id
                                
                               """ )

left_semi_join_sql.show()

+-------------+
|employee_name|
+-------------+
|  Pallavi mam|
|        Cathy|
|          Bob|
|        David|
|    Amrit Sir|
|        Frank|
+-------------+



### Left Anti Joins

Question: Find the employees who don't belong to any department.

In [10]:
left_anti_join_sql = spark.sql("""
                                select e.employee_name
                               from employee e
                               left anti join department d
                               on e.department_id = d.department_id
                                
                               """ )

left_anti_join_sql.show()

+-------------+
|employee_name|
+-------------+
|        Alice|
|          Eva|
|        Henry|
|        Grace|
+-------------+



### Cross (Cartesian) Joins

Question: Create a DataFrame that contains all possible combinations of employees and departments.

In [11]:
cross_join_sql = spark.sql("""
                            select * 
                           from employee
                           cross join department
""")
                           
cross_join_sql.show()



+-----------+-------------+-------------+-------------+--------------------+
|Employee_Id|Employee_name|department_id|department_id|     department_name|
+-----------+-------------+-------------+-------------+--------------------+
|          1|  Pallavi mam|          101|          101|                  Hr|
|          1|  Pallavi mam|          101|          102|         Engineering|
|          1|  Pallavi mam|          101|          103|             Finance|
|          1|  Pallavi mam|          101|          104|           Marketing|
|          1|  Pallavi mam|          101|          105|           Operation|
|          1|  Pallavi mam|          101|          106|                null|
|          1|  Pallavi mam|          101|          107|          Operations|
|          1|  Pallavi mam|          101|          108|          Production|
|          1|  Pallavi mam|          101|         null|             Finance|
|          1|  Pallavi mam|          101|          110|Research and Deve...|

                                                                                