In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, broadcast

spark = SparkSession.builder.appName("join_practice").getOrCreate()

employees = [
(1,"Manasa",10,50000,None),
(2,"Mevin",20,60000,1),
(3,"Pavithra",10,45000,1),
(4,"Menakha",30,70000,2),
(5,"Arun",None,40000,None)
]

departments = [
(10,"HR"),
(20,"IT"),
(30,"Finance"),
(40,"Admin")
]

df1 = spark.createDataFrame(employees,
["emp_id","name","dept_id","salary","manager_id"])

df2 = spark.createDataFrame(departments,
["dept_id","dept_name"])


In [7]:
df1.join(df2 , "dept_id" , "left").show()


+-------+------+--------+------+----------+---------+
|dept_id|emp_id|    name|salary|manager_id|dept_name|
+-------+------+--------+------+----------+---------+
|     10|     1|  Manasa| 50000|      NULL|       HR|
|     20|     2|   Mevin| 60000|         1|       IT|
|   NULL|     5|    Arun| 40000|      NULL|     NULL|
|     10|     3|Pavithra| 45000|         1|       HR|
|     30|     4| Menakha| 70000|         2|  Finance|
+-------+------+--------+------+----------+---------+



In [22]:
# df1.join(df2 ,(df1.dept_id == df2.dept_id) & (df1.salary > 500000), "inner" ).show()
df_emp.join(
    df_dept,
    (df_emp.dept_id == df_dept.dept_id) & (df_emp.salary > 50000),
    "inner"
).show()

+------+-------+-------+------+----------+-------+---------+
|emp_id|   name|dept_id|salary|manager_id|dept_id|dept_name|
+------+-------+-------+------+----------+-------+---------+
|     2|  Mevin|     20| 60000|         1|     20|       IT|
|     4|Menakha|     30| 70000|         2|     30|  Finance|
+------+-------+-------+------+----------+-------+---------+



In [13]:
# DIFFERENT COLUMN NAME JOIN

df_dept2 = df_dept.withColumnRenamed("dept_id","id")

df_emp.join(
    df_dept2,
    df_emp.dept_id == df_dept2.id,
    "inner"
).show()

+------+--------+-------+------+----------+---+---------+
|emp_id|    name|dept_id|salary|manager_id| id|dept_name|
+------+--------+-------+------+----------+---+---------+
|     1|  Manasa|     10| 50000|      NULL| 10|       HR|
|     3|Pavithra|     10| 45000|         1| 10|       HR|
|     2|   Mevin|     20| 60000|         1| 20|       IT|
|     4| Menakha|     30| 70000|         2| 30|  Finance|
+------+--------+-------+------+----------+---+---------+



In [19]:
# self join:

df1.alias("e").join(
    df1.alias("m") , col("e.emp_id")==col("m.manager_id") , "left").select(col("e.name").alias("employee"),
            col("m.name").alias("manager")
      ).show()


+--------+--------+
|employee| manager|
+--------+--------+
|  Manasa|Pavithra|
|  Manasa|   Mevin|
|   Mevin| Menakha|
|    Arun|    NULL|
|Pavithra|    NULL|
| Menakha|    NULL|
+--------+--------+



In [28]:
# braod cast join :
# Rule: small table < ~10MB (default auto threshold)
# This avoids shuffle of the big table â†’ faster performance.

# Basic usage:
df1.join(broadcast(df2),"dept_id","inner").show()

# With condition:
df1.join(
    broadcast(df2),
    df1.dept_id == df2.dept_id,
    "left"
).show()

+-------+------+--------+------+----------+---------+
|dept_id|emp_id|    name|salary|manager_id|dept_name|
+-------+------+--------+------+----------+---------+
|     10|     1|  Manasa| 50000|      NULL|       HR|
|     20|     2|   Mevin| 60000|         1|       IT|
|     10|     3|Pavithra| 45000|         1|       HR|
|     30|     4| Menakha| 70000|         2|  Finance|
+-------+------+--------+------+----------+---------+

+------+--------+-------+------+----------+-------+---------+
|emp_id|    name|dept_id|salary|manager_id|dept_id|dept_name|
+------+--------+-------+------+----------+-------+---------+
|     1|  Manasa|     10| 50000|      NULL|     10|       HR|
|     2|   Mevin|     20| 60000|         1|     20|       IT|
|     3|Pavithra|     10| 45000|         1|     10|       HR|
|     4| Menakha|     30| 70000|         2|     30|  Finance|
|     5|    Arun|   NULL| 40000|      NULL|   NULL|     NULL|
+------+--------+-------+------+----------+-------+---------+



In [21]:
# Find employees earning more than their manager

# df1.join(df2."")

# Find departments with no employees (anti join)

# Count employees per department (join + groupBy)

# Show only employees working in HR

# Find employees without manager

df_emp.filter(
    col("manager_id").isNull()
).select(
    "name","salary"
).show()

+------+------+
|  name|salary|
+------+------+
|Manasa| 50000|
|  Arun| 40000|
+------+------+

