In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructType, StructField, StringType, IntegerType, LongType)
spark = SparkSession.builder.appName("Digital Learning Platform").getOrCreate()

#DATASET 1 — USER REGISTRATION (CORRUPTED SCHEMA)

Exercises

1. Design an explicit schema using StructType
2. Normalize age into IntegerType
3. Normalize skills into ArrayType
4. Handle empty or missing names
5. Produce a clean users_df

In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [3]:
#1
raw_users = [
("U001","Amit","28","Hyderabad","['AI','ML','Cloud']"),
("U002","Neha","Thirty","Delhi","AI,Testing"),
("U003","Ravi",None,"Bangalore",["Data","Spark"]),
("U004","Pooja","29","Mumbai",None),
("U005","", "31","Chennai","['DevOps']")]

user_schema = StructType([
StructField("user_id", StringType(), True),
StructField("name", StringType(), True),
StructField("age_raw", StringType(), True),
StructField("city", StringType(), True),
StructField("skills_raw", StringType(), True)])
df_raw = spark.createDataFrame(raw_users, user_schema)
df_raw.printSchema()
df_raw.show()

root
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age_raw: string (nullable = true)
 |-- city: string (nullable = true)
 |-- skills_raw: string (nullable = true)

+-------+-----+-------+---------+-------------------+
|user_id| name|age_raw|     city|         skills_raw|
+-------+-----+-------+---------+-------------------+
|   U001| Amit|     28|Hyderabad|['AI','ML','Cloud']|
|   U002| Neha| Thirty|    Delhi|         AI,Testing|
|   U003| Ravi|   NULL|Bangalore|      [Data, Spark]|
|   U004|Pooja|     29|   Mumbai|               NULL|
|   U005|     |     31|  Chennai|         ['DevOps']|
+-------+-----+-------+---------+-------------------+



In [4]:
#2
df_age = df_raw.withColumn("age", when(col("age_raw").rlike("^[0-9]+$"),
col("age_raw").cast(IntegerType())).otherwise(None))
df_age.show()

+-------+-----+-------+---------+-------------------+----+
|user_id| name|age_raw|     city|         skills_raw| age|
+-------+-----+-------+---------+-------------------+----+
|   U001| Amit|     28|Hyderabad|['AI','ML','Cloud']|  28|
|   U002| Neha| Thirty|    Delhi|         AI,Testing|NULL|
|   U003| Ravi|   NULL|Bangalore|      [Data, Spark]|NULL|
|   U004|Pooja|     29|   Mumbai|               NULL|  29|
|   U005|     |     31|  Chennai|         ['DevOps']|  31|
+-------+-----+-------+---------+-------------------+----+



In [5]:
#3
df_skills = df_age.withColumn("skills", when(col("skills_raw").isNull(), array())
.when(col("skills_raw").startswith("["), split(regexp_replace(col("skills_raw"), "[\[\]']", ""), ",")).otherwise(split(col("skills_raw"), ",\s")))
df_skills.show()

  .when(col("skills_raw").startswith("["), split(regexp_replace(col("skills_raw"), "[\[\]']", ""), ",")).otherwise(split(col("skills_raw"), ",\s")))
  .when(col("skills_raw").startswith("["), split(regexp_replace(col("skills_raw"), "[\[\]']", ""), ",")).otherwise(split(col("skills_raw"), ",\s")))


+-------+-----+-------+---------+-------------------+----+---------------+
|user_id| name|age_raw|     city|         skills_raw| age|         skills|
+-------+-----+-------+---------+-------------------+----+---------------+
|   U001| Amit|     28|Hyderabad|['AI','ML','Cloud']|  28|[AI, ML, Cloud]|
|   U002| Neha| Thirty|    Delhi|         AI,Testing|NULL|   [AI,Testing]|
|   U003| Ravi|   NULL|Bangalore|      [Data, Spark]|NULL| [Data,  Spark]|
|   U004|Pooja|     29|   Mumbai|               NULL|  29|             []|
|   U005|     |     31|  Chennai|         ['DevOps']|  31|       [DevOps]|
+-------+-----+-------+---------+-------------------+----+---------------+



In [6]:
#4
df_name = df_skills.withColumn("name", when(col("name").isNull() | (trim(col("name")) == ""), lit("Unknown")).otherwise(col("name")))
df_name.show()

+-------+-------+-------+---------+-------------------+----+---------------+
|user_id|   name|age_raw|     city|         skills_raw| age|         skills|
+-------+-------+-------+---------+-------------------+----+---------------+
|   U001|   Amit|     28|Hyderabad|['AI','ML','Cloud']|  28|[AI, ML, Cloud]|
|   U002|   Neha| Thirty|    Delhi|         AI,Testing|NULL|   [AI,Testing]|
|   U003|   Ravi|   NULL|Bangalore|      [Data, Spark]|NULL| [Data,  Spark]|
|   U004|  Pooja|     29|   Mumbai|               NULL|  29|             []|
|   U005|Unknown|     31|  Chennai|         ['DevOps']|  31|       [DevOps]|
+-------+-------+-------+---------+-------------------+----+---------------+



