# PySpark Intermediate Level Questions for Practice

In [1]:
# Initialize Spark Session
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
import datetime
import random

spark = SparkSession.builder \
    .appName("Data Processing Application") \
    .master("local[*]") \
    .getOrCreate()


## 1. Customer Insights - Top 5 Cities by Number of Customers

In [2]:
# Sample Data
customers_data = [
    (1, "john@example.com", "New York"),
    (2, "sarah@gmail.com", "London"),
    (3, "mike@yahoo.com", "New York"),
    (4, "lisa@example.com", "Paris"),
    (5, "dave@outlook.com", "London"),
    (6, "anna@gmail.com", "Berlin")
]
customers = spark.createDataFrame(customers_data, ["customer_id", "email", "city"])
customers.show(5)

+-----------+----------------+--------+
|customer_id|           email|    city|
+-----------+----------------+--------+
|          1|john@example.com|New York|
|          2| sarah@gmail.com|  London|
|          3|  mike@yahoo.com|New York|
|          4|lisa@example.com|   Paris|
|          5|dave@outlook.com|  London|
+-----------+----------------+--------+
only showing top 5 rows



In [3]:
top_cities = customers.groupBy("city").count().orderBy(desc("count")).limit(5)
top_cities.show(5)

+--------+-----+
|    city|count|
+--------+-----+
|  London|    2|
|New York|    2|
|  Berlin|    1|
|   Paris|    1|
+--------+-----+



## 2. Sales Analysis - Remove Rows with Null Invoice Numbers

In [4]:
# Sample Data
sales_data = [
    (1001, "A101", 25.99),
    (None, "B202", 49.99),
    (1003, None, 15.49),
    (1004, "C303", 75.00)
]
sales = spark.createDataFrame(sales_data, ["invoice_id", "product_id", "amount"])
sales.show()

+----------+----------+------+
|invoice_id|product_id|amount|
+----------+----------+------+
|      1001|      A101| 25.99|
|      NULL|      B202| 49.99|
|      1003|      NULL| 15.49|
|      1004|      C303|  75.0|
+----------+----------+------+



In [5]:
clean_sales = sales.dropna(subset=["invoice_id"])
clean_sales.show()

+----------+----------+------+
|invoice_id|product_id|amount|
+----------+----------+------+
|      1001|      A101| 25.99|
|      1003|      NULL| 15.49|
|      1004|      C303|  75.0|
+----------+----------+------+



## 3. Marketing - Extract Email Domain from Email Addresses

In [6]:
email_domains = customers.withColumn("domain", split(col("email"), "@").getItem(1))
email_domains.select("email", "domain").show()

+----------------+-----------+
|           email|     domain|
+----------------+-----------+
|john@example.com|example.com|
| sarah@gmail.com|  gmail.com|
|  mike@yahoo.com|  yahoo.com|
|lisa@example.com|example.com|
|dave@outlook.com|outlook.com|
|  anna@gmail.com|  gmail.com|
+----------------+-----------+



In [7]:
user_name  = customers.withColumn("user_name",split(col("email"),"@").getItem(0))
user_name.show()

+-----------+----------------+--------+---------+
|customer_id|           email|    city|user_name|
+-----------+----------------+--------+---------+
|          1|john@example.com|New York|     john|
|          2| sarah@gmail.com|  London|    sarah|
|          3|  mike@yahoo.com|New York|     mike|
|          4|lisa@example.com|   Paris|     lisa|
|          5|dave@outlook.com|  London|     dave|
|          6|  anna@gmail.com|  Berlin|     anna|
+-----------+----------------+--------+---------+



## 4. Retail: Products with Zero Quantity Sold

In [8]:
# Sample Data
inventory_data = [
    ("P100", "Phone", 0),
    ("P200", "Laptop", 12),
    ("P300", "Tablet", 0),
    ("P400", "Monitor", 5)
]
inventory = spark.createDataFrame(inventory_data, ["product_id", "product_name", "quantity_sold"])
inventory.show()

+----------+------------+-------------+
|product_id|product_name|quantity_sold|
+----------+------------+-------------+
|      P100|       Phone|            0|
|      P200|      Laptop|           12|
|      P300|      Tablet|            0|
|      P400|     Monitor|            5|
+----------+------------+-------------+



In [9]:
inventory.select("product_name").where(col("quantity_sold") == 0).show()

+------------+
|product_name|
+------------+
|       Phone|
|      Tablet|
+------------+



## 5. E-commerce: Customers with No Orders

In [10]:
# Sample Data
orders_data = [(101, 1, "2023-10-01"), (102, 3, "2023-10-05")]
orders = spark.createDataFrame(orders_data, ["order_id", "customer_id", "order_date"])
orders.show()

+--------+-----------+----------+
|order_id|customer_id|order_date|
+--------+-----------+----------+
|     101|          1|2023-10-01|
|     102|          3|2023-10-05|
+--------+-----------+----------+



In [11]:
customers.join(orders, "customer_id","left_anti").select("customer_id").show()

+-----------+
|customer_id|
+-----------+
|          2|
|          6|
|          5|
|          4|
+-----------+



## 6. Finance: High-Value Transactions (>$10K)

In [12]:
# Sample Data
transactions_data = [
    (5001, 1, 9500.0),
    (5002, 2, 12000.0),
    (5003, 3, 8500.0),
    (5004, 4, 15000.0)
]
transactions = spark.createDataFrame(transactions_data, ["txn_id", "customer_id", "amount"])
transactions.show()

+------+-----------+-------+
|txn_id|customer_id| amount|
+------+-----------+-------+
|  5001|          1| 9500.0|
|  5002|          2|12000.0|
|  5003|          3| 8500.0|
|  5004|          4|15000.0|
+------+-----------+-------+



In [13]:
transactions.select("txn_id","customer_id").where(col("amount")>10000).show()

+------+-----------+
|txn_id|customer_id|
+------+-----------+
|  5002|          2|
|  5004|          4|
+------+-----------+



## 7. Product Analysis: Top Products per Region

In [14]:
# Sample Data
region_sales_data = [
    ("North", "Phone", 150),
    ("North", "Laptop", 200),
    ("South", "Phone", 300),
    ("South", "Tablet", 100),
    ("East", "Laptop", 250),
    ("West", "Phone", 180)
]
region_sales = spark.createDataFrame(region_sales_data, ["region", "product", "sales"])
region_sales.show()

+------+-------+-----+
|region|product|sales|
+------+-------+-----+
| North|  Phone|  150|
| North| Laptop|  200|
| South|  Phone|  300|
| South| Tablet|  100|
|  East| Laptop|  250|
|  West|  Phone|  180|
+------+-------+-----+



