# Installing PySpark in Google Colab

In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.window import Window

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .config('spark.ui.port', '4050') \
       .getOrCreate()

spark

Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:7 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [75.2 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,604 kB]
Hit:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Hit:12 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Get:13 https://r2u.stat.illinois.edu/ubuntu jammy/mai

In [2]:
from google.colab import output
output.serve_kernel_port_as_window(4050, path ='/jobs/index.html')

Try `serve_kernel_port_as_iframe` instead. [0m


<IPython.core.display.Javascript object>

# Reading Data

For this example, I am going to use a data set from this [github repo](https://github.com/afaqueahmad7117/spark-experiments.git)


In [None]:
# Clone the repo
!git clone https://github.com/afaqueahmad7117/spark-experiments.git

# Load datasets from the cloned repo
transactions_df = spark.read.parquet("spark-experiments/data/data_skew/transactions.parquet")
customers_df = spark.read.parquet("spark-experiments/data/data_skew/customers.parquet")

print("Transactions Dataset Schema:")
transactions_df.printSchema()
print("Customers Dataset Schema:")
customers_df.printSchema()

Cloning into 'spark-experiments'...
remote: Enumerating objects: 544, done.[K
remote: Counting objects: 100% (8/8), done.[K
remote: Compressing objects: 100% (8/8), done.[K
remote: Total 544 (delta 7), reused 0 (delta 0), pack-reused 536 (from 1)[K
Receiving objects: 100% (544/544), 702.60 MiB | 16.88 MiB/s, done.
Resolving deltas: 100% (112/112), done.
Updating files: 100% (351/351), done.


NameError: name 'spark' is not defined

# 🚩 **Day 1 - 2025/04/14**

**1. Schema Validation & Type Conversion (Easy)**

Your ETL pipeline ingests raw data with all columns as strings. Convert the `amt` (transaction amount) to DoubleType and `age` to IntegerType. Validate by showing the schema post-conversion.

In [None]:
transactions_df.show(5)

+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|   cust_id|start_date|  end_date|         txn_id|      date|year|month|day| expense_type|   amt|       city|
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|C0YDPQWPBJ|2010-07-01|2018-12-01|TZ5SMKZY9S03OQJ|2018-10-07|2018|   10|  7|Entertainment| 10.42|     boston|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYIAPPNU066CJ5R|2016-03-27|2016|    3| 27| Motor/Travel| 44.34|   portland|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TETSXIK4BLXHJ6W|2011-04-11|2011|    4| 11|Entertainment|  3.18|    chicago|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TQKL1QFJY3EM8LO|2018-02-22|2018|    2| 22|    Groceries|268.97|los_angeles|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYL6DFP09PPXMVB|2010-10-16|2010|   10| 16|Entertainment|  2.66|    chicago|
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
only showi

In [None]:
transactions_df = transactions_df.withColumn("amt", F.col("amt").cast(DoubleType()))
transactions_df.printSchema()

transactions_df.show(3) # this triggers the transformation

root
 |-- cust_id: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- txn_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- expense_type: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- city: string (nullable = true)

+----------+----------+----------+---------------+----------+----+-----+---+-------------+-----+--------+
|   cust_id|start_date|  end_date|         txn_id|      date|year|month|day| expense_type|  amt|    city|
+----------+----------+----------+---------------+----------+----+-----+---+-------------+-----+--------+
|C0YDPQWPBJ|2010-07-01|2018-12-01|TZ5SMKZY9S03OQJ|2018-10-07|2018|   10|  7|Entertainment|10.42|  boston|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYIAPPNU066CJ5R|2016-03-27|2016|    3| 27| Motor/Travel|44.34|portland|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TETSXIK4BLXH

**2. Time-Based Aggregations (Medium)**
Scenario: The business wants monthly expense reports. Calculate total monthly expenses per customer, preserving the original schema's year and month columns. Handle potential nulls in amt.

```
# Expected output schema
# |-- cust_id: string
# |-- year: string
# |-- month: string
# |-- total_expense: double
```

In [None]:
monthly_transactions_per_customer = transactions_df.groupBy("cust_id","year","month").agg(F.sum("amt").alias("total_expense"))\
                                                    .orderBy("year","month","total_expense")
monthly_transactions_per_customer.show(5)

+----------+----+-----+------------------+
|   cust_id|year|month|     total_expense|
+----------+----+-----+------------------+
|C42POJ8QKI|2010|    1|298.94000000000005|
|CC5E1YOY7N|2010|    1|302.53000000000003|
|CCQ557SM5V|2010|    1|332.96000000000004|
|CLESRZVUWU|2010|    1|334.14000000000004|
|COV6YRAYE9|2010|    1|362.21999999999997|
+----------+----+-----+------------------+
only showing top 5 rows



**3. Data Quality Check (Medium)**
Scenario: Your team needs to validate customer data. Find:

1. Customers with invalid ZIP codes (non-5-digit format)
2. Transactions with future dates (dates beyond today's date)

Return counts for both anomalies.

In [None]:
customers_df.select("cust_id", "zip")\
            .filter(F.length(F.col("zip")) != 5)\
            .show()

+-------+---+
|cust_id|zip|
+-------+---+
+-------+---+



*There are no records with zip codes with not equal to 5 digits*

**4. Customer-Transaction Enrichment (Hard)**

Scenario: Create a master dataset showing each transaction enriched with customer demographics. Optimize for:

Fast joins using broadcast join where appropriate

Handling null values in customer data

Preserving original transaction order*

```
# Target schema
# |-- txn_id: string
# |-- date: string
# |-- expense_type: string
# |-- amt: double
# |-- city: string
# |-- name: string
# |-- age: int
# |-- gender: string
```

In [None]:
transactions_df_for_join = transactions_df.select("txn_id", "cust_id", "date", "expense_type", "amt", F.col("city").alias("txn_city"))

customers_df_for_join = customers_df.select("cust_id", "name", "age", "gender")

master_df = transactions_df_for_join.join(F.broadcast(customers_df_for_join),
                              customers_df_for_join.cust_id == transactions_df_for_join.cust_id,
                              how='inner')

master_df.show(5)

+---------------+----------+----------+-------------+------+-----------+----------+--------+---+------+
|         txn_id|   cust_id|      date| expense_type|   amt|   txn_city|   cust_id|    name|age|gender|
+---------------+----------+----------+-------------+------+-----------+----------+--------+---+------+
|TZ5SMKZY9S03OQJ|C0YDPQWPBJ|2018-10-07|Entertainment| 10.42|     boston|C0YDPQWPBJ|Ada Lamb| 32|Female|
|TYIAPPNU066CJ5R|C0YDPQWPBJ|2016-03-27| Motor/Travel| 44.34|   portland|C0YDPQWPBJ|Ada Lamb| 32|Female|
|TETSXIK4BLXHJ6W|C0YDPQWPBJ|2011-04-11|Entertainment|  3.18|    chicago|C0YDPQWPBJ|Ada Lamb| 32|Female|
|TQKL1QFJY3EM8LO|C0YDPQWPBJ|2018-02-22|    Groceries|268.97|los_angeles|C0YDPQWPBJ|Ada Lamb| 32|Female|
|TYL6DFP09PPXMVB|C0YDPQWPBJ|2010-10-16|Entertainment|  2.66|    chicago|C0YDPQWPBJ|Ada Lamb| 32|Female|
+---------------+----------+----------+-------------+------+-----------+----------+--------+---+------+
only showing top 5 rows



**5. Window Function Analysis (Hard)**

Scenario: For customer retention analysis, calculate:

a) Days since previous transaction per customer

b) Rolling 30-day average spend per customer

Use appropriate window functions and handle partition boundaries.

In [None]:
# Days since previous transaction per customer
# I'm assuming we are comparing it to today's date

# group by cust_id, get last transaction date, check the difference between today's date and last transaction date
transactions_df.groupBy("cust_id").agg(F.max("date").alias("last_transaction_date"))\
                .withColumn("days_since_last_txn", F.date_diff(F.current_date(), F.col("last_transaction_date")))\
                .show(5)

+----------+---------------------+-------------------+
|   cust_id|last_transaction_date|days_since_last_txn|
+----------+---------------------+-------------------+
|C007YEYTX9|           2020-09-27|               1662|
|C00B971T1J|           2020-12-27|               1571|
|C00WRSJF1Q|           2020-12-27|               1571|
|C01AZWQMF3|           2019-03-27|               2212|
|C01BKUFRHA|           2020-09-27|               1662|
+----------+---------------------+-------------------+
only showing top 5 rows



In [None]:
# Ensure 'date' is in timestamp format
transactions_df = transactions_df.withColumn("date_ts", F.col("date").cast("timestamp"))

# Create a time-based window: last 30 days per customer
window_30_days = Window.partitionBy("cust_id")\
                       .orderBy(F.col("date_ts").cast("long"))\
                       .rangeBetween(-30 * 86400, 0)  # last 30 days in seconds

# First aggregate daily total per customer
daily_total_df = transactions_df.groupBy("cust_id", "date_ts").agg(F.sum("amt").alias("daily_total"))

# Apply rolling average over the 30-day window
result_df = daily_total_df.withColumn("rolling_30_days_avg", F.avg("daily_total").over(window_30_days))

result_df.show(5)



+----------+-------------------+-----------+-------------------+
|   cust_id|            date_ts|daily_total|rolling_30_days_avg|
+----------+-------------------+-----------+-------------------+
|C007YEYTX9|2012-02-01 00:00:00|      74.62|              74.62|
|C007YEYTX9|2012-02-02 00:00:00|     293.11|            183.865|
|C007YEYTX9|2012-02-03 00:00:00|      146.7|  171.4766666666667|
|C007YEYTX9|2012-02-04 00:00:00|     3647.9|          1040.5825|
|C007YEYTX9|2012-02-05 00:00:00|     261.46|            884.758|
+----------+-------------------+-----------+-------------------+
only showing top 5 rows



# 🚩 **Day 2 - 2025/04/16**

**1. Incremental Data Load Strategy (Medium)**

Scenario: New transactions arrive daily. Write a PySpark job to:

Load only new transactions (those with date > last_processed_date)

Deduplicate using txn_id (keep latest record if duplicates exist)

Update a master transactions table without reprocessing old data

In [None]:
# latest transaction date in dataset is 2020-12-27

# let's assume we only want to process records after 2020-09-01 and none before it

# transactions_df is the raw data set that keeps populating everyday
# master_transactions is the cleaned version of transactions_df with column last_processed_date
# so we check what was the last_processed_date and filter using that value in the transactions_df


# Cast date column to date type
transactions_df = transactions_df.withColumn("date", F.col("date").cast("date"))

# Define last processed date
last_processed_date = "2020-09-01" # or max value from master_transactions

# Filter only new transactions
new_transactions = transactions_df.filter(F.col("date") > F.lit(last_processed_date))

# Deduplicate on txn_id by keeping the latest record per txn_id
window_spec = Window.partitionBy("txn_id").orderBy(F.col("date").desc())

deduped_new_txns = new_transactions.withColumn("rank", F.row_number().over(window_spec))\
                                   .filter(F.col("rank") == 1)\
                                   .drop("rank")

# Load master table as example
master_transactions = transactions_df.filter(F.col("date") <= F.lit(last_processed_date))

# Append new deduped data to master
updated_master = master_transactions.unionByName(deduped_new_txns)

updated_master.orderBy("date", ascending=False).show()


+----------+----------+--------+---------------+----------+----+-----+---+-------------+-----+-------------+-------------------+
|   cust_id|start_date|end_date|         txn_id|      date|year|month|day| expense_type|  amt|         city|            date_ts|
+----------+----------+--------+---------------+----------+----+-----+---+-------------+-----+-------------+-------------------+
|CO0JUS2B89|2013-03-01|    NULL|T09MG32WEEJYDTX|2020-12-27|2020|   12| 27|    Groceries|42.43|     portland|2020-12-27 00:00:00|
|C4XSZX5AEQ|2012-11-01|    NULL|T093856A0IX0QKY|2020-12-27|2020|   12| 27|Entertainment|26.19|san_francisco|2020-12-27 00:00:00|
|C0YDPQWPBJ|2011-10-01|    NULL|T0I1IFIJMRCAPLO|2020-12-27|2020|   12| 27|Entertainment|25.56|    san_diego|2020-12-27 00:00:00|
|CW57H4XRF4|2012-12-01|    NULL|T0INHCOKS6AYENZ|2020-12-27|2020|   12| 27|Entertainment|25.18|       boston|2020-12-27 00:00:00|
|CRO4QOP409|2013-12-01|    NULL|T0A1STBKX7DM2GK|2020-12-27|2020|   12| 27|Entertainment|76.77|   

**2. Geospatial UDF Optimization (Hard)**
Scenario: Calculate distances between customer ZIP codes and transaction cities.

Create a UDF to convert ZIP codes to latitude/longitude (mock this)

In [None]:
def to_lat_long(zip_code):
  zip_int = int(str(zip_code).zfill(5))  # pad zeros if needed
  latitude = 30.0 + (zip_int % 1000) * 0.01
  longitude = -100.0 - (zip_int % 1000) * 0.01
  return (latitude, longitude)

udf_schema = StructType([StructField("latitude", DoubleType()),
                         StructField("longitude", DoubleType())
                         ])

zip_to_lat_long_udf = F.udf(to_lat_long, udf_schema)

# Apply the UDF
df_with_coords = customers_df.withColumn("coords", zip_to_lat_long_udf(F.col("zip")))

# Split into separate lat/lon columns
df_with_coords = df_with_coords.withColumn("latitude", F.col("coords.latitude"))\
                               .withColumn("longitude", F.col("coords.longitude"))\
                               .drop("coords")

df_with_coords.show()

+----------+--------------+---+------+----------+-----+------------+------------------+---------+
|   cust_id|          name|age|gender|  birthday|  zip|        city|          latitude|longitude|
+----------+--------------+---+------+----------+-----+------------+------------------+---------+
|C007YEYTX9|  Aaron Abbott| 34|Female| 7/13/1991|97823|      boston|38.230000000000004|  -108.23|
|C00B971T1J|  Aaron Austin| 37|Female|12/16/2004|30332|     chicago|             33.32|  -103.32|
|C00WRSJF1Q|  Aaron Barnes| 29|Female| 3/11/1977|23451|      denver|             34.51|  -104.51|
|C01AZWQMF3| Aaron Barrett| 31|  Male|  7/9/1998|46613| los_angeles|             36.13|  -106.13|
|C01BKUFRHA|  Aaron Becker| 54|  Male|11/24/1979|40284|   san_diego|             32.84|  -102.84|
|C01RGUNJV9|    Aaron Bell| 24|Female| 8/16/1968|86331|      denver|             33.31|  -103.31|
|C01USDV4EE|   Aaron Blair| 35|Female|  9/9/1974|80078|    new_york|             30.78|  -100.78|
|C01WMZQ7PN|   Aaron

**3. Skewed Join Mitigation (Hard)**

Optimize this join between transactions_df and customers_df, where one of the customers has massive amounts of transactions as compared to others

```
+----------+--------+
|   cust_id|   count|
+----------+--------+
|C0YDPQWPBJ|17539732|
|C3KUDEN3KO|    7999|
|CBW3FMEAU7|    7999|
|C89FCEGPJP|    7999|
|CHNFNR89ZV|    7998|
+----------+--------+
only showing top 5 rows
```

In [None]:
# first approach to rely on Spark's adaptive query execution to take the best route for the join
joined_df_aqe = transactions_df.join(customers_df, customers_df.cust_id == transactions_df.cust_id, how='inner')

joined_df_aqe.explain(mode="formatted")
joined_df_aqe.show(5)

== Physical Plan ==
AdaptiveSparkPlan (7)
+- BroadcastHashJoin Inner BuildRight (6)
   :- Filter (2)
   :  +- Scan parquet  (1)
   +- BroadcastExchange (5)
      +- Filter (4)
         +- Scan parquet  (3)


(1) Scan parquet 
Output [11]: [cust_id#0, start_date#1, end_date#2, txn_id#3, date#4, year#5, month#6, day#7, expense_type#8, amt#9, city#10]
Batched: true
Location: InMemoryFileIndex [file:/content/spark-experiments/data/data_skew/transactions.parquet]
PushedFilters: [IsNotNull(cust_id)]
ReadSchema: struct<cust_id:string,start_date:string,end_date:string,txn_id:string,date:string,year:string,month:string,day:string,expense_type:string,amt:string,city:string>

(2) Filter
Input [11]: [cust_id#0, start_date#1, end_date#2, txn_id#3, date#4, year#5, month#6, day#7, expense_type#8, amt#9, city#10]
Condition : isnotnull(cust_id#0)

(3) Scan parquet 
Output [7]: [cust_id#22, name#23, age#24, gender#25, birthday#26, zip#27, city#28]
Batched: true
Location: InMemoryFileIndex [file:/content

In [None]:
# second approach we can use salting

# Add salt only for the skewed customer
num_salts = 10  # number of salts for distributing skew

transactions_df_salted = transactions_df.withColumn(
    "salt",
    F.when(F.col("cust_id") == "C0YDPQWPBJ", F.floor(F.rand(seed=42) * num_salts).cast("int"))
    .otherwise(F.lit(None))
)

In [None]:
from pyspark.sql import Row

# Expand the skewed customer row
skewed_rows = customers_df.filter(F.col("cust_id") == "C0YDPQWPBJ").collect()
salted_customers = []

for row in skewed_rows:
    for salt_val in range(num_salts):
        row_dict = row.asDict()
        row_dict["salt"] = salt_val
        salted_customers.append(Row(**row_dict))

# Combine with rest of customers (unsalted)
non_skewed_customers = customers_df.filter(F.col("cust_id") != "C0YDPQWPBJ")
salted_skewed_customers_df = spark.createDataFrame(salted_customers)

# Add salt column with null to non-skewed customers
non_skewed_customers = non_skewed_customers.withColumn("salt", F.lit(None).cast("int"))

customers_df_salted = non_skewed_customers.unionByName(salted_skewed_customers_df)

# Join using both cust_id and salt
joined_df_salted = transactions_df_salted.join(
    customers_df_salted,
    on=["cust_id", "salt"],
    how="inner"
)

joined_df_salted.explain(mode="formatted")
joined_df_salted.show(5)

== Physical Plan ==
AdaptiveSparkPlan (12)
+- Project (11)
   +- SortMergeJoin Inner (10)
      :- Sort (5)
      :  +- Exchange (4)
      :     +- Filter (3)
      :        +- Project (2)
      :           +- Scan parquet  (1)
      +- Sort (9)
         +- Exchange (8)
            +- Filter (7)
               +- Scan ExistingRDD (6)


(1) Scan parquet 
Output [11]: [cust_id#0, start_date#1, end_date#2, txn_id#3, date#4, year#5, month#6, day#7, expense_type#8, amt#9, city#10]
Batched: true
Location: InMemoryFileIndex [file:/content/spark-experiments/data/data_skew/transactions.parquet]
ReadSchema: struct<cust_id:string,start_date:string,end_date:string,txn_id:string,date:string,year:string,month:string,day:string,expense_type:string,amt:string,city:string>

(2) Project
Output [12]: [cust_id#0, start_date#1, end_date#2, txn_id#3, date#4, year#5, month#6, day#7, expense_type#8, amt#9, city#10, CASE WHEN (cust_id#0 = C0YDPQWPBJ) THEN cast(FLOOR((rand(42) * 10.0)) as int) END AS salt#325]


In [None]:
import time

start = time.time()
joined_df_aqe.cache().count()
print("AQE Join Time:", time.time() - start)

start = time.time()
joined_df_salted.cache().count()
print("Salted Join Time:", time.time() - start)

AQE Join Time: 428.58775758743286
Salted Join Time: 256.59440326690674


# 🚩 **Day 3 - 2025/04/18**

### **Q1: Basic Transformation**
**User Full Name Creator**

You are provided with a DataFrame of user information. Write a PySpark function to create a new column called `full_name`, which is a concatenation of the first name and last name, with a space in between.

**Input DataFrame:**
`users`
```
+----------+-----------+----------+
| user_id  | first_name| last_name|
+----------+-----------+----------+
| 1        | Alice     | Cooper   |
| 2        | Bob       | Marley   |
+----------+-----------+----------+
```

**Output DataFrame:**
```
+----------+-----------+----------+-------------+
| user_id  | first_name| last_name| full_name   |
+----------+-----------+----------+-------------+
| 1        | Alice     | Cooper   | Alice Cooper|
| 2        | Bob       | Marley   | Bob Marley  |
+----------+-----------+----------+-------------+
```

In [None]:
data = [
         (1, "Alice", "Cooper"),
         (2, "Bob", "Marley")
         ]

df = spark.createDataFrame(data, ["user_id", "first_name", "last_name"])

# full name column
df_with_fullname = df.withColumn("full_name", F.concat(F.col("first_name"), F.lit(" "), F.col("last_name")))

df_with_fullname.show()

+-------+----------+---------+------------+
|user_id|first_name|last_name|   full_name|
+-------+----------+---------+------------+
|      1|     Alice|   Cooper|Alice Cooper|
|      2|       Bob|   Marley|  Bob Marley|
+-------+----------+---------+------------+



### **Q2: Filter Orders by Value**

**Scenario:**  
You work at an e-commerce company and want to analyze high-value orders.

**Task:**  
Add a column `total_amount` (quantity × unit_price) and return only the orders where `total_amount >= 100`.

```python
orders = spark.createDataFrame([
    (1001, 1, 1, 50.0),
    (1002, 2, 5, 20.0),
    (1003, 3, 1, 200.0)
], ["order_id", "product_id", "quantity", "unit_price"])
```

**Expected Output:**

```
+--------+-----------+--------+-----------+-------------+
|order_id|product_id |quantity|unit_price |total_amount |
+--------+-----------+--------+-----------+-------------+
|1002    |2          |5       |20.0       |100.0        |
|1003    |3          |1       |200.0      |200.0        |
+--------+-----------+--------+-----------+-------------+
```


In [None]:
orders = spark.createDataFrame([
    (1001, 1, 1, 50.0),
    (1002, 2, 5, 20.0),
    (1003, 3, 1, 200.0)
], ["order_id", "product_id", "quantity", "unit_price"])


high_value_orders = orders.withColumn("total_amount", F.col("quantity") * F.col("unit_price"))\
                            .filter(F.col("total_amount") >=  100.0)

high_value_orders.show()

+--------+----------+--------+----------+------------+
|order_id|product_id|quantity|unit_price|total_amount|
+--------+----------+--------+----------+------------+
|    1002|         2|       5|      20.0|       100.0|
|    1003|         3|       1|     200.0|       200.0|
+--------+----------+--------+----------+------------+



### **Q3: Join with Aggregation**

**Scenario:**  
You're helping HR compute the total hours each employee worked this month.

**Task:**  
Join `employees` and `timesheets`, and calculate the `total_hours` each employee has worked.

```python
employees = spark.createDataFrame([
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie")
], ["employee_id", "name"])

timesheets = spark.createDataFrame([
    (1, "2024-04-01", 8),
    (1, "2024-04-02", 7),
    (2, "2024-04-01", 6),
    (2, "2024-04-02", 5),
    (3, "2024-04-01", 9)
], ["employee_id", "date", "hours"])
```

**Expected Output:**

```
+-----------+--------+
|name       |total_hours|
+-----------+------------+
|Alice      |15          |
|Bob        |11          |
|Charlie    |9           |
+-----------+------------+
```

In [None]:
employees = spark.createDataFrame([
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie")
], ["employee_id", "name"])

timesheets = spark.createDataFrame([
    (1, "2024-04-01", 8),
    (1, "2024-04-02", 7),
    (2, "2024-04-01", 6),
    (2, "2024-04-02", 5),
    (3, "2024-04-01", 9)
], ["employee_id", "date", "hours"])


total_hours_per_employee = employees.join(timesheets, employees.employee_id==timesheets.employee_id, how = 'inner')\
                                    .groupBy(employees.employee_id, employees.name).agg(F.sum(F.col("hours")).alias("total_hours"))\
                                    .select("name", "total_hours")

total_hours_per_employee.show()

+-------+-----------+
|   name|total_hours|
+-------+-----------+
|  Alice|         15|
|    Bob|         11|
|Charlie|          9|
+-------+-----------+



### **Q4: Window Functions**

**Scenario:**  
You are analyzing monthly sales and want to know the top-selling product each month.

**Task:**  
Use a window function to rank products by `units_sold` per month and return only those with rank = 1.

```python
from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

sales = spark.createDataFrame([
    ("2024-01", "Product A", 100),
    ("2024-01", "Product B", 120),
    ("2024-01", "Product C", 90),
    ("2024-02", "Product A", 150),
    ("2024-02", "Product B", 140),
    ("2024-02", "Product C", 160)
], ["month", "product", "units_sold"])
```

**Expected Output:**

```
+--------+----------+-----------+
|month   |product   |units_sold |
+--------+----------+-----------+
|2024-01 |Product B |120        |
|2024-02 |Product C |160        |
+--------+----------+-----------+
```


In [None]:
sales = spark.createDataFrame([
    ("2024-01", "Product A", 100),
    ("2024-01", "Product B", 120),
    ("2024-01", "Product C", 90),
    ("2024-02", "Product A", 150),
    ("2024-02", "Product B", 140),
    ("2024-02", "Product C", 160)
], ["month", "product", "units_sold"])

window_spec = Window.partitionBy("month").orderBy(F.col("units_sold").desc())

top_selling_product_per_month = sales.withColumn("rank", F.row_number().over(window_spec))\
                                      .filter(F.col("rank") == 1)\
                                      .select("month", "product", "units_sold")
top_selling_product_per_month.show()

+-------+---------+----------+
|  month|  product|units_sold|
+-------+---------+----------+
|2024-01|Product B|       120|
|2024-02|Product C|       160|
+-------+---------+----------+



### **Q5: Multi-Join with Filtering and Aggregation**

**Scenario:**  
You’re working at a logistics company. You need to compute how many packages each customer received in February 2024 **only from active couriers**.

**Task:**  
Join `customers`, `shipments`, and `couriers` to return a DataFrame with `customer_name`, `courier_name`, and the number of shipments in Feb 2024 only from active couriers.

```python
customers = spark.createDataFrame([
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie")
], ["customer_id", "customer_name"])

couriers = spark.createDataFrame([
    (101, "FastX", True),
    (102, "GoExpress", False),
    (103, "QuickShip", True)
], ["courier_id", "courier_name", "is_active"])

shipments = spark.createDataFrame([
    (1001, 1, 101, "2024-02-01"),
    (1002, 1, 101, "2024-02-15"),
    (1003, 2, 102, "2024-01-10"),
    (1004, 3, 103, "2024-01-30"),
    (1005, 3, 103, "2024-02-12")
], ["shipment_id", "customer_id", "courier_id", "shipment_date"])
```

**Expected Output:**

```
+-------------+-------------+-------------------+
|customer_name|courier_name |num_shipments_feb  |
+-------------+-------------+-------------------+
|Alice        |FastX        |2                  |
|Charlie      |QuickShip    |1                  |
+-------------+-------------+-------------------+
```

In [None]:
customers = spark.createDataFrame([
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie")
], ["customer_id", "customer_name"])

couriers = spark.createDataFrame([
    (101, "FastX", True),
    (102, "GoExpress", False),
    (103, "QuickShip", True)
], ["courier_id", "courier_name", "is_active"])

shipments = spark.createDataFrame([
    (1001, 1, 101, "2024-02-01"),
    (1002, 1, 101, "2024-02-15"),
    (1003, 2, 102, "2024-01-10"),
    (1004, 3, 103, "2024-01-30"),
    (1005, 3, 103, "2024-02-12")
], ["shipment_id", "customer_id", "courier_id", "shipment_date"])

customers.join(shipments, shipments.customer_id == customers.customer_id, how='inner')\
          .join(couriers, shipments.courier_id == couriers.courier_id, how = 'inner')\
          .filter(F.col("shipment_date").between("2024-02-01", "2024-02-29"))\
          .groupBy(customers.customer_name, couriers.courier_name).count()\
          .show()

+-------------+------------+-----+
|customer_name|courier_name|count|
+-------------+------------+-----+
|      Charlie|   QuickShip|    1|
|        Alice|       FastX|    2|
+-------------+------------+-----+



# 🚩 **Day 4 - 2025/04/19**

### Q1: Product Sales per Category and Price Tier

You're analyzing product sales performance. Group products into price tiers:  
- "Low" for price < 50  
- "Mid" for 50 <= price < 150  
- "High" for price >= 150  

Then compute the total units sold per category and price tier.

```python
products = spark.createDataFrame([
    (101, "Gadget", "Electronics", 49.99),
    (102, "Smartphone", "Electronics", 199.99),
    (103, "Blender", "Home Appliances", 89.99),
    (104, "Microwave", "Home Appliances", 149.99),
    (105, "Notebook", "Stationery", 5.99)
], ["product_id", "product_name", "category", "price"])

sales = spark.createDataFrame([
    (101, 200),
    (102, 150),
    (103, 100),
    (104, 120),
    (105, 300)
], ["product_id", "units_sold"])
```

Expected output:

```
+------------------+----------+------------+
|category          |price_tier|total_units |
+------------------+----------+------------+
|Electronics       |Low       |200         |
|Electronics       |High      |150         |
|Home Appliances   |Mid       |100         |
|Home Appliances   |High      |120         |
|Stationery        |Low       |300         |
+------------------+----------+------------+
```

In [None]:
products = spark.createDataFrame([
    (101, "Gadget", "Electronics", 49.99),
    (102, "Smartphone", "Electronics", 199.99),
    (103, "Blender", "Home Appliances", 89.99),
    (104, "Microwave", "Home Appliances", 149.99),
    (105, "Notebook", "Stationery", 5.99)
], ["product_id", "product_name", "category", "price"])

sales = spark.createDataFrame([
    (101, 200),
    (102, 150),
    (103, 100),
    (104, 120),
    (105, 300)
], ["product_id", "units_sold"])

products_with_tier = products.withColumn("price_tier",
                                                      F.when(F.col("price")<50, "Low")\
                                                      .when(F.col("price").between(50, 150), "Mid")\
                                                      .otherwise("High")
                                                      )

sales.join(products_with_tier, products_with_tier.product_id == sales.product_id, how='inner')\
      .groupBy("category", "price_tier").agg(F.sum(F.col("units_sold")).alias("total_units"))\
      .show()

+---------------+----------+-----------+
|       category|price_tier|total_units|
+---------------+----------+-----------+
|    Electronics|      High|        150|
|    Electronics|       Low|        200|
|     Stationery|       Low|        300|
|Home Appliances|       Mid|        220|
+---------------+----------+-----------+



### Q7: Latest Login per User

You’re maintaining a system that logs user logins. Return only the latest login for each user, along with their device.

```python
logins = spark.createDataFrame([
    (1, "2024-04-01 10:00:00", "Chrome"),
    (1, "2024-04-01 18:00:00", "Firefox"),
    (2, "2024-04-02 09:00:00", "Safari"),
    (3, "2024-04-01 20:00:00", "Edge"),
    (3, "2024-04-03 08:30:00", "Chrome")
], ["user_id", "login_time", "device"])
```

Expected output:

```
+--------+-------------------+--------+
|user_id |login_time         |device  |
+--------+-------------------+--------+
|1       |2024-04-01 18:00:00|Firefox |
|2       |2024-04-02 09:00:00|Safari  |
|3       |2024-04-03 08:30:00|Chrome  |
+--------+-------------------+--------+
```

In [6]:
logins = spark.createDataFrame([
    (1, "2024-04-01 10:00:00", "Chrome"),
    (1, "2024-04-01 18:00:00", "Firefox"),
    (2, "2024-04-02 09:00:00", "Safari"),
    (3, "2024-04-01 20:00:00", "Edge"),
    (3, "2024-04-03 08:30:00", "Chrome")
], ["user_id", "login_time", "device"])


window_spec = Window.partitionBy("user_id").orderBy(F.col("login_time").desc())

latest_logins = logins.withColumn("row_num", F.row_number().over(window_spec))\
                      .filter(F.col("row_num")==1)\
                      .select("user_id", "login_time", "device")

latest_logins.show()

+-------+-------------------+-------+
|user_id|         login_time| device|
+-------+-------------------+-------+
|      1|2024-04-01 18:00:00|Firefox|
|      2|2024-04-02 09:00:00| Safari|
|      3|2024-04-03 08:30:00| Chrome|
+-------+-------------------+-------+



### Q8: Customer Retention Calculation

You’re analyzing whether customers placed orders in multiple months. For each customer, return the number of distinct months in which they placed orders.

```python
orders = spark.createDataFrame([
    (1, "2024-01-15"),
    (1, "2024-02-20"),
    (2, "2024-01-10"),
    (3, "2024-02-12"),
    (3, "2024-02-28"),
    (3, "2024-03-01")
], ["customer_id", "order_date"])
```

Expected output:

```
+-------------+------------------+
|customer_id  |months_active     |
+-------------+------------------+
|1            |2                 |
|2            |1                 |
|3            |2                 |
+-------------+------------------+
```


In [9]:
orders = spark.createDataFrame([
    (1, "2024-01-15"),
    (1, "2024-02-20"),
    (2, "2024-01-10"),
    (3, "2024-02-12"),
    (3, "2024-02-28"),
    (3, "2024-03-01")
], ["customer_id", "order_date"])

orders_with_date_dims = orders.withColumn("month", F.month(F.col("order_date")))\
                              .withColumn("year", F.year(F.col("order_date")))

months_active = orders_with_date_dims.groupBy("customer_id").agg(F.count_distinct("month").alias("months_active"))

months_active.show()

+-----------+-------------+
|customer_id|months_active|
+-----------+-------------+
|          1|            2|
|          3|            2|
|          2|            1|
+-----------+-------------+



### Q9: Top 2 Products by Revenue per Category

You need to extract the top 2 products in each category based on total revenue (`price × units_sold`).

```python
products = spark.createDataFrame([
    (1, "Product A", "Books", 10),
    (2, "Product B", "Books", 15),
    (3, "Product C", "Books", 20),
    (4, "Product D", "Electronics", 200),
    (5, "Product E", "Electronics", 180)
], ["product_id", "product_name", "category", "price"])

sales = spark.createDataFrame([
    (1, 100),
    (2, 150),
    (3, 50),
    (4, 10),
    (5, 15)
], ["product_id", "units_sold"])
```

Expected output:

```
+----------+------------+---------+
|category  |product_name|revenue  |
+----------+------------+---------+
|Books     |Product B   |2250     |
|Books     |Product A   |1000     |
|Electronics|Product E  |2700     |
|Electronics|Product D  |2000     |
+----------+------------+---------+
```


In [14]:
products = spark.createDataFrame([
    (1, "Product A", "Books", 10),
    (2, "Product B", "Books", 15),
    (3, "Product C", "Books", 20),
    (4, "Product D", "Electronics", 200),
    (5, "Product E", "Electronics", 180)
], ["product_id", "product_name", "category", "price"])

sales = spark.createDataFrame([
    (1, 100),
    (2, 150),
    (3, 50),
    (4, 10),
    (5, 15)
], ["product_id", "units_sold"])

# rename to avoid ambigious reference
products_for_join = products.withColumnRenamed("product_id", "id")

window_spec = Window.orderBy(F.col("revenue").desc()).partitionBy("category")

revenue = products_for_join.join(sales, sales.product_id == products_for_join.id, how = 'inner')\
                  .withColumn("revenue", F.col("units_sold") * F.col("price"))\
                  .withColumn("revenue_rank", F.row_number().over(window_spec))\
                  .filter(F.col("revenue_rank").between(1,2))\
                  .select("category", "product_name", "revenue")

revenue.show()

+-----------+------------+-------+
|   category|product_name|revenue|
+-----------+------------+-------+
|      Books|   Product B|   2250|
|      Books|   Product A|   1000|
|Electronics|   Product E|   2700|
|Electronics|   Product D|   2000|
+-----------+------------+-------+



### Q10: Booking Gap Analysis

A hotel wants to analyze gaps in room bookings. For each room, return the number of days between consecutive bookings (ordered by start date). Include bookings that have gaps only (i.e., if the next booking doesn't start the day after the previous ends).

```python
from pyspark.sql.functions import to_date

bookings = spark.createDataFrame([
    (101, "2024-04-01", "2024-04-03"),
    (101, "2024-04-05", "2024-04-07"),
    (101, "2024-04-07", "2024-04-09"),
    (102, "2024-04-01", "2024-04-02"),
    (102, "2024-04-10", "2024-04-12")
], ["room_id", "start_date", "end_date"])
```

Expected output:

```
+--------+----------------+----------------+-----------+
|room_id |prev_end_date   |next_start_date |gap_days   |
+--------+----------------+----------------+-----------+
|101     |2024-04-03      |2024-04-05      |2          |
|102     |2024-04-02      |2024-04-10      |8          |
+--------+----------------+----------------+-----------+
```

In [24]:
bookings = spark.createDataFrame([
    (101, "2024-04-01", "2024-04-03"),
    (101, "2024-04-05", "2024-04-07"),
    (101, "2024-04-07", "2024-04-09"),
    (102, "2024-04-01", "2024-04-02"),
    (102, "2024-04-10", "2024-04-12")
], ["room_id", "start_date", "end_date"])

window_spec = Window.orderBy(F.col("start_date")).partitionBy("room_id")

bookings.withColumn("next_start_date", F.lead(F.col("start_date"), 1).over(window_spec))\
        .withColumn("days_gap", F.date_diff("next_start_date", "end_date"))\
        .filter((F.col("days_gap")!=0)&((F.col("days_gap").isNotNull())))\
        .select("room_id", F.col("end_date").alias("prev_end_date"), "next_start_date", "days_gap")\
        .show()

+-------+-------------+---------------+--------+
|room_id|prev_end_date|next_start_date|days_gap|
+-------+-------------+---------------+--------+
|    101|   2024-04-03|     2024-04-05|       2|
|    102|   2024-04-02|     2024-04-10|       8|
+-------+-------------+---------------+--------+

