# DATASET 1 — USER REGISTRATION (CORRUPTED SCHEMA)

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark=SparkSession.builder.appName('Struct Type').getOrCreate()

In [1]:
raw_users = [
("U001","Amit","28","Hyderabad","['AI','ML','Cloud']"),
("U002","Neha","Thirty","Delhi","AI,Testing"),
("U003","Ravi",None,"Bangalore",["Data","Spark"]),
("U004","Pooja","29","Mumbai",None),
("U005","", "31","Chennai","['DevOps']")
]

# 1. Design an explicit schema using StructType

In [3]:
from pyspark.sql.types import(
    StructType,
    StructField,
    StringType,
    IntegerType,
    LongType
)

In [4]:
users_schema=StructType([
    StructField("user_id",StringType(),True),
    StructField("name",StringType(),True),
    StructField("age",StringType(),True),
    StructField("city",StringType(),True),
    StructField("interests",StringType(),True)
])

#2. Normalize age into IntegerType

In [5]:
from pyspark.sql.functions import col,when,regexp_replace
from pyspark.sql.types import IntegerType

df_users=spark.createDataFrame(raw_users,users_schema)
df_users.show()

+-------+-----+------+---------+-------------------+
|user_id| name|   age|     city|          interests|
+-------+-----+------+---------+-------------------+
|   U001| Amit|    28|Hyderabad|['AI','ML','Cloud']|
|   U002| Neha|Thirty|    Delhi|         AI,Testing|
|   U003| Ravi|  NULL|Bangalore|      [Data, Spark]|
|   U004|Pooja|    29|   Mumbai|               NULL|
|   U005|     |    31|  Chennai|         ['DevOps']|
+-------+-----+------+---------+-------------------+



In [8]:
from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType

df_users_age_normalized = df_users.withColumn(
    "age",
    when(col("age").rlike("^[0-9]+$"), col("age").cast(IntegerType()))
    .otherwise(None)
)

df_users_age_normalized.show()
df_users_age_normalized.printSchema()

+-------+-----+----+---------+-------------------+
|user_id| name| age|     city|          interests|
+-------+-----+----+---------+-------------------+
|   U001| Amit|  28|Hyderabad|['AI','ML','Cloud']|
|   U002| Neha|NULL|    Delhi|         AI,Testing|
|   U003| Ravi|NULL|Bangalore|      [Data, Spark]|
|   U004|Pooja|  29|   Mumbai|               NULL|
|   U005|     |  31|  Chennai|         ['DevOps']|
+-------+-----+----+---------+-------------------+

root
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- interests: string (nullable = true)



# 3. Normalize skills into ArrayType

In [9]:
from pyspark.sql.functions import col, split, regexp_replace, when, trim, expr
from pyspark.sql.types import ArrayType, StringType

df_users_skills_normalized = df_users_age_normalized.withColumn(
    "cleaned_interests_str",
    when(
        col("interests").isNotNull(),
        regexp_replace(
            regexp_replace(
                col("interests"),
                "^\[|\]$", ""
            ),
            "'", ""
        )
    ).otherwise(None)
).withColumn(
    "interests",
    when(
        col("cleaned_interests_str").isNotNull(),
        split(col("cleaned_interests_str"), ",")
    ).otherwise(None)
).withColumn(
    "interests",
    when(
        col("interests").isNotNull(),

        expr("filter(transform(interests, x -> trim(x)), x -> x != '')")
    ).otherwise(None)
).drop("cleaned_interests_str")

df_users_skills_normalized.show()
df_users_skills_normalized.printSchema()

  "^\[|\]$", ""


+-------+-----+----+---------+---------------+
|user_id| name| age|     city|      interests|
+-------+-----+----+---------+---------------+
|   U001| Amit|  28|Hyderabad|[AI, ML, Cloud]|
|   U002| Neha|NULL|    Delhi|  [AI, Testing]|
|   U003| Ravi|NULL|Bangalore|  [Data, Spark]|
|   U004|Pooja|  29|   Mumbai|           NULL|
|   U005|     |  31|  Chennai|       [DevOps]|
+-------+-----+----+---------+---------------+

root
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- interests: array (nullable = true)
 |    |-- element: string (containsNull = false)



# 4. Handle empty or missing names

In [10]:
from pyspark.sql.functions import col, when

df_users_names_handled = df_users_skills_normalized.withColumn(
    "name",
    when(col("name").isNull() | (col("name") == ""), "Unknown")
    .otherwise(col("name"))
)

df_users_names_handled.show()
df_users_names_handled.printSchema()

+-------+-------+----+---------+---------------+
|user_id|   name| age|     city|      interests|
+-------+-------+----+---------+---------------+
|   U001|   Amit|  28|Hyderabad|[AI, ML, Cloud]|
|   U002|   Neha|NULL|    Delhi|  [AI, Testing]|
|   U003|   Ravi|NULL|Bangalore|  [Data, Spark]|
|   U004|  Pooja|  29|   Mumbai|           NULL|
|   U005|Unknown|  31|  Chennai|       [DevOps]|
+-------+-------+----+---------+---------------+

root
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- interests: array (nullable = true)
 |    |-- element: string (containsNull = false)



# 5. Produce a clean users_df

In [11]:
users_df = df_users_names_handled

users_df.show()
users_df.printSchema()