In [15]:
window_spec = Window.partitionBy("region").orderBy(desc("sales"))
top_products = region_sales.withColumn("rank", row_number().over(window_spec)).filter(col("rank")==1)
top_products.show()

+------+-------+-----+----+
|region|product|sales|rank|
+------+-------+-----+----+
|  East| Laptop|  250|   1|
| North| Laptop|  200|   1|
| South|  Phone|  300|   1|
|  West|  Phone|  180|   1|
+------+-------+-----+----+



## 8. Logistics: Total Weight by Country

In [16]:
# Sample Data
shipments_data = [
    ("USA", "S1001", 120),
    ("USA", "S1002", 85),
    ("UK", "S2001", 45),
    ("DE", "S3001", 210)
]
shipments = spark.createDataFrame(shipments_data, ["country", "shipment_id", "weight_kg"])
shipments.show()

+-------+-----------+---------+
|country|shipment_id|weight_kg|
+-------+-----------+---------+
|    USA|      S1001|      120|
|    USA|      S1002|       85|
|     UK|      S2001|       45|
|     DE|      S3001|      210|
+-------+-----------+---------+



In [17]:
total_weight = shipments.groupBy("country").agg(sum("weight_kg").alias("total_weight"))
total_weight.show()

+-------+------------+
|country|total_weight|
+-------+------------+
|    USA|         205|
|     DE|         210|
|     UK|          45|
+-------+------------+



## 9. Customer Behavior: Age Group Segmentation

In [18]:
# Sample Data
users_data = [
    (1, 17), (2, 25), (3, 32), (4, 16),
    (5, 45), (6, 67), (7, 28), (8, 14)
]
users = spark.createDataFrame(users_data, ["user_id", "age"])
users.show()

+-------+---+
|user_id|age|
+-------+---+
|      1| 17|
|      2| 25|
|      3| 32|
|      4| 16|
|      5| 45|
|      6| 67|
|      7| 28|
|      8| 14|
+-------+---+



In [19]:
segment_df = users.withColumn("age_group",when(col('age')<12,"Child").when(col('age')<21,"Teen").otherwise("adult"))
segment_df.show()

+-------+---+---------+
|user_id|age|age_group|
+-------+---+---------+
|      1| 17|     Teen|
|      2| 25|    adult|
|      3| 32|    adult|
|      4| 16|     Teen|
|      5| 45|    adult|
|      6| 67|    adult|
|      7| 28|    adult|
|      8| 14|     Teen|
+-------+---+---------+



## 10. Support Analysis: Tickets Resolved in 24h

In [20]:
# Sample Data
tickets_data = [
    ("T001", "2023-10-01 09:00:00", "2023-10-01 10:30:00"),
    ("T002", "2023-10-02 14:00:00", "2023-10-03 15:00:00"),
    ("T003", "2023-10-03 11:00:00", "2023-10-03 11:45:00")
]
tickets = spark.createDataFrame(tickets_data, ["ticket_id", "created_time", "resolved_time"]) \
    .withColumn("created_time", to_timestamp(col("created_time"))) \
    .withColumn("resolved_time", to_timestamp(col("resolved_time")))
tickets.show()

+---------+-------------------+-------------------+
|ticket_id|       created_time|      resolved_time|
+---------+-------------------+-------------------+
|     T001|2023-10-01 09:00:00|2023-10-01 10:30:00|
|     T002|2023-10-02 14:00:00|2023-10-03 15:00:00|
|     T003|2023-10-03 11:00:00|2023-10-03 11:45:00|
+---------+-------------------+-------------------+



In [23]:
tickets = tickets.withColumn("resolution_time",datediff(col("resolved_time"),col("created_time"))).filter(col("resolution_time")<=24)
tickets.show()

+---------+-------------------+-------------------+---------------+
|ticket_id|       created_time|      resolved_time|resolution_time|
+---------+-------------------+-------------------+---------------+
|     T001|2023-10-01 09:00:00|2023-10-01 10:30:00|              0|
|     T003|2023-10-03 11:00:00|2023-10-03 11:45:00|              0|
+---------+-------------------+-------------------+---------------+



## 11. Store Analytics: Average Basket Size

In [24]:
# Sample Data
transactions_data = [
    ("StoreA", "T1001", 5),
    ("StoreA", "T1002", 8),
    ("StoreB", "T2001", 3),
    ("StoreB", "T2002", 12)
]
transactions = spark.createDataFrame(transactions_data, ["store_id", "txn_id", "items"])
transactions.show()


+--------+------+-----+
|store_id|txn_id|items|
+--------+------+-----+
|  StoreA| T1001|    5|
|  StoreA| T1002|    8|
|  StoreB| T2001|    3|
|  StoreB| T2002|   12|
+--------+------+-----+



In [25]:
transactions.groupBy("store_id").agg(avg("items").alias("avg_basket_size")).show()

+--------+---------------+
|store_id|avg_basket_size|
+--------+---------------+
|  StoreA|            6.5|
|  StoreB|            7.5|
+--------+---------------+



## 12. Healthcare: Patients with Missing Insurance

In [26]:
# Sample Data
patients_data = [
    ("P001", "John Doe", "ABC Insurance"),
    ("P002", "Jane Smith", None),
    ("P003", "Bob Lee", "XYZ Insurance"),
    ("P004", "Alice Kim", None)
]
patients = spark.createDataFrame(patients_data, ["patient_id", "name", "insurance"])
patients.show()


+----------+----------+-------------+
|patient_id|      name|    insurance|
+----------+----------+-------------+
|      P001|  John Doe|ABC Insurance|
|      P002|Jane Smith|         NULL|
|      P003|   Bob Lee|XYZ Insurance|
|      P004| Alice Kim|         NULL|
+----------+----------+-------------+



In [27]:
patients.filter(col("insurance").isNull()).select("patient_id","name").show()

+----------+----------+
|patient_id|      name|
+----------+----------+
|      P002|Jane Smith|
|      P004| Alice Kim|
+----------+----------+



## 13. Education: Average Score by Subject

In [28]:
# Sample Data
scores_data = [
    ("Math", "SchoolA", 85),
    ("Math", "SchoolB", 78),
    ("Science", "SchoolA", 92),
    ("Science", "SchoolB", 88)
]
scores = spark.createDataFrame(scores_data, ["subject", "school", "avg_score"])
scores.show()


+-------+-------+---------+
|subject| school|avg_score|
+-------+-------+---------+
|   Math|SchoolA|       85|
|   Math|SchoolB|       78|
|Science|SchoolA|       92|
|Science|SchoolB|       88|
+-------+-------+---------+



In [29]:
scores.groupBy('subject').agg(avg("avg_score").alias("avg_score")).show()

+-------+---------+
|subject|avg_score|
+-------+---------+
|   Math|     81.5|
|Science|     90.0|
+-------+---------+



## 14. Employee Management: Small Departments

