#REALTIME CASE STUDY

In [None]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("LearningPlatformAnalytics") \
    .getOrCreate()

from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window

#Part A - Ex1

In [None]:

# Dataset 1 — USER MASTER (Raw)
users_schema = StructType([
    StructField("user_id", StringType(), False),
    StructField("name", StringType(), True),
    StructField("age_raw", StringType(), True),
    StructField("city", StringType(), True),
    StructField("skills_raw", StringType(), True)
])

raw_users = [
    ("U001","Amit","28","Hyderabad","AI,ML,Cloud"),
    ("U002","Neha","Thirty","Delhi","Testing"),
    ("U003","Ravi",None,"Bangalore",["Data","Spark"]),   # note: array
    ("U004","Pooja","29","Mumbai","AI|ML"),
    ("U005","", "31","Chennai",None)
]

# Dataset 2 — COURSE CATALOG
courses_schema = StructType([
    StructField("course_id", StringType(), False),
    StructField("course_name", StringType(), True),
    StructField("domain", StringType(), True),
    StructField("level", StringType(), True),
    StructField("price", StringType(), True)
])

raw_courses = [
    ("C001","PySpark Mastery","Data Engineering","Advanced","₹9999"),
    ("C002","AI for Testers","QA","Beginner","8999"),
    ("C003","ML Foundations","AI","Intermediate",None),
    ("C004","Data Engineering Bootcamp","Data","Advanced","₹14999")
]

# Dataset 3 — USER ENROLLMENTS
enrollments_schema = StructType([
    StructField("user_id", StringType(), False),
    StructField("course_id", StringType(), False),
    StructField("enrollment_date_raw", StringType(), True)
])

raw_enrollments = [
    ("U001","C001","2024-01-05"),
    ("U002","C002","05/01/2024"),
    ("U003","C001","2024/01/06"),
    ("U004","C003","invalid_date"),
    ("U001","C004","2024-01-10"),
    ("U005","C002","2024-01-12")
]

# Dataset 4 — USER ACTIVITY LOGS
activity_schema = StructType([
    StructField("user_id", StringType(), False),
    StructField("actions_raw", StringType(), True),
    StructField("metadata_raw", StringType(), True),
    StructField("duration_mins", IntegerType(), True)
])

raw_activity = [
    ("U001","login,watch,logout","{'device':'mobile'}",120),
    ("U002",None,"device=laptop",90),
    ("U003","login|logout",None,30),
    ("U004",None,"{'device':'tablet'}",60),
    ("U005","login","{'device':'mobile'}",15)
]




In [None]:

users_raw_df = spark.createDataFrame(raw_users, users_schema)
courses_raw_df = spark.createDataFrame(raw_courses, courses_schema)
enrollments_raw_df = spark.createDataFrame(raw_enrollments, enrollments_schema)
activity_raw_df = spark.createDataFrame(raw_activity, activity_schema)


#Part A - Ex 2

In [None]:
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window


users_df = users_raw_df.withColumn(
    "age", F.regexp_extract(F.col("age_raw"), r"\d+", 0).cast("int"))


courses_df = courses_raw_df.withColumn(
    "price", F.regexp_replace(F.col("price"), r"[^\d]", "").cast("double"))


enrollments_df = enrollments_raw_df.withColumn(
    "enrollment_dt",
    F.coalesce(
        F.to_date("enrollment_date_raw", "yyyy-MM-dd"),
        F.to_date("enrollment_date_raw", "dd/MM/yyyy"),
        F.to_date("enrollment_date_raw", "yyyy/MM/dd")
    )
).drop("enrollment_date_raw").filter(F.col("enrollment_dt").isNotNull())

#Part A - Ex 3
Convert skills and actions into arrays

In [None]:

from pyspark.sql import functions as F
users_df = users_df.withColumn(
    "skills",
    F.when(F.col("skills_raw").isNull(), F.array()) )


#Part A - Ex 4
Handle missing and invalid records gracefully

In [None]:
valid_activity_df = activity_raw_df.filter(F.size("actions_raw") > 0)

#Part B - Ex 1
Join users with enrollments

In [None]:
users_enrollments_df = users_df.join(enrollments_df, on = "user_id", how = "left")


#Part B - Ex 2
Join enrollments with courses

In [None]:
enrollments_courses_df = enrollments_df.join(courses_df, on = "course_id", how = "left")

#Part B - Ex 3
Decide which table(s) should be broadcast