<a href="https://colab.research.google.com/github/codeprakash309/PySparkCodeHub/blob/PySparkCodeHub(Prakash)/Advanced_PySpark_Transformations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

spark = SparkSession.builder.appName("PracticeTransformations").getOrCreate()

data = [
    ("Alice", 30, "HR", 40000, "New York"),
    ("Bob", 35, "Finance", 60000, "Chicago"),
    ("Cathy", 28, "IT", 75000, "San Francisco"),
    ("David", 45, "Finance", 80000, "New York"),
    ("Eva", 32, "IT", 72000, "Boston"),
    ("Frank", 29, "HR", 42000, "Chicago"),
    ("Grace", 41, "Marketing", 52000, "Boston"),
    ("Henry", 36, "Finance", 70000, "San Francisco"),
    ("Ivy", 27, "IT", 68000, "New York"),
    ("Jake", 39, "Marketing", 55000, "Chicago"),
    ("Karen", 31, "HR", 45000, "Boston"),
    ("Leo", 26, "Finance", 58000, "New York"),
    ("Mona", 33, "IT", 76000, "Chicago"),
    ("Nick", 40, "Marketing", 60000, "San Francisco")
]

columns = ["Name", "Age", "Department", "Salary", "City"]

df = spark.createDataFrame(data, columns)
df.show()


+-----+---+----------+------+-------------+
| Name|Age|Department|Salary|         City|
+-----+---+----------+------+-------------+
|Alice| 30|        HR| 40000|     New York|
|  Bob| 35|   Finance| 60000|      Chicago|
|Cathy| 28|        IT| 75000|San Francisco|
|David| 45|   Finance| 80000|     New York|
|  Eva| 32|        IT| 72000|       Boston|
|Frank| 29|        HR| 42000|      Chicago|
|Grace| 41| Marketing| 52000|       Boston|
|Henry| 36|   Finance| 70000|San Francisco|
|  Ivy| 27|        IT| 68000|     New York|
| Jake| 39| Marketing| 55000|      Chicago|
|Karen| 31|        HR| 45000|       Boston|
|  Leo| 26|   Finance| 58000|     New York|
| Mona| 33|        IT| 76000|      Chicago|
| Nick| 40| Marketing| 60000|San Francisco|
+-----+---+----------+------+-------------+



In [5]:
#Window Functions (e.g., Rank employees by salary within department)

windowSpec = Window.partitionBy("Department").orderBy(df["Salary"].desc())

df.withColumn("Rank_in_Department", rank().over(windowSpec)).show()


+-----+---+----------+------+-------------+------------------+
| Name|Age|Department|Salary|         City|Rank_in_Department|
+-----+---+----------+------+-------------+------------------+
|David| 45|   Finance| 80000|     New York|                 1|
|Henry| 36|   Finance| 70000|San Francisco|                 2|
|  Bob| 35|   Finance| 60000|      Chicago|                 3|
|  Leo| 26|   Finance| 58000|     New York|                 4|
|Karen| 31|        HR| 45000|       Boston|                 1|
|Frank| 29|        HR| 42000|      Chicago|                 2|
|Alice| 30|        HR| 40000|     New York|                 3|
| Mona| 33|        IT| 76000|      Chicago|                 1|
|Cathy| 28|        IT| 75000|San Francisco|                 2|
|  Eva| 32|        IT| 72000|       Boston|                 3|
|  Ivy| 27|        IT| 68000|     New York|                 4|
| Nick| 40| Marketing| 60000|San Francisco|                 1|
| Jake| 39| Marketing| 55000|      Chicago|            

In [7]:
#Pivot Table (e.g., Salary per department per city)
df.groupBy("City").pivot("Department").agg({"Salary": "avg"}).show()


+-------------+-------+-------+-------+---------+
|         City|Finance|     HR|     IT|Marketing|
+-------------+-------+-------+-------+---------+
|San Francisco|70000.0|   NULL|75000.0|  60000.0|
|      Chicago|60000.0|42000.0|76000.0|  55000.0|
|     New York|69000.0|40000.0|68000.0|     NULL|
|       Boston|   NULL|45000.0|72000.0|  52000.0|
+-------------+-------+-------+-------+---------+



In [10]:
# Handling Null Values
from pyspark.sql.functions import when

#Introduce some nulls first for testing:
df_nulls = df.withColumn("Bonus", when(df["Department"] == "HR", None).otherwise(1000))
df_nulls.show()


+-----+---+----------+------+-------------+-----+
| Name|Age|Department|Salary|         City|Bonus|
+-----+---+----------+------+-------------+-----+
|Alice| 30|        HR| 40000|     New York| NULL|
|  Bob| 35|   Finance| 60000|      Chicago| 1000|
|Cathy| 28|        IT| 75000|San Francisco| 1000|
|David| 45|   Finance| 80000|     New York| 1000|
|  Eva| 32|        IT| 72000|       Boston| 1000|
|Frank| 29|        HR| 42000|      Chicago| NULL|
|Grace| 41| Marketing| 52000|       Boston| 1000|
|Henry| 36|   Finance| 70000|San Francisco| 1000|
|  Ivy| 27|        IT| 68000|     New York| 1000|
| Jake| 39| Marketing| 55000|      Chicago| 1000|
|Karen| 31|        HR| 45000|       Boston| NULL|
|  Leo| 26|   Finance| 58000|     New York| 1000|
| Mona| 33|        IT| 76000|      Chicago| 1000|
| Nick| 40| Marketing| 60000|San Francisco| 1000|
+-----+---+----------+------+-------------+-----+



In [12]:
#Fill Nulls
df_nulls.fillna({'Bonus': 500}).show()


+-----+---+----------+------+-------------+-----+
| Name|Age|Department|Salary|         City|Bonus|
+-----+---+----------+------+-------------+-----+
|Alice| 30|        HR| 40000|     New York|  500|
|  Bob| 35|   Finance| 60000|      Chicago| 1000|
|Cathy| 28|        IT| 75000|San Francisco| 1000|
|David| 45|   Finance| 80000|     New York| 1000|
|  Eva| 32|        IT| 72000|       Boston| 1000|
|Frank| 29|        HR| 42000|      Chicago|  500|
|Grace| 41| Marketing| 52000|       Boston| 1000|
|Henry| 36|   Finance| 70000|San Francisco| 1000|
|  Ivy| 27|        IT| 68000|     New York| 1000|
| Jake| 39| Marketing| 55000|      Chicago| 1000|
|Karen| 31|        HR| 45000|       Boston|  500|
|  Leo| 26|   Finance| 58000|     New York| 1000|
| Mona| 33|        IT| 76000|      Chicago| 1000|
| Nick| 40| Marketing| 60000|San Francisco| 1000|
+-----+---+----------+------+-------------+-----+



In [14]:
#Drop Rows with Nulls:
df_nulls.dropna().show()


+-----+---+----------+------+-------------+-----+
| Name|Age|Department|Salary|         City|Bonus|
+-----+---+----------+------+-------------+-----+
|  Bob| 35|   Finance| 60000|      Chicago| 1000|
|Cathy| 28|        IT| 75000|San Francisco| 1000|
|David| 45|   Finance| 80000|     New York| 1000|
|  Eva| 32|        IT| 72000|       Boston| 1000|
|Grace| 41| Marketing| 52000|       Boston| 1000|
|Henry| 36|   Finance| 70000|San Francisco| 1000|
|  Ivy| 27|        IT| 68000|     New York| 1000|
| Jake| 39| Marketing| 55000|      Chicago| 1000|
|  Leo| 26|   Finance| 58000|     New York| 1000|
| Mona| 33|        IT| 76000|      Chicago| 1000|
| Nick| 40| Marketing| 60000|San Francisco| 1000|
+-----+---+----------+------+-------------+-----+



In [16]:
#Date/Time Columns
#Let's say we add a Joining_Date column:
from pyspark.sql.functions import to_date, current_date, datediff

data_with_dates = df.withColumn("Joining_Date", to_date(
    when(df.Name == "Alice", "2021-01-10")
    .when(df.Name == "Bob", "2019-07-15")
    .otherwise("2020-03-01")
))

data_with_dates.show()


+-----+---+----------+------+-------------+------------+
| Name|Age|Department|Salary|         City|Joining_Date|
+-----+---+----------+------+-------------+------------+
|Alice| 30|        HR| 40000|     New York|  2021-01-10|
|  Bob| 35|   Finance| 60000|      Chicago|  2019-07-15|
|Cathy| 28|        IT| 75000|San Francisco|  2020-03-01|
|David| 45|   Finance| 80000|     New York|  2020-03-01|
|  Eva| 32|        IT| 72000|       Boston|  2020-03-01|
|Frank| 29|        HR| 42000|      Chicago|  2020-03-01|
|Grace| 41| Marketing| 52000|       Boston|  2020-03-01|
|Henry| 36|   Finance| 70000|San Francisco|  2020-03-01|
|  Ivy| 27|        IT| 68000|     New York|  2020-03-01|
| Jake| 39| Marketing| 55000|      Chicago|  2020-03-01|
|Karen| 31|        HR| 45000|       Boston|  2020-03-01|
|  Leo| 26|   Finance| 58000|     New York|  2020-03-01|
| Mona| 33|        IT| 76000|      Chicago|  2020-03-01|
| Nick| 40| Marketing| 60000|San Francisco|  2020-03-01|
+-----+---+----------+------+--

In [17]:
#Find how long the employee has been with the company:
df_with_duration = data_with_dates.withColumn("Days_In_Company", datediff(current_date(), "Joining_Date"))
df_with_duration.select("Name", "Joining_Date", "Days_In_Company").show()


+-----+------------+---------------+
| Name|Joining_Date|Days_In_Company|
+-----+------------+---------------+
|Alice|  2021-01-10|           1654|
|  Bob|  2019-07-15|           2199|
|Cathy|  2020-03-01|           1969|
|David|  2020-03-01|           1969|
|  Eva|  2020-03-01|           1969|
|Frank|  2020-03-01|           1969|
|Grace|  2020-03-01|           1969|
|Henry|  2020-03-01|           1969|
|  Ivy|  2020-03-01|           1969|
| Jake|  2020-03-01|           1969|
|Karen|  2020-03-01|           1969|
|  Leo|  2020-03-01|           1969|
| Mona|  2020-03-01|           1969|
| Nick|  2020-03-01|           1969|
+-----+------------+---------------+