In [30]:
# Sample Data
employees_data = [
    ("IT", "E001"), ("IT", "E002"), ("HR", "E003"),
    ("Finance", "E004"), ("Finance", "E005"), ("Finance", "E006")
]
employees = spark.createDataFrame(employees_data, ["dept", "employee_id"])
employees.show()

+-------+-----------+
|   dept|employee_id|
+-------+-----------+
|     IT|       E001|
|     IT|       E002|
|     HR|       E003|
|Finance|       E004|
|Finance|       E005|
|Finance|       E006|
+-------+-----------+



In [31]:
employees.groupBy('dept').agg(count("employee_id").alias('total_employees')).filter(col("total_employees")<3).show()

+----+---------------+
|dept|total_employees|
+----+---------------+
|  HR|              1|
|  IT|              2|
+----+---------------+



## 15. Subscription Services: Expiring Subscriptions

In [32]:
from datetime import date

# Sample Data
subscriptions_data = [
    ("U001", date(2023, 10, 20)),
    ("U002", date(2024, 11, 15)),
    ("U003", date(2025, 10, 30)),
    ("U004", date(2025, 11, 5))
]
subscriptions = spark.createDataFrame(subscriptions_data, ["user_id", "expiry_date"])
subscriptions.show()

+-------+-----------+
|user_id|expiry_date|
+-------+-----------+
|   U001| 2023-10-20|
|   U002| 2024-11-15|
|   U003| 2025-10-30|
|   U004| 2025-11-05|
+-------+-----------+



In [33]:
subscriptions.filter(col("expiry_date")<=date.today()).show()

+-------+-----------+
|user_id|expiry_date|
+-------+-----------+
|   U001| 2023-10-20|
|   U002| 2024-11-15|
+-------+-----------+



## 16. Sales Forecasting: 7-Day Moving Average

In [34]:
# Sample Data
daily_sales_data = [
    ("2023-10-01", 120), ("2023-10-02", 150), ("2023-10-03", 180),
    ("2023-10-04", 90), ("2023-10-05", 200), ("2023-10-06", 170),
    ("2023-10-07", 210), ("2023-10-08", 190)
]
daily_sales = spark.createDataFrame(daily_sales_data, ["date", "sales"])
daily_sales.show()

+----------+-----+
|      date|sales|
+----------+-----+
|2023-10-01|  120|
|2023-10-02|  150|
|2023-10-03|  180|
|2023-10-04|   90|
|2023-10-05|  200|
|2023-10-06|  170|
|2023-10-07|  210|
|2023-10-08|  190|
+----------+-----+



In [35]:
window_spec = Window.orderBy("date").rowsBetween(-6, 0)
moving_avg = daily_sales.withColumn("7d_moving_Avg", round(avg("sales").over(window_spec), 2))
moving_avg.show()

+----------+-----+-------------+
|      date|sales|7d_moving_Avg|
+----------+-----+-------------+
|2023-10-01|  120|        120.0|
|2023-10-02|  150|        135.0|
|2023-10-03|  180|        150.0|
|2023-10-04|   90|        135.0|
|2023-10-05|  200|        148.0|
|2023-10-06|  170|       151.67|
|2023-10-07|  210|        160.0|
|2023-10-08|  190|        170.0|
+----------+-----+-------------+



## 17. Web Analytics: Top Pages per Session

In [36]:
# Sample Data
web_logs_data = [
    ("session1", "/home", 10), ("session1", "/products", 25),
    ("session1", "/cart", 30), ("session2", "/home", 5),
    ("session2", "/about", 45), ("session3", "/home", 8)
]
web_logs = spark.createDataFrame(web_logs_data, ["session_id", "page", "duration_sec"])
web_logs.show()

+----------+---------+------------+
|session_id|     page|duration_sec|
+----------+---------+------------+
|  session1|    /home|          10|
|  session1|/products|          25|
|  session1|    /cart|          30|
|  session2|    /home|           5|
|  session2|   /about|          45|
|  session3|    /home|           8|
+----------+---------+------------+



In [37]:
window_spec = Window.partitionBy("session_id").orderBy(desc("duration_sec"))
top_pages = web_logs.withColumn("rank", rank().over(window_spec)).filter(col("rank")==1)
top_pages.show()

+----------+------+------------+----+
|session_id|  page|duration_sec|rank|
+----------+------+------------+----+
|  session1| /cart|          30|   1|
|  session2|/about|          45|   1|
|  session3| /home|           8|   1|
+----------+------+------------+----+



## 18. Streaming Platform: Total Watch Time

In [38]:
# Sample Data
watch_data = [
    ("user1", 45), ("user1", 30), ("user2", 120),
    ("user3", 90), ("user3", 60), ("user3", 45)
]
watch_logs = spark.createDataFrame(watch_data, ["user_id", "duration_min"])
watch_logs.show()

+-------+------------+
|user_id|duration_min|
+-------+------------+
|  user1|          45|
|  user1|          30|
|  user2|         120|
|  user3|          90|
|  user3|          60|
|  user3|          45|
+-------+------------+



In [39]:
watch_logs.groupBy("user_id").agg(sum("duration_min").alias("total_watch_time_in_min")).show()

+-------+-----------------------+
|user_id|total_watch_time_in_min|
+-------+-----------------------+
|  user1|                     75|
|  user2|                    120|
|  user3|                    195|
+-------+-----------------------+



## 19. B2B SaaS: Churned Customers

In [40]:
# Sample Data
customers_data = [
    ("C001", date(2025, 4, 1)),
    ("C002", date(2025, 5, 15)),
    ("C003", date(2025, 6, 5)),
    ("C004", date(2025, 7, 20))
]
customers = spark.createDataFrame(customers_data, ["customer_id", "last_login"])
customers.show()

+-----------+----------+
|customer_id|last_login|
+-----------+----------+
|       C001|2025-04-01|
|       C002|2025-05-15|
|       C003|2025-06-05|
|       C004|2025-07-20|
+-----------+----------+



In [41]:
customers.filter(datediff(current_date(),col("last_login"))>=90).show()

+-----------+----------+
|customer_id|last_login|
+-----------+----------+
|       C001|2025-04-01|
+-----------+----------+



## 20. Job Portal: Location-Based Matching

In [42]:
# Sample Data
job_seekers_data = [("JS1", "NY"), ("JS2", "CA"), ("JS3", "TX")]
job_postings_data = [("JP1", "NY"), ("JP2", "CA"), ("JP3", "FL")]

job_seekers = spark.createDataFrame(job_seekers_data, ["seeker_id", "location"])
job_postings = spark.createDataFrame(job_postings_data, ["job_id", "location"])

job_seekers.show()

+---------+--------+
|seeker_id|location|
+---------+--------+
|      JS1|      NY|
|      JS2|      CA|
|      JS3|      TX|
+---------+--------+



