#1 DATA

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark=SparkSession.builder.appName('Struct Type').getOrCreate()

raw_users = [
("U001","Amit","28","Hyderabad","['AI','ML','Cloud']"),
("U002","Neha","Thirty","Delhi","AI,Testing"),
("U003","Ravi",None,"Bangalore",["Data","Spark"]),
("U004","Pooja","29","Mumbai",None),
("U005","", "31","Chennai","['DevOps']")
]


In [8]:
from pyspark.sql.types import(
    StructType,
    StructField,
    StringType,
    IntegerType,
    LongType
)

users_schema=StructType([
    StructField("user_id",StringType(),True),
    StructField("name",StringType(),True),
    StructField("age",StringType(),True),
    StructField("city",StringType(),True),
    StructField("interests",StringType(),True)
])

In [9]:
from pyspark.sql.functions import col,when,regexp_replace
from pyspark.sql.types import IntegerType

df_users=spark.createDataFrame(raw_users,users_schema)
df_users.show()
from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType

df_users_age_normalized = df_users.withColumn(
    "age",
    when(col("age").rlike("^[0-9]+$"), col("age").cast(IntegerType()))
    .otherwise(None)
)

df_users_age_normalized.show()
df_users_age_normalized.printSchema()

+-------+-----+------+---------+-------------------+
|user_id| name|   age|     city|          interests|
+-------+-----+------+---------+-------------------+
|   U001| Amit|    28|Hyderabad|['AI','ML','Cloud']|
|   U002| Neha|Thirty|    Delhi|         AI,Testing|
|   U003| Ravi|  NULL|Bangalore|      [Data, Spark]|
|   U004|Pooja|    29|   Mumbai|               NULL|
|   U005|     |    31|  Chennai|         ['DevOps']|
+-------+-----+------+---------+-------------------+

+-------+-----+----+---------+-------------------+
|user_id| name| age|     city|          interests|
+-------+-----+----+---------+-------------------+
|   U001| Amit|  28|Hyderabad|['AI','ML','Cloud']|
|   U002| Neha|NULL|    Delhi|         AI,Testing|
|   U003| Ravi|NULL|Bangalore|      [Data, Spark]|
|   U004|Pooja|  29|   Mumbai|               NULL|
|   U005|     |  31|  Chennai|         ['DevOps']|
+-------+-----+----+---------+-------------------+

root
 |-- user_id: string (nullable = true)
 |-- name: string 

In [10]:
from pyspark.sql.functions import col, split, regexp_replace, when, trim, expr
from pyspark.sql.types import ArrayType, StringType

df_users_skills_normalized = df_users_age_normalized.withColumn(
    "cleaned_interests_str",
    when(
        col("interests").isNotNull(),
        regexp_replace(
            regexp_replace(
                col("interests"),
                "^\[|\]$", ""
            ),
            "'", ""
        )
    ).otherwise(None)
).withColumn(
    "interests",
    when(
        col("cleaned_interests_str").isNotNull(),
        split(col("cleaned_interests_str"), ",")
    ).otherwise(None)
).withColumn(
    "interests",
    when(
        col("interests").isNotNull(),

        expr("filter(transform(interests, x -> trim(x)), x -> x != '')")
    ).otherwise(None)
).drop("cleaned_interests_str")

df_users_skills_normalized.show()
df_users_skills_normalized.printSchema()

  "^\[|\]$", ""


+-------+-----+----+---------+---------------+
|user_id| name| age|     city|      interests|
+-------+-----+----+---------+---------------+
|   U001| Amit|  28|Hyderabad|[AI, ML, Cloud]|
|   U002| Neha|NULL|    Delhi|  [AI, Testing]|
|   U003| Ravi|NULL|Bangalore|  [Data, Spark]|
|   U004|Pooja|  29|   Mumbai|           NULL|
|   U005|     |  31|  Chennai|       [DevOps]|
+-------+-----+----+---------+---------------+

root
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- interests: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [11]:
from pyspark.sql.functions import col, when

df_users_names_handled = df_users_skills_normalized.withColumn(
    "name",
    when(col("name").isNull() | (col("name") == ""), "Unknown")
    .otherwise(col("name"))
)

df_users_names_handled.show()
df_users_names_handled.printSchema()

