In [1]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

print("="*70)
print("OULAD ETL PIPELINE - ONLINE EDUCATION ANALYTICS")
print("="*70)
print(f"Run time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

# =============================================================================
# 1. EXTRACT
# =============================================================================
print("\n[EXTRACT] Loading OULAD datasets...")

students = pd.read_csv("studentInfo.csv")
courses = pd.read_csv("courses.csv")
assessments = pd.read_csv("assessments.csv")
student_assessment = pd.read_csv("studentAssessment.csv")
vle = pd.read_csv("vle.csv")  # metadata only
student_registration = pd.read_csv("studentRegistration.csv")

print("âœ“ All required files loaded successfully")
print(f"   Students: {len(students)}")
print(f"   Courses: {len(courses)}")
print(f"   Assessments: {len(assessments)}")
print(f"   Student Assessments: {len(student_assessment)}")
print(f"   VLE Resources: {len(vle)}")
print(f"   Registrations: {len(student_registration)}")

# =============================================================================
# 2. TRANSFORM - DATA QUALITY
# =============================================================================
print("\n[TRANSFORM] Data quality checks...")

# Remove duplicates
initial_students = len(students)
students.drop_duplicates(inplace=True)
print(f"âœ“ Students: Removed {initial_students - len(students)} duplicates")

initial_assessments = len(student_assessment)
student_assessment.drop_duplicates(inplace=True)
print(f"âœ“ Assessments: Removed {initial_assessments - len(student_assessment)} duplicates")

courses.drop_duplicates(inplace=True)
student_registration.drop_duplicates(inplace=True)

# Check missing values
print("\nâœ“ Missing values check:")
missing_cols = students.isnull().sum()[students.isnull().sum() > 0]
if len(missing_cols) > 0:
    print(missing_cols)
else:
    print("   No missing values found")

# =============================================================================
# 3. DIMENSION TABLES
# =============================================================================

# ============= DIM_STUDENT =============
print("\n[TRANSFORM] Creating dim_student...")

dim_student = students[[
    'id_student', 'gender', 'region', 'highest_education',
    'imd_band', 'age_band', 'num_of_prev_attempts', 'disability'
]].drop_duplicates(subset=['id_student'])

# Handle missing values
dim_student.fillna({
    'gender': 'Unknown',
    'region': 'Unknown',
    'highest_education': 'Unknown',
    'imd_band': 'Unknown',
    'age_band': 'Unknown',
    'disability': 'N',
    'num_of_prev_attempts': 0
}, inplace=True)

# Add surrogate key
dim_student.insert(0, 'StudentSK', range(1, len(dim_student) + 1))

# Reorder columns for clarity
dim_student = dim_student[[
    'StudentSK', 'id_student', 'gender', 'age_band', 
    'highest_education', 'region', 'imd_band', 
    'num_of_prev_attempts', 'disability'
]]

print(f"âœ“ dim_student created: {len(dim_student)} unique students")

# ============= DIM_COURSE =============
print("\n[TRANSFORM] Creating dim_course...")

dim_course = courses.copy()

# Create unique identifier
dim_course['course_presentation_id'] = (
    dim_course['code_module'] + "_" + dim_course['code_presentation']
)

# Add descriptive fields
dim_course['semester_name'] = dim_course['code_presentation'].map({
    '2013B': 'February 2013',
    '2013J': 'October 2013',
    '2014B': 'February 2014',
    '2014J': 'October 2014'
})

dim_course['year'] = dim_course['code_presentation'].str[:4]
dim_course['semester_type'] = dim_course['code_presentation'].str[4:].map({
    'B': 'February',
    'J': 'October'
})

# Add surrogate key
dim_course.insert(0, 'CourseSK', range(1, len(dim_course) + 1))

# Select final columns
dim_course = dim_course[[
    'CourseSK', 'course_presentation_id', 'code_module', 
    'code_presentation', 'semester_name', 'year', 
    'semester_type', 'module_presentation_length'
]]

print(f"âœ“ dim_course created: {len(dim_course)} course presentations")

# ============= DIM_ASSESSMENT =============
print("\n[TRANSFORM] Creating dim_assessment...")

dim_assessment = assessments.copy()

# Create unique identifier
dim_assessment['assessment_full_id'] = (
    dim_assessment['code_module'] + "_" +
    dim_assessment['code_presentation'] + "_" +
    dim_assessment['id_assessment'].astype(str)
)

# Handle missing values
dim_assessment['assessment_type'] = dim_assessment['assessment_type'].fillna('Unknown')

# Categorize assessment types
def categorize_assessment(atype):
    if pd.isna(atype) or atype == 'Unknown':
        return 'Unknown'
    atype = str(atype).upper()
    if 'TMA' in atype:
        return 'Tutor Marked Assignment'
    elif 'CMA' in atype:
        return 'Computer Marked Assignment'
    elif 'EXAM' in atype:
        return 'Final Exam'
    else:
        return 'Other'

dim_assessment['assessment_category'] = dim_assessment['assessment_type'].apply(categorize_assessment)

# Add surrogate key
dim_assessment.insert(0, 'AssessmentSK', range(1, len(dim_assessment) + 1))

print(f"âœ“ dim_assessment created: {len(dim_assessment)} assessments")

# ============= DIM_VLE_RESOURCE =============
print("\n[TRANSFORM] Creating dim_vle_resource...")

dim_vle_resource = vle.copy()

# Create unique identifier
dim_vle_resource['resource_full_id'] = (
    dim_vle_resource['code_module'] + "_" + 
    dim_vle_resource['code_presentation'] + "_" + 
    dim_vle_resource['id_site'].astype(str)
)

# Handle missing values
dim_vle_resource['activity_type'] = dim_vle_resource['activity_type'].fillna('Unknown')

# Categorize activities
def categorize_activity(activity):
    if pd.isna(activity):
        return 'Other'
    activity = str(activity).lower()
    if 'resource' in activity or 'page' in activity or 'url' in activity:
        return 'Content'
    elif 'forum' in activity or 'discussion' in activity:
        return 'Discussion'
    elif 'quiz' in activity or 'questionnaire' in activity:
        return 'Assessment'
    elif 'homepage' in activity:
        return 'Navigation'
    else:
        return 'Other'

dim_vle_resource['activity_category'] = dim_vle_resource['activity_type'].apply(categorize_activity)

# Add surrogate key
dim_vle_resource.insert(0, 'ResourceSK', range(1, len(dim_vle_resource) + 1))

print(f"âœ“ dim_vle_resource created: {len(dim_vle_resource)} VLE resources")

# ============= DIM_TIME =============
print("\n[TRANSFORM] Creating dim_time...")

# Collect dates from registrations
all_dates = []

# Get dates from student_registration
if 'date_registration' in student_registration.columns:
    reg_dates = student_registration['date_registration'].dropna().unique()
    all_dates.extend(reg_dates)

if 'date_unregistration' in student_registration.columns:
    unreg_dates = student_registration['date_unregistration'].dropna().unique()
    all_dates.extend(unreg_dates)

# Convert to actual dates (days from course start)
# Use 2013-10-01 as base date
base_date = datetime(2013, 10, 1)
actual_dates = []

for day_offset in all_dates:
    if pd.notna(day_offset):
        actual_date = base_date + timedelta(days=int(day_offset))
        if 2012 <= actual_date.year <= 2015:  # Valid range
            actual_dates.append(actual_date)

# Remove duplicates and sort
actual_dates = sorted(list(set(actual_dates)))

# If we don't have enough dates, create a date range
if len(actual_dates) < 100:
    actual_dates = pd.date_range(start='2013-01-01', end='2014-12-31', freq='D').tolist()

# Create dim_time
dim_time = pd.DataFrame({'FullDate': pd.to_datetime(actual_dates)})
dim_time = dim_time.drop_duplicates().sort_values('FullDate').reset_index(drop=True)

dim_time['DateSK'] = range(1, len(dim_time) + 1)
dim_time['Year'] = dim_time['FullDate'].dt.year
dim_time['Quarter'] = dim_time['FullDate'].dt.quarter
dim_time['Month'] = dim_time['FullDate'].dt.month
dim_time['MonthName'] = dim_time['FullDate'].dt.month_name()
dim_time['Week'] = dim_time['FullDate'].dt.isocalendar().week
dim_time['DayOfWeek'] = dim_time['FullDate'].dt.dayofweek
dim_time['DayName'] = dim_time['FullDate'].dt.day_name()
dim_time['IsWeekend'] = dim_time['DayOfWeek'].isin([5, 6])

# Add semester indicator
def get_semester(month):
    if month >= 9:
        return 'Fall'
    elif month >= 6:
        return 'Summer'
    elif month >= 3:
        return 'Spring'
    else:
        return 'Winter'

dim_time['Semester'] = dim_time['Month'].apply(get_semester)

# Reorder columns
dim_time = dim_time[[
    'DateSK', 'FullDate', 'Year', 'Quarter', 'Month', 'MonthName',
    'Week', 'DayOfWeek', 'DayName', 'IsWeekend', 'Semester'
]]

print(f"âœ“ dim_time created: {len(dim_time)} unique dates")

# =============================================================================
# 4. FACT TABLES
# =============================================================================

# ============= FACT_STUDENT_ASSESSMENT =============
print("\n[TRANSFORM] Creating fact_student_assessment...")

fact_assess = student_assessment.copy()

# Merge with students to get course info
fact_assess = fact_assess.merge(
    students[['id_student', 'code_module', 'code_presentation']],
    on='id_student',
    how='left'
)

# Merge with dim_student
fact_assess = fact_assess.merge(
    dim_student[['id_student', 'StudentSK']],
    on='id_student',
    how='left'
)

# Merge with dim_course
fact_assess['course_presentation_id'] = (
    fact_assess['code_module'] + "_" + fact_assess['code_presentation']
)
fact_assess = fact_assess.merge(
    dim_course[['course_presentation_id', 'CourseSK']],
    on='course_presentation_id',
    how='left'
)

# Merge with dim_assessment
fact_assess['assessment_full_id'] = (
    fact_assess['code_module'] + "_" +
    fact_assess['code_presentation'] + "_" +
    fact_assess['id_assessment'].astype(str)
)
fact_assess = fact_assess.merge(
    dim_assessment[['assessment_full_id', 'AssessmentSK', 'weight', 'date']],
    on='assessment_full_id',
    how='left'
)

# Clean and calculate metrics
fact_assess['score'] = pd.to_numeric(fact_assess['score'], errors='coerce').fillna(0)
fact_assess['is_passed'] = (fact_assess['score'] >= 40).astype(int)
fact_assess['is_submitted'] = fact_assess['score'].notna().astype(int)

# Calculate days late
fact_assess['days_late'] = fact_assess['date_submitted'] - fact_assess['date']
fact_assess['days_late'] = fact_assess['days_late'].fillna(0)
fact_assess['is_late'] = (fact_assess['days_late'] > 0).astype(int)

# Select final columns
fact_student_assessment = fact_assess[[
    'StudentSK', 'CourseSK', 'AssessmentSK',
    'score', 'weight', 'is_submitted', 'is_passed', 
    'is_late', 'days_late', 'is_banked'
]].dropna(subset=['StudentSK', 'CourseSK', 'AssessmentSK'])

print(f"âœ“ fact_student_assessment created: {len(fact_student_assessment)} records")

# ============= FACT_STUDENT_COURSE =============
print("\n[TRANSFORM] Creating fact_student_course...")

fact_course = students.copy()

# Merge with dim_student
fact_course = fact_course.merge(
    dim_student[['id_student', 'StudentSK']],
    on='id_student',
    how='left'
)

# Merge with dim_course
fact_course['course_presentation_id'] = (
    fact_course['code_module'] + "_" + fact_course['code_presentation']
)
fact_course = fact_course.merge(
    dim_course[['course_presentation_id', 'CourseSK']],
    on='course_presentation_id',
    how='left'
)

# Merge with registration dates
fact_course = fact_course.merge(
    student_registration[['id_student', 'code_module', 'code_presentation', 
                          'date_registration', 'date_unregistration']],
    on=['id_student', 'code_module', 'code_presentation'],
    how='left'
)

# Calculate metrics
fact_course['final_result_code'] = fact_course['final_result'].map({
    'Distinction': 4,
    'Pass': 3,
    'Fail': 2,
    'Withdrawn': 1
})

fact_course['is_completed'] = fact_course['final_result'].isin(['Pass', 'Distinction']).astype(int)
fact_course['is_distinction'] = (fact_course['final_result'] == 'Distinction').astype(int)
fact_course['is_withdrawn'] = (fact_course['final_result'] == 'Withdrawn').astype(int)
fact_course['studied_credits'] = fact_course['studied_credits'].fillna(0)

# Select final columns
fact_student_course = fact_course[[
    'StudentSK', 'CourseSK',
    'studied_credits', 'num_of_prev_attempts',
    'date_registration', 'date_unregistration',
    'final_result', 'final_result_code',
    'is_completed', 'is_distinction', 'is_withdrawn'
]].dropna(subset=['StudentSK', 'CourseSK'])

print(f"âœ“ fact_student_course created: {len(fact_student_course)} records")

# =============================================================================
# 5. DATA VALIDATION
# =============================================================================
print("\n[VALIDATE] Running data quality checks...")

# Check surrogate keys are unique
assert dim_student['StudentSK'].is_unique, "ERROR: StudentSK not unique!"
assert dim_course['CourseSK'].is_unique, "ERROR: CourseSK not unique!"
assert dim_assessment['AssessmentSK'].is_unique, "ERROR: AssessmentSK not unique!"
assert dim_vle_resource['ResourceSK'].is_unique, "ERROR: ResourceSK not unique!"
assert dim_time['DateSK'].is_unique, "ERROR: DateSK not unique!"
print("âœ“ All surrogate keys are unique")

# Check referential integrity
assert fact_student_assessment['StudentSK'].isin(dim_student['StudentSK']).all(), "ERROR: Invalid StudentSK in fact_student_assessment"
assert fact_student_assessment['CourseSK'].isin(dim_course['CourseSK']).all(), "ERROR: Invalid CourseSK in fact_student_assessment"
assert fact_student_course['StudentSK'].isin(dim_student['StudentSK']).all(), "ERROR: Invalid StudentSK in fact_student_course"
assert fact_student_course['CourseSK'].isin(dim_course['CourseSK']).all(), "ERROR: Invalid CourseSK in fact_student_course"
print("âœ“ Referential integrity validated")

# Check data ranges
assert (fact_student_assessment['score'] >= 0).all(), "ERROR: Negative scores found"
assert (fact_student_assessment['score'] <= 100).all(), "ERROR: Scores > 100 found"
print("âœ“ Data ranges validated")

print("\nâœ“ All validation checks passed!")

# =============================================================================
# 6. LOAD - SAVE TO CSV
# =============================================================================
print("\n[LOAD] Saving cleaned datasets...")

output_files = {
    'dim_student.csv': dim_student,
    'dim_course.csv': dim_course,
    'dim_assessment.csv': dim_assessment,
    'dim_vle_resource.csv': dim_vle_resource,
    'dim_time.csv': dim_time,
    'fact_student_assessment.csv': fact_student_assessment,
    'fact_student_course.csv': fact_student_course
}

for filename, df in output_files.items():
    df.to_csv(filename, index=False)
    print(f"âœ“ {filename}: {len(df)} rows, {len(df.columns)} columns")

# =============================================================================
# 7. SUMMARY STATISTICS
# =============================================================================
print("\n" + "="*70)
print("ETL PIPELINE COMPLETED SUCCESSFULLY")
print("="*70)

print(f"\nðŸ“Š DIMENSION TABLES:")
print(f"   Students: {len(dim_student):,}")
print(f"   Courses: {len(dim_course)}")
print(f"   Assessments: {len(dim_assessment)}")
print(f"   VLE Resources: {len(dim_vle_resource):,}")
print(f"   Time Periods: {len(dim_time):,}")

print(f"\nðŸ“ˆ FACT TABLES:")
print(f"   Assessment Records: {len(fact_student_assessment):,}")
print(f"   Course Enrollments: {len(fact_student_course):,}")

print(f"\nðŸŽ“ KEY METRICS:")
completion_rate = (fact_student_course['is_completed'].sum() / len(fact_student_course) * 100)
print(f"   Overall Completion Rate: {completion_rate:.1f}%")
distinction_rate = (fact_student_course['is_distinction'].sum() / len(fact_student_course) * 100)
print(f"   Distinction Rate: {distinction_rate:.1f}%")
withdrawal_rate = (fact_student_course['is_withdrawn'].sum() / len(fact_student_course) * 100)
print(f"   Withdrawal Rate: {withdrawal_rate:.1f}%")
avg_score = fact_student_assessment['score'].mean()
print(f"   Average Assessment Score: {avg_score:.1f}")

print(f"\nðŸ“… TIME RANGE:")
print(f"   From: {dim_time['FullDate'].min().strftime('%Y-%m-%d')}")
print(f"   To: {dim_time['FullDate'].max().strftime('%Y-%m-%d')}")

print("\n" + "="*70)
print("âœ… READY FOR POWER BI / TABLEAU IMPORT!")
print("="*70)

OULAD ETL PIPELINE - ONLINE EDUCATION ANALYTICS
Run time: 2025-12-21 15:22:02

[EXTRACT] Loading OULAD datasets...
âœ“ All required files loaded successfully
   Students: 32593
   Courses: 22
   Assessments: 206
   Student Assessments: 173912
   VLE Resources: 6364
   Registrations: 32593

[TRANSFORM] Data quality checks...
âœ“ Students: Removed 0 duplicates
âœ“ Assessments: Removed 0 duplicates

âœ“ Missing values check:
imd_band    1111
dtype: int64

[TRANSFORM] Creating dim_student...
âœ“ dim_student created: 28785 unique students

[TRANSFORM] Creating dim_course...
âœ“ dim_course created: 22 course presentations

[TRANSFORM] Creating dim_assessment...
âœ“ dim_assessment created: 206 assessments

[TRANSFORM] Creating dim_vle_resource...
âœ“ dim_vle_resource created: 6364 VLE resources

[TRANSFORM] Creating dim_time...
âœ“ dim_time created: 541 unique dates

[TRANSFORM] Creating fact_student_assessment...
âœ“ fact_student_assessment created: 173912 records

[TRANSFORM] Creating fact_