In [43]:
job_postings.show()

+------+--------+
|job_id|location|
+------+--------+
|   JP1|      NY|
|   JP2|      CA|
|   JP3|      FL|
+------+--------+



In [44]:
job_postings.join(job_seekers,on='location',how='inner').show()

+--------+------+---------+
|location|job_id|seeker_id|
+--------+------+---------+
|      CA|   JP2|      JS2|
|      NY|   JP1|      JS1|
+--------+------+---------+



## 21. Manufacturing: Machine Failure Detection

In [45]:
# Sample Data
machine_logs_data = [
    ("M001", "2025-06-22 08:00:00", "failure"),
    ("M001", "2025-06-22 12:00:00", "failure"),
    ("M001", "2025-06-21 09:00:00", "failure"),
    ("M002", "2025-06-22 10:00:00", "failure")
]
machine_logs = spark.createDataFrame(machine_logs_data, ["machine_id", "timestamp", "status"])
machine_logs.show()


+----------+-------------------+-------+
|machine_id|          timestamp| status|
+----------+-------------------+-------+
|      M001|2025-06-22 08:00:00|failure|
|      M001|2025-06-22 12:00:00|failure|
|      M001|2025-06-21 09:00:00|failure|
|      M002|2025-06-22 10:00:00|failure|
+----------+-------------------+-------+



In [46]:
# Solution
window_spec = Window.partitionBy("machine_id").orderBy("timestamp")
failure_counts = machine_logs.withColumn("prev_failure", lag("timestamp").over(window_spec)) \
    .withColumn("time_diff",
        (col("timestamp").cast("long") - col("prev_failure").cast("long")) / 3600) \
    .filter(col("time_diff") <= 24) \
    .groupBy("machine_id") \
    .agg(count("*").alias("failure_count")) \
    .filter(col("failure_count") >= 2)
failure_counts.show()

+----------+-------------+
|machine_id|failure_count|
+----------+-------------+
+----------+-------------+



## 22. Insurance: Claim-to-Premium Ratio

In [47]:
# Sample Data
insurance_data = [
    ("Health", 1000, 5000), ("Health", 1500, 5000),
    ("Auto", 2000, 3000), ("Life", 3000, 10000)
]
insurance = spark.createDataFrame(insurance_data, ["policy_type", "claim_amount", "premium"])
insurance.show()

+-----------+------------+-------+
|policy_type|claim_amount|premium|
+-----------+------------+-------+
|     Health|        1000|   5000|
|     Health|        1500|   5000|
|       Auto|        2000|   3000|
|       Life|        3000|  10000|
+-----------+------------+-------+



In [48]:
insurance.withColumn("claim-to-premium_ration",round(col("claim_amount")/col("premium"),2)).show()

+-----------+------------+-------+-----------------------+
|policy_type|claim_amount|premium|claim-to-premium_ration|
+-----------+------------+-------+-----------------------+
|     Health|        1000|   5000|                    0.2|
|     Health|        1500|   5000|                    0.3|
|       Auto|        2000|   3000|                   0.67|
|       Life|        3000|  10000|                    0.3|
+-----------+------------+-------+-----------------------+



In [49]:
# according to policy type
insurance.groupBy("policy_type").agg(round(sum("claim_amount")/sum("premium"),2).alias("claim_to_premium_ratio")).show()

+-----------+----------------------+
|policy_type|claim_to_premium_ratio|
+-----------+----------------------+
|     Health|                  0.25|
|       Life|                   0.3|
|       Auto|                  0.67|
+-----------+----------------------+



## 23. Content Platform: Binge Watchers

In [51]:
# Sample Data
watch_data = [
    ("user1", "showA", 1, "2023-10-01 20:00:00"),
    ("user1", "showA", 2, "2023-10-01 20:30:00"),
    ("user1", "showA", 3, "2023-10-01 21:00:00"),
    ("user2", "showB", 1, "2023-10-01 19:00:00"),
    ("user2", "showB", 2, "2023-10-02 19:30:00")
]
watch_logs = spark.createDataFrame(watch_data, ["user_id", "show_id", "episode", "timestamp"])

In [52]:
watch_logs.show()

+-------+-------+-------+-------------------+
|user_id|show_id|episode|          timestamp|
+-------+-------+-------+-------------------+
|  user1|  showA|      1|2023-10-01 20:00:00|
|  user1|  showA|      2|2023-10-01 20:30:00|
|  user1|  showA|      3|2023-10-01 21:00:00|
|  user2|  showB|      1|2023-10-01 19:00:00|
|  user2|  showB|      2|2023-10-02 19:30:00|
+-------+-------+-------+-------------------+



In [53]:
window_spec = Window.partitionBy("user_id", "show_id").orderBy("timestamp")
binge = watch_logs.withColumn("prev_episode_time", lag("timestamp").over(window_spec))
binge = binge.withColumn("time_diff_hours", (col("timestamp").cast("timestamp").cast("long") - col("prev_episode_time").cast("timestamp").cast("long")) / 3600)
binge.show()

+-------+-------+-------+-------------------+-------------------+---------------+
|user_id|show_id|episode|          timestamp|  prev_episode_time|time_diff_hours|
+-------+-------+-------+-------------------+-------------------+---------------+
|  user1|  showA|      1|2023-10-01 20:00:00|               NULL|           NULL|
|  user1|  showA|      2|2023-10-01 20:30:00|2023-10-01 20:00:00|            0.5|
|  user1|  showA|      3|2023-10-01 21:00:00|2023-10-01 20:30:00|            0.5|
|  user2|  showB|      1|2023-10-01 19:00:00|               NULL|           NULL|
|  user2|  showB|      2|2023-10-02 19:30:00|2023-10-01 19:00:00|           24.5|
+-------+-------+-------+-------------------+-------------------+---------------+



In [54]:
binge.filter(col("time_diff_hours")<=1).select("user_id","show_id").distinct().show()

+-------+-------+
|user_id|show_id|
+-------+-------+
|  user1|  showA|
+-------+-------+



## 24. Healthcare: Inconsistent Records

In [55]:
# Sample Data
visits_data = [
    ("P001", "2023-10-01 10:00:00", "2023-10-01 15:00:00"),
    ("P002", "2023-10-02 09:00:00", "2023-10-01 14:00:00"),  # Inconsistent
    ("P003", "2023-10-03 11:00:00", "2023-10-03 16:00:00")
]
visits = spark.createDataFrame(visits_data, ["patient_id", "admit_time", "discharge_time"]) \
    .withColumn("admit_time", to_timestamp(col("admit_time"))) \
    .withColumn("discharge_time", to_timestamp(col("discharge_time")))

visits.show()

