In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .getOrCreate()

In [3]:
from pyspark.sql import functions as F
from pyspark.sql import types as T


# Raw Data

In [14]:
raw_users = [
    ("U001","Amit","28","Hyderabad","AI,ML,Cloud"),
    ("U002","Neha","Thirty","Delhi","Testing"),
    ("U003","Ravi",None,"Bangalore",["Data","Spark"]),
    ("U004","Pooja","29","Mumbai","AI|ML"),
    ("U005","", "31","Chennai",None)
]

# Schema

In [15]:
schema = T.StructType([
    T.StructField("user_id", T.StringType(), True),
    T.StructField("name", T.StringType(), True),
    T.StructField("age_raw", T.StringType(), True),
    T.StructField("city", T.StringType(), True),
    T.StructField("skills_raw", T.StringType(), True)
])

# Clean the Data

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

def coerce_skills(value):
    if isinstance(value, list):
        return ",".join([str(x) for x in value])
    return value

rows = [(u, n, a if a is not None else None, c, coerce_skills(s))
        for (u, n, a, c, s) in raw_users]

df = spark.createDataFrame(rows, schema=schema)



In [17]:
df = df.withColumn(
    "age",
    F.when(F.col("age_raw").rlike("^[0-9]+$"), F.col("age_raw").cast("int"))
     .when(F.lower(F.col("age_raw")) == "thirty", F.lit(30))
     .when(F.col("age_raw").isNull(), None)
     .otherwise(None)
)

In [18]:
df = df.withColumn(
    "skills",
    F.when(
        F.col("skills_raw").isNotNull(),
        F.expr("filter(transform(split(regexp_replace(skills_raw, '\\\\|', ','), ','), x -> lower(trim(x))), x -> x != '')")
    ).otherwise(F.array())
)

In [19]:

df = df.withColumn(
    "name",
    F.when(F.trim(F.col("name")) == "", F.lit("Unknown")).otherwise(F.col("name"))
)


In [20]:
clean = df.select("user_id", "name", "age", "city", "skills")

clean.show(truncate=False)
clean.printSchema()

+-------+-------+----+---------+---------------+
|user_id|name   |age |city     |skills         |
+-------+-------+----+---------+---------------+
|U001   |Amit   |28  |Hyderabad|[ai, ml, cloud]|
|U002   |Neha   |30  |Delhi    |[testing]      |
|U003   |Ravi   |NULL|Bangalore|[data, spark]  |
|U004   |Pooja  |29  |Mumbai   |[ai, ml]       |
|U005   |Unknown|31  |Chennai  |[]             |
+-------+-------+----+---------+---------------+

root
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- skills: array (nullable = true)
 |    |-- element: string (containsNull = false)



# Dataset 2

In [21]:
raw_courses = [
    ("C001","PySpark Mastery","Data Engineering","Advanced","₹9999"),
    ("C002","AI for Testers","QA","Beginner","8999"),
    ("C003","ML Foundations","AI","Intermediate",None),
    ("C004","Data Engineering Bootcamp","Data","Advanced","₹14999")
]


In [22]:
schema = T.StructType([
    T.StructField("course_id", T.StringType(), True),
    T.StructField("course_name", T.StringType(), True),
    T.StructField("domain", T.StringType(), True),
    T.StructField("level", T.StringType(), True),
    T.StructField("price_raw", T.StringType(), True)
])


In [23]:
df = spark.createDataFrame(raw_courses, schema=schema)


In [24]:
df = df.withColumn(
    "price",
    F.when(F.col("price_raw").isNotNull(),
           F.regexp_replace(F.col("price_raw"), "[^0-9]", "").cast("int")
    ).otherwise(None)
)


In [25]:
clean_courses = df.select("course_id", "course_name", "domain", "level", "price")


In [26]:
clean_courses.show(truncate=False)
clean_courses.printSchema()


