In [1]:
# data repartitioning using repartition/coalesce
# joins in pyspark

In [2]:
from pyspark.sql import SparkSession


spark = (
    SparkSession
    .builder
    .appName("spark introduction")
    .master("local[*]")
    .getOrCreate()
)

In [51]:
# Updated schema with gender and hire_date
emp_schema = """
emp_id int,
emp_name string,
gender string,
department string,
dept_id int,
role string,
salary int,
hire_date string
"""

dept_schema = """
dept_id int,
department string,
location string,
budget int
"""


emp_data = [
    [101, "Alice", "Female", "HR", 1, "Manager", 60000, "2018-03-15"],
    [102, "Bob", "Male", "IT", 2, "Developer", 75000, "2019-07-22"],
    [103, "Charlie", "Male", "Finance", 3, "Analyst", 65000, "2020-01-10"],
    [104, "Diana", "Female", "IT", 2, "Tester", 55000, "2021-05-18"],
    [105, "Evan", "Male", "Sales", 4, "Executive", 50000, "2022-02-01"],
    [106, "Fiona", "Female", "Marketing", 5, "Coordinator", 48000, "2020-09-12"],
    [107, "George", "", "IT", 2, "DevOps Engineer", 82000, "2017-11-30"],
    [108, "Hannah", "Female", "Finance", 3, "Accountant", 62000, "2019-04-25"],
    [109, "Ian", "Male", "HR", 1, "Recruiter", 52000, "2021-08-09"],
    [110, "Julia", "Female", "Sales", 4, "Manager", 70000, "2016-06-17"],

    [111, "Kevin", "Male", "IT", 2, "Architect", 90000, "2015-02-12"],
    [112, "Laura", "Female", "Finance", 3, "Controller", 85000, "2014-10-05"],
    [113, "Mike", "Male", "Marketing", 5, "SEO Specialist", 56000, "2021-03-11"],
    [114, "Nina", "Female", "HR", 1, "HR Executive", 47000, "2022-07-19"],
    [115, "Oscar", "Male", "Sales", 4, "Sales Lead", 72000, "2018-11-23"],
    [116, "Paula", "Female", "IT", 2, "Data Engineer", 88000, "2019-08-14"],
    [117, "Quinn", "Male", "Finance", 3, "Risk Analyst", 67000, "2020-12-01"],
    [118, "Rachel", "Female", "Marketing", 5, "Brand Manager", 73000, "2017-06-08"],
    [119, "Steve", "Male", "IT", 2, "Support Engineer", 52000, "2021-09-30"],
    [120, "Tina", "Female", "HR", 1, "Trainer", 49000, "2020-02-18"],

    [121, "Uma", "Female", "Sales", 4, "Account Manager", 68000, "2019-01-27"],
    [122, "Victor", "Male", "IT", 2, "Security Analyst", 81000, "2018-04-06"],
    [123, "Wendy", "Female", "Finance", 3, "Auditor", 64000, "2021-11-22"],
    [124, "Xavier", "Male", "Marketing", 5, "Content Writer", 45000, "2022-05-10"],
    [125, "Yara", "Female", "Sales", 4, "Sales Analyst", 59000, "2020-08-03"],
    [126, "Zack", "Male", "IT", 2, "Cloud Engineer", 87000, "2016-12-15"]
]

emp1 = spark.createDataFrame(data=emp_data,schema=emp_schema)


In [50]:
dept_data = [
    [1, "HR", "New York", 500000],
    [2, "IT", "San Francisco", 1200000],
    [3, "Finance", "Chicago", 900000],
    [4, "Sales", "Dallas", 800000],
    [5, "Marketing", "Boston", 600000],
    [6, "Operations", "Seattle", 700000],
    [7, "Support", "Phoenix", 400000]
]

dept_df = spark.createDataFrame(dept_data, schema=dept_schema)
dept_df.show(truncate=False)




+-------+----------+-------------+-------+
|dept_id|department|location     |budget |
+-------+----------+-------------+-------+
|1      |HR        |New York     |500000 |
|2      |IT        |San Francisco|1200000|
|3      |Finance   |Chicago      |900000 |
|4      |Sales     |Dallas       |800000 |
|5      |Marketing |Boston       |600000 |
|6      |Operations|Seattle      |700000 |
|7      |Support   |Phoenix      |400000 |
+-------+----------+-------------+-------+



In [52]:
emp1.printSchema()