+----------+-------------------+-------------------+
|patient_id|         admit_time|     discharge_time|
+----------+-------------------+-------------------+
|      P001|2023-10-01 10:00:00|2023-10-01 15:00:00|
|      P002|2023-10-02 09:00:00|2023-10-01 14:00:00|
|      P003|2023-10-03 11:00:00|2023-10-03 16:00:00|
+----------+-------------------+-------------------+



In [56]:
visits.filter(col("admit_time") > col("discharge_time")).show()

+----------+-------------------+-------------------+
|patient_id|         admit_time|     discharge_time|
+----------+-------------------+-------------------+
|      P002|2023-10-02 09:00:00|2023-10-01 14:00:00|
+----------+-------------------+-------------------+



## 25. Retail: Weekly Repeat Customers

In [57]:
# Sample Data
purchases_data = [
    ("C001", "2023-10-01"), ("C001", "2023-10-06"),
    ("C002", "2023-10-02"), ("C003", "2023-10-01"),
    ("C003", "2023-10-06"), ("C003", "2023-10-08")
]
purchases = spark.createDataFrame(purchases_data, ["customer_id", "purchase_date"]) \
    .withColumn("purchase_date", to_date(col("purchase_date")))

purchases.show()

+-----------+-------------+
|customer_id|purchase_date|
+-----------+-------------+
|       C001|   2023-10-01|
|       C001|   2023-10-06|
|       C002|   2023-10-02|
|       C003|   2023-10-01|
|       C003|   2023-10-06|
|       C003|   2023-10-08|
+-----------+-------------+



In [58]:
purchases = purchases.withColumn("week",weekofyear(col("purchase_date")))
purchases.groupBy("customer_id","week").agg(count("*").alias("visits")).filter(col("visits")>1).show()

+-----------+----+------+
|customer_id|week|visits|
+-----------+----+------+
|       C003|  40|     2|
+-----------+----+------+



## 26. HR Analytics: Employee Tenure

In [59]:
# Sample Data
employees_data = [
    ("E001", date(2020, 5, 15)),
    ("E002", date(2022, 1, 10)),
    ("E003", date(2018, 11, 20))
]
employees = spark.createDataFrame(employees_data, ["employee_id", "join_date"])
employees.show()

+-----------+----------+
|employee_id| join_date|
+-----------+----------+
|       E001|2020-05-15|
|       E002|2022-01-10|
|       E003|2018-11-20|
+-----------+----------+



In [60]:
employees.withColumn("Employee_Tenure",year(current_date()) - year(col("join_date"))).show()

+-----------+----------+---------------+
|employee_id| join_date|Employee_Tenure|
+-----------+----------+---------------+
|       E001|2020-05-15|              5|
|       E002|2022-01-10|              3|
|       E003|2018-11-20|              7|
+-----------+----------+---------------+



In [61]:
# Solution
tenure = employees.withColumn("tenure_months",
    months_between(current_date(), col("join_date")))
tenure.show()

+-----------+----------+-------------+
|employee_id| join_date|tenure_months|
+-----------+----------+-------------+
|       E001|2020-05-15|  62.77419355|
|       E002|2022-01-10|  42.93548387|
|       E003|2018-11-20|  80.61290323|
+-----------+----------+-------------+



## 27. Travel Agency: Destination Revenue

In [62]:
# Sample Data
bookings_data = [("B001", "Paris", 2), ("B002", "Tokyo", 3)]
packages_data = [("Paris", 1200), ("Tokyo", 1500)]

bookings = spark.createDataFrame(bookings_data, ["booking_id", "destination", "num_travelers"])
packages = spark.createDataFrame(packages_data, ["destination", "price_per_person"])



In [63]:
bookings.show()

+----------+-----------+-------------+
|booking_id|destination|num_travelers|
+----------+-----------+-------------+
|      B001|      Paris|            2|
|      B002|      Tokyo|            3|
+----------+-----------+-------------+



In [64]:
packages.show()

+-----------+----------------+
|destination|price_per_person|
+-----------+----------------+
|      Paris|            1200|
|      Tokyo|            1500|
+-----------+----------------+



In [65]:
revenue = bookings.join(packages,on='destination',how='inner').withColumn("revenue",col("num_travelers")*col("price_per_person")).groupBy("destination").agg(sum("revenue").alias("total_revenue"))
revenue.show()

+-----------+-------------+
|destination|total_revenue|
+-----------+-------------+
|      Paris|         2400|
|      Tokyo|         4500|
+-----------+-------------+



## 28. Ride-Sharing: Driver Cancellation Rate

In [66]:
# Sample Data
trips_data = [
    ("D001", "completed"), ("D001", "canceled"), ("D001", "completed"),
    ("D002", "completed"), ("D002", "completed"), ("D003", "canceled")
]
trips = spark.createDataFrame(trips_data, ["driver_id", "status"])
trips.show()

+---------+---------+
|driver_id|   status|
+---------+---------+
|     D001|completed|
|     D001| canceled|
|     D001|completed|
|     D002|completed|
|     D002|completed|
|     D003| canceled|
+---------+---------+



In [67]:
total_trips = trips.groupBy("driver_id").agg(count("*").alias("total_trips"))

In [68]:
canceled_tirps = trips.groupBy("driver_id").agg(count(when(col("status")=='canceled', 1)).alias("canceled_trips"))

In [69]:
cancellation_rates = total_trips.join(canceled_tirps,on='driver_id',how='inner').withColumn('cancelllation_rate',round(100 * col("canceled_trips")/col("total_trips"),2))
cancellation_rates.show()

+---------+-----------+--------------+------------------+
|driver_id|total_trips|canceled_trips|cancelllation_rate|
+---------+-----------+--------------+------------------+
|     D001|          3|             1|             33.33|
|     D002|          2|             0|               0.0|
|     D003|          1|             1|             100.0|
+---------+-----------+--------------+------------------+



## 29. Warehouse: Inventory Change Tracking

In [70]:
# Sample Data
inventory_data = [
    ("2023-10-01", "P100", 50),
    ("2023-10-02", "P100", 45),
    ("2023-10-03", "P100", 40),
    ("2023-10-01", "P200", 100),
    ("2023-10-02", "P200", 120),
    ("2023-10-03", "P200", 110)
]
inventory = spark.createDataFrame(inventory_data, ["date", "product_id", "stock"])
inventory.show()

+----------+----------+-----+
|      date|product_id|stock|
+----------+----------+-----+
|2023-10-01|      P100|   50|
|2023-10-02|      P100|   45|
|2023-10-03|      P100|   40|
|2023-10-01|      P200|  100|
|2023-10-02|      P200|  120|
|2023-10-03|      P200|  110|
+----------+----------+-----+