In [7]:
#5
df_clean_users = df_name.select("user_id", "name", "age", "city", "skills")
df_clean_users.show(truncate=False)
df_clean_users.printSchema()

+-------+-------+----+---------+---------------+
|user_id|name   |age |city     |skills         |
+-------+-------+----+---------+---------------+
|U001   |Amit   |28  |Hyderabad|[AI, ML, Cloud]|
|U002   |Neha   |NULL|Delhi    |[AI,Testing]   |
|U003   |Ravi   |NULL|Bangalore|[Data,  Spark] |
|U004   |Pooja  |29  |Mumbai   |[]             |
|U005   |Unknown|31  |Chennai  |[DevOps]       |
+-------+-------+----+---------+---------------+

root
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- skills: array (nullable = true)
 |    |-- element: string (containsNull = false)



#DATASET 2 — COURSE CATALOG (NESTED STRUCT)

Exercises

1. Create nested StructType for course metadata
2. Normalize domain and level
3. Convert price to IntegerType
4. Handle missing prices
5. Produce courses_df

In [8]:
#1
raw_courses = [
("C001","PySpark Mastery",("Data Engineering","Advanced"),"₹9999"),
("C002","AI for Testers",{"domain":"QA","level":"Beginner"},"8999"),
("C003","ML Foundations",("AI","Intermediate"),None),
("C004","Data Engineering Bootcamp","Data|Advanced","₹14999")
]
course_schema = StructType([
    StructField("course_id", StringType(), True),
    StructField("course_name", StringType(), True),
    StructField("metadata_raw", StringType(), True),
    StructField("price_raw", StringType(), True)
])
df_courses_raw = spark.createDataFrame(raw_courses, course_schema)
df_courses_raw.printSchema()

root
 |-- course_id: string (nullable = true)
 |-- course_name: string (nullable = true)
 |-- metadata_raw: string (nullable = true)
 |-- price_raw: string (nullable = true)



In [9]:
#2
df_courses_normalized = df_courses_raw.withColumn("domain", \
    when(col("metadata_raw").startswith("("), regexp_extract(col("metadata_raw"), "\('([^']+)',\s*'([^']+)'\)", 1)) \
    .when(col("metadata_raw").startswith("{"), regexp_extract(col("metadata_raw"), "'domain':\s*'([^']+)'", 1)) \
    .when(col("metadata_raw").contains("|"), split(col("metadata_raw"), "\|").getItem(0)) \
    .otherwise(lit(None))) \
.withColumn("level", \
    when(col("metadata_raw").startswith("("), regexp_extract(col("metadata_raw"), "\('([^']+)',\s*'([^']+)'\)", 2)) \
    .when(col("metadata_raw").startswith("{"), regexp_extract(col("metadata_raw"), "'level':\s*'([^']+)'", 1)) \
    .when(col("metadata_raw").contains("|"), split(col("metadata_raw"), "\|").getItem(1)) \
    .otherwise(lit(None)))

df_courses_normalized.show(truncate=False)
df_courses_normalized.printSchema()

  when(col("metadata_raw").startswith("("), regexp_extract(col("metadata_raw"), "\('([^']+)',\s*'([^']+)'\)", 1)) \
  .when(col("metadata_raw").startswith("{"), regexp_extract(col("metadata_raw"), "'domain':\s*'([^']+)'", 1)) \
  .when(col("metadata_raw").contains("|"), split(col("metadata_raw"), "\|").getItem(0)) \
  when(col("metadata_raw").startswith("("), regexp_extract(col("metadata_raw"), "\('([^']+)',\s*'([^']+)'\)", 2)) \
  .when(col("metadata_raw").startswith("{"), regexp_extract(col("metadata_raw"), "'level':\s*'([^']+)'", 1)) \
  .when(col("metadata_raw").contains("|"), split(col("metadata_raw"), "\|").getItem(1)) \