root
 |-- emp_id: integer (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- department: string (nullable = true)
 |-- dept_id: integer (nullable = true)
 |-- role: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- hire_date: string (nullable = true)



In [53]:
# get number of partition for emp 
emp1.rdd.getNumPartitions()

8

In [54]:
dept_df.rdd.getNumPartitions()

8

In [55]:
# repartition of data using repartition & coalesce

emp_partitioned = emp1.repartition(100)

In [56]:
emp_partitioned.rdd.getNumPartitions()

100

In [15]:
# coalesce vs repartitions 

# repartition involves data shuffling whereas coalesce dosent 
# repartition can increase or decrease partition number but 
# coalesce can only decrease not increase
# repartiton allows uniform data distribution but coalesce can gurantee that.

In [57]:
emp_partitioned = emp1.coalesce(100)

In [58]:
emp_partitioned.rdd.getNumPartitions()

8

In [59]:
emp_partitioned = emp1.repartition(100,"emp_id")

In [60]:
# to dind the info for partitions and repartition 
from pyspark.sql.functions import spark_partition_id 

emp_1 = emp1.repartition(4,"emp_id").withColumn("partition_num",spark_partition_id())

In [61]:
emp_1.show()

+------+--------+------+----------+-------+----------------+------+----------+-------------+
|emp_id|emp_name|gender|department|dept_id|            role|salary| hire_date|partition_num|
+------+--------+------+----------+-------+----------------+------+----------+-------------+
|   101|   Alice|Female|        HR|      1|         Manager| 60000|2018-03-15|            0|
|   107|  George|      |        IT|      2| DevOps Engineer| 82000|2017-11-30|            0|
|   109|     Ian|  Male|        HR|      1|       Recruiter| 52000|2021-08-09|            0|
|   110|   Julia|Female|     Sales|      4|         Manager| 70000|2016-06-17|            0|
|   115|   Oscar|  Male|     Sales|      4|      Sales Lead| 72000|2018-11-23|            0|
|   126|    Zack|  Male|        IT|      2|  Cloud Engineer| 87000|2016-12-15|            0|
|   105|    Evan|  Male|     Sales|      4|       Executive| 50000|2022-02-01|            1|
|   106|   Fiona|Female| Marketing|      5|     Coordinator| 48000|202

In [70]:
# innner join 

df_joined = emp1.alias("e").join(dept_df.alias("d"),how="inner",on=emp1.dept_id==dept_df.dept_id)

In [71]:
df_joined.select("e.emp_id","d.budget","d.department","e.salary").show(5)

+------+-------+----------+------+
|emp_id| budget|department|salary|
+------+-------+----------+------+
|   101| 500000|        HR| 60000|
|   109| 500000|        HR| 52000|
|   114| 500000|        HR| 47000|
|   120| 500000|        HR| 49000|
|   102|1200000|        IT| 75000|
+------+-------+----------+------+
only showing top 5 rows



In [72]:
df_joined = emp1.alias("e").join(dept_df.alias("d"),how="left",on=emp1.dept_id==dept_df.dept_id)

In [73]:
df_joined.show()

+------+--------+------+----------+-------+---------------+------+----------+-------+----------+-------------+-------+
|emp_id|emp_name|gender|department|dept_id|           role|salary| hire_date|dept_id|department|     location| budget|
+------+--------+------+----------+-------+---------------+------+----------+-------+----------+-------------+-------+
|   101|   Alice|Female|        HR|      1|        Manager| 60000|2018-03-15|      1|        HR|     New York| 500000|
|   103| Charlie|  Male|   Finance|      3|        Analyst| 65000|2020-01-10|      3|   Finance|      Chicago| 900000|
|   102|     Bob|  Male|        IT|      2|      Developer| 75000|2019-07-22|      2|        IT|San Francisco|1200000|
|   106|   Fiona|Female| Marketing|      5|    Coordinator| 48000|2020-09-12|      5| Marketing|       Boston| 600000|
|   105|    Evan|  Male|     Sales|      4|      Executive| 50000|2022-02-01|      4|     Sales|       Dallas| 800000|
|   104|   Diana|Female|        IT|      2|     

In [104]:
# join with cascading 
# join with department_id and only for department 101 & 102
# join with not null conditions

from pyspark.sql.functions import col

df_final = emp1.join(
    dept_df,
    how="left_outer",
    on=(
        (emp1.dept_id == dept_df.dept_id) &
        ((emp1.emp_id == 101) | (emp1.emp_id == 102)) &
        (dept_df.dept_id.isNotNull())
    )
)


In [106]:
 df_final.show()

+------+--------+------+----------+-------+---------------+------+----------+-------+----------+-------------+-------+
|emp_id|emp_name|gender|department|dept_id|           role|salary| hire_date|dept_id|department|     location| budget|
+------+--------+------+----------+-------+---------------+------+----------+-------+----------+-------------+-------+
|   101|   Alice|Female|        HR|      1|        Manager| 60000|2018-03-15|      1|        HR|     New York| 500000|
|   103| Charlie|  Male|   Finance|      3|        Analyst| 65000|2020-01-10|   null|      null|         null|   null|
|   102|     Bob|  Male|        IT|      2|      Developer| 75000|2019-07-22|      2|        IT|San Francisco|1200000|
|   106|   Fiona|Female| Marketing|      5|    Coordinator| 48000|2020-09-12|   null|      null|         null|   null|
|   105|    Evan|  Male|     Sales|      4|      Executive| 50000|2022-02-01|   null|      null|         null|   null|
|   104|   Diana|Female|        IT|      2|     