+-------+-------+----+---------+---------------+
|user_id|   name| age|     city|      interests|
+-------+-------+----+---------+---------------+
|   U001|   Amit|  28|Hyderabad|[AI, ML, Cloud]|
|   U002|   Neha|NULL|    Delhi|  [AI, Testing]|
|   U003|   Ravi|NULL|Bangalore|  [Data, Spark]|
|   U004|  Pooja|  29|   Mumbai|           NULL|
|   U005|Unknown|  31|  Chennai|       [DevOps]|
+-------+-------+----+---------+---------------+

root
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- interests: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [12]:
users_df = df_users_names_handled

users_df.show()
users_df.printSchema()

+-------+-------+----+---------+---------------+
|user_id|   name| age|     city|      interests|
+-------+-------+----+---------+---------------+
|   U001|   Amit|  28|Hyderabad|[AI, ML, Cloud]|
|   U002|   Neha|NULL|    Delhi|  [AI, Testing]|
|   U003|   Ravi|NULL|Bangalore|  [Data, Spark]|
|   U004|  Pooja|  29|   Mumbai|           NULL|
|   U005|Unknown|  31|  Chennai|       [DevOps]|
+-------+-------+----+---------+---------------+

root
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- interests: array (nullable = true)
 |    |-- element: string (containsNull = false)



#2 Data

In [13]:
raw_courses = [
("C001","PySpark Mastery",("Data Engineering","Advanced"),"₹9999"),
("C002","AI for Testers",{"domain":"QA","level":"Beginner"},"8999"),
("C003","ML Foundations",("AI","Intermediate"),None),
("C004","Data Engineering Bootcamp","Data|Advanced","₹14999")
]

In [14]:
from pyspark.sql.types import StructType, StructField, StringType

course_metadata_schema = StructType([
    StructField("domain", StringType(), True),
    StructField("level", StringType(), True)
])