+-------+-------+----+---------+---------------+
|user_id|   name| age|     city|      interests|
+-------+-------+----+---------+---------------+
|   U001|   Amit|  28|Hyderabad|[AI, ML, Cloud]|
|   U002|   Neha|NULL|    Delhi|  [AI, Testing]|
|   U003|   Ravi|NULL|Bangalore|  [Data, Spark]|
|   U004|  Pooja|  29|   Mumbai|           NULL|
|   U005|Unknown|  31|  Chennai|       [DevOps]|
+-------+-------+----+---------+---------------+

root
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- interests: array (nullable = true)
 |    |-- element: string (containsNull = false)



# DATASET 2 — COURSE CATALOG (NESTED STRUCT)

In [12]:
raw_courses = [
("C001","PySpark Mastery",("Data Engineering","Advanced"),"₹9999"),
("C002","AI for Testers",{"domain":"QA","level":"Beginner"},"8999"),
("C003","ML Foundations",("AI","Intermediate"),None),
("C004","Data Engineering Bootcamp","Data|Advanced","₹14999")
]

# 1. Create nested StructType for course metadata

In [13]:
from pyspark.sql.types import StructType, StructField, StringType

course_metadata_schema = StructType([
    StructField("domain", StringType(), True),
    StructField("level", StringType(), True)
])