In [71]:
window_spec = Window.partitionBy("product_id").orderBy("date")
invendtory_change = inventory.withColumn("prev_stock",lag("stock").over(window_spec)).withColumn("stock_change",col("stock")-col("prev_stock"))
invendtory_change.show()

+----------+----------+-----+----------+------------+
|      date|product_id|stock|prev_stock|stock_change|
+----------+----------+-----+----------+------------+
|2023-10-01|      P100|   50|      NULL|        NULL|
|2023-10-02|      P100|   45|        50|          -5|
|2023-10-03|      P100|   40|        45|          -5|
|2023-10-01|      P200|  100|      NULL|        NULL|
|2023-10-02|      P200|  120|       100|          20|
|2023-10-03|      P200|  110|       120|         -10|
+----------+----------+-----+----------+------------+



## 30. Education: Performance Improvement

In [72]:
# Sample Data
scores_data = [
    ("S001", "Math", 1, 75), ("S001", "Math", 2, 85),
    ("S002", "Science", 1, 80), ("S002", "Science", 2, 70),
    ("S003", "Math", 1, 60), ("S003", "Math", 2, 75)
]
scores = spark.createDataFrame(scores_data, ["student_id", "subject", "exam", "score"])
scores.show()


+----------+-------+----+-----+
|student_id|subject|exam|score|
+----------+-------+----+-----+
|      S001|   Math|   1|   75|
|      S001|   Math|   2|   85|
|      S002|Science|   1|   80|
|      S002|Science|   2|   70|
|      S003|   Math|   1|   60|
|      S003|   Math|   2|   75|
+----------+-------+----+-----+



In [73]:
window_spec = Window.partitionBy("student_id","subject").orderBy("exam")
score_change = scores.withColumn("prev_score",lag("score").over(window_spec)).withColumn("improvement",col("score")-col("prev_score"))
score_change.show()

+----------+-------+----+-----+----------+-----------+
|student_id|subject|exam|score|prev_score|improvement|
+----------+-------+----+-----+----------+-----------+
|      S001|   Math|   1|   75|      NULL|       NULL|
|      S001|   Math|   2|   85|        75|         10|
|      S002|Science|   1|   80|      NULL|       NULL|
|      S002|Science|   2|   70|        80|        -10|
|      S003|   Math|   1|   60|      NULL|       NULL|
|      S003|   Math|   2|   75|        60|         15|
+----------+-------+----+-----+----------+-----------+



In [74]:
score_change.filter(col("improvement")>0).show()

+----------+-------+----+-----+----------+-----------+
|student_id|subject|exam|score|prev_score|improvement|
+----------+-------+----+-----+----------+-----------+
|      S001|   Math|   2|   85|        75|         10|
|      S003|   Math|   2|   75|        60|         15|
+----------+-------+----+-----+----------+-----------+



## 31. Retail Banking: Monthly Spend Increase

In [75]:
# Sample Data
spend_data = [
    ("U100", "2023-01", 1000), ("U100", "2023-02", 1800),
    ("U200", "2023-01", 500), ("U200", "2023-02", 700),
    ("U300", "2023-01", 2000), ("U300", "2023-02", 2100)
]
spend = spark.createDataFrame(spend_data, ["user_id", "month", "amount"])

spend.show()

+-------+-------+------+
|user_id|  month|amount|
+-------+-------+------+
|   U100|2023-01|  1000|
|   U100|2023-02|  1800|
|   U200|2023-01|   500|
|   U200|2023-02|   700|
|   U300|2023-01|  2000|
|   U300|2023-02|  2100|
+-------+-------+------+



In [76]:
window_spec = Window.partitionBy("user_id").orderBy("month")
spend_increase = spend.withColumn("prev_month_amount",lag("amount").over(window_spec)).withColumn("change",col("amount")-col("prev_month_amount")).withColumn("pct_change",100*round(col("change")/col("prev_month_amount"),2))
spend_increase.filter(col("pct_change")>50).show()

+-------+-------+------+-----------------+------+----------+
|user_id|  month|amount|prev_month_amount|change|pct_change|
+-------+-------+------+-----------------+------+----------+
|   U100|2023-02|  1800|             1000|   800|      80.0|
+-------+-------+------+-----------------+------+----------+



## 32. IoT Sensors: Out-of-Range Detection

In [77]:
# Sample Data
sensor_data = [
    ("S001", 25.5), ("S002", 65.2), ("S003", 42.0),
    ("S004", 75.5), ("S005", 18.0), ("S006", 95.0)
]
sensors = spark.createDataFrame(sensor_data, ["sensor_id", "temperature"])
sensors.show()

+---------+-----------+
|sensor_id|temperature|
+---------+-----------+
|     S001|       25.5|
|     S002|       65.2|
|     S003|       42.0|
|     S004|       75.5|
|     S005|       18.0|
|     S006|       95.0|
+---------+-----------+



In [79]:
sensor_out_range = sensors.filter((col("temperature") > 60) | (col("temperature") < 20))
sensor_out_range.show()

+---------+-----------+
|sensor_id|temperature|
+---------+-----------+
|     S002|       65.2|
|     S004|       75.5|
|     S005|       18.0|
|     S006|       95.0|
+---------+-----------+



## 33. Sales Analysis: Duplicate Product Listings

In [80]:
# Sample Data
products_data = [
    ("Phone X", 999, "Latest model"),
    ("Phone X", 999, "Latest model"),
    ("Laptop Pro", 1500, "High performance"),
    ("Phone X", 899, "Refurbished model")
]
products = spark.createDataFrame(products_data, ["name", "price", "description"])

products.show()

+----------+-----+-----------------+
|      name|price|      description|
+----------+-----+-----------------+
|   Phone X|  999|     Latest model|
|   Phone X|  999|     Latest model|
|Laptop Pro| 1500| High performance|
|   Phone X|  899|Refurbished model|
+----------+-----+-----------------+



In [81]:
products.groupBy("name","price","description").count().show()

+----------+-----+-----------------+-----+
|      name|price|      description|count|
+----------+-----+-----------------+-----+
|   Phone X|  999|     Latest model|    2|
|Laptop Pro| 1500| High performance|    1|
|   Phone X|  899|Refurbished model|    1|
+----------+-----+-----------------+-----+



## 34. Finance: PAN Masking UDF

In [82]:
# Sample Data
users_data = [("U001", "ABCDE1234F"), ("U002", "FGHIJ5678K")]
users = spark.createDataFrame(users_data, ["user_id", "pan"])
users.show()


+-------+----------+
|user_id|       pan|
+-------+----------+
|   U001|ABCDE1234F|
|   U002|FGHIJ5678K|
+-------+----------+



In [83]:
def pan_mask(pan):
  return pan[:2]+"XXXXXX"+pan[-2:]