+---------+-------------------------+----------------------------+---------+------+--------+
|course_id|course_name              |metadata_raw                |price_raw|domain|level   |
+---------+-------------------------+----------------------------+---------+------+--------+
|C001     |PySpark Mastery          |[Ljava.lang.Object;@7b0e82a2|₹9999    |NULL  |NULL    |
|C002     |AI for Testers           |{level=Beginner, domain=QA} |8999     |      |        |
|C003     |ML Foundations           |[Ljava.lang.Object;@73c75bf3|NULL     |NULL  |NULL    |
|C004     |Data Engineering Bootcamp|Data|Advanced               |₹14999   |Data  |Advanced|
+---------+-------------------------+----------------------------+---------+------+--------+

root
 |-- course_id: string (nullable = true)
 |-- course_name: string (nullable = true)
 |-- metadata_raw: string (nullable = true)
 |-- price_raw: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- level: string (nullable = true)



In [10]:
#3
df_price = df_courses_normalized.withColumn("price", regexp_replace(col("price_raw"), "[^0-9]", "").cast(IntegerType()))
df_price.show()

+---------+--------------------+--------------------+---------+------+--------+-----+
|course_id|         course_name|        metadata_raw|price_raw|domain|   level|price|
+---------+--------------------+--------------------+---------+------+--------+-----+
|     C001|     PySpark Mastery|[Ljava.lang.Objec...|    ₹9999|  NULL|    NULL| 9999|
|     C002|      AI for Testers|{level=Beginner, ...|     8999|      |        | 8999|
|     C003|      ML Foundations|[Ljava.lang.Objec...|     NULL|  NULL|    NULL| NULL|
|     C004|Data Engineering ...|       Data|Advanced|   ₹14999|  Data|Advanced|14999|
+---------+--------------------+--------------------+---------+------+--------+-----+



In [11]:
#4
df_price = df_price.fillna({"price":0})
df_price.show()

+---------+--------------------+--------------------+---------+------+--------+-----+
|course_id|         course_name|        metadata_raw|price_raw|domain|   level|price|
+---------+--------------------+--------------------+---------+------+--------+-----+
|     C001|     PySpark Mastery|[Ljava.lang.Objec...|    ₹9999|  NULL|    NULL| 9999|
|     C002|      AI for Testers|{level=Beginner, ...|     8999|      |        | 8999|
|     C003|      ML Foundations|[Ljava.lang.Objec...|     NULL|  NULL|    NULL|    0|
|     C004|Data Engineering ...|       Data|Advanced|   ₹14999|  Data|Advanced|14999|
+---------+--------------------+--------------------+---------+------+--------+-----+



In [12]:
#5
df_courses_clean = df_price.select("course_id", "course_name", "domain", "level", "price")
df_courses_clean.show(truncate=False)
df_courses_clean.printSchema()

+---------+-------------------------+------+--------+-----+
|course_id|course_name              |domain|level   |price|
+---------+-------------------------+------+--------+-----+
|C001     |PySpark Mastery          |NULL  |NULL    |9999 |
|C002     |AI for Testers           |      |        |8999 |
|C003     |ML Foundations           |NULL  |NULL    |0    |
|C004     |Data Engineering Bootcamp|Data  |Advanced|14999|
+---------+-------------------------+------+--------+-----+

root
 |-- course_id: string (nullable = true)
 |-- course_name: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- level: string (nullable = true)
 |-- price: integer (nullable = false)



#DATASET 3 — USER COURSE ENROLLMENTS (JOIN + BROADCAST)

Exercises
1. Normalize enrollment dates
2. Identify invalid enrollments
3. Join with users_df
4. Join with courses_df
5. Decide which table should be broadcast
6. Prove your choice using explain(True)

In [13]:
#1
raw_enrollments = [
("U001","C001","2024-01-05"),
("U002","C002","05/01/2024"),
("U003","C001","2024/01/06"),
("U004","C003","invalid_date"),
("U001","C004","2024-01-10")
]
enroll_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("course_id", StringType(), True),
    StructField("enrollment_date_raw", StringType(), True)
])
df_enrollments_raw = spark.createDataFrame(raw_enrollments, enroll_schema)
df_enrollments_raw.printSchema()
df_enrollments_raw.show()

root
 |-- user_id: string (nullable = true)
 |-- course_id: string (nullable = true)
 |-- enrollment_date_raw: string (nullable = true)

+-------+---------+-------------------+
|user_id|course_id|enrollment_date_raw|
+-------+---------+-------------------+
|   U001|     C001|         2024-01-05|
|   U002|     C002|         05/01/2024|
|   U003|     C001|         2024/01/06|
|   U004|     C003|       invalid_date|
|   U001|     C004|         2024-01-10|
+-------+---------+-------------------+



In [14]:
#2
from pyspark.sql.functions import coalesce, col, to_date, when

df_enrollments_raw = df_enrollments_raw.withColumn(
    "enrollment_date",
    coalesce(
        when(col("enrollment_date_raw").rlike("^\d{4}-\d{2}-\d{2}$"), to_date(col("enrollment_date_raw"), "yyyy-MM-dd")),
        when(col("enrollment_date_raw").rlike("^\d{2}/\d{2}/\d{4}$"), to_date(col("enrollment_date_raw"), "dd/MM/yyyy")),
        when(col("enrollment_date_raw").rlike("^\d{4}/\d{2}/\d{2}$"), to_date(col("enrollment_date_raw"), "yyyy/MM/dd"))
    )
)
df_enrollments_raw.show()

  when(col("enrollment_date_raw").rlike("^\d{4}-\d{2}-\d{2}$"), to_date(col("enrollment_date_raw"), "yyyy-MM-dd")),
  when(col("enrollment_date_raw").rlike("^\d{2}/\d{2}/\d{4}$"), to_date(col("enrollment_date_raw"), "dd/MM/yyyy")),
  when(col("enrollment_date_raw").rlike("^\d{4}/\d{2}/\d{2}$"), to_date(col("enrollment_date_raw"), "yyyy/MM/dd"))


+-------+---------+-------------------+---------------+
|user_id|course_id|enrollment_date_raw|enrollment_date|
+-------+---------+-------------------+---------------+
|   U001|     C001|         2024-01-05|     2024-01-05|
|   U002|     C002|         05/01/2024|     2024-01-05|
|   U003|     C001|         2024/01/06|     2024-01-06|
|   U004|     C003|       invalid_date|           NULL|
|   U001|     C004|         2024-01-10|     2024-01-10|
+-------+---------+-------------------+---------------+



In [15]:
#3
df_enrollments_processed = df_enrollments_raw.drop("enrollment_date_raw")
df_enriched = df_enrollments_processed.join(broadcast(df_courses_clean), on="course_id", how="left")
df_enriched.show()

+---------+-------+---------------+--------------------+------+--------+-----+
|course_id|user_id|enrollment_date|         course_name|domain|   level|price|
+---------+-------+---------------+--------------------+------+--------+-----+
|     C001|   U001|     2024-01-05|     PySpark Mastery|  NULL|    NULL| 9999|
|     C002|   U002|     2024-01-05|      AI for Testers|      |        | 8999|
|     C001|   U003|     2024-01-06|     PySpark Mastery|  NULL|    NULL| 9999|
|     C003|   U004|           NULL|      ML Foundations|  NULL|    NULL|    0|
|     C004|   U001|     2024-01-10|Data Engineering ...|  Data|Advanced|14999|
+---------+-------+---------------+--------------------+------+--------+-----+



In [16]:
#4
df_enriched.show(truncate=False)
df_enriched.printSchema()

+---------+-------+---------------+-------------------------+------+--------+-----+
|course_id|user_id|enrollment_date|course_name              |domain|level   |price|
+---------+-------+---------------+-------------------------+------+--------+-----+
|C001     |U001   |2024-01-05     |PySpark Mastery          |NULL  |NULL    |9999 |
|C002     |U002   |2024-01-05     |AI for Testers           |      |        |8999 |
|C001     |U003   |2024-01-06     |PySpark Mastery          |NULL  |NULL    |9999 |
|C003     |U004   |NULL           |ML Foundations           |NULL  |NULL    |0    |
|C004     |U001   |2024-01-10     |Data Engineering Bootcamp|Data  |Advanced|14999|
+---------+-------+---------------+-------------------------+------+--------+-----+

root
 |-- course_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- enrollment_date: date (nullable = true)
 |-- course_name: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- level: string (nullable = t

In [None]:
#5
# Decision: Broadcast df_courses_clean
# Reasoning: The `df_courses_clean` (course catalog) is expected to be significantly smaller than `df_enrollments_processed` (user enrollments).
# Broadcasting the smaller table to all worker nodes during a join optimizes performance by avoiding a shuffle of the larger DataFrame and reducing network I/O.
# This was already implemented in the previous join: `df_enrollments_processed.join(broadcast(df_courses_clean), on="course_id", how="left")`

In [17]:
#6
df_enriched.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(LeftOuter, [course_id])
:- Project [user_id#190, course_id#191, enrollment_date#203]
:  +- Project [user_id#190, course_id#191, enrollment_date_raw#192, coalesce(CASE WHEN RLIKE(enrollment_date_raw#192, ^\d{4}-\d{2}-\d{2}$) THEN to_date(enrollment_date_raw#192, Some(yyyy-MM-dd), Some(Etc/UTC), true) END, CASE WHEN RLIKE(enrollment_date_raw#192, ^\d{2}/\d{2}/\d{4}$) THEN to_date(enrollment_date_raw#192, Some(dd/MM/yyyy), Some(Etc/UTC), true) END, CASE WHEN RLIKE(enrollment_date_raw#192, ^\d{4}/\d{2}/\d{2}$) THEN to_date(enrollment_date_raw#192, Some(yyyy/MM/dd), Some(Etc/UTC), true) END) AS enrollment_date#203]
:     +- LogicalRDD [user_id#190, course_id#191, enrollment_date_raw#192], false
+- ResolvedHint (strategy=broadcast)
   +- Project [course_id#103, course_name#104, domain#107, level#108, price#151]
      +- Project [course_id#103, course_name#104, metadata_raw#105, price_raw#106, domain#107, level#108, coalesce(price#128, cast(0 as int))

#DATASET 4 — USER ACTIVITY LOGS (ARRAY + MAP)

Exercises
1. Normalize actions into ArrayType
2. Normalize metadata into MapType
3. Handle missing actions safely
4. Explode actions and count frequency
5. Produce activity_df

In [18]:
#1
raw_activity = [
("U001","login,watch,logout","{'device':'mobile','ip':'1.1.1.1'}",120),
("U002",["login","watch"],"device=laptop;ip=2.2.2.2",90),
("U003","login|logout",None,30),
("U004",None,"{'device':'tablet'}",60)
]
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
from pyspark.sql import Row
from pyspark.sql.functions import col, when, split

# 1. Defining the schema for the activity data
activity_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("actions", ArrayType(StringType()), True), # Normalize actions to ArrayType
    StructField("properties", StringType(), True),
    StructField("duration", IntegerType(), True)
])

# 2. Preprocess raw_activity to normalize the 'actions' field
processed_raw_activity = []
for user_id, actions_raw, properties, duration in raw_activity:
    normalized_actions = None
    if actions_raw is None:
        normalized_actions = None
    elif isinstance(actions_raw, list):
        normalized_actions = actions_raw
    elif isinstance(actions_raw, str):
        if ',' in actions_raw:
            normalized_actions = [a.strip() for a in actions_raw.split(',')]
        elif '|' in actions_raw:
            normalized_actions = [a.strip() for a in actions_raw.split('|')]

    processed_raw_activity.append(Row(user_id=user_id, actions=normalized_actions, properties=properties, duration=duration))

# 3. Creating the DataFrame
df_activity = spark.createDataFrame(processed_raw_activity, activity_schema)

df_activity.show(truncate=False)
df_activity.printSchema()

+-------+----------------------+----------------------------------+--------+
|user_id|actions               |properties                        |duration|
+-------+----------------------+----------------------------------+--------+
|U001   |[login, watch, logout]|{'device':'mobile','ip':'1.1.1.1'}|120     |
|U002   |[login, watch]        |device=laptop;ip=2.2.2.2          |90      |
|U003   |[login, logout]       |NULL                              |30      |
|U004   |NULL                  |{'device':'tablet'}               |60      |
+-------+----------------------+----------------------------------+--------+

root
 |-- user_id: string (nullable = true)
 |-- actions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: string (nullable = true)
 |-- duration: integer (nullable = true)



In [19]:
#2
from pyspark.sql.functions import col, when, from_json, regexp_replace, udf
from pyspark.sql.types import MapType, StringType

# Define a UDF to parse custom key-value strings like "device=laptop;ip=2.2.2.2"
def parse_custom_properties(s):
    if s is None:
        return None
    try:
        parts = s.split(';')
        result_map = {}
        for part in parts:
            if '=' in part:
                key, value = part.split('=', 1)
                result_map[key.strip()] = value.strip()
        return result_map
    except Exception:
        return None # Return None for malformed strings

# Register the UDF
parse_custom_properties_udf = udf(parse_custom_properties, MapType(StringType(), StringType()))

df_activity_normalized = df_activity.withColumn(
    "properties",
    when(col("properties").isNull(), None)
    .when(col("properties").startswith("{"), # Check for JSON-like strings (start with '{')
          from_json(regexp_replace(col("properties"), "'", "\""), MapType(StringType(), StringType())))
    .otherwise(parse_custom_properties_udf(col("properties"))) # Handle custom format for others
)

df_activity_normalized.show(truncate=False)
df_activity_normalized.printSchema()

+-------+----------------------+---------------------------------+--------+
|user_id|actions               |properties                       |duration|
+-------+----------------------+---------------------------------+--------+
|U001   |[login, watch, logout]|{device -> mobile, ip -> 1.1.1.1}|120     |
|U002   |[login, watch]        |{device -> laptop, ip -> 2.2.2.2}|90      |
|U003   |[login, logout]       |NULL                             |30      |
|U004   |NULL                  |{device -> tablet}               |60      |
+-------+----------------------+---------------------------------+--------+

root
 |-- user_id: string (nullable = true)
 |-- actions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- duration: integer (nullable = true)



In [20]:
#3
from pyspark.sql.functions import col, when, array

# Handle missing actions safely by replacing NULL with empty array
df_activity_normalized = df_activity_normalized.withColumn(
    "actions",
    when(col("actions").isNull(), array()).otherwise(col("actions"))
)

df_activity_normalized.show(truncate=False)
df_activity_normalized.printSchema()


+-------+----------------------+---------------------------------+--------+
|user_id|actions               |properties                       |duration|
+-------+----------------------+---------------------------------+--------+
|U001   |[login, watch, logout]|{device -> mobile, ip -> 1.1.1.1}|120     |
|U002   |[login, watch]        |{device -> laptop, ip -> 2.2.2.2}|90      |
|U003   |[login, logout]       |NULL                             |30      |
|U004   |[]                    |{device -> tablet}               |60      |
+-------+----------------------+---------------------------------+--------+

root
 |-- user_id: string (nullable = true)
 |-- actions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- duration: integer (nullable = true)



In [21]:
#4
from pyspark.sql.functions import explode, col, count

# Explode the 'actions' array to create a new row for each action
df_exploded_actions = df_activity_normalized.select(col("user_id"), explode(col("actions")).alias("action"))

# Count the frequency of each action
action_frequency = df_exploded_actions.groupBy("action").agg(count("action").alias("frequency"))

# Show the results, ordered by frequency
action_frequency.orderBy(col("frequency").desc()).show()

+------+---------+
|action|frequency|
+------+---------+
| login|        3|
| watch|        2|
|logout|        2|
+------+---------+



In [22]:
#5
activity_df = df_activity_normalized

activity_df.show(truncate=False)
activity_df.printSchema()

+-------+----------------------+---------------------------------+--------+
|user_id|actions               |properties                       |duration|
+-------+----------------------+---------------------------------+--------+
|U001   |[login, watch, logout]|{device -> mobile, ip -> 1.1.1.1}|120     |
|U002   |[login, watch]        |{device -> laptop, ip -> 2.2.2.2}|90      |
|U003   |[login, logout]       |NULL                             |30      |
|U004   |[]                    |{device -> tablet}               |60      |
+-------+----------------------+---------------------------------+--------+

root
 |-- user_id: string (nullable = true)
 |-- actions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- duration: integer (nullable = true)



#DATASET 5 — PAYMENTS (WINDOW + AGGREGATES)

Exercises
1. Convert dates properly
2. Compute total spend per user (GroupBy)
3. Compute running spend per user (Window)
4. Rank users by total spend
5. Compare GroupBy vs Window outputs

In [23]:
#1
raw_payments = [
("U001","2024-01-05",9999),
("U001","2024-01-10",14999),
("U002","2024-01-06",8999),
("U003","2024-01-07",0),
("U004","2024-01-08",7999),
("U001","2024-01-15",1999)
]
from pyspark.sql.functions import to_date, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define schema for raw_payments
payment_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("payment_date_raw", StringType(), True),
    StructField("amount", IntegerType(), True)
])

# Create DataFrame from raw_payments
df_payments = spark.createDataFrame(raw_payments, payment_schema)

# Convert dates properly and drop the raw column
df_payments = df_payments.withColumn("payment_date", to_date(col("payment_date_raw"), "yyyy-MM-dd")) \
                         .drop("payment_date_raw")

df_payments.printSchema()
df_payments.show()

root
 |-- user_id: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- payment_date: date (nullable = true)

+-------+------+------------+
|user_id|amount|payment_date|
+-------+------+------------+
|   U001|  9999|  2024-01-05|
|   U001| 14999|  2024-01-10|
|   U002|  8999|  2024-01-06|
|   U003|     0|  2024-01-07|
|   U004|  7999|  2024-01-08|
|   U001|  1999|  2024-01-15|
+-------+------+------------+



In [24]:
#2
total_spend_per_user = df_payments.groupBy("user_id").sum("amount")
total_spend_per_user.show()

+-------+-----------+
|user_id|sum(amount)|
+-------+-----------+
|   U002|       8999|
|   U001|      26997|
|   U004|       7999|
|   U003|          0|
+-------+-----------+



In [25]:
#3
from pyspark.sql.window import Window

window_spec = Window.partitionBy("user_id").orderBy("payment_date")
running_spend_per_user = df_payments.withColumn("running_spend", sum("amount").over(window_spec))
running_spend_per_user.show()

+-------+------+------------+-------------+
|user_id|amount|payment_date|running_spend|
+-------+------+------------+-------------+
|   U001|  9999|  2024-01-05|         9999|
|   U001| 14999|  2024-01-10|        24998|
|   U001|  1999|  2024-01-15|        26997|
|   U002|  8999|  2024-01-06|         8999|
|   U003|     0|  2024-01-07|            0|
|   U004|  7999|  2024-01-08|         7999|
+-------+------+------------+-------------+



In [26]:
#4
from pyspark.sql.functions import rank, desc
from pyspark.sql.window import Window

window_spec_rank = Window.orderBy(desc("sum(amount)"))

ranked_users_by_total_spend = total_spend_per_user.withColumn("rank", rank().over(window_spec_rank))

ranked_users_by_total_spend.show()

+-------+-----------+----+
|user_id|sum(amount)|rank|
+-------+-----------+----+
|   U001|      26997|   1|
|   U002|       8999|   2|
|   U004|       7999|   3|
|   U003|          0|   4|
+-------+-----------+----+



In [27]:
#5
print("Total Spend Per User (GroupBy):")
total_spend_per_user.show()

print("\nRunning Spend Per User (Window Function):")
running_spend_per_user.show()

Total Spend Per User (GroupBy):
+-------+-----------+
|user_id|sum(amount)|
+-------+-----------+
|   U002|       8999|
|   U001|      26997|
|   U004|       7999|
|   U003|          0|
+-------+-----------+


Running Spend Per User (Window Function):
+-------+------+------------+-------------+
|user_id|amount|payment_date|running_spend|
+-------+------+------------+-------------+
|   U001|  9999|  2024-01-05|         9999|
|   U001| 14999|  2024-01-10|        24998|
|   U001|  1999|  2024-01-15|        26997|
|   U002|  8999|  2024-01-06|         8999|
|   U003|     0|  2024-01-07|            0|
|   U004|  7999|  2024-01-08|         7999|
+-------+------+------------+-------------+



#DATASET 6 — PARTITIONS & PERFORMANCE

Exercises
1. Check default partitions for all DataFrames
2. Repartition enrollments by course_id
3. Coalesce results before writing
4. Write outputs and inspect file counts
5. Explain why repartition caused shuffle

In [28]:
#1
dataframes_to_check = {
    "users_df": df_clean_users,
    "courses_df": df_courses_clean,
    "activity_df": activity_df,
    "df_enrollments_processed": df_enrollments_processed,
    "df_enriched": df_enriched,
    "df_payments": df_payments,
    "total_spend_per_user": total_spend_per_user,
    "running_spend_per_user": running_spend_per_user,
    "ranked_users_by_total_spend": ranked_users_by_total_spend
}

print("--- Number of Partitions for DataFrames ---")
for df_name, df_obj in dataframes_to_check.items():
    print(f"DataFrame: {df_name}, Partitions: {df_obj.rdd.getNumPartitions()}")

--- Number of Partitions for DataFrames ---
DataFrame: users_df, Partitions: 2
DataFrame: courses_df, Partitions: 2
DataFrame: activity_df, Partitions: 2
DataFrame: df_enrollments_processed, Partitions: 2
DataFrame: df_enriched, Partitions: 2
DataFrame: df_payments, Partitions: 2
DataFrame: total_spend_per_user, Partitions: 1
DataFrame: running_spend_per_user, Partitions: 1
DataFrame: ranked_users_by_total_spend, Partitions: 1


In [29]:
#2
repartitioned_enrollments = df_enrollments_processed.repartition("course_id")
print(f"Original df_enrollments_processed partitions: {df_enrollments_processed.rdd.getNumPartitions()}")
print(f"Repartitioned enrollments partitions: {repartitioned_enrollments.rdd.getNumPartitions()}")
repartitioned_enrollments.show()

Original df_enrollments_processed partitions: 2
Repartitioned enrollments partitions: 1
+-------+---------+---------------+
|user_id|course_id|enrollment_date|
+-------+---------+---------------+
|   U001|     C001|     2024-01-05|
|   U002|     C002|     2024-01-05|
|   U004|     C003|           NULL|
|   U001|     C004|     2024-01-10|
|   U003|     C001|     2024-01-06|
+-------+---------+---------------+



In [30]:
#3
coalesced_enrollments = repartitioned_enrollments.coalesce(1)
print(f"Repartitioned enrollments partitions: {repartitioned_enrollments.rdd.getNumPartitions()}")
print(f"Coalesced enrollments partitions: {coalesced_enrollments.rdd.getNumPartitions()}")
coalesced_enrollments.show()

Repartitioned enrollments partitions: 1
Coalesced enrollments partitions: 1
+-------+---------+---------------+
|user_id|course_id|enrollment_date|
+-------+---------+---------------+
|   U001|     C001|     2024-01-05|
|   U002|     C002|     2024-01-05|
|   U004|     C003|           NULL|
|   U001|     C004|     2024-01-10|
|   U003|     C001|     2024-01-06|
+-------+---------+---------------+



In [31]:
#4
import os
import shutil

output_path = "/tmp/coalesced_enrollments_output"

# Clean up previous runs if any
if os.path.exists(output_path):
    shutil.rmtree(output_path)

# Write the coalesced DataFrame to a single Parquet file
# Using mode("overwrite") to handle re-runs smoothly
coalesced_enrollments.write.mode("overwrite").parquet(output_path)

# Inspect file counts
print(f"Contents of {output_path}:")
files = os.listdir(output_path)
for f in files:
    print(f)

# Count only the data files (e.g., .parquet files), excluding _SUCCESS and other metadata
data_files = [f for f in files if f.endswith(".parquet")]
print(f"Number of data files written: {len(data_files)}")

Contents of /tmp/coalesced_enrollments_output:
_SUCCESS
._SUCCESS.crc
.part-00000-8c546d88-ba30-4fa2-8997-0d2a908eae37-c000.snappy.parquet.crc
part-00000-8c546d88-ba30-4fa2-8997-0d2a908eae37-c000.snappy.parquet
Number of data files written: 1


In [None]:
# Repartitioning, especially by a key (like 'course_id' in our case), inherently involves a 'shuffle' operation in Spark.
# A shuffle is a costly operation where data needs to be redistributed across the network among different executors or even within the same executor.
# When you repartition by 'course_id', Spark needs to ensure that all rows with the same 'course_id' are moved to the same new partition.
# To achieve this, it must read all the data, hash the 'course_id' for each row, and then send the row to the appropriate target partition.
# This involves:
# 1. Serialization: Converting data to a format that can be sent over the network.
# 2. Network I/O: Transferring data between different nodes.
# 3. Deserialization: Converting data back into an in-memory format on the receiving side.
# 4. Disk I/O: Often, data is spilled to disk during shuffling if it doesn't fit in memory.
# This is in contrast to transformations like `filter` or `select`, which are 'narrow transformations' and can often be performed on data within existing partitions without moving it.

#DATASET 7 — DAG & OPTIMIZATION

Exercises

1. For each major transformation, run explain(True)
2. Identify:
*  Shuffles
* Sorts
*  Broadcast joins
3. Identify one bad DAG
4. Rewrite pipeline to improve it
5. Justify improvements using physical plan

In [32]:
#1
dataframes_to_explain = {
    "df_clean_users": df_clean_users,
    "df_courses_clean": df_courses_clean,
    "activity_df": activity_df,
    "df_enrollments_processed": df_enrollments_processed,
    "df_enriched": df_enriched,
    "df_payments": df_payments,
    "total_spend_per_user": total_spend_per_user,
    "running_spend_per_user": running_spend_per_user,
    "ranked_users_by_total_spend": ranked_users_by_total_spend
}

for df_name, df_obj in dataframes_to_explain.items():
    print(f"\n--- Explain for DataFrame: {df_name} ---")
    df_obj.explain(True)


--- Explain for DataFrame: df_clean_users ---
== Parsed Logical Plan ==
'Project ['user_id, 'name, 'age, 'city, 'skills]
+- Project [user_id#0, CASE WHEN (isnull(name#1) OR (trim(name#1, None) = )) THEN Unknown ELSE name#1 END AS name#64, age_raw#2, city#3, skills_raw#4, age#21, skills#41]
   +- Project [user_id#0, name#1, age_raw#2, city#3, skills_raw#4, age#21, CASE WHEN isnull(skills_raw#4) THEN cast(array() as array<string>) WHEN StartsWith(skills_raw#4, [) THEN split(regexp_replace(skills_raw#4, [\[\]'], , 1), ,, -1) ELSE split(skills_raw#4, ,\s, -1) END AS skills#41]
      +- Project [user_id#0, name#1, age_raw#2, city#3, skills_raw#4, CASE WHEN RLIKE(age_raw#2, ^[0-9]+$) THEN cast(age_raw#2 as int) ELSE cast(null as int) END AS age#21]
         +- LogicalRDD [user_id#0, name#1, age_raw#2, city#3, skills_raw#4], false

== Analyzed Logical Plan ==
user_id: string, name: string, age: int, city: string, skills: array<string>
Project [user_id#0, name#64, age#21, city#3, skills#41]
+

df_clean_users, df_courses_clean, df_enrollments_processed, df_payments: These DataFrames generally show straightforward Project and Scan ExistingRDD operations. There are no explicit shuffles, sorts, or broadcast joins at these initial transformation stages.

activity_df: This DataFrame uses a Python UDF. While UDFs introduce their own overhead, the core Spark physical plan for this DataFrame doesn't show shuffles, sorts, or broadcast joins.

df_enriched: This is where we clearly see a Broadcast Join! The physical plan contains BroadcastHashJoin (indicating the join strategy) and BroadcastExchange (confirming that df_courses_clean, the smaller table, was sent to all worker nodes).

total_spend_per_user: This DataFrame involves a groupBy operation, which necessitates a Shuffle. You'll see Exchange hashpartitioning in its physical plan.

running_spend_per_user: This DataFrame uses a window function with partitionBy and orderBy. This leads to both a Shuffle (Exchange hashpartitioning to group by user_id) and a Sort (Sort to order by payment_date within each user's partition).

ranked_users_by_total_spend: This DataFrame is notable for multiple performance-impacting operations. It involves:

Shuffles: An Exchange hashpartitioning for the initial aggregation (calculating total spend) and then critically, an Exchange SinglePartition to gather all aggregated data onto a single executor before global ranking. Sorts: A Sort operation is performed on this single partition to establish the global order required for the rank window function.

In [None]:
#3
# Bad DAG identified in `ranked_users_by_total_spend`:
# The physical plan for `ranked_users_by_total_spend` includes an `Exchange SinglePartition` followed by a global `Sort`.
# This means that after computing the total spend per user (which already involves a shuffle for aggregation),
# Spark then gathers ALL the aggregated data into a single partition (`Exchange SinglePartition`) to perform a global sort (`Sort`).
# This design choice, while correct for achieving a global rank, is highly inefficient and becomes a major bottleneck
# for large datasets as it eliminates parallelism and forces all data processing onto a single executor.

In [37]:
#4
good_df = df_enrollments_processed \
.filter(col("enrollment_date").isNotNull())\
.join(broadcast(df_clean_users), "user_id")\
.groupBy("course_id")\
.count()
good_df.show()

+---------+-----+
|course_id|count|
+---------+-----+
|     C001|    2|
|     C002|    1|
|     C004|    1|
+---------+-----+



In [38]:
#5
good_df.explain(True)

== Parsed Logical Plan ==
'Aggregate ['course_id], ['course_id, 'count(1) AS count#489]
+- Project [user_id#190, course_id#191, enrollment_date#203, name#64, age#21, city#3, skills#41]
   +- Join Inner, (user_id#190 = user_id#0)
      :- Filter isnotnull(enrollment_date#203)
      :  +- Project [user_id#190, course_id#191, enrollment_date#203]
      :     +- Project [user_id#190, course_id#191, enrollment_date_raw#192, coalesce(CASE WHEN RLIKE(enrollment_date_raw#192, ^\d{4}-\d{2}-\d{2}$) THEN to_date(enrollment_date_raw#192, Some(yyyy-MM-dd), Some(Etc/UTC), true) END, CASE WHEN RLIKE(enrollment_date_raw#192, ^\d{2}/\d{2}/\d{4}$) THEN to_date(enrollment_date_raw#192, Some(dd/MM/yyyy), Some(Etc/UTC), true) END, CASE WHEN RLIKE(enrollment_date_raw#192, ^\d{4}/\d{2}/\d{2}$) THEN to_date(enrollment_date_raw#192, Some(yyyy/MM/dd), Some(Etc/UTC), true) END) AS enrollment_date#203]
      :        +- LogicalRDD [user_id#190, course_id#191, enrollment_date_raw#192], false
      +- ResolvedHint 