course_schema = StructType([
    StructField("course_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("metadata", course_metadata_schema, True),
    StructField("price", StringType(), True)
])

# 2. Normalize domain and level

In [14]:
from pyspark.sql.functions import col, when, split, struct, lit
from pyspark.sql.types import StringType, StructType, StructField, Row

# Preprocess raw_courses to explicitly create Row objects for the nested metadata struct
processed_raw_courses = []
for course_id, title, metadata_raw, price in raw_courses:
    domain = None
    level = None

    if isinstance(metadata_raw, tuple) and len(metadata_raw) == 2:
        # Handle tuple format: ("Data Engineering","Advanced")
        domain = metadata_raw[0]
        level = metadata_raw[1]
    elif isinstance(metadata_raw, dict):
        # Handle dictionary format: {"domain":"QA","level":"Beginner"}
        domain = metadata_raw.get("domain")
        level = metadata_raw.get("level")
    elif isinstance(metadata_raw, str):
        # Handle pipe-separated string format: "Data|Advanced"
        parts = metadata_raw.split('|')
        if len(parts) == 2:
            domain = parts[0]
            level = parts[1]

    # Create a Row for the metadata field, or None if no valid metadata was found
    metadata_struct_row = Row(domain=domain, level=level) if (domain is not None or level is not None) else None

    # Append the processed Row for the entire course entry
    processed_raw_courses.append(Row(course_id=course_id, title=title, metadata=metadata_struct_row, price=price))

# Create the DataFrame using the preprocessed list and the defined course_schema
df_courses_normalized = spark.createDataFrame(processed_raw_courses, course_schema)

df_courses_normalized.show(truncate=False)
df_courses_normalized.printSchema()

+---------+-------------------------+----------------------------+------+
|course_id|title                    |metadata                    |price |
+---------+-------------------------+----------------------------+------+
|C001     |PySpark Mastery          |{Data Engineering, Advanced}|₹9999 |
|C002     |AI for Testers           |{QA, Beginner}              |8999  |
|C003     |ML Foundations           |{AI, Intermediate}          |NULL  |
|C004     |Data Engineering Bootcamp|{Data, Advanced}            |₹14999|
+---------+-------------------------+----------------------------+------+

root
 |-- course_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- metadata: struct (nullable = true)
 |    |-- domain: string (nullable = true)
 |    |-- level: string (nullable = true)
 |-- price: string (nullable = true)



# 3. Convert price to IntegerType

In [15]:
from pyspark.sql.functions import col, regexp_replace, when
from pyspark.sql.types import IntegerType

# Convert price to IntegerType by removing currency symbols and casting
df_courses_final = df_courses_normalized.withColumn(
    "price",
    when(col("price").isNotNull(),
         regexp_replace(col("price"), "₹", "").cast(IntegerType())
    ).otherwise(None)
)

df_courses_final.show()
df_courses_final.printSchema()

+---------+--------------------+--------------------+-----+
|course_id|               title|            metadata|price|
+---------+--------------------+--------------------+-----+
|     C001|     PySpark Mastery|{Data Engineering...| 9999|
|     C002|      AI for Testers|      {QA, Beginner}| 8999|
|     C003|      ML Foundations|  {AI, Intermediate}| NULL|
|     C004|Data Engineering ...|    {Data, Advanced}|14999|
+---------+--------------------+--------------------+-----+

root
 |-- course_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- metadata: struct (nullable = true)
 |    |-- domain: string (nullable = true)
 |    |-- level: string (nullable = true)
 |-- price: integer (nullable = true)



# 4. Handle missing prices

In [16]:
from pyspark.sql.functions import col

# Fill missing prices (NULL values) with 0
df_courses_final = df_courses_final.withColumn(
    "price",
    when(col("price").isNull(), 0).otherwise(col("price"))
)

# Alternatively, a more concise way using fillna:
# df_courses_final = df_courses_final.fillna(0, subset=["price"])

df_courses_final.show()
df_courses_final.printSchema()

+---------+--------------------+--------------------+-----+
|course_id|               title|            metadata|price|
+---------+--------------------+--------------------+-----+
|     C001|     PySpark Mastery|{Data Engineering...| 9999|
|     C002|      AI for Testers|      {QA, Beginner}| 8999|
|     C003|      ML Foundations|  {AI, Intermediate}|    0|
|     C004|Data Engineering ...|    {Data, Advanced}|14999|
+---------+--------------------+--------------------+-----+

root
 |-- course_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- metadata: struct (nullable = true)
 |    |-- domain: string (nullable = true)
 |    |-- level: string (nullable = true)
 |-- price: integer (nullable = true)



# 5. Produce courses_df

In [17]:
courses_df = df_courses_final

courses_df.show()
courses_df.printSchema()

+---------+--------------------+--------------------+-----+
|course_id|               title|            metadata|price|
+---------+--------------------+--------------------+-----+
|     C001|     PySpark Mastery|{Data Engineering...| 9999|
|     C002|      AI for Testers|      {QA, Beginner}| 8999|
|     C003|      ML Foundations|  {AI, Intermediate}|    0|
|     C004|Data Engineering ...|    {Data, Advanced}|14999|
+---------+--------------------+--------------------+-----+

root
 |-- course_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- metadata: struct (nullable = true)
 |    |-- domain: string (nullable = true)
 |    |-- level: string (nullable = true)
 |-- price: integer (nullable = true)



# DATASET 4 — USER ACTIVITY LOGS (ARRAY +MAP)

In [19]:
raw_activity = [
("U001","login,watch,logout","{'device':'mobile','ip':'1.1.1.1'}",120),
("U002",["login","watch"],"device=laptop;ip=2.2.2.2",90),
("U003","login|logout",None,30),
("U004",None,"{'device':'tablet'}",60)
]

# 1. Normalize actions into ArrayType

In [20]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
from pyspark.sql import Row
from pyspark.sql.functions import col, when, split

# 1. Define the schema for the activity data
activity_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("actions", ArrayType(StringType()), True), # Normalize actions to ArrayType
    StructField("properties", StringType(), True),
    StructField("duration", IntegerType(), True)
])

# 2. Preprocess raw_activity to normalize the 'actions' field
processed_raw_activity = []
for user_id, actions_raw, properties, duration in raw_activity:
    normalized_actions = None
    if actions_raw is None:
        normalized_actions = None
    elif isinstance(actions_raw, list):
        normalized_actions = actions_raw
    elif isinstance(actions_raw, str):
        if ',' in actions_raw:
            normalized_actions = [a.strip() for a in actions_raw.split(',')]
        elif '|' in actions_raw:
            normalized_actions = [a.strip() for a in actions_raw.split('|')]

    processed_raw_activity.append(Row(user_id=user_id, actions=normalized_actions, properties=properties, duration=duration))

# 3. Create the DataFrame
df_activity = spark.createDataFrame(processed_raw_activity, activity_schema)

# Display the DataFrame and its schema
df_activity.show(truncate=False)
df_activity.printSchema()

+-------+----------------------+----------------------------------+--------+
|user_id|actions               |properties                        |duration|
+-------+----------------------+----------------------------------+--------+
|U001   |[login, watch, logout]|{'device':'mobile','ip':'1.1.1.1'}|120     |
|U002   |[login, watch]        |device=laptop;ip=2.2.2.2          |90      |
|U003   |[login, logout]       |NULL                              |30      |
|U004   |NULL                  |{'device':'tablet'}               |60      |
+-------+----------------------+----------------------------------+--------+

root
 |-- user_id: string (nullable = true)
 |-- actions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: string (nullable = true)
 |-- duration: integer (nullable = true)



# 2. Normalize metadata into MapType

In [22]:
from pyspark.sql.functions import col, when, from_json, regexp_replace, udf
from pyspark.sql.types import MapType, StringType

# Define a UDF to parse custom key-value strings like "device=laptop;ip=2.2.2.2"
def parse_custom_properties(s):
    if s is None:
        return None
    try:
        parts = s.split(';')
        result_map = {}
        for part in parts:
            if '=' in part:
                key, value = part.split('=', 1)
                result_map[key.strip()] = value.strip()
        return result_map
    except Exception:
        return None # Return None for malformed strings

# Register the UDF
parse_custom_properties_udf = udf(parse_custom_properties, MapType(StringType(), StringType()))

df_activity_normalized = df_activity.withColumn(
    "properties",
    when(col("properties").isNull(), None)
    .when(col("properties").startswith("{"), # Check for JSON-like strings (start with '{')
          from_json(regexp_replace(col("properties"), "'", "\""), MapType(StringType(), StringType())))
    .otherwise(parse_custom_properties_udf(col("properties"))) # Handle custom format for others
)

df_activity_normalized.show(truncate=False)
df_activity_normalized.printSchema()

+-------+----------------------+---------------------------------+--------+
|user_id|actions               |properties                       |duration|
+-------+----------------------+---------------------------------+--------+
|U001   |[login, watch, logout]|{device -> mobile, ip -> 1.1.1.1}|120     |
|U002   |[login, watch]        |{device -> laptop, ip -> 2.2.2.2}|90      |
|U003   |[login, logout]       |NULL                             |30      |
|U004   |NULL                  |{device -> tablet}               |60      |
+-------+----------------------+---------------------------------+--------+

root
 |-- user_id: string (nullable = true)
 |-- actions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- duration: integer (nullable = true)



# 3. Handle missing actions safely

In [23]:
from pyspark.sql.functions import col, when, array

# Handle missing actions safely by replacing NULL with empty array
df_activity_normalized = df_activity_normalized.withColumn(
    "actions",
    when(col("actions").isNull(), array()).otherwise(col("actions"))
)

df_activity_normalized.show(truncate=False)
df_activity_normalized.printSchema()

+-------+----------------------+---------------------------------+--------+
|user_id|actions               |properties                       |duration|
+-------+----------------------+---------------------------------+--------+
|U001   |[login, watch, logout]|{device -> mobile, ip -> 1.1.1.1}|120     |
|U002   |[login, watch]        |{device -> laptop, ip -> 2.2.2.2}|90      |
|U003   |[login, logout]       |NULL                             |30      |
|U004   |[]                    |{device -> tablet}               |60      |
+-------+----------------------+---------------------------------+--------+

root
 |-- user_id: string (nullable = true)
 |-- actions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- duration: integer (nullable = true)



# 4. Explode actions and count frequency

In [24]:
from pyspark.sql.functions import explode, col, count

# Explode the 'actions' array to create a new row for each action
df_exploded_actions = df_activity_normalized.select(col("user_id"), explode(col("actions")).alias("action"))

# Count the frequency of each action
action_frequency = df_exploded_actions.groupBy("action").agg(count("action").alias("frequency"))

# Show the results, ordered by frequency
action_frequency.orderBy(col("frequency").desc()).show()


+------+---------+
|action|frequency|
+------+---------+
| login|        3|
| watch|        2|
|logout|        2|
+------+---------+



# 5. Produce activity_df

In [25]:
activity_df = df_activity_normalized

activity_df.show(truncate=False)
activity_df.printSchema()

+-------+----------------------+---------------------------------+--------+
|user_id|actions               |properties                       |duration|
+-------+----------------------+---------------------------------+--------+
|U001   |[login, watch, logout]|{device -> mobile, ip -> 1.1.1.1}|120     |
|U002   |[login, watch]        |{device -> laptop, ip -> 2.2.2.2}|90      |
|U003   |[login, logout]       |NULL                             |30      |
|U004   |[]                    |{device -> tablet}               |60      |
+-------+----------------------+---------------------------------+--------+

root
 |-- user_id: string (nullable = true)
 |-- actions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- duration: integer (nullable = true)



# DATASET 3 — USER COURSE ENROLLMENTS (JOIN + BROADCAST)

In [26]:
raw_enrollments = [
("U001","C001","2024-01-05"),
("U002","C002","05/01/2024"),
("U003","C001","2024/01/06"),
("U004","C003","invalid_date"),
("U001","C004","2024-01-10")
]

In [27]:
enroll_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("course_id", StringType(), True),
    StructField("enrollment_date_raw", StringType(), True)
])
df_enrollments_raw = spark.createDataFrame(raw_enrollments, enroll_schema)
df_enrollments_raw.printSchema()
df_enrollments_raw.show()