mask_udf = udf(pan_mask,StringType()) # save udf name along with input parameter data type
masked_users = users.withColumn("masked_pan",mask_udf(col("pan"))) # call the variable with passing value to input parameter
masked_users.show()

+-------+----------+----------+
|user_id|       pan|masked_pan|
+-------+----------+----------+
|   U001|ABCDE1234F|ABXXXXXX4F|
|   U002|FGHIJ5678K|FGXXXXXX8K|
+-------+----------+----------+



## 35. Energy Sector: Daily Consumption

In [84]:
# Sample Data
energy_data = [
    ("2023-10-01", "North", 2500),
    ("2023-10-01", "South", 3200),
    ("2023-10-02", "North", 2600),
    ("2023-10-02", "South", 3100)
]
energy = spark.createDataFrame(energy_data, ["date", "region", "kwh"])

energy.show()

+----------+------+----+
|      date|region| kwh|
+----------+------+----+
|2023-10-01| North|2500|
|2023-10-01| South|3200|
|2023-10-02| North|2600|
|2023-10-02| South|3100|
+----------+------+----+



In [85]:
energy.groupBy("date").agg(sum("kwh").alias("daily_consumption")).show()

+----------+-----------------+
|      date|daily_consumption|
+----------+-----------------+
|2023-10-01|             5700|
|2023-10-02|             5700|
+----------+-----------------+



## 36. Data Governance: Schema Drift Detection

In [None]:
# Sample schemas
schema_v1 = StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType())
])

schema_v2 = StructType([
    StructField("id", IntegerType()),
    StructField("full_name", StringType()),  # Changed column
    StructField("email", StringType())  # New column
])

# Detection function
def detect_schema_drift(current_schema, new_schema):
    current_fields = {f.name: f.dataType for f in current_schema}
    new_fields = {f.name: f.dataType for f in new_schema}

    added = [col for col in new_fields if col not in current_fields]
    removed = [col for col in current_fields if col not in new_fields]
    changed = [
        col for col in current_fields
        if col in new_fields and current_fields[col] != new_fields[col]
    ]

    return {"added": added, "removed": removed, "changed": changed}

# Test detection
drift_report = detect_schema_drift(schema_v1, schema_v2)
print(f"Schema Drift Report: {drift_report}")

Schema Drift Report: {'added': ['full_name', 'email'], 'removed': ['name'], 'changed': []}


## 37. Power BI: Sales Aggregation Pipeline

In [87]:
# Raw data
raw_sales_data = [
    (1001, "2023-10-01", "P100", 2, 25.99),
    (1002, "2023-10-01", "P200", 1, 49.99),
    (1003, "2023-10-02", "P100", 3, 25.99)
]
raw_sales = spark.createDataFrame(raw_sales_data, ["order_id", "date", "product_id", "qty", "price"])

# Silver layer (cleaned)
silver_sales = raw_sales.dropna().filter(col("qty") > 0)

# Gold layer (aggregated)
gold_sales = silver_sales.groupBy("date", "product_id").agg(
    sum("qty").alias("total_qty"),
    sum(col("qty") * col("price")).alias("total_sales")
)

# Show results
print("Gold Layer:")
gold_sales.show()

Gold Layer:
+----------+----------+---------+-----------+
|      date|product_id|total_qty|total_sales|
+----------+----------+---------+-----------+
|2023-10-01|      P100|        2|      51.98|
|2023-10-02|      P100|        3|      77.97|
|2023-10-01|      P200|        1|      49.99|
+----------+----------+---------+-----------+



## 38. Security Monitoring: Failed Login Detection

In [88]:
# Sample Data
login_data = [
    ("user1", "2023-10-01 09:00:00", "success"),
    ("user1", "2023-10-01 09:00:05", "success"),
    ("user1", "2023-10-01 09:00:10", "fail"),
    ("user1", "2023-10-01 09:00:15", "fail"),
    ("user2", "2023-10-01 10:00:00", "success")
]
logins = spark.createDataFrame(login_data, ["username", "timestamp", "status"]) \
    .withColumn("timestamp", to_timestamp(col("timestamp")))

# Solution
window_spec = Window.partitionBy("username").orderBy("timestamp").rowsBetween(-2, 0)
failed_logins = logins.filter(col("status") == "fail") \
    .withColumn("fail_count", count("*").over(window_spec)) \
    .filter(col("fail_count") >= 2)
failed_logins.show()

+--------+-------------------+------+----------+
|username|          timestamp|status|fail_count|
+--------+-------------------+------+----------+
|   user1|2023-10-01 09:00:15|  fail|         2|
+--------+-------------------+------+----------+



## 39. IoT Analytics: 15-min Aggregations

In [89]:
# Sample Data
sensor_data = [
    ("S001", "2023-10-01 08:05:00", 25.5),
    ("S001", "2023-10-01 08:10:00", 26.0),
    ("S001", "2023-10-01 08:15:00", 25.8),
    ("S001", "2023-10-01 08:20:00", 26.2)
]
sensor_logs = spark.createDataFrame(sensor_data, ["sensor_id", "timestamp", "value"]) \
    .withColumn("timestamp", to_timestamp(col("timestamp")))

# Solution
agg_15min = sensor_logs.groupBy(
    "sensor_id",
    window("timestamp", "15 minutes")
).agg(avg("value").alias("avg_value"))

agg_15min.select("sensor_id", "window.start", "window.end", "avg_value").show()

+---------+-------------------+-------------------+---------+
|sensor_id|              start|                end|avg_value|
+---------+-------------------+-------------------+---------+
|     S001|2023-10-01 08:00:00|2023-10-01 08:15:00|    25.75|
|     S001|2023-10-01 08:15:00|2023-10-01 08:30:00|     26.0|
+---------+-------------------+-------------------+---------+



## 40. Call Center: Peak Call Hours

In [90]:
from pyspark.sql.functions import *

# Corrected Sample Data
calls_data = [
    ("2023-10-01 09:15:00",), ("2023-10-01 09:30:00",),
    ("2023-10-01 10:00:00",), ("2023-10-02 14:15:00",),
    ("2023-10-02 14:30:00",), ("2023-10-02 14:45:00",)
]

calls = spark.createDataFrame(calls_data, ["call_time"]) \
    .withColumn("call_time", to_timestamp(col("call_time")))

# Solution
peak_hours = calls.groupBy(
    dayofweek("call_time").alias("day_of_week"),
    hour("call_time").alias("hour")
).count().orderBy(desc("count"))

peak_hours.show()


+-----------+----+-----+
|day_of_week|hour|count|
+-----------+----+-----+
|          2|  14|    3|
|          1|   9|    2|
|          1|  10|    1|
+-----------+----+-----+



## 41. Marketing Funnel: User Journey

