# Module 07b - Advanced Operations - Window Functions & UDFs - Exercises

## Instructions

This notebook contains exercises based on the concepts learned in Module 07b.

- Complete each exercise in the provided code cells
- Run the data setup cells first to generate/create necessary data
- Test your solutions by running the verification cells (if provided)
- Refer back to the main module notebook if you need help


## Data Setup

Run the cells below to set up the data needed for the exercises.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col, sum, avg, rank, dense_rank, row_number, lead, lag, max, min, count
from pyspark.sql.window import Window
import os

# Create SparkSession
spark = SparkSession.builder \
    .appName("Module 07b Exercises") \
    .master("local[*]") \
    .getOrCreate()

# Set data directory
data_dir = "../data"
os.makedirs(data_dir, exist_ok=True)
print("SparkSession created successfully!")

# Create sample DataFrame with time series data
data = [
    ("Alice", "Sales", 50000, "2024-01"),
    ("Bob", "IT", 60000, "2024-01"),
    ("Charlie", "Sales", 70000, "2024-01"),
    ("Diana", "IT", 55000, "2024-01"),
    ("Eve", "HR", 65000, "2024-01"),
    ("Alice", "Sales", 52000, "2024-02"),
    ("Bob", "IT", 61000, "2024-02"),
    ("Charlie", "Sales", 72000, "2024-02"),
    ("Diana", "IT", 56000, "2024-02"),
    ("Eve", "HR", 66000, "2024-02"),
    ("Alice", "Sales", 53000, "2024-03"),
    ("Bob", "IT", 62000, "2024-03"),
    ("Charlie", "Sales", 73000, "2024-03"),
    ("Diana", "IT", 57000, "2024-03"),
    ("Eve", "HR", 67000, "2024-03")
]

schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Department", StringType(), True),
    StructField("Salary", IntegerType(), True),
    StructField("Month", StringType(), True)
])

df = spark.createDataFrame(data, schema)

print("Sample DataFrame:")
df.show()

### Exercise 1: ROW_NUMBER Window Function

Complete the operation described in the code comments.

In [0]:
from pyspark.sql.functions import rank, dense_rank, row_number
from pyspark.sql.window import Window

# Use ROW_NUMBER() to rank employees within each department by salary
window_aspec = Window.partitionBy("Department").orderBy("Salary")
result_1 = df.withColumn("Rank",row_number().over(window_aspec))
result_1.show()

### Exercise 2: RANK Window Function

Complete the operation described in the code comments.

In [0]:
# Your code here
# Use RANK() to rank employees within each department by salary
window_aspec = Window.partitionBy("Department").orderBy("Salary")
result_2 = df.withColumn("Rank",rank().over(window_aspec))
result_2.show()

### Exercise 3: DENSE_RANK Window Function

Complete the operation described in the code comments.

In [0]:
# Your code here
# Use DENSE_RANK() to rank employees within each department by salary
window_aspec = Window.partitionBy("Department").orderBy("Salary")
result_3 = df.withColumn("Rank",dense_rank().over(window_aspec))
result_3.show()

### Exercise 4: Running Total

Complete the operation described in the code comments.

In [0]:
# Your code here
# Calculate running total of salary within each department ordered by month
window_aspec = Window.partitionBy("Department").orderBy("Month")
result_4 = df.withColumn("RunningTotal",sum("salary").over(window_aspec))
result_4.show()

### Exercise 5: LEAD Function

Complete the operation described in the code comments.

In [0]:
# Your code here
# Use LEAD() to get next month's salary for each employee
window_aspec = Window.orderBy("Month")
result_5 = df.withColumn("NextMonthSalary",lead("salary",1).over(window_aspec))
result_5.show()

### Exercise 6: LAG Function

Complete the operation described in the code comments.

In [0]:
# Your code here
# Use LAG() to get previous month's salary for each employee
window_aspec = Window.orderBy("Month")
result_6 = df.withColumn("PreviousMonthSalary",lag("Salary").over(window_aspec))
result_6.show()

### Exercise 7: Window with Partition and Order

Complete the operation described in the code comments.

In [0]:
# Your code here
# Create a window partitioned by Department and ordered by Salary descending
window_aspec = Window.partitionBy("Department").orderBy(col("Salary").desc())
ranked_df = df.withColumn("Rank",row_number().over(window_aspec))
result_7 = ranked_df.drop(col("Rank"))
result_7.orderBy("Department", col("Salary").desc()).show()

### Exercise 8: Maximum Salary per Department

Complete the operation described in the code comments.

In [0]:
# Your code here
# Find the maximum salary in each department using window function
window_aspec = Window.partitionBy("Department")
result_8 = df.withColumn("maxSalary",max("salary").over(window_aspec))
result_8.show()