+---------+-------------------------+----------------+------------+-----+
|course_id|course_name              |domain          |level       |price|
+---------+-------------------------+----------------+------------+-----+
|C001     |PySpark Mastery          |Data Engineering|Advanced    |9999 |
|C002     |AI for Testers           |QA              |Beginner    |8999 |
|C003     |ML Foundations           |AI              |Intermediate|NULL |
|C004     |Data Engineering Bootcamp|Data            |Advanced    |14999|
+---------+-------------------------+----------------+------------+-----+

root
 |-- course_id: string (nullable = true)
 |-- course_name: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- level: string (nullable = true)
 |-- price: integer (nullable = true)



# Dataset 4

In [42]:
raw_enrollments = [
    ("U001","C001","2024-01-05"),
    ("U002","C002","05/01/2024"),
    ("U003","C001","2024/01/06"),
    ("U004","C003","invalid_date"),
    ("U001","C004","2024-01-10"),
    ("U005","C002","2024-01-12")
]


In [43]:
schema = T.StructType([
    T.StructField("user_id", T.StringType(), True),
    T.StructField("course_id", T.StringType(), True),
    T.StructField("date_raw", T.StringType(), True)
])


In [44]:
df = spark.createDataFrame(raw_enrollments, schema=schema)


In [45]:
df = df.withColumn("date_str", F.regexp_replace("date_raw", "/", "-"))

In [46]:
df = df.withColumn(
    "enrollment_date",
    F.when(F.col("date_str").rlike("^[0-9]{4}-[0-9]{2}-[0-9]{2}$"),
           F.to_date("date_str", "yyyy-MM-dd"))
     .when(F.col("date_str").rlike("^[0-9]{2}-[0-9]{2}-[0-9]{4}$"),
           F.to_date("date_str", "dd-MM-yyyy"))
     .otherwise(None)
)

In [47]:
df = df.withColumn("is_valid_date", F.col("enrollment_date").isNotNull())


In [49]:

clean_df=df
clean_df.show()

+-------+---------+------------+------------+---------------+-------------+
|user_id|course_id|    date_raw|    date_str|enrollment_date|is_valid_date|
+-------+---------+------------+------------+---------------+-------------+
|   U001|     C001|  2024-01-05|  2024-01-05|     2024-01-05|         true|
|   U002|     C002|  05/01/2024|  05-01-2024|     2024-01-05|         true|
|   U003|     C001|  2024/01/06|  2024-01-06|     2024-01-06|         true|
|   U004|     C003|invalid_date|invalid_date|           NULL|        false|
|   U001|     C004|  2024-01-10|  2024-01-10|     2024-01-10|         true|
|   U005|     C002|  2024-01-12|  2024-01-12|     2024-01-12|         true|
+-------+---------+------------+------------+---------------+-------------+



In [62]:
clean_df=clean_df.drop("is_valid_date")

In [63]:
clean_df.show()

+-------+---------+------------+------------+---------------+
|user_id|course_id|    date_raw|    date_str|enrollment_date|
+-------+---------+------------+------------+---------------+
|   U001|     C001|  2024-01-05|  2024-01-05|     2024-01-05|
|   U002|     C002|  05/01/2024|  05-01-2024|     2024-01-05|
|   U003|     C001|  2024/01/06|  2024-01-06|     2024-01-06|
|   U004|     C003|invalid_date|invalid_date|           NULL|
|   U001|     C004|  2024-01-10|  2024-01-10|     2024-01-10|
|   U005|     C002|  2024-01-12|  2024-01-12|     2024-01-12|
+-------+---------+------------+------------+---------------+



In [65]:
clean_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- course_id: string (nullable = true)
 |-- date_raw: string (nullable = true)
 |-- date_str: string (nullable = true)
 |-- enrollment_date: date (nullable = true)



# Dataset 4

In [50]:
raw_activity = [
    ("U001","login,watch,logout","{'device':'mobile'}",120),
    ("U002",["login","watch"],"device=laptop",90),
    ("U003","login|logout",None,30),
    ("U004",None,"{'device':'tablet'}",60),
    ("U005","login","{'device':'mobile'}",15)
]