root
 |-- user_id: string (nullable = true)
 |-- course_id: string (nullable = true)
 |-- enrollment_date_raw: string (nullable = true)

+-------+---------+-------------------+
|user_id|course_id|enrollment_date_raw|
+-------+---------+-------------------+
|   U001|     C001|         2024-01-05|
|   U002|     C002|         05/01/2024|
|   U003|     C001|         2024/01/06|
|   U004|     C003|       invalid_date|
|   U001|     C004|         2024-01-10|
+-------+---------+-------------------+



In [28]:
from pyspark.sql.functions import coalesce, col, to_date, when

df_enrollments_raw = df_enrollments_raw.withColumn(
    "enrollment_date",
    coalesce(
        when(col("enrollment_date_raw").rlike("^\\d{4}-\\d{2}-\\d{2}$"), to_date(col("enrollment_date_raw"), "yyyy-MM-dd")),
        when(col("enrollment_date_raw").rlike("^\\d{2}/\\d{2}/\\d{4}$"), to_date(col("enrollment_date_raw"), "dd/MM/yyyy")),
        when(col("enrollment_date_raw").rlike("^\\d{4}/\\d{2}/\\d{2}$"), to_date(col("enrollment_date_raw"), "yyyy/MM/dd"))
    )
)
df_enrollments_raw.show()

+-------+---------+-------------------+---------------+
|user_id|course_id|enrollment_date_raw|enrollment_date|
+-------+---------+-------------------+---------------+
|   U001|     C001|         2024-01-05|     2024-01-05|
|   U002|     C002|         05/01/2024|     2024-01-05|
|   U003|     C001|         2024/01/06|     2024-01-06|
|   U004|     C003|       invalid_date|           NULL|
|   U001|     C004|         2024-01-10|     2024-01-10|
+-------+---------+-------------------+---------------+



In [30]:
from pyspark.sql.functions import broadcast

In [32]:
df_enrollments_processed = df_enrollments_raw.drop("enrollment_date_raw")
df_enriched = df_enrollments_processed.join(broadcast(courses_df), on="course_id", how="left")
df_enriched.show()