course_schema = StructType([
    StructField("course_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("metadata", course_metadata_schema, True),
    StructField("price", StringType(), True)
])

In [16]:
from pyspark.sql.functions import col, when, split, struct, lit
from pyspark.sql.types import StringType, StructType, StructField, Row

processed_raw_courses = []
for course_id, title, metadata_raw, price in raw_courses:
    domain = None
    level = None

    if isinstance(metadata_raw, tuple) and len(metadata_raw) == 2:
        domain = metadata_raw[0]
        level = metadata_raw[1]
    elif isinstance(metadata_raw, dict):
        domain = metadata_raw.get("domain")
        level = metadata_raw.get("level")
    elif isinstance(metadata_raw, str):
        parts = metadata_raw.split('|')
        if len(parts) == 2:
            domain = parts[0]
            level = parts[1]

    metadata_struct_row = Row(domain=domain, level=level) if (domain is not None or level is not None) else None

    processed_raw_courses.append(Row(course_id=course_id, title=title, metadata=metadata_struct_row, price=price))

df_courses_normalized = spark.createDataFrame(processed_raw_courses, course_schema)

df_courses_normalized.show(truncate=False)
df_courses_normalized.printSchema()

+---------+-------------------------+----------------------------+------+
|course_id|title                    |metadata                    |price |
+---------+-------------------------+----------------------------+------+
|C001     |PySpark Mastery          |{Data Engineering, Advanced}|₹9999 |
|C002     |AI for Testers           |{QA, Beginner}              |8999  |
|C003     |ML Foundations           |{AI, Intermediate}          |NULL  |
|C004     |Data Engineering Bootcamp|{Data, Advanced}            |₹14999|
+---------+-------------------------+----------------------------+------+

root
 |-- course_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- metadata: struct (nullable = true)
 |    |-- domain: string (nullable = true)
 |    |-- level: string (nullable = true)
 |-- price: string (nullable = true)



In [17]:
from pyspark.sql.functions import col, regexp_replace, when
from pyspark.sql.types import IntegerType

df_courses_final = df_courses_normalized.withColumn(
    "price",
    when(col("price").isNotNull(),
         regexp_replace(col("price"), "₹", "").cast(IntegerType())
    ).otherwise(None)
)

df_courses_final.show()
df_courses_final.printSchema()

+---------+--------------------+--------------------+-----+
|course_id|               title|            metadata|price|
+---------+--------------------+--------------------+-----+
|     C001|     PySpark Mastery|{Data Engineering...| 9999|
|     C002|      AI for Testers|      {QA, Beginner}| 8999|
|     C003|      ML Foundations|  {AI, Intermediate}| NULL|
|     C004|Data Engineering ...|    {Data, Advanced}|14999|
+---------+--------------------+--------------------+-----+

root
 |-- course_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- metadata: struct (nullable = true)
 |    |-- domain: string (nullable = true)
 |    |-- level: string (nullable = true)
 |-- price: integer (nullable = true)



In [18]:
from pyspark.sql.functions import col
df_courses_final = df_courses_final.withColumn(
    "price",
    when(col("price").isNull(), 0).otherwise(col("price"))
)


df_courses_final.show()
df_courses_final.printSchema()

+---------+--------------------+--------------------+-----+
|course_id|               title|            metadata|price|
+---------+--------------------+--------------------+-----+
|     C001|     PySpark Mastery|{Data Engineering...| 9999|
|     C002|      AI for Testers|      {QA, Beginner}| 8999|
|     C003|      ML Foundations|  {AI, Intermediate}|    0|
|     C004|Data Engineering ...|    {Data, Advanced}|14999|
+---------+--------------------+--------------------+-----+

root
 |-- course_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- metadata: struct (nullable = true)
 |    |-- domain: string (nullable = true)
 |    |-- level: string (nullable = true)
 |-- price: integer (nullable = true)



In [19]:
courses_df = df_courses_final

courses_df.show()
courses_df.printSchema()

+---------+--------------------+--------------------+-----+
|course_id|               title|            metadata|price|
+---------+--------------------+--------------------+-----+
|     C001|     PySpark Mastery|{Data Engineering...| 9999|
|     C002|      AI for Testers|      {QA, Beginner}| 8999|
|     C003|      ML Foundations|  {AI, Intermediate}|    0|
|     C004|Data Engineering ...|    {Data, Advanced}|14999|
+---------+--------------------+--------------------+-----+

root
 |-- course_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- metadata: struct (nullable = true)
 |    |-- domain: string (nullable = true)
 |    |-- level: string (nullable = true)
 |-- price: integer (nullable = true)



#3 DATASET

In [22]:
raw_enrollments = [
("U001","C001","2024-01-05"),
("U002","C002","05/01/2024"),
("U003","C001","2024/01/06"),
("U004","C003","invalid_date"),
("U001","C004","2024-01-10")
]

enroll_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("course_id", StringType(), True),
    StructField("enrollment_date_raw", StringType(), True)
])
df_enrollments_raw = spark.createDataFrame(raw_enrollments, enroll_schema)
df_enrollments_raw.printSchema()
df_enrollments_raw.show()

root
 |-- user_id: string (nullable = true)
 |-- course_id: string (nullable = true)
 |-- enrollment_date_raw: string (nullable = true)

+-------+---------+-------------------+
|user_id|course_id|enrollment_date_raw|
+-------+---------+-------------------+
|   U001|     C001|         2024-01-05|
|   U002|     C002|         05/01/2024|
|   U003|     C001|         2024/01/06|
|   U004|     C003|       invalid_date|
|   U001|     C004|         2024-01-10|
+-------+---------+-------------------+



In [27]:
from pyspark.sql.functions import coalesce, col, when, broadcast, to_date

df_enrollments_raw = df_enrollments_raw.withColumn(
    "enrollment_date",
    coalesce(
        when(col("enrollment_date_raw").rlike("^\\d{4}-\\d{2}-\\d{2}$|^\\d{4}-\\d{2}-\\d{2}$"), to_date(col("enrollment_date_raw"), "yyyy-MM-dd")),
        when(col("enrollment_date_raw").rlike("^\\d{2}/\\d{2}/\\d{4}$|^\\d{2}/\\d{2}/\\d{4}$"), to_date(col("enrollment_date_raw"), "dd/MM/yyyy")),
        when(col("enrollment_date_raw").rlike("^\\d{4}/\\d{2}/\\d{2}$|^\\d{4}/\\d{2}/\\d{2}$"), to_date(col("enrollment_date_raw"), "yyyy/MM/dd"))
    )
)
df_enrollments_raw.show()

df_enrollments_processed = df_enrollments_raw.drop("enrollment_date_raw")
df_enriched = df_enrollments_processed.join(broadcast(courses_df), on="course_id", how="left")
df_enriched.show()

df_enriched.show(truncate=False)
df_enriched.printSchema()

# Decision: Broadcast df_courses_clean
# Reasoning: The `df_courses_clean` (course catalog) is expected to be significantly smaller than `df_enrollments_processed` (user enrollments).
# Broadcasting the smaller table to all worker nodes during a join optimizes performance by avoiding a shuffle of the larger DataFrame and reducing network I/O.
# This was already implemented in the previous join: `df_enrollments_processed.join(broadcast(df_courses_clean), on="course_id", how="left")`

df_enriched.explain(True)

+-------+---------+-------------------+---------------+
|user_id|course_id|enrollment_date_raw|enrollment_date|
+-------+---------+-------------------+---------------+
|   U001|     C001|         2024-01-05|     2024-01-05|
|   U002|     C002|         05/01/2024|     2024-01-05|
|   U003|     C001|         2024/01/06|     2024-01-06|
|   U004|     C003|       invalid_date|           NULL|
|   U001|     C004|         2024-01-10|     2024-01-10|
+-------+---------+-------------------+---------------+

+---------+-------+---------------+--------------------+--------------------+-----+
|course_id|user_id|enrollment_date|               title|            metadata|price|
+---------+-------+---------------+--------------------+--------------------+-----+
|     C001|   U001|     2024-01-05|     PySpark Mastery|{Data Engineering...| 9999|
|     C002|   U002|     2024-01-05|      AI for Testers|      {QA, Beginner}| 8999|
|     C001|   U003|     2024-01-06|     PySpark Mastery|{Data Engineering..

#4 DATASET

In [31]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
from pyspark.sql import Row
from pyspark.sql.functions import col, when, split
raw_activity = [
("U001","login,watch,logout","{'device':'mobile','ip':'1.1.1.1'}",120),
("U002",["login","watch"],"device=laptop;ip=2.2.2.2",90),
("U003","login|logout",None,30),
("U004",None,"{'device':'tablet'}",60)
]
# 1. Define the schema for the activity data
activity_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("actions", ArrayType(StringType()), True), # Normalize actions to ArrayType
    StructField("properties", StringType(), True),
    StructField("duration", IntegerType(), True)
])

In [32]:
processed_raw_activity = []
for user_id, actions_raw, properties, duration in raw_activity:
    normalized_actions = None
    if actions_raw is None:
        normalized_actions = None
    elif isinstance(actions_raw, list):
        normalized_actions = actions_raw
    elif isinstance(actions_raw, str):
        if ',' in actions_raw:
            normalized_actions = [a.strip() for a in actions_raw.split(',')]
        elif '|' in actions_raw:
            normalized_actions = [a.strip() for a in actions_raw.split('|')]

    processed_raw_activity.append(Row(user_id=user_id, actions=normalized_actions, properties=properties, duration=duration))

In [33]:
df_activity = spark.createDataFrame(processed_raw_activity, activity_schema)

# Display the DataFrame and its schema
df_activity.show(truncate=False)
df_activity.printSchema()

+-------+----------------------+----------------------------------+--------+
|user_id|actions               |properties                        |duration|
+-------+----------------------+----------------------------------+--------+
|U001   |[login, watch, logout]|{'device':'mobile','ip':'1.1.1.1'}|120     |
|U002   |[login, watch]        |device=laptop;ip=2.2.2.2          |90      |
|U003   |[login, logout]       |NULL                              |30      |
|U004   |NULL                  |{'device':'tablet'}               |60      |
+-------+----------------------+----------------------------------+--------+

root
 |-- user_id: string (nullable = true)
 |-- actions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: string (nullable = true)
 |-- duration: integer (nullable = true)



In [34]:
from pyspark.sql.functions import col, when, from_json, regexp_replace, udf
from pyspark.sql.types import MapType, StringType

# Define a UDF to parse custom key-value strings like "device=laptop;ip=2.2.2.2"
def parse_custom_properties(s):
    if s is None:
        return None
    try:
        parts = s.split(';')
        result_map = {}
        for part in parts:
            if '=' in part:
                key, value = part.split('=', 1)
                result_map[key.strip()] = value.strip()
        return result_map
    except Exception:
        return None # Return None for malformed strings

# Register the UDF
parse_custom_properties_udf = udf(parse_custom_properties, MapType(StringType(), StringType()))

df_activity_normalized = df_activity.withColumn(
    "properties",
    when(col("properties").isNull(), None)
    .when(col("properties").startswith("{"), # Check for JSON-like strings (start with '{')
          from_json(regexp_replace(col("properties"), "'", "\""), MapType(StringType(), StringType())))
    .otherwise(parse_custom_properties_udf(col("properties"))) # Handle custom format for others
)

df_activity_normalized.show(truncate=False)
df_activity_normalized.printSchema()

+-------+----------------------+---------------------------------+--------+
|user_id|actions               |properties                       |duration|
+-------+----------------------+---------------------------------+--------+
|U001   |[login, watch, logout]|{device -> mobile, ip -> 1.1.1.1}|120     |
|U002   |[login, watch]        |{device -> laptop, ip -> 2.2.2.2}|90      |
|U003   |[login, logout]       |NULL                             |30      |
|U004   |NULL                  |{device -> tablet}               |60      |
+-------+----------------------+---------------------------------+--------+

root
 |-- user_id: string (nullable = true)
 |-- actions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- duration: integer (nullable = true)



In [35]:
from pyspark.sql.functions import col, when, array

# Handle missing actions safely by replacing NULL with empty array
df_activity_normalized = df_activity_normalized.withColumn(
    "actions",
    when(col("actions").isNull(), array()).otherwise(col("actions"))
)

df_activity_normalized.show(truncate=False)
df_activity_normalized.printSchema()

+-------+----------------------+---------------------------------+--------+
|user_id|actions               |properties                       |duration|
+-------+----------------------+---------------------------------+--------+
|U001   |[login, watch, logout]|{device -> mobile, ip -> 1.1.1.1}|120     |
|U002   |[login, watch]        |{device -> laptop, ip -> 2.2.2.2}|90      |
|U003   |[login, logout]       |NULL                             |30      |
|U004   |[]                    |{device -> tablet}               |60      |
+-------+----------------------+---------------------------------+--------+

root
 |-- user_id: string (nullable = true)
 |-- actions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- duration: integer (nullable = true)



In [36]:
from pyspark.sql.functions import explode, col, count

# Explode the 'actions' array to create a new row for each action
df_exploded_actions = df_activity_normalized.select(col("user_id"), explode(col("actions")).alias("action"))

# Count the frequency of each action
action_frequency = df_exploded_actions.groupBy("action").agg(count("action").alias("frequency"))

# Show the results, ordered by frequency
action_frequency.orderBy(col("frequency").desc()).show()

+------+---------+
|action|frequency|
+------+---------+
| login|        3|
| watch|        2|
|logout|        2|
+------+---------+



In [37]:
activity_df = df_activity_normalized

activity_df.show(truncate=False)
activity_df.printSchema()

+-------+----------------------+---------------------------------+--------+
|user_id|actions               |properties                       |duration|
+-------+----------------------+---------------------------------+--------+
|U001   |[login, watch, logout]|{device -> mobile, ip -> 1.1.1.1}|120     |
|U002   |[login, watch]        |{device -> laptop, ip -> 2.2.2.2}|90      |
|U003   |[login, logout]       |NULL                             |30      |
|U004   |[]                    |{device -> tablet}               |60      |
+-------+----------------------+---------------------------------+--------+

root
 |-- user_id: string (nullable = true)
 |-- actions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- duration: integer (nullable = true)



#5 DATASET

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import sum
spark=SparkSession.builder.getOrCreate()

data = [
("U001","2024-01-05",9999),
("U001","2024-01-10",14999),
("U002","2024-01-06",8999),
("U003","2024-01-07",0),
("U004","2024-01-08",7999),
("U001","2024-01-15",1999)
]

columns = ["user_id","date","amount"]

df=spark.createDataFrame(data,columns)
df.show()

+-------+----------+------+
|user_id|      date|amount|
+-------+----------+------+
|   U001|2024-01-05|  9999|
|   U001|2024-01-10| 14999|
|   U002|2024-01-06|  8999|
|   U003|2024-01-07|     0|
|   U004|2024-01-08|  7999|
|   U001|2024-01-15|  1999|
+-------+----------+------+



In [3]:
from pyspark.sql.functions import to_date

df = df.withColumn("date", to_date(df["date"], "yyyy-MM-dd"))

df.printSchema()
df.show()

root
 |-- user_id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: long (nullable = true)

+-------+----------+------+
|user_id|      date|amount|
+-------+----------+------+
|   U001|2024-01-05|  9999|
|   U001|2024-01-10| 14999|
|   U002|2024-01-06|  8999|
|   U003|2024-01-07|     0|
|   U004|2024-01-08|  7999|
|   U001|2024-01-15|  1999|
+-------+----------+------+



In [4]:
total_spend_per_user = df.groupBy("user_id").sum("amount")
total_spend_per_user.show()

+-------+-----------+
|user_id|sum(amount)|
+-------+-----------+
|   U002|       8999|
|   U001|      26997|
|   U004|       7999|
|   U003|          0|
+-------+-----------+



In [5]:
window_spec = Window.partitionBy("user_id").orderBy("date")
running_spend_per_user = df.withColumn("running_spend", sum("amount").over(window_spec))
running_spend_per_user.show()

+-------+----------+------+-------------+
|user_id|      date|amount|running_spend|
+-------+----------+------+-------------+
|   U001|2024-01-05|  9999|         9999|
|   U001|2024-01-10| 14999|        24998|
|   U001|2024-01-15|  1999|        26997|
|   U002|2024-01-06|  8999|         8999|
|   U003|2024-01-07|     0|            0|
|   U004|2024-01-08|  7999|         7999|
+-------+----------+------+-------------+



In [6]:
from pyspark.sql.functions import rank, desc
from pyspark.sql.window import Window

window_spec_rank = Window.orderBy(desc("sum(amount)"))

ranked_users_by_total_spend = total_spend_per_user.withColumn("rank", rank().over(window_spec_rank))

ranked_users_by_total_spend.show()

+-------+-----------+----+
|user_id|sum(amount)|rank|
+-------+-----------+----+
|   U001|      26997|   1|
|   U002|       8999|   2|
|   U004|       7999|   3|
|   U003|          0|   4|
+-------+-----------+----+



### Comparing `GroupBy` and `Window` Outputs

Both `groupBy` and `Window` functions are powerful tools in PySpark for data aggregation and analysis, but they serve different purposes and produce distinct outputs.

#### 1. `GroupBy` Output: `total_spend_per_user`

- **Purpose**: `GroupBy` operations are used for **aggregation**, where you want to collapse multiple rows into a single summary row based on one or more grouping keys. It answers questions like 'What is the total spend for each user?' or 'How many items did each category sell?'
- **Output Characteristics**: The output of a `groupBy` operation typically has fewer rows than the input DataFrame, as it aggregates data based on the unique values of the grouping keys. For each group, it provides a single aggregated value (e.g., sum, count, average).
- **Example Output (`total_spend_per_user`):**
```
+-------+-----------+
|user_id|sum(amount)|
+-------+-----------+
|   U002|       8999|
|   U001|      26997|
|   U004|       7999|
|   U003|          0|
+-------+-----------+
```
Here, you get one row per `user_id` showing their total spend.

#### 2. `Window` Output: `running_spend_per_user` and `ranked_users_by_total_spend`

- **Purpose**: `Window` functions perform calculations across a set of DataFrame rows that are related to the current row. Unlike `groupBy`, `Window` functions **do not collapse rows**. Instead, they add new columns to the DataFrame, providing context-sensitive calculations (like running totals, moving averages, or rankings) for each original row.
- **Output Characteristics**: The output of a `Window` function typically has the **same number of rows** as the input DataFrame. It adds one or more new columns containing the results of the window calculation for each row.

##### a. Running Spend per User (`running_spend_per_user`)

- **Type**: Cumulative aggregate within a partition.
- **Output Characteristics**: For each transaction, it shows the cumulative spend up to that point for that specific user.
- **Example Output (`running_spend_per_user`):**
```
+-------+----------+------+-------------+
|user_id|      date|amount|running_spend|
+-------+----------+------+-------------+
|   U001|2024-01-05|  9999|         9999|
|   U001|2024-01-10| 14999|        24998|
|   U001|2024-01-15|  1999|        26997|
|   U002|2024-01-06|  8999|         8999|
|   U003|2024-01-07|     0|            0|
|   U004|2024-01-08|  7999|         7999|
+-------+----------+------+-------------+
```
Here, each row of the original `df` is preserved, and a new column `running_spend` is added, showing the sum of `amount` up to that date for each user.

##### b. Ranked Users by Total Spend (`ranked_users_by_total_spend`)

- **Type**: Ranking function applied to an aggregated DataFrame.
- **Output Characteristics**: It assigns a rank to each user based on their total spend. It operates on the `total_spend_per_user` (an aggregated DataFrame) and adds a rank column, keeping one row per user.
- **Example Output (`ranked_users_by_total_spend`):**
```
+-------+-----------+----+
|user_id|sum(amount)|rank|
+-------+-----------+----+
|   U001|      26997|   1|
|   U002|       8999|   2|
|   U004|       7999|   3|
|   U003|          0|   4|
+-------+-----------+----+
```
This output shows the ranking of users based on the total spend, which was initially computed using `groupBy`.

#### Key Differences Summarized:

| Feature         | `GroupBy`                               | `Window` (e.g., `running_spend`)                  | `Window` (e.g., `rank` on aggregated data)      |
| :-------------- | :-------------------------------------- | :------------------------------------------------ | :---------------------------------------------- |
| **Row Count**   | Reduces rows (aggregates)               | Preserves original rows                           | Preserves rows of the input (often aggregated)  |
| **Output**      | Summarized data per group               | Adds new column(s) with contextual calculations   | Adds new column(s) with ranking                 |
| **Use Case**    | Overall aggregates (total, count, avg)  | Running totals, moving averages, row comparisons  | Ranking within partitions or across the whole DF|

In essence, `groupBy` is about summarization and reducing granularity, while `Window` functions are about enriching the existing data with new calculations that consider a defined 'window' of related rows.

#6 DataSET

In [None]:
#1.
dataframes_to_check = {
    "users_df": users_df,
    "courses_df": courses_df,
    "activity_df": activity_df,
    "df_enrollments_processed": df_enrollments_processed,
    "df_enriched": df_enriched,
    "df": df,
    "total_spend_per_user": total_spend_per_user,
    "running_spend_per_user": running_spend_per_user,
    "ranked_users_by_total_spend": ranked_users_by_total_spend
}

print("--- Number of Partitions for DataFrames ---")
for df_name, df_obj in dataframes_to_check.items():
    print(f"DataFrame: {df_name}, Partitions: {df_obj.rdd.getNumPartitions()}")

#2.
df_enrollments_repartitioned = df_enrollments_processed.repartition('course_id')

print(f"Original df_enrollments_processed partitions: {df_enrollments_processed.rdd.getNumPartitions()}")
print(f"Repartitioned df_enrollments_repartitioned partitions: {df_enrollments_repartitioned.rdd.getNumPartitions()}")

#3
df_enrollments_coalesced = df_enrollments_repartitioned.coalesce(1)

print(f"Coalesced df_enrollments_coalesced partitions: {df_enrollments_coalesced.rdd.getNumPartitions()}")

output_path = "/tmp/enrollments_single_partition"

df_enrollments_coalesced.write.mode("overwrite").parquet(output_path)

print(f"Coalesced DataFrame written to {output_path} in Parquet format.")


#7 DATASET

In [None]:
dataframes_to_explain = {
    "df_clean_users": df_clean_users,
    "df_courses_clean": df_courses_clean,
    "activity_df": activity_df,
    "df_enrollments_processed": df_enrollments_processed,
    "df_enriched": df_enriched,
    "df_payments": df_payments,
    "total_spend_per_user": total_spend_per_user,
    "running_spend_per_user": running_spend_per_user,
    "ranked_users_by_total_spend": ranked_users_by_total_spend
}

for df_name, df_obj in dataframes_to_explain.items():
    print(f"\n--- Explain for DataFrame: {df_name} ---")
    df_obj.explain(True)

In [None]:
# Bad DAG identified in `ranked_users_by_total_spend`:
# The physical plan for `ranked_users_by_total_spend` includes an `Exchange SinglePartition` followed by a global `Sort`.
# This means that after computing the total spend per user (which already involves a shuffle for aggregation),
# Spark then gathers ALL the aggregated data into a single partition (`Exchange SinglePartition`) to perform a global sort (`Sort`).
# This design choice, while correct for achieving a global rank, is highly inefficient and becomes a major bottleneck
# for large datasets as it eliminates parallelism and forces all data processing onto a single executor.