In [0]:
# Load CSV into dataframe
df_raw = spark.read.option("header", True).option("inferSchema", True).csv("/Volumes/trial/default/course_engagement/online_course_engagement_data_cleaned.csv")

# Save as managed delta table (bronze)
df_raw.write.format("delta").mode("overwrite").saveAsTable("bronze_ecourse_engagement")

In [0]:
from pyspark.sql.functions import when, col, sum as spark_sum

df_raw.printSchema()

display(df_raw)

# Checking for nulls
df_raw.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df_raw.columns]).show()

# Unique values
for column in df_raw.columns:
    print(f"{column}: {df_raw.select(column).distinct().count()} unique values")

# Rename columns to snake case
df_renamed = df_raw.selectExpr(
    "UserID as user_id",
    "CourseCategory as course_category",
    "TimeSpentOnCourse as time_spent_on_course",
    "NumberOfVideosWatched as number_of_videos_watched",
    "NumberOfQuizzesTaken as number_of_quizzes_taken",
    "QuizScores as quiz_scores",
    "CompletionRate as completion_rate",
    "DeviceType as device_type",
    "CourseCompletion as course_completion"
)

# Add label columns for readability
df_labeled = df_renamed \
    .withColumn("device_type_label", when(col("device_type") == 0, "Desktop").otherwise("Mobile")) \
    .withColumn("course_completion_label", when(col("course_completion") == 1, "Completed").otherwise("Not Completed"))

# Show sample of transformed data
display(df_labeled.select("device_type", "device_type_label", "course_completion", "course_completion_label"))

# Save as managed delta table (silver)
df_labeled.write.format("delta").mode("overwrite").saveAsTable("silver_ecourse_engagement")

In [0]:
# Basic data quality checks
# Count records with negative quiz scores
invalid_quiz_scores = df_renamed.filter((col("quiz_scores") < 0) | (col("quiz_scores") > 100))
print(f"Invalid quiz score rows: {invalid_quiz_scores.count()}")