In [91]:
# Sample Data
events_data = [
    ("user1", "view", "2023-10-01 09:00:00"),
    ("user1", "click", "2023-10-01 09:01:00"),
    ("user1", "purchase", "2023-10-01 09:05:00"),
    ("user2", "view", "2023-10-01 10:00:00"),
    ("user3", "view", "2023-10-01 11:00:00"),
    ("user3", "click", "2023-10-01 11:02:00")
]
events = spark.createDataFrame(events_data, ["user_id", "event", "timestamp"])

# Solution - Funnel conversion
funnel_stages = ["view", "click", "purchase"]
funnel_counts = events.groupBy("event").agg(countDistinct("user_id").alias("users")) \
    .filter(col("event").isin(funnel_stages)) \
    .orderBy(expr(f"array_position(array('view','click','purchase'), event)"))

funnel_counts.show()

+--------+-----+
|   event|users|
+--------+-----+
|    view|    3|
|   click|    2|
|purchase|    1|
+--------+-----+



## 42. Sports Analytics: Performance Trends

In [92]:
# Sample Data
player_data = [
    ("PlayerA", "Season1", 85), ("PlayerA", "Season2", 88), ("PlayerA", "Season3", 92),
    ("PlayerB", "Season1", 78), ("PlayerB", "Season2", 75), ("PlayerB", "Season3", 80)
]
players = spark.createDataFrame(player_data, ["player", "season", "avg_score"])

# Solution
window_spec = Window.partitionBy("player").orderBy("season")
trend_analysis = players.withColumn("prev_score", lag("avg_score").over(window_spec)) \
    .filter(col("prev_score").isNotNull()) \
    .withColumn("improvement", col("avg_score") - col("prev_score")) \
    .groupBy("player") \
    .agg(
        avg("improvement").alias("avg_improvement"),
        min("improvement").alias("min_improvement"),
        max("improvement").alias("max_improvement")
    )
trend_analysis.show()

+-------+---------------+---------------+---------------+
| player|avg_improvement|min_improvement|max_improvement|
+-------+---------------+---------------+---------------+
|PlayerA|            3.5|              3|              4|
|PlayerB|            1.0|             -3|              5|
+-------+---------------+---------------+---------------+



## 43. Transportation: Bus Utilization

In [93]:
# Sample Data
bus_data = [
    ("Bus001", 50, 15), ("Bus001", 50, 20),
    ("Bus002", 40, 35), ("Bus003", 60, 10)
]
buses = spark.createDataFrame(bus_data, ["bus_id", "capacity", "passengers"])

# Solution
utilization = buses.withColumn("utilization", col("passengers") / col("capacity")) \
    .groupBy("bus_id") \
    .agg(avg("utilization").alias("avg_utilization")) \
    .filter(col("avg_utilization") < 0.3)
utilization.show()

+------+-------------------+
|bus_id|    avg_utilization|
+------+-------------------+
|Bus003|0.16666666666666666|
+------+-------------------+



## 44. Retail Chains: Price Variance

In [94]:
# Sample Data
pricing_data = [
    ("ProductA", "Store1", 10.99), ("ProductA", "Store2", 9.99),
    ("ProductA", "Store3", 11.49), ("ProductB", "Store1", 24.99),
    ("ProductB", "Store2", 25.49), ("ProductB", "Store3", 24.99)
]
pricing = spark.createDataFrame(pricing_data, ["product", "store", "price"])

pricing.show()

+--------+------+-----+
| product| store|price|
+--------+------+-----+
|ProductA|Store1|10.99|
|ProductA|Store2| 9.99|
|ProductA|Store3|11.49|
|ProductB|Store1|24.99|
|ProductB|Store2|25.49|
|ProductB|Store3|24.99|
+--------+------+-----+



In [95]:
# Solution
price_variance = pricing.groupBy("product").agg(
    stddev("price").alias("price_stddev"),
    (max("price") - min("price")).alias("price_range")
)
price_variance.show()

+--------+------------------+-----------+
| product|      price_stddev|price_range|
+--------+------------------+-----------+
|ProductA|0.7637626158259734|        1.5|
|ProductB|0.2886751345948129|        0.5|
+--------+------------------+-----------+



## 45. Cloud Billing: Cost Spikes

In [96]:
# Sample Data
billing_data = [
    ("2023-40", "Compute", 1000), ("2023-41", "Compute", 3000),
    ("2023-40", "Storage", 500), ("2023-41", "Storage", 550)
]
billing = spark.createDataFrame(billing_data, ["week", "service", "cost"])

billing.show()

+-------+-------+----+
|   week|service|cost|
+-------+-------+----+
|2023-40|Compute|1000|
|2023-41|Compute|3000|
|2023-40|Storage| 500|
|2023-41|Storage| 550|
+-------+-------+----+



In [97]:
# Solution
window_spec = Window.partitionBy("service").orderBy("week")
cost_spikes = billing.withColumn("prev_cost", lag("cost").over(window_spec)) \
    .filter(col("prev_cost").isNotNull()) \
    .withColumn("pct_change", (col("cost") - col("prev_cost")) / col("prev_cost")) \
    .filter(col("pct_change") > 1.0)  # >100% increase
cost_spikes.show()

+-------+-------+----+---------+----------+
|   week|service|cost|prev_cost|pct_change|
+-------+-------+----+---------+----------+
|2023-41|Compute|3000|     1000|       2.0|
+-------+-------+----+---------+----------+



## 46. Customer Experience: NPS Trends

In [98]:
# Sample Data
nps_data = [
    ("2023-10-01", "North", 8), ("2023-10-01", "South", 9),
    ("2023-10-08", "North", 7), ("2023-10-08", "South", 8),
    ("2023-10-15", "North", 9), ("2023-10-15", "South", 9)
]
nps = spark.createDataFrame(nps_data, ["date", "region", "score"])

nps.show()

+----------+------+-----+
|      date|region|score|
+----------+------+-----+
|2023-10-01| North|    8|
|2023-10-01| South|    9|
|2023-10-08| North|    7|
|2023-10-08| South|    8|
|2023-10-15| North|    9|
|2023-10-15| South|    9|
+----------+------+-----+



In [99]:
# Solution
nps_trends = nps.groupBy("region", window("date", "1 week")).agg(
    avg("score").alias("avg_nps")
).orderBy("region", "window.start")
nps_trends.select("region", "window.start", "avg_nps").show()

+------+-------------------+-------+
|region|              start|avg_nps|
+------+-------------------+-------+
| North|2023-09-28 00:00:00|    8.0|
| North|2023-10-05 00:00:00|    7.0|
| North|2023-10-12 00:00:00|    9.0|
| South|2023-09-28 00:00:00|    9.0|
| South|2023-10-05 00:00:00|    8.0|
| South|2023-10-12 00:00:00|    9.0|
+------+-------------------+-------+



In [100]:
spark.stop()