In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Joins and Data Partitions")
    .master("local[*]")
    .getOrCreate()
)

spark

In [2]:
emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, date string"

emp_data = [
    ("E01", "D01", "John", "30", "M", "3000", "2022-01-01"),
    ("E02", "D02", "Jane", "25", "F", "4000", "2022-02-15"),
    ("E03", "D01", "Jake", "28", "M", "3500", "2022-03-10"),
    ("E04", "D03", "Jill", "35", "F", "5000", "2022-04-05"),
    ("E05", "D02", "Bill", "29", "M", "3800", "2022-05-20")
]

emp_df = spark.createDataFrame(emp_data, schema=emp_schema)

emp_df.show()

dept_schema = "department_id string, department_name string, city string, country string, budget string"

dept_data = [
    ("D01", "HR", "New York", "USA", "100000"),
    ("D02", "Engineering", "San Francisco", "USA", "200000"),
    ("D03", "Sales", "London", "UK", "150000")
]

dept_df = spark.createDataFrame(dept_data, schema=dept_schema)

dept_df.show()

+-----------+-------------+----+---+------+------+----------+
|employee_id|department_id|name|age|gender|salary|      date|
+-----------+-------------+----+---+------+------+----------+
|        E01|          D01|John| 30|     M|  3000|2022-01-01|
|        E02|          D02|Jane| 25|     F|  4000|2022-02-15|
|        E03|          D01|Jake| 28|     M|  3500|2022-03-10|
|        E04|          D03|Jill| 35|     F|  5000|2022-04-05|
|        E05|          D02|Bill| 29|     M|  3800|2022-05-20|
+-----------+-------------+----+---+------+------+----------+

+-------------+---------------+-------------+-------+------+
|department_id|department_name|         city|country|budget|
+-------------+---------------+-------------+-------+------+
|          D01|             HR|     New York|    USA|100000|
|          D02|    Engineering|San Francisco|    USA|200000|
|          D03|          Sales|       London|     UK|150000|
+-------------+---------------+-------------+-------+------+



In [3]:
emp_df.rdd.getNumPartitions()

20

In [4]:
dept_df.rdd.getNumPartitions()

20

In [6]:
# repartition -> + - partitions
# it includes data shuffling 

emp_par = emp_df.repartition(40)
emp_par.rdd.getNumPartitions()

40

In [7]:
# coalesce -> - partions
# it doesnt include data shuffling 

emp_par = emp_df.repartition(20)
emp_par.rdd.getNumPartitions()

20

In [8]:
emp_part = emp_df.repartition(4,"department_id")
emp_part.rdd.getNumPartitions()

4

In [9]:
# partition info

from pyspark.sql.functions import spark_partition_id
emp1 = emp_df.withColumn("partition_num",spark_partition_id())
emp1.show()

+-----------+-------------+----+---+------+------+----------+-------------+
|employee_id|department_id|name|age|gender|salary|      date|partition_num|
+-----------+-------------+----+---+------+------+----------+-------------+
|        E01|          D01|John| 30|     M|  3000|2022-01-01|            3|
|        E02|          D02|Jane| 25|     F|  4000|2022-02-15|            7|
|        E03|          D01|Jake| 28|     M|  3500|2022-03-10|           11|
|        E04|          D03|Jill| 35|     F|  5000|2022-04-05|           15|
|        E05|          D02|Bill| 29|     M|  3800|2022-05-20|           19|
+-----------+-------------+----+---+------+------+----------+-------------+



In [10]:
emp1 = emp_df.repartition(4,"department_id").withColumn("partition_num",spark_partition_id())
emp1.show()

+-----------+-------------+----+---+------+------+----------+-------------+
|employee_id|department_id|name|age|gender|salary|      date|partition_num|
+-----------+-------------+----+---+------+------+----------+-------------+
|        E01|          D01|John| 30|     M|  3000|2022-01-01|            0|
|        E03|          D01|Jake| 28|     M|  3500|2022-03-10|            0|
|        E02|          D02|Jane| 25|     F|  4000|2022-02-15|            1|
|        E04|          D03|Jill| 35|     F|  5000|2022-04-05|            1|
|        E05|          D02|Bill| 29|     M|  3800|2022-05-20|            1|
+-----------+-------------+----+---+------+------+----------+-------------+



In [15]:
# inner join datasets
# select e.emp_name, d.dept_name, d.department_id, e.salary
# from emp e inner join dept d on emp.department_id = dept.department_id

df_joined = emp_df.alias("e").join(dept_df.alias("d" ), how="inner", on=emp_df.department_id==dept_df.department_id)
df_joined.show()

+-----------+-------------+----+---+------+------+----------+-------------+---------------+-------------+-------+------+
|employee_id|department_id|name|age|gender|salary|      date|department_id|department_name|         city|country|budget|
+-----------+-------------+----+---+------+------+----------+-------------+---------------+-------------+-------+------+
|        E01|          D01|John| 30|     M|  3000|2022-01-01|          D01|             HR|     New York|    USA|100000|
|        E03|          D01|Jake| 28|     M|  3500|2022-03-10|          D01|             HR|     New York|    USA|100000|
|        E02|          D02|Jane| 25|     F|  4000|2022-02-15|          D02|    Engineering|San Francisco|    USA|200000|
|        E05|          D02|Bill| 29|     M|  3800|2022-05-20|          D02|    Engineering|San Francisco|    USA|200000|
|        E04|          D03|Jill| 35|     F|  5000|2022-04-05|          D03|          Sales|       London|     UK|150000|
+-----------+-------------+----+

In [17]:
df_joined.select("e.name", "d.department_id", "d.department_name", "e.salary").show()

+----+-------------+---------------+------+
|name|department_id|department_name|salary|
+----+-------------+---------------+------+
|John|          D01|             HR|  3000|
|Jake|          D01|             HR|  3500|
|Jane|          D02|    Engineering|  4000|
|Bill|          D02|    Engineering|  3800|
|Jill|          D03|          Sales|  5000|
+----+-------------+---------------+------+



In [20]:
# left outer join datasets
# select e.emp_name, d.department_id, e.salary

df_joined = emp_df.alias("e").join(dept_df.alias("d"), how="left_outer", on=emp_df.department_id==dept_df.department_id)
df_joined.show()

+-----------+-------------+----+---+------+------+----------+-------------+---------------+-------------+-------+------+
|employee_id|department_id|name|age|gender|salary|      date|department_id|department_name|         city|country|budget|
+-----------+-------------+----+---+------+------+----------+-------------+---------------+-------------+-------+------+
|        E01|          D01|John| 30|     M|  3000|2022-01-01|          D01|             HR|     New York|    USA|100000|
|        E02|          D02|Jane| 25|     F|  4000|2022-02-15|          D02|    Engineering|San Francisco|    USA|200000|
|        E03|          D01|Jake| 28|     M|  3500|2022-03-10|          D01|             HR|     New York|    USA|100000|
|        E04|          D03|Jill| 35|     F|  5000|2022-04-05|          D03|          Sales|       London|     UK|150000|
|        E05|          D02|Bill| 29|     M|  3800|2022-05-20|          D02|    Engineering|San Francisco|    USA|200000|
+-----------+-------------+----+