In [51]:
schema = T.StructType([
    T.StructField("user_id", T.StringType(), True),
    T.StructField("actions_raw", T.StringType(), True),
    T.StructField("metadata_raw", T.StringType(), True),
    T.StructField("duration", T.IntegerType(), True)
])


In [52]:
def coerce_actions(value):
    if isinstance(value, list):
        return ",".join(value)
    return value

rows = [(u, coerce_actions(a), m, d) for (u,a,m,d) in raw_activity]
df = spark.createDataFrame(rows, schema=schema)


In [55]:
df = df.withColumn(
    "actions",
    F.when(F.col("actions_raw").isNotNull(),
           F.expr("filter(transform(split(regexp_replace(actions_raw, '\\\\|', ','), ','), x -> lower(trim(x))), x -> x != '')")
    ).otherwise(F.array())
)


In [56]:
df = df.withColumn(
    "device",
    F.when(F.col("metadata_raw").rlike("mobile"), F.lit("mobile"))
     .when(F.col("metadata_raw").rlike("laptop"), F.lit("laptop"))
     .when(F.col("metadata_raw").rlike("tablet"), F.lit("tablet"))
     .otherwise(None)
)

In [57]:
clean_activity = df.select("user_id", "actions", "device", "duration")

clean_activity.show(truncate=False)
clean_activity.printSchema()

+-------+----------------------+------+--------+
|user_id|actions               |device|duration|
+-------+----------------------+------+--------+
|U001   |[login, watch, logout]|mobile|120     |
|U002   |[login, watch]        |laptop|90      |
|U003   |[login, logout]       |NULL  |30      |
|U004   |[]                    |tablet|60      |
|U005   |[login]               |mobile|15      |
+-------+----------------------+------+--------+

root
 |-- user_id: string (nullable = true)
 |-- actions: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- device: string (nullable = true)
 |-- duration: integer (nullable = true)



# Join users with enrollments

In [66]:
users_intersect_enrollments = clean_df.select("user_id").intersect(
    clean.select("user_id")
)

users_with_enrollments = clean_df.join(
    users_intersect_enrollments, on="user_id", how="inner"
)

In [67]:
users_with_enrollments.show()

+-------+---------+------------+------------+---------------+
|user_id|course_id|    date_raw|    date_str|enrollment_date|
+-------+---------+------------+------------+---------------+
|   U002|     C002|  05/01/2024|  05-01-2024|     2024-01-05|
|   U003|     C001|  2024/01/06|  2024-01-06|     2024-01-06|
|   U001|     C001|  2024-01-05|  2024-01-05|     2024-01-05|
|   U004|     C003|invalid_date|invalid_date|           NULL|
|   U005|     C002|  2024-01-12|  2024-01-12|     2024-01-12|
|   U001|     C004|  2024-01-10|  2024-01-10|     2024-01-10|
+-------+---------+------------+------------+---------------+



# Broadcast

In [68]:
from pyspark.sql.functions import broadcast

enrollments_users = clean_df.join(broadcast(clean), "user_id", "left")
enrollments_courses = enrollments_users.join(broadcast(clean_courses), "course_id", "left")