+---------+-------+---------------+--------------------+--------------------+-----+
|course_id|user_id|enrollment_date|               title|            metadata|price|
+---------+-------+---------------+--------------------+--------------------+-----+
|     C001|   U001|     2024-01-05|     PySpark Mastery|{Data Engineering...| 9999|
|     C002|   U002|     2024-01-05|      AI for Testers|      {QA, Beginner}| 8999|
|     C001|   U003|     2024-01-06|     PySpark Mastery|{Data Engineering...| 9999|
|     C003|   U004|           NULL|      ML Foundations|  {AI, Intermediate}|    0|
|     C004|   U001|     2024-01-10|Data Engineering ...|    {Data, Advanced}|14999|
+---------+-------+---------------+--------------------+--------------------+-----+



In [33]:
df_enriched.show(truncate=False)
df_enriched.printSchema()

+---------+-------+---------------+-------------------------+----------------------------+-----+
|course_id|user_id|enrollment_date|title                    |metadata                    |price|
+---------+-------+---------------+-------------------------+----------------------------+-----+
|C001     |U001   |2024-01-05     |PySpark Mastery          |{Data Engineering, Advanced}|9999 |
|C002     |U002   |2024-01-05     |AI for Testers           |{QA, Beginner}              |8999 |
|C001     |U003   |2024-01-06     |PySpark Mastery          |{Data Engineering, Advanced}|9999 |
|C003     |U004   |NULL           |ML Foundations           |{AI, Intermediate}          |0    |
|C004     |U001   |2024-01-10     |Data Engineering Bootcamp|{Data, Advanced}            |14999|
+---------+-------+---------------+-------------------------+----------------------------+-----+

root
 |-- course_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- enrollment_date: date (nullable = tr

In [34]:
# Decision: Broadcast df_courses_clean
# Reasoning: The `df_courses_clean` (course catalog) is expected to be significantly smaller than `df_enrollments_processed` (user enrollments).
# Broadcasting the smaller table to all worker nodes during a join optimizes performance by avoiding a shuffle of the larger DataFrame and reducing network I/O.
# This was already implemented in the previous join: `df_enrollments_processed.join(broadcast(df_courses_clean), on="course_id", how="left")`

In [35]:
df_enriched.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(LeftOuter, [course_id])
:- Project [user_id#245, course_id#246, enrollment_date#258]
:  +- Project [user_id#245, course_id#246, enrollment_date_raw#247, coalesce(CASE WHEN RLIKE(enrollment_date_raw#247, ^\d{4}-\d{2}-\d{2}$) THEN to_date(enrollment_date_raw#247, Some(yyyy-MM-dd), Some(Etc/UTC), true) END, CASE WHEN RLIKE(enrollment_date_raw#247, ^\d{2}/\d{2}/\d{4}$) THEN to_date(enrollment_date_raw#247, Some(dd/MM/yyyy), Some(Etc/UTC), true) END, CASE WHEN RLIKE(enrollment_date_raw#247, ^\d{4}/\d{2}/\d{2}$) THEN to_date(enrollment_date_raw#247, Some(yyyy/MM/dd), Some(Etc/UTC), true) END) AS enrollment_date#258]
:     +- LogicalRDD [user_id#245, course_id#246, enrollment_date_raw#247], false
+- ResolvedHint (strategy=broadcast)
   +- Project [course_id#109, title#110, metadata#111, CASE WHEN isnull(price#126) THEN 0 ELSE price#126 END AS price#140]
      +- Project [course_id#109, title#110, metadata#111, CASE WHEN isnotnull(price#112) THEN cast(

# DATASET 5 — PAYMENTS (WINDOW + AGGREGATES)

In [38]:
data= [
("U001","2024-01-05",9999),
("U001","2024-01-10",14999),
("U002","2024-01-06",8999),
("U003","2024-01-07",0),
("U004","2024-01-08",7999),
("U001","2024-01-15",1999)
]

In [39]:
columns = ["user_id","date","amount"]

df=spark.createDataFrame(data,columns)
df.show()

+-------+----------+------+
|user_id|      date|amount|
+-------+----------+------+
|   U001|2024-01-05|  9999|
|   U001|2024-01-10| 14999|
|   U002|2024-01-06|  8999|
|   U003|2024-01-07|     0|
|   U004|2024-01-08|  7999|
|   U001|2024-01-15|  1999|
+-------+----------+------+



In [40]:
from pyspark.sql.functions import to_date

df = df.withColumn("date", to_date(df["date"], "yyyy-MM-dd"))

df.printSchema()
df.show()

root
 |-- user_id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: long (nullable = true)

+-------+----------+------+
|user_id|      date|amount|
+-------+----------+------+
|   U001|2024-01-05|  9999|
|   U001|2024-01-10| 14999|
|   U002|2024-01-06|  8999|
|   U003|2024-01-07|     0|
|   U004|2024-01-08|  7999|
|   U001|2024-01-15|  1999|
+-------+----------+------+



In [41]:
total_spend_per_user = df.groupBy("user_id").sum("amount")
total_spend_per_user.show()

+-------+-----------+
|user_id|sum(amount)|
+-------+-----------+
|   U002|       8999|
|   U001|      26997|
|   U004|       7999|
|   U003|          0|
+-------+-----------+



In [44]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, col

window_spec = Window.partitionBy("user_id").orderBy("date")
running_spend_per_user = df.withColumn("running_spend", sum(col("amount")).over(window_spec))
running_spend_per_user.show()

+-------+----------+------+-------------+
|user_id|      date|amount|running_spend|
+-------+----------+------+-------------+
|   U001|2024-01-05|  9999|         9999|
|   U001|2024-01-10| 14999|        24998|
|   U001|2024-01-15|  1999|        26997|
|   U002|2024-01-06|  8999|         8999|
|   U003|2024-01-07|     0|            0|
|   U004|2024-01-08|  7999|         7999|
+-------+----------+------+-------------+



In [45]:
from pyspark.sql.functions import rank, desc
from pyspark.sql.window import Window

window_spec_rank = Window.orderBy(desc("sum(amount)"))

ranked_users_by_total_spend = total_spend_per_user.withColumn("rank", rank().over(window_spec_rank))

ranked_users_by_total_spend.show()

+-------+-----------+----+
|user_id|sum(amount)|rank|
+-------+-----------+----+
|   U001|      26997|   1|
|   U002|       8999|   2|
|   U004|       7999|   3|
|   U003|          0|   4|
+-------+-----------+----+



### Comparing `GroupBy` and `Window` Outputs

Both `groupBy` and `Window` functions are powerful tools in PySpark for data aggregation and analysis, but they serve different purposes and produce distinct outputs.

#### 1. `GroupBy` Output: `total_spend_per_user`

- **Purpose**: `GroupBy` operations are used for **aggregation**, where you want to collapse multiple rows into a single summary row based on one or more grouping keys. It answers questions like 'What is the total spend for each user?' or 'How many items did each category sell?'

- **Output Characteristics**: The output of a `groupBy` operation typically has fewer rows than the input DataFrame, as it aggregates data based on the unique values of the grouping keys. For each group, it provides a single aggregated value (e.g., sum, count, average).

- **Example Output (`total_spend_per_user`):**

```

+-------+-----------+

|user_id|sum(amount)|

+-------+-----------+

|   U002|       8999|

|   U001|      26997|

|   U004|       7999|

|   U003|          0|

+-------+-----------+

```

Here, you get one row per `user_id` showing their total spend.

#### 2. `Window` Output: `running_spend_per_user` and `ranked_users_by_total_spend`

- **Purpose**: `Window` functions perform calculations across a set of DataFrame rows that are related to the current row. Unlike `groupBy`, `Window` functions **do not collapse rows**. Instead, they add new columns to the DataFrame, providing context-sensitive calculations (like running totals, moving averages, or rankings) for each original row.

- **Output Characteristics**: The output of a `Window` function typically has the **same number of rows** as the input DataFrame. It adds one or more new columns containing the results of the window calculation for each row.

##### a. Running Spend per User (`running_spend_per_user`)

- **Type**: Cumulative aggregate within a partition.

- **Output Characteristics**: For each transaction, it shows the cumulative spend up to that point for that specific user.

- **Example Output (`running_spend_per_user`):**

```

+-------+----------+------+-------------+

|user_id|      date|amount|running_spend|

+-------+----------+------+-------------+

|   U001|2024-01-05|  9999|         9999|

|   U001|2024-01-10| 14999|        24998|

|   U001|2024-01-15|  1999|        26997|

|   U002|2024-01-06|  8999|         8999|

|   U003|2024-01-07|     0|            0|

|   U004|2024-01-08|  7999|         7999|

+-------+----------+------+-------------+

```

Here, each row of the original `df` is preserved, and a new column `running_spend` is added, showing the sum of `amount` up to that date for each user.

##### b. Ranked Users by Total Spend (`ranked_users_by_total_spend`)

- **Type**: Ranking function applied to an aggregated DataFrame.

- **Output Characteristics**: It assigns a rank to each user based on their total spend. It operates on the `total_spend_per_user` (an aggregated DataFrame) and adds a rank column, keeping one row per user.

- **Example Output (`ranked_users_by_total_spend`):**

```

+-------+-----------+----+

|user_id|sum(amount)|rank|

+-------+-----------+----+

|   U001|      26997|   1|

|   U002|       8999|   2|

|   U004|       7999|   3|

|   U003|          0|   4|

+-------+-----------+----+

```

This output shows the ranking of users based on the total spend, which was initially computed using `groupBy`.

#### Key Differences Summarized:

| Feature         | `GroupBy`                               | `Window` (e.g., `running_spend`)                  | `Window` (e.g., `rank` on aggregated data)      |

| :-------------- | :-------------------------------------- | :------------------------------------------------ | :---------------------------------------------- |

| **Row Count**   | Reduces rows (aggregates)               | Preserves original rows                           | Preserves rows of the input (often aggregated)  |

| **Output**      | Summarized data per group               | Adds new column(s) with contextual calculations   | Adds new column(s) with ranking                 |

| **Use Case**    | Overall aggregates (total, count, avg)  | Running totals, moving averages, row comparisons  | Ranking within partitions or across the whole DF|

In essence, `groupBy` is about summarization and reducing granularity, while `Window` functions are about enriching the existing data with new calculations that consider a defined 'window' of related rows.


# DATASET 6 — PARTITIONS & PERFORMANCE

In [46]:
dataframes_to_check = {
    "users_df": users_df,
    "courses_df": courses_df,
    "activity_df": activity_df,
    "df_enrollments_processed": df_enrollments_processed,
    "df_enriched": df_enriched,
    "df": df,
    "total_spend_per_user": total_spend_per_user,
    "running_spend_per_user": running_spend_per_user,
    "ranked_users_by_total_spend": ranked_users_by_total_spend
}

print("--- Number of Partitions for DataFrames ---")
for df_name, df_obj in dataframes_to_check.items():
    print(f"DataFrame: {df_name}, Partitions: {df_obj.rdd.getNumPartitions()}")

--- Number of Partitions for DataFrames ---
DataFrame: users_df, Partitions: 2
DataFrame: courses_df, Partitions: 2
DataFrame: activity_df, Partitions: 2
DataFrame: df_enrollments_processed, Partitions: 2
DataFrame: df_enriched, Partitions: 2
DataFrame: df, Partitions: 2
DataFrame: total_spend_per_user, Partitions: 1
DataFrame: running_spend_per_user, Partitions: 1
DataFrame: ranked_users_by_total_spend, Partitions: 1
-------------------------------------------


In [47]:
df_enrollments_repartitioned = df_enrollments_processed.repartition('course_id')

print(f"Original df_enrollments_processed partitions: {df_enrollments_processed.rdd.getNumPartitions()}")
print(f"Repartitioned df_enrollments_repartitioned partitions: {df_enrollments_repartitioned.rdd.getNumPartitions()}")

Original df_enrollments_processed partitions: 2
Repartitioned df_enrollments_repartitioned partitions: 1


In [48]:
df_enrollments_coalesced = df_enrollments_repartitioned.coalesce(1)

print(f"Coalesced df_enrollments_coalesced partitions: {df_enrollments_coalesced.rdd.getNumPartitions()}")

output_path = "/tmp/enrollments_single_partition"

df_enrollments_coalesced.write.mode("overwrite").parquet(output_path)

print(f"Coalesced DataFrame written to {output_path} in Parquet format.")

Coalesced df_enrollments_coalesced partitions: 1
Coalesced DataFrame written to /tmp/enrollments_single_partition in Parquet format.


In [49]:
# A repartition operation in Apache Spark causes a shuffle because it fundamentally changes how data is distributed across the partitions in your cluster. Here's a breakdown of why and its performance implications:

# Why repartition causes a shuffle:

# 1. Redistribution of Data: When you call repartition(), especially on a specific column (like repartition('course_id')), Spark needs to ensure that all rows with the same value for that column (e.g., all rows for 'C001') end up on the same partition. This often means data needs to be moved from its current location on one executor to a different executor and partition.
# 2. Network Transfer: To achieve this redistribution, data must be serialized, sent over the network, and then deserialized on the destination executor. This cross-network data movement is the core of what a "shuffle" is.
# 3. Intermediate Storage: During a shuffle, Spark might temporarily write data to local disk on the executors before it's sent to the final destination. This adds disk I/O to the process.

# Performance Implications of a Shuffle:

# * High Cost: Shuffles are generally the most expensive operations in Spark jobs. They are resource-intensive due to the significant network, disk I/O, and CPU usage.
# * Network Bandwidth: Moving large amounts of data across the network can saturate network links, leading to slow performance.
# * Disk I/O: If the data being shuffled is too large to fit in memory, Spark will spill it to disk, incurring additional disk read/write overhead.
# * CPU Overhead: Data needs to be serialized before being sent over the network and deserialized upon arrival, consuming valuable CPU cycles.
# * Memory Usage: Executors require memory to buffer shuffled data, and if this memory is insufficient, it can lead to out-of-memory errors or frequent spills to disk.
# * Garbage Collection: Heavy shuffling can increase garbage collection activity on the JVMs of the executors, further impacting performance.
# * Bottlenecks: Data skew (where a few partition keys have a disproportionately large amount of data) can exacerbate shuffle performance issues, creating bottlenecks at specific executors.

# In your case, when you repartitioned df_enrollments_processed by 'course_id', Spark had to collect all rows, group them by course_id, and then distribute them to new partitions based on that grouping. Since the original partitions weren't necessarily organized by course_id, a shuffle was necessary to achieve the desired distribution, even resulting in a reduced partition count for the repartitioned DataFrame in this specific instance. This is why df_enriched.explain(True) showed BroadcastHashJoin, which avoids shuffling the smaller courses_df but still relies on proper partitioning for the larger df_enrollments_processed to optimize the join.

# DATASET 7 — DAG & OPTIMIZATION

In [50]:
print("\n--- Execution Plan for df_users_age_normalized ---")
df_users_age_normalized.explain(True)

print("\n--- Execution Plan for df_users_skills_normalized ---")
df_users_skills_normalized.explain(True)

print("\n--- Execution Plan for users_df (Final) ---")
users_df.explain(True)


--- Execution Plan for df_users_age_normalized ---
== Parsed Logical Plan ==
'Project [unresolvedstarwithcolumns(age, CASE WHEN 'rlike('age, ^[0-9]+$) THEN cast('age as int) ELSE null END, None)]
+- LogicalRDD [user_id#0, name#1, age#2, city#3, interests#4], false

== Analyzed Logical Plan ==
user_id: string, name: string, age: int, city: string, interests: string
Project [user_id#0, name#1, CASE WHEN RLIKE(age#2, ^[0-9]+$) THEN cast(age#2 as int) ELSE cast(null as int) END AS age#38, city#3, interests#4]
+- LogicalRDD [user_id#0, name#1, age#2, city#3, interests#4], false

== Optimized Logical Plan ==
Project [user_id#0, name#1, CASE WHEN RLIKE(age#2, ^[0-9]+$) THEN cast(age#2 as int) END AS age#38, city#3, interests#4]
+- LogicalRDD [user_id#0, name#1, age#2, city#3, interests#4], false

== Physical Plan ==
*(1) Project [user_id#0, name#1, CASE WHEN RLIKE(age#2, ^[0-9]+$) THEN cast(age#2 as int) END AS age#38, city#3, interests#4]
+- *(1) Scan ExistingRDD[user_id#0,name#1,age#2,city

In [51]:
print("\n--- Execution Plan for df_courses_normalized ---")
df_courses_normalized.explain(True)

print("\n--- Execution Plan for df_courses_final ---")
df_courses_final.explain(True)

print("\n--- Execution Plan for courses_df (Final) ---")
courses_df.explain(True)


--- Execution Plan for df_courses_normalized ---
== Parsed Logical Plan ==
LogicalRDD [course_id#109, title#110, metadata#111, price#112], false

== Analyzed Logical Plan ==
course_id: string, title: string, metadata: struct<domain:string,level:string>, price: string
LogicalRDD [course_id#109, title#110, metadata#111, price#112], false

== Optimized Logical Plan ==
LogicalRDD [course_id#109, title#110, metadata#111, price#112], false

== Physical Plan ==
*(1) Scan ExistingRDD[course_id#109,title#110,metadata#111,price#112]


--- Execution Plan for df_courses_final ---
== Parsed Logical Plan ==
'Project [unresolvedstarwithcolumns(price, CASE WHEN 'isNull('price) THEN 0 ELSE 'price END, None)]
+- Project [course_id#109, title#110, metadata#111, CASE WHEN isnotnull(price#112) THEN cast(regexp_replace(price#112, ₹, , 1) as int) ELSE cast(null as int) END AS price#126]
   +- LogicalRDD [course_id#109, title#110, metadata#111, price#112], false

== Analyzed Logical Plan ==
course_id: string

In [52]:
print("\n--- Execution Plan for df_activity_normalized ---")
df_activity_normalized.explain(True)

print("\n--- Execution Plan for activity_df (Final) ---")
activity_df.explain(True)


--- Execution Plan for df_activity_normalized ---
== Parsed Logical Plan ==
'Project [unresolvedstarwithcolumns(actions, CASE WHEN 'isNull('actions) THEN 'array() ELSE 'actions END, None)]
+- Project [user_id#167, actions#168, CASE WHEN isnull(properties#169) THEN cast(null as map<string,string>) WHEN StartsWith(properties#169, {) THEN from_json(MapType(StringType,StringType,true), regexp_replace(properties#169, ', ", 1), Some(Etc/UTC), false) ELSE parse_custom_properties(properties#169)#184 END AS properties#185, duration#170]
   +- LogicalRDD [user_id#167, actions#168, properties#169, duration#170], false

== Analyzed Logical Plan ==
user_id: string, actions: array<string>, properties: map<string,string>, duration: int
Project [user_id#167, CASE WHEN isnull(actions#168) THEN cast(array() as array<string>) ELSE actions#168 END AS actions#200, properties#185, duration#170]
+- Project [user_id#167, actions#168, CASE WHEN isnull(properties#169) THEN cast(null as map<string,string>) WHEN

In [53]:
print("\n--- Execution Plan for df_enriched (Joined Enrollments and Courses) ---")
df_enriched.explain(True)


--- Execution Plan for df_enriched (Joined Enrollments and Courses) ---
== Parsed Logical Plan ==
'Join UsingJoin(LeftOuter, [course_id])
:- Project [user_id#245, course_id#246, enrollment_date#258]
:  +- Project [user_id#245, course_id#246, enrollment_date_raw#247, coalesce(CASE WHEN RLIKE(enrollment_date_raw#247, ^\d{4}-\d{2}-\d{2}$) THEN to_date(enrollment_date_raw#247, Some(yyyy-MM-dd), Some(Etc/UTC), true) END, CASE WHEN RLIKE(enrollment_date_raw#247, ^\d{2}/\d{2}/\d{4}$) THEN to_date(enrollment_date_raw#247, Some(dd/MM/yyyy), Some(Etc/UTC), true) END, CASE WHEN RLIKE(enrollment_date_raw#247, ^\d{4}/\d{2}/\d{2}$) THEN to_date(enrollment_date_raw#247, Some(yyyy/MM/dd), Some(Etc/UTC), true) END) AS enrollment_date#258]
:     +- LogicalRDD [user_id#245, course_id#246, enrollment_date_raw#247], false
+- ResolvedHint (strategy=broadcast)
   +- Project [course_id#109, title#110, metadata#111, CASE WHEN isnull(price#126) THEN 0 ELSE price#126 END AS price#140]
      +- Project [course_i

 # 3. Bad DAG identified in `ranked_users_by_total_spend`:
# The physical plan for `ranked_users_by_total_spend` includes an `Exchange SinglePartition` followed by a global `Sort`.
# This means that after computing the total spend per user (which already involves a shuffle for aggregation),
# Spark then gathers ALL the aggregated data into a single partition (`Exchange SinglePartition`) to perform a global sort (`Sort`).
# This design choice, while correct for achieving a global rank, is highly inefficient and becomes a major bottleneck
# for large datasets as it eliminates parallelism and forces all data processing onto a single executor.