### Exercise 9: Average Salary per Department

Complete the operation described in the code comments.

In [0]:
# Your code here
# Calculate average salary per department using window function
window_aspec = Window.partitionBy("Department")
result_9 = df.withColumn("AvgSalary",avg("salary").over(window_aspec))
result_9.show()

### Exercise 10: Salary Difference from Department Average

Complete the operation described in the code comments.

In [0]:
# Your code here
# Calculate each employee's salary difference from their department average
window_aspec = Window.partitionBy("Department")
avg_salary = df.withColumn("AvgDiff",avg("salary").over(window_aspec))
result_10 = avg_salary.withColumn("SalaryDiff",col("AvgDiff")-col("salary")).show()

### Exercise 11: Percent Rank

Complete the operation described in the code comments.

In [0]:
window_aspec = Window.partitionBy("Department").orderBy(col("Salary"))

result_11 = df.withColumn("PercentRank", percent_rank().over(window_aspec))

result_11.show()


### Exercise 12: Cumulative Sum

Complete the operation described in the code comments.

In [0]:
# Your code here
# Calculate cumulative sum of salary for each employee over months
window_aspec = Window.partitionBy("Name").orderBy("Month")
result_12 = df.withColumn("CumulativeSalary",sum("salary").over(window_aspec)).show()

### Exercise 13: Moving Average

Complete the operation described in the code comments.

In [0]:
# Your code here
# Calculate 2-month moving average of salary for each employee
window_aspec = Window.orderBy("Month").rowsBetween(-1,0)
result_13 = df.withColumn("MovingAverage",avg("salary").over(window_aspec)).show()

### Exercise 14: First Value in Window

Complete the operation described in the code comments.

In [0]:
# Your code here
# Get the first salary value in each department using first_value()
from pyspark.sql import Window
from pyspark.sql.functions import first, col


window_aspec = Window.partitionBy("Department").orderBy("Month") \
                     .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)


result_14 = df.withColumn(
    "FirstSalary",
    first(col("Salary")).over(window_aspec)
)


result_14.show()


### Exercise 15: Last Value in Window

Complete the operation described in the code comments.

In [0]:
# Your code here
# Get the last salary value in each department using last_value()
from pyspark.sql import Window
from pyspark.sql.functions import last, col


window_aspec = Window.partitionBy("Department").orderBy("Month") \
                     .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)


result_15 = df.withColumn(
    "LastSalary",
    last(col("Salary")).over(window_aspec)
)


result_15.show()


### Exercise 16: Count per Department

Complete the operation described in the code comments.

In [0]:
# Your code here
# Count number of employees in each department using window function
window_aspec = Window.partitionBy("Department")
result_16 = df.withColumn("EmployeeCount",count("*").over(window_aspec))
result_16.show()

### Exercise 17: Salary Rank with Ties

Complete the operation described in the code comments.

In [0]:
# Your code here
# Rank employees by salary, handling ties appropriately
window_aspec = Window.orderBy(col("Salary").desc())
result_17 = df.withColumn("Rank",dense_rank().over(window_aspec))
result_17.show()

### Exercise 18: Window with Rows Between

Complete the operation described in the code comments.

In [0]:
# Your code here
# Create a window with rowsBetween to calculate sum of current and previous row
window_aspec = Window.rowsBetween(-1,0)
result_18 = df.withColumn("SumCurrentAndPrevious",sum("salary").over(window_aspec)).show()

### Exercise 19: Multiple Window Functions

Complete the operation described in the code comments.

In [0]:
# Your code here
# Apply multiple window functions (rank, avg, max) in a single query
from pyspark.sql import Window
from pyspark.sql.functions import rank, avg, max, col


window_rank = Window.partitionBy("Department").orderBy("Salary")


window_agg = Window.partitionBy("Department")


result_19 = df.withColumn("Rank", rank().over(window_rank)) \
              .withColumn("AvgSalary", avg(col("Salary")).over(window_agg)) \
              .withColumn("MaxSalary", max(col("Salary")).over(window_agg))


result_19.show()


### Exercise 20: Complex Window with Multiple Partitions

Complete the operation described in the code comments.

In [0]:
# Your code here
# Create a complex window partitioned by Department, ordered by Month, with specific range
from pyspark.sql import Window
from pyspark.sql.functions import avg, col


window_spec = Window.partitionBy("Department") \
                    .orderBy("Month") \
                    .rowsBetween(-2, 0)

result = df.withColumn(
    "Salary_MA_3",
    avg(col("Salary")).over(window_spec)
)

result.show()


## Summary

Great job completing the exercises! Review your solutions and compare them with the solutions notebook if needed.