In [69]:
enrollments_courses.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(LeftOuter, [course_id])
:- Project [user_id#130, course_id#131, date_raw#132, date_str#133, enrollment_date#134, name#40, age#36, city#34, skills#37]
:  +- Join LeftOuter, (user_id#130 = user_id#31)
:     :- Project [user_id#130, course_id#131, date_raw#132, date_str#133, enrollment_date#134]
:     :  +- Project [user_id#130, course_id#131, date_raw#132, date_str#133, enrollment_date#134, isnotnull(enrollment_date#134) AS is_valid_date#135]
:     :     +- Project [user_id#130, course_id#131, date_raw#132, date_str#133, CASE WHEN RLIKE(date_str#133, ^[0-9]{4}-[0-9]{2}-[0-9]{2}$) THEN to_date(date_str#133, Some(yyyy-MM-dd), Some(Etc/UTC), true) WHEN RLIKE(date_str#133, ^[0-9]{2}-[0-9]{2}-[0-9]{4}$) THEN to_date(date_str#133, Some(dd-MM-yyyy), Some(Etc/UTC), true) ELSE cast(null as date) END AS enrollment_date#134]
:     :        +- Project [user_id#130, course_id#131, date_raw#132, regexp_replace(date_raw#132, /, -, 1) AS date_str#133]
:     :   

# Eliminate Orphan Records

In [70]:
final = (
    enrollments_courses
    .withColumn("is_orphan",
                (F.col("name").isNull()) | (F.col("course_name").isNull()))
    .filter(~F.col("is_orphan"))
)

# Total Enrollments per Course




In [71]:
total_enrollments_per_course = clean_df.groupBy("course_id") \
    .agg(F.countDistinct("user_id").alias("total_enrollments"))

In [73]:
total_enrollments_per_course.show()

+---------+-----------------+
|course_id|total_enrollments|
+---------+-----------------+
|     C003|                1|
|     C004|                1|
|     C001|                2|
|     C002|                2|
+---------+-----------------+



# Total Revenue per Course





In [72]:
total_revenue_per_course = enrollments_courses.groupBy("course_id", "course_name") \
    .agg((F.count("user_id") * F.first("price")).alias("total_revenue"))

In [74]:
total_revenue_per_course.show()

+---------+--------------------+-------------+
|course_id|         course_name|total_revenue|
+---------+--------------------+-------------+
|     C001|     PySpark Mastery|        19998|
|     C002|      AI for Testers|        17998|
|     C003|      ML Foundations|         NULL|
|     C004|Data Engineering ...|        14999|
+---------+--------------------+-------------+



# Average Engagement Time per Course

In [75]:
enrollments_activity = enrollments_courses.join(clean_activity, "user_id", "left")

avg_engagement_per_course = enrollments_activity.groupBy("course_id", "course_name") \
    .agg(F.avg("duration").alias("avg_engagement_time"))

In [76]:
avg_engagement_per_course.show()

+---------+--------------------+-------------------+
|course_id|         course_name|avg_engagement_time|
+---------+--------------------+-------------------+
|     C001|     PySpark Mastery|               75.0|
|     C002|      AI for Testers|               52.5|
|     C003|      ML Foundations|               60.0|
|     C004|Data Engineering ...|              120.0|
+---------+--------------------+-------------------+



# Total Courses Enrolled per User



In [78]:
courses_per_user = clean_df.groupBy("user_id") \
    .agg(F.countDistinct("course_id").alias("total_courses_enrolled"))

In [79]:
courses_per_user.show()

+-------+----------------------+
|user_id|total_courses_enrolled|
+-------+----------------------+
|   U004|                     1|
|   U005|                     1|
|   U002|                     1|
|   U003|                     1|
|   U001|                     2|
+-------+----------------------+



#  Identify Users with Zero Activity


In [83]:
clean_df.printSchema()


root
 |-- user_id: string (nullable = true)
 |-- course_id: string (nullable = true)
 |-- date_raw: string (nullable = true)
 |-- date_str: string (nullable = true)
 |-- enrollment_date: date (nullable = true)



In [84]:
users_with_activity = clean_activity.select("user_id").distinct()


In [85]:
users_with_activity.printSchema()

root
 |-- user_id: string (nullable = true)



In [88]:
users_with_zero_activity = clean.select("user_id", "name") \
    .join(users_with_activity, "user_id", "left_anti")


In [89]:
users_with_zero_activity.show()

+-------+----+
|user_id|name|
+-------+----+
+-------+----+



# Rank Users by Total Time Spent


In [91]:
from pyspark.sql import Window
import pyspark.sql.functions as F

user_time = clean_activity.groupBy("user_id").agg(F.sum("duration").alias("total_time"))

w = Window.orderBy(F.desc("total_time"))

ranked_users = user_time.withColumn("rank", F.rank().over(w))

In [94]:
ranked_users.show()

+-------+----------+----+
|user_id|total_time|rank|
+-------+----------+----+
|   U001|       120|   1|
|   U002|        90|   2|
|   U004|        60|   3|
|   U003|        30|   4|
|   U005|        15|   5|
+-------+----------+----+



# Running Revenue per Course by Enrollment Date

In [93]:
enrollments_courses = clean_df.join(clean_courses, "course_id", "left")

enrollments_courses = enrollments_courses.withColumn("revenue", F.col("price"))

w = Window.partitionBy("course_id").orderBy("enrollment_date") \
          .rowsBetween(Window.unboundedPreceding, Window.currentRow)

running_revenue = enrollments_courses.withColumn("running_revenue", F.sum("revenue").over(w))

In [95]:
running_revenue.show()

+---------+-------+------------+------------+---------------+--------------------+----------------+------------+-----+-------+---------------+
|course_id|user_id|    date_raw|    date_str|enrollment_date|         course_name|          domain|       level|price|revenue|running_revenue|
+---------+-------+------------+------------+---------------+--------------------+----------------+------------+-----+-------+---------------+
|     C001|   U001|  2024-01-05|  2024-01-05|     2024-01-05|     PySpark Mastery|Data Engineering|    Advanced| 9999|   9999|           9999|
|     C001|   U003|  2024/01/06|  2024-01-06|     2024-01-06|     PySpark Mastery|Data Engineering|    Advanced| 9999|   9999|          19998|
|     C002|   U002|  05/01/2024|  05-01-2024|     2024-01-05|      AI for Testers|              QA|    Beginner| 8999|   8999|           8999|
|     C002|   U005|  2024-01-12|  2024-01-12|     2024-01-12|      AI for Testers|              QA|    Beginner| 8999|   8999|          17998|

# Identify Top 2 Users per Course by Engagement

In [96]:
enrollments_activity = clean_df.join(clean_activity, "user_id", "left")

user_course_time = enrollments_activity.groupBy("course_id", "user_id") \
    .agg(F.sum("duration").alias("total_time"))

w = Window.partitionBy("course_id").orderBy(F.desc("total_time"))

top_users_per_course = user_course_time.withColumn("rank", F.rank().over(w)) \
    .filter(F.col("rank") <= 2)

# Compare GroupBy vs Window Results

In [98]:
user_course_time.explain(True)


top_users_per_course.explain(True)


== Parsed Logical Plan ==
'Aggregate ['course_id, 'user_id], ['course_id, 'user_id, 'sum('duration) AS total_time#523]
+- Project [user_id#130, course_id#131, date_raw#132, date_str#133, enrollment_date#134, actions#198, device#201, duration#177]
   +- Join LeftOuter, (user_id#130 = user_id#174)
      :- Project [user_id#130, course_id#131, date_raw#132, date_str#133, enrollment_date#134]
      :  +- Project [user_id#130, course_id#131, date_raw#132, date_str#133, enrollment_date#134, isnotnull(enrollment_date#134) AS is_valid_date#135]
      :     +- Project [user_id#130, course_id#131, date_raw#132, date_str#133, CASE WHEN RLIKE(date_str#133, ^[0-9]{4}-[0-9]{2}-[0-9]{2}$) THEN to_date(date_str#133, Some(yyyy-MM-dd), Some(Etc/UTC), true) WHEN RLIKE(date_str#133, ^[0-9]{2}-[0-9]{2}-[0-9]{4}$) THEN to_date(date_str#133, Some(dd-MM-yyyy), Some(Etc/UTC), true) ELSE cast(null as date) END AS enrollment_date#134]
      :        +- Project [user_id#130, course_id#131, date_raw#132, regexp_re

# Aggregate Total Time


In [99]:
from pyspark.sql import functions as F

user_engagement = clean_activity.groupBy("user_id") \
    .agg(F.sum("duration").alias("total_time"))

# Classify with Built‑in Functions

In [100]:
user_engagement = user_engagement.withColumn(
    "engagement_level",
    F.when(F.col("total_time") >= 100, "High")
     .when((F.col("total_time") >= 50) & (F.col("total_time") < 100), "Medium")
     .otherwise("Low")
)

In [101]:
user_engagement.show()

+-------+----------+----------------+
|user_id|total_time|engagement_level|
+-------+----------+----------------+
|   U002|        90|          Medium|
|   U001|       120|            High|
|   U004|        60|          Medium|
|   U005|        15|             Low|
|   U003|        30|             Low|
+-------+----------+----------------+



# Sort courses by total revenue

In [102]:
from pyspark.sql import functions as F

enrollments_courses = clean_df.join(clean_courses, "course_id", "left")

total_revenue_per_course = (
    enrollments_courses
    .groupBy("course_id", "course_name")
    .agg((F.count("user_id") * F.first("price")).alias("total_revenue"))
    .orderBy(F.desc("total_revenue"))
)

# Sort users by engagement within each city

In [103]:

user_time = clean_activity.groupBy("user_id").agg(F.sum("duration").alias("total_time"))


user_time_city = user_time.join(clean.select("user_id", "city"), "user_id", "left")


users_sorted_within_city = user_time_city.orderBy("city", F.desc("total_time"))

# Set operations

In [104]:
users_enrolled = clean_df.select("user_id").distinct()

users_active = clean_activity.select("user_id").distinct()

In [105]:
users_enrolled.show()
users_active.show()

+-------+
|user_id|
+-------+
|   U002|
|   U003|
|   U001|
|   U004|
|   U005|
+-------+

+-------+
|user_id|
+-------+
|   U002|
|   U001|
|   U004|
|   U005|
|   U003|
+-------+



# Find users who enrolled but never became active

In [106]:
enrolled_not_active = users_enrolled.join(users_active, "user_id", "left_anti")

In [107]:
enrolled_not_active.show()

+-------+
|user_id|
+-------+
+-------+



# Find users who are both enrolled and active

In [108]:
enrolled_and_active = users_enrolled.intersect(users_active)

In [109]:
enrolled_and_active.show()

+-------+
|user_id|
+-------+
|   U004|
|   U005|
|   U002|
|   U003|
|   U001|
+-------+



 # DAG and performance analysis

In [111]:

total_revenue_per_course.explain(True)

users_sorted_within_city.explain(True)

enrollments_courses.explain(True)

== Parsed Logical Plan ==
'Sort ['total_revenue DESC NULLS LAST], true
+- Aggregate [course_id#131, course_name#58], [course_id#131, course_name#58, (count(user_id#130) * cast(first(price#62, false) as bigint)) AS total_revenue#565L]
   +- Project [course_id#131, user_id#130, date_raw#132, date_str#133, enrollment_date#134, course_name#58, domain#59, level#60, price#62]
      +- Join LeftOuter, (course_id#131 = course_id#57)
         :- Project [user_id#130, course_id#131, date_raw#132, date_str#133, enrollment_date#134]
         :  +- Project [user_id#130, course_id#131, date_raw#132, date_str#133, enrollment_date#134, isnotnull(enrollment_date#134) AS is_valid_date#135]
         :     +- Project [user_id#130, course_id#131, date_raw#132, date_str#133, CASE WHEN RLIKE(date_str#133, ^[0-9]{4}-[0-9]{2}-[0-9]{2}$) THEN to_date(date_str#133, Some(yyyy-MM-dd), Some(Etc/UTC), true) WHEN RLIKE(date_str#133, ^[0-9]{2}-[0-9]{2}-[0-9]{4}$) THEN to_date(date_str#133, Some(dd-MM-yyyy), Some(Etc/U