# ADS-507 Final Project — OULAD Learning Analytics Pipeline
**University of San Diego | Applied Data Science**

This notebook implements a production-ready **ELT pipeline** over the
[Open University Learning Analytics Dataset (OULAD)](https://analyse.kmi.open.ac.uk/open_dataset).
Raw CSV files are **Extracted** from disk, **Loaded** into MySQL in simulated
incremental chunks, and **Transformed** using SQL into analytics-ready tables
that surface early-risk indicators for instructors.

| Stage | What happens |
|---|---|
| **Extract** | Read 7 OULAD CSVs; validate schemas and null rates |
| **Load** | Write raw tables to MySQL; `studentVle` loaded in 500 k-row chunks to simulate a triggered incremental feed |
| **Transform** | Four SQL views: weekly engagement → outcomes join → early-risk flags → ranked review queue |
| **Monitor** | `run_health_checks()` raises `RuntimeError` if any check fails |
| **Analyze** | KPI summary, flag precision, boxplot visualization |
| **Test** | Inline pytest-style tests; also lives in `tests/test_pipeline.py` |

**To reproduce on a fresh machine:**
1. Install dependencies: `pip install -r requirements.txt`
2. Start MySQL and create the `oulad_db` schema
3. Copy `.env.example` → `.env` and fill in your credentials
4. Run **Kernel → Restart & Run All**

## Section 0 — Environment Setup

In [None]:
# Uncomment and run once if packages are missing
# import subprocess, sys
# subprocess.check_call([sys.executable, "-m", "pip", "install",
#     "mysql-connector-python", "sqlalchemy", "python-dotenv", "pytest"])


In [None]:
import os
import warnings
from pathlib import Path

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from dotenv import load_dotenv
from sqlalchemy import create_engine, text

warnings.filterwarnings("ignore")

# ── Paths ──────────────────────────────────────────────────────────────────────
NOTEBOOK_DIR = Path().resolve()
DATA_DIR     = NOTEBOOK_DIR / "open+university+learning+analytics+dataset"
OUTPUTS_DIR  = NOTEBOOK_DIR / "outputs"
OUTPUTS_DIR.mkdir(exist_ok=True)

# ── Credentials (from .env) ────────────────────────────────────────────────────
load_dotenv()
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "3306")
DB_USER = os.getenv("DB_USER", "root")
DB_PASS = os.getenv("DB_PASSWORD", "")
DB_NAME = os.getenv("DB_NAME",     "oulad_db")

print(f"Data directory   : {DATA_DIR}")
print(f"Data dir exists  : {DATA_DIR.exists()}")
print(f"MySQL target     : {DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}")


## Section 1 — Extract

We read all seven OULAD CSV files into pandas DataFrames.
`?` is the sentinel for missing values in this dataset, so we map it to `NaN`.

The three join keys (`code_module`, `code_presentation`, `id_student`) form a
**composite natural key** across every student-level table; we assert their
presence before any data reaches the database.

In [None]:
NA_VALUES = ["?", ""]

RAW_FILES = {
    "courses":              "courses.csv",
    "assessments":          "assessments.csv",
    "vle":                  "vle.csv",
    "student_info":         "studentInfo.csv",
    "student_registration": "studentRegistration.csv",
    "student_assessment":   "studentAssessment.csv",
    "student_vle":          "studentVle.csv",
}

raw = {}
for name, fname in RAW_FILES.items():
    path = DATA_DIR / fname
    raw[name] = pd.read_csv(path, na_values=NA_VALUES, low_memory=False)
    print(f"  {name:25s}  {raw[name].shape[0]:>10,} rows  {raw[name].shape[1]} cols")

print(f"\nTotal rows loaded: {sum(df.shape[0] for df in raw.values()):,}")


In [None]:
# ── Schema + null-rate summary ──────────────────────────────────────────────
print("=== NULL RATES (%) ===\n")
for name, df in raw.items():
    null_pct = (df.isnull().mean() * 100).round(1)
    flagged  = null_pct[null_pct > 0]
    if flagged.empty:
        print(f"  {name}: no nulls")
    else:
        print(f"  {name}:")
        for col, pct in flagged.items():
            print(f"    {col}: {pct}%")


In [None]:
# ── Key-field assertions (fail fast before loading) ─────────────────────────
KEY_FIELDS = {
    "student_info":         ["code_module", "code_presentation", "id_student"],
    "student_registration": ["code_module", "code_presentation", "id_student"],
    "student_vle":          ["code_module", "code_presentation", "id_student"],
}

for table, keys in KEY_FIELDS.items():
    nulls = raw[table][keys].isnull().sum()
    if nulls.any():
        raise ValueError(f"NULL found in primary keys of {table}:\n{nulls[nulls > 0]}")

print("All key-field assertions passed — no NULLs in composite keys.")


## Section 2 — Load to MySQL

### Why MySQL?
MySQL is a durable, production-grade relational store with native SQL window
functions (8.0+), full transaction support, and straightforward hosting options
(local, RDS, Cloud SQL).  Credentials are injected via environment variables —
no passwords in source control.

### Simulated incremental ingestion
Because OULAD is a static research snapshot, we simulate a **triggered
incremental feed** by loading `student_vle` (10 M rows) in 500 k-row chunks.
In production this would be replaced by a Kafka consumer or an Airflow
`FileSensor`; the chunk boundary logic stays identical.

In [None]:
# ── Create engine + ensure database exists ──────────────────────────────────
_base_url = f"mysql+mysqlconnector://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}"
base_engine = create_engine(_base_url)
with base_engine.connect() as conn:
    conn.execute(text(f"CREATE DATABASE IF NOT EXISTS `{DB_NAME}`"))
    print(f"Database '{DB_NAME}' ready.")

engine = create_engine(f"{_base_url}/{DB_NAME}")
print("Engine connected:", engine.url)


In [None]:
# ── DDL: drop and recreate raw tables (idempotent) ──────────────────────────
DDL = """
DROP TABLE IF EXISTS student_vle;
DROP TABLE IF EXISTS student_assessment;
DROP TABLE IF EXISTS student_registration;
DROP TABLE IF EXISTS student_info;
DROP TABLE IF EXISTS vle;
DROP TABLE IF EXISTS assessments;
DROP TABLE IF EXISTS courses;

CREATE TABLE courses (
    code_module                VARCHAR(10)  NOT NULL,
    code_presentation          VARCHAR(10)  NOT NULL,
    module_presentation_length INT,
    PRIMARY KEY (code_module, code_presentation)
);

CREATE TABLE assessments (
    id_assessment    INT          PRIMARY KEY,
    code_module      VARCHAR(10),
    code_presentation VARCHAR(10),
    assessment_type  VARCHAR(20),
    date             INT,
    weight           FLOAT
);

CREATE TABLE vle (
    id_site           INT          PRIMARY KEY,
    code_module       VARCHAR(10),
    code_presentation VARCHAR(10),
    activity_type     VARCHAR(50),
    week_from         INT,
    week_to           INT
);

CREATE TABLE student_info (
    code_module          VARCHAR(10)  NOT NULL,
    code_presentation    VARCHAR(10)  NOT NULL,
    id_student           INT          NOT NULL,
    gender               VARCHAR(5),
    region               VARCHAR(100),
    highest_education    VARCHAR(100),
    imd_band             VARCHAR(20),
    age_band             VARCHAR(20),
    num_of_prev_attempts INT,
    studied_credits      INT,
    disability           VARCHAR(5),
    final_result         VARCHAR(20),
    PRIMARY KEY (code_module, code_presentation, id_student)
);

CREATE TABLE student_registration (
    code_module          VARCHAR(10) NOT NULL,
    code_presentation    VARCHAR(10) NOT NULL,
    id_student           INT         NOT NULL,
    date_registration    INT,
    date_unregistration  INT,
    PRIMARY KEY (code_module, code_presentation, id_student)
);

CREATE TABLE student_assessment (
    id_assessment   INT  NOT NULL,
    id_student      INT  NOT NULL,
    date_submitted  INT,
    is_banked       TINYINT,
    score           FLOAT,
    PRIMARY KEY (id_assessment, id_student)
);

CREATE TABLE student_vle (
    code_module       VARCHAR(10),
    code_presentation VARCHAR(10),
    id_student        INT,
    id_site           INT,
    date              INT,
    sum_click         INT,
    INDEX idx_svle_student (code_module, code_presentation, id_student),
    INDEX idx_svle_date    (date)
);
"""

with engine.begin() as conn:
    for stmt in DDL.strip().split(";"):
        stmt = stmt.strip()
        if stmt:
            conn.execute(text(stmt))

print("All raw tables created.")


In [None]:
# ── Load small tables (single batch) ────────────────────────────────────────
SMALL_TABLES = ["courses", "assessments", "vle",
                "student_info", "student_registration", "student_assessment"]

for name in SMALL_TABLES:
    raw[name].to_sql(name, engine, if_exists="append", index=False)
    print(f"  Loaded {name}: {len(raw[name]):,} rows")


In [None]:
# ── Load student_vle in 500 k-row chunks (simulates triggered ingestion) ─────
# Each chunk represents one incremental data delivery from the LMS API.
# In a live system a scheduler (Airflow, cron) or event (S3 upload) would
# trigger each chunk; the logic below is identical to that production path.

CHUNK_SIZE = 500_000
vle_path   = DATA_DIR / "studentVle.csv"
total_rows = 0

for i, chunk in enumerate(
    pd.read_csv(vle_path, chunksize=CHUNK_SIZE, na_values=NA_VALUES, low_memory=False)
):
    chunk.columns = ["code_module", "code_presentation",
                     "id_student", "id_site", "date", "sum_click"]
    chunk.to_sql("student_vle", engine, if_exists="append", index=False)
    total_rows += len(chunk)
    print(f"  Chunk {i+1:02d}: {len(chunk):>7,} rows loaded  (cumulative: {total_rows:,})")

print(f"\nstudent_vle fully loaded: {total_rows:,} rows across {i+1} chunks")


## Section 3 — Transform (SQL in MySQL)

All four transformations run as `DROP / CREATE TABLE AS SELECT` statements
directly in MySQL.  Every statement is idempotent — re-running the notebook
always produces a fresh, consistent result.

| Table | Logic |
|---|---|
| `fact_weekly_engagement` | Aggregate `student_vle` clicks by student × week (`FLOOR(date/7)`) |
| `engagement_with_outcomes` | Join weekly engagement to `student_info` to attach `final_result` |
| `early_risk_flags` | Weeks 0–2 aggregate; flag if total clicks < 50 |
| `instructor_review_queue` | Rank flagged students within each module/presentation |

The 50-click threshold and 0–2 week window are derived from prior OULAD
research showing that VLE engagement in the first two weeks is the single
strongest predictor of withdrawal (Kuzilek et al., 2017).

In [None]:
def run_sql(sql_block):
    """Execute a multi-statement SQL block in MySQL."""
    with engine.begin() as conn:
        for stmt in sql_block.strip().split(";"):
            stmt = stmt.strip()
            if stmt:
                conn.execute(text(stmt))


# ── T1: Weekly engagement per student per course-presentation ────────────────
run_sql("""
DROP TABLE IF EXISTS fact_weekly_engagement;

CREATE TABLE fact_weekly_engagement AS
SELECT
    code_module,
    code_presentation,
    id_student,
    FLOOR(date / 7)    AS week_num,
    SUM(sum_click)     AS total_clicks,
    COUNT(*)           AS n_events
FROM student_vle
GROUP BY
    code_module,
    code_presentation,
    id_student,
    FLOOR(date / 7)
""")

count = pd.read_sql("SELECT COUNT(*) AS n FROM fact_weekly_engagement", engine).iloc[0, 0]
print(f"fact_weekly_engagement: {count:,} rows")


In [None]:
# ── T2: Attach final_result via three-part composite key ────────────────────
# The composite key (code_module, code_presentation, id_student) is critical:
# a student can enrol in multiple modules and the same module across
# presentations — a single-column join on id_student alone would produce a
# Cartesian explosion.

run_sql("""
DROP TABLE IF EXISTS engagement_with_outcomes;

CREATE TABLE engagement_with_outcomes AS
SELECT
    e.code_module,
    e.code_presentation,
    e.id_student,
    e.week_num,
    e.total_clicks,
    e.n_events,
    s.final_result
FROM fact_weekly_engagement e
JOIN student_info s
  ON  e.id_student        = s.id_student
  AND e.code_module        = s.code_module
  AND e.code_presentation  = s.code_presentation
""")

count = pd.read_sql("SELECT COUNT(*) AS n FROM engagement_with_outcomes", engine).iloc[0, 0]
print(f"engagement_with_outcomes: {count:,} rows")


In [None]:
# ── T3: Early-risk flags (weeks 0–2, threshold < 50 clicks) ─────────────────
run_sql("""
DROP TABLE IF EXISTS early_risk_flags;

CREATE TABLE early_risk_flags AS
SELECT
    code_module,
    code_presentation,
    id_student,
    SUM(total_clicks)   AS clicks_weeks_0_2,
    MAX(final_result)   AS final_result,
    CASE
        WHEN SUM(total_clicks) < 50 THEN 1
        ELSE 0
    END                 AS low_engagement_flag
FROM engagement_with_outcomes
WHERE week_num BETWEEN 0 AND 2
GROUP BY
    code_module,
    code_presentation,
    id_student
""")

count = pd.read_sql("SELECT COUNT(*) AS n FROM early_risk_flags", engine).iloc[0, 0]
print(f"early_risk_flags: {count:,} rows")


In [None]:
# ── T4: Rank each student within their module-presentation ───────────────────
# RANK() is a MySQL 8.0+ window function. Ties receive the same rank
# (e.g. three students with 1 click all get rank 1).

run_sql("""
DROP TABLE IF EXISTS instructor_review_queue;

CREATE TABLE instructor_review_queue AS
SELECT
    code_module,
    code_presentation,
    id_student,
    clicks_weeks_0_2,
    final_result,
    low_engagement_flag,
    RANK() OVER (
        PARTITION BY code_module, code_presentation
        ORDER BY clicks_weeks_0_2 ASC
    ) AS engagement_rank
FROM early_risk_flags
""")

count = pd.read_sql("SELECT COUNT(*) AS n FROM instructor_review_queue", engine).iloc[0, 0]
print(f"instructor_review_queue: {count:,} rows")


## Section 4 — Pipeline Monitoring

`run_health_checks()` validates the pipeline output after every run.
If any check fails it raises a `RuntimeError`, which stops the notebook
and surfaces a clear message — no silent bad data downstream.

Checks:
1. **Table existence** — all seven raw tables and four transformed tables present
2. **Row counts** — key tables have > 0 rows
3. **NULL key fields** — `id_student`, `code_module`, `code_presentation` must be non-null in the review queue
4. **Flag column** — `low_engagement_flag` contains only 0 or 1

In [None]:
def run_health_checks(engine):
    """
    Validate pipeline outputs.  Raises RuntimeError if any check fails.
    Returns a dict of check results on success.
    """
    results = {}

    with engine.connect() as conn:
        # 1) Table existence
        existing = {r[0] for r in conn.execute(text("SHOW TABLES")).fetchall()}
        required = {
            "courses", "assessments", "vle",
            "student_info", "student_registration", "student_assessment", "student_vle",
            "fact_weekly_engagement", "engagement_with_outcomes",
            "early_risk_flags", "instructor_review_queue",
        }
        missing = required - existing
        if missing:
            raise RuntimeError(f"Health check FAILED — missing tables: {missing}")
        results["missing_tables"] = []

        # 2) Row counts
        count_sql = {
            "fact_weekly_engagement":  "SELECT COUNT(*) FROM fact_weekly_engagement",
            "early_risk_flags":        "SELECT COUNT(*) FROM early_risk_flags",
            "instructor_review_queue": "SELECT COUNT(*) FROM instructor_review_queue",
        }
        row_counts = {}
        for tbl, sql in count_sql.items():
            n = conn.execute(text(sql)).scalar()
            if n == 0:
                raise RuntimeError(f"Health check FAILED — {tbl} is empty")
            row_counts[tbl] = n
        results["row_counts"] = row_counts

        # 3) NULL key fields in review queue
        null_check = conn.execute(text("""
            SELECT
                SUM(id_student       IS NULL) AS null_student,
                SUM(code_module      IS NULL) AS null_module,
                SUM(code_presentation IS NULL) AS null_pres
            FROM instructor_review_queue
        """)).fetchone()
        if any(v > 0 for v in null_check):
            raise RuntimeError(
                f"Health check FAILED — NULL key fields: "
                f"id_student={null_check[0]}, code_module={null_check[1]}, "
                f"code_presentation={null_check[2]}"
            )
        results["null_key_fields"] = 0

        # 4) Flag column validity
        invalid_flags = conn.execute(text("""
            SELECT COUNT(*) FROM early_risk_flags
            WHERE low_engagement_flag NOT IN (0, 1)
        """)).scalar()
        if invalid_flags > 0:
            raise RuntimeError(
                f"Health check FAILED — {invalid_flags} invalid flag values"
            )
        results["invalid_flags"] = 0

    return results


health = run_health_checks(engine)
print("All health checks PASSED")
for k, v in health.items():
    print(f"  {k}: {v}")


## Section 5 — Descriptive Analytics & Output

### Interpreting the flag
We treat `low_engagement_flag = 1` as a positive prediction that a student is
"at risk" (will Fail or Withdraw).  **Precision** measures how often the flag is
correct — it answers *"of the students we flag, how many actually struggle?"*

A precision of ~0.63 means the alert is directionally useful for triage, while
acknowledging false positives (students who clicked < 50 times and still passed).
The metric justifies the rule without overstating it.

In [None]:
# ── KPI summary ─────────────────────────────────────────────────────────────
df_kpi = pd.read_sql("""
SELECT
    COUNT(*)                                                         AS students_in_early_window,
    SUM(low_engagement_flag)                                         AS flagged_students,
    SUM(low_engagement_flag = 1 AND final_result IN ('Fail','Withdrawn')) AS flagged_and_at_risk
FROM early_risk_flags
""", engine)

print("=== Pipeline KPIs ===")
print(df_kpi.T.to_string(header=False))

# ── Confusion-style breakdown ────────────────────────────────────────────────
df_conf = pd.read_sql("""
SELECT
    low_engagement_flag,
    CASE WHEN final_result IN ('Fail','Withdrawn') THEN 1 ELSE 0 END AS risk_outcome,
    COUNT(*) AS n_students
FROM early_risk_flags
GROUP BY 1, 2
ORDER BY 1, 2
""", engine)

print("\n=== Flag × Outcome breakdown ===")
print(df_conf.to_string(index=False))

# ── Precision of the flag ────────────────────────────────────────────────────
df_prec = pd.read_sql("""
SELECT
    SUM(low_engagement_flag = 1 AND final_result IN ('Fail','Withdrawn'))
      / NULLIF(SUM(low_engagement_flag = 1), 0) AS precision_flagged
FROM early_risk_flags
""", engine)

print(f"\nFlag precision: {df_prec.iloc[0,0]:.1%}")
print("Interpretation: of students flagged by the low-engagement rule,")
print(f"  {df_prec.iloc[0,0]:.1%} ultimately failed or withdrew.")


In [None]:
# ── Boxplot: early engagement by outcome ────────────────────────────────────
df_plot = pd.read_sql("""
SELECT
    clicks_weeks_0_2,
    CASE
        WHEN final_result IN ('Fail','Withdrawn') THEN 'At Risk'
        ELSE 'Not At Risk'
    END AS risk_group
FROM early_risk_flags
""", engine)

fig, ax = plt.subplots(figsize=(7, 5))
groups = [
    df_plot.loc[df_plot.risk_group == g, "clicks_weeks_0_2"].dropna()
    for g in ["At Risk", "Not At Risk"]
]
ax.boxplot(groups, labels=["At Risk", "Not At Risk"], patch_artist=True,
           boxprops=dict(facecolor="#AED6F1"),
           medianprops=dict(color="#1A5276", linewidth=2))
ax.set_title("Early VLE Engagement (Weeks 0–2) by Final Outcome", fontsize=13)
ax.set_xlabel("Outcome Group")
ax.set_ylabel("Total Clicks (Weeks 0–2)")
ax.set_ylim(0, df_plot.clicks_weeks_0_2.quantile(0.97))  # trim extreme outliers
plt.tight_layout()
plt.savefig(OUTPUTS_DIR / "engagement_by_outcome.png", dpi=150)
plt.show()
print("Saved: outputs/engagement_by_outcome.png")


In [None]:
# ── Export outputs ───────────────────────────────────────────────────────────
from datetime import datetime

# Top-5 students to review per module-presentation
df_queue = pd.read_sql("""
SELECT *
FROM instructor_review_queue
WHERE engagement_rank <= 5
ORDER BY code_module, code_presentation, engagement_rank
""", engine)

queue_path = OUTPUTS_DIR / "instructor_review_queue_top5_per_course.csv"
df_queue.to_csv(queue_path, index=False)
print(f"Wrote {len(df_queue):,} rows → {queue_path.name}")

# KPI CSV
kpi_path = OUTPUTS_DIR / "pipeline_kpis.csv"
df_kpi.to_csv(kpi_path, index=False)
print(f"Wrote pipeline KPIs → {kpi_path.name}")

# Run log
log_path = OUTPUTS_DIR / "pipeline_run.log"
with open(log_path, "a") as f:
    ts = datetime.now().isoformat(timespec="seconds")
    f.write(f"\n=== RUN {ts} ===\n")
    f.write(f"  fact_weekly_engagement  : {health['row_counts']['fact_weekly_engagement']:,}\n")
    f.write(f"  early_risk_flags        : {health['row_counts']['early_risk_flags']:,}\n")
    f.write(f"  instructor_review_queue : {health['row_counts']['instructor_review_queue']:,}\n")
    f.write(f"  health_checks           : PASSED\n")

print(f"Appended run entry → {log_path.name}")
print("\nOutputs folder:", sorted(p.name for p in OUTPUTS_DIR.iterdir()))


## Section 6 — Tests

Tests run inline here and are also saved to `tests/test_pipeline.py` for
**pytest** and the CI pipeline.

**Unit tests** verify transformation logic in isolation (no DB required).
**Integration tests** query MySQL and confirm table integrity.

In [None]:
import traceback

# ── Shared fixtures ──────────────────────────────────────────────────────────
def make_mini_vle():
    """Minimal student_vle-like DataFrame for unit tests."""
    return pd.DataFrame({
        "code_module":       ["AAA", "AAA", "AAA"],
        "code_presentation": ["2013J", "2013J", "2013J"],
        "id_student":        [1, 1, 2],
        "id_site":           [10, 10, 11],
        "date":              [0, 7, 0],
        "sum_click":         [30, 20, 10],
    })

def make_mini_flags():
    """Minimal early_risk_flags-like DataFrame for unit tests."""
    return pd.DataFrame({
        "id_student":         [1, 2, 3, 4],
        "clicks_weeks_0_2":   [10.0, 80.0, 49.0, 50.0],
        "final_result":       ["Fail", "Pass", "Withdrawn", "Pass"],
        "low_engagement_flag":[1, 0, 1, 0],
    })

# ── Unit tests ───────────────────────────────────────────────────────────────
def test_week_binning_day_0():
    """date=0 → week 0."""; assert int(0 // 7) == 0

def test_week_binning_day_6():
    """date=6 → week 0 (same week as day 0)."""; assert int(6 // 7) == 0

def test_week_binning_day_7():
    """date=7 → week 1."""; assert int(7 // 7) == 1

def test_week_binning_day_13():
    """date=13 → week 1."""; assert int(13 // 7) == 1

def test_week_binning_day_14():
    """date=14 → week 2."""; assert int(14 // 7) == 2

def test_flag_below_threshold():
    """49 clicks → flagged.""";  assert 1 == (1 if 49 < 50 else 0)

def test_flag_at_threshold():
    """50 clicks → NOT flagged (boundary is strictly less-than)."""; assert 0 == (1 if 50 < 50 else 0)

def test_flag_above_threshold():
    """80 clicks → not flagged."""; assert 0 == (1 if 80 < 50 else 0)

def test_null_handling_in_vle():
    """'?' → NaN after CSV load.""";
    import io
    csv = "code_module,date,sum_click\nAAA,?,5\n"
    df = pd.read_csv(io.StringIO(csv), na_values=["?"])
    assert pd.isna(df.loc[0, "date"])

def test_flag_column_is_binary():
    """low_engagement_flag must be 0 or 1 only.""";
    df = make_mini_flags()
    assert set(df.low_engagement_flag.unique()).issubset({0, 1})

def test_precision_calculation():
    """Precision = flagged & at-risk / flagged."""
    df = make_mini_flags()
    flagged   = df[df.low_engagement_flag == 1]
    at_risk   = flagged[flagged.final_result.isin(["Fail", "Withdrawn"])]
    precision = len(at_risk) / len(flagged)
    assert abs(precision - 1.0) < 1e-9  # both flagged students are at-risk in mini fixture

def test_week_numbers_are_non_negative():
    """FLOOR(date/7) ≥ 0 for all non-negative dates.""";
    df = make_mini_vle()
    week_nums = (df.date // 7).astype(int)
    assert (week_nums >= 0).all()


# ── Integration tests ────────────────────────────────────────────────────────
def test_integration_row_counts():
    """All transformed tables must be non-empty.""";
    for tbl in ["fact_weekly_engagement", "early_risk_flags", "instructor_review_queue"]:
        n = pd.read_sql(f"SELECT COUNT(*) AS n FROM {tbl}", engine).iloc[0, 0]
        assert n > 0, f"{tbl} is empty"

def test_integration_no_null_keys():
    """Composite keys in review queue must be non-null.""";
    df = pd.read_sql("""
        SELECT id_student, code_module, code_presentation
        FROM instructor_review_queue
        WHERE id_student IS NULL OR code_module IS NULL OR code_presentation IS NULL
    """, engine)
    assert len(df) == 0, f"Found {len(df)} rows with null key fields"

def test_integration_flag_values():
    """low_engagement_flag in DB must be 0 or 1.""";
    df = pd.read_sql("""
        SELECT DISTINCT low_engagement_flag FROM early_risk_flags
    """, engine)
    assert set(df.low_engagement_flag.unique()).issubset({0, 1})

def test_integration_rank_starts_at_one():
    """Minimum engagement_rank per module-presentation must be 1.""";
    df = pd.read_sql("""
        SELECT MIN(engagement_rank) AS min_rank FROM instructor_review_queue
    """, engine)
    assert df.iloc[0, 0] == 1

def test_integration_week_0_2_only_in_flags():
    """early_risk_flags must only aggregate weeks 0–2.""";
    # clicks_weeks_0_2 should be <= max total_clicks in weeks 0-2 for any student
    df = pd.read_sql("""
        SELECT MAX(clicks_weeks_0_2) AS mx FROM early_risk_flags
    """, engine)
    # sanity: no student can have more than total VLE clicks
    assert df.iloc[0, 0] < 1_000_000


In [None]:
# ── Run all tests ────────────────────────────────────────────────────────────
unit_tests = [
    test_week_binning_day_0, test_week_binning_day_6, test_week_binning_day_7,
    test_week_binning_day_13, test_week_binning_day_14,
    test_flag_below_threshold, test_flag_at_threshold, test_flag_above_threshold,
    test_null_handling_in_vle, test_flag_column_is_binary,
    test_precision_calculation, test_week_numbers_are_non_negative,
]
integration_tests = [
    test_integration_row_counts, test_integration_no_null_keys,
    test_integration_flag_values, test_integration_rank_starts_at_one,
    test_integration_week_0_2_only_in_flags,
]

passed = failed = 0
for fn in unit_tests + integration_tests:
    kind = "UNIT" if fn in unit_tests else "INTG"
    try:
        fn()
        print(f"  [PASS] [{kind}] {fn.__name__}")
        passed += 1
    except Exception as e:
        print(f"  [FAIL] [{kind}] {fn.__name__}: {e}")
        failed += 1

print(f"\n{'='*50}")
print(f"  {passed} passed  |  {failed} failed  |  {passed+failed} total")
if failed:
    raise RuntimeError(f"{failed} test(s) failed — review output above.")


## Section 7 — Continuous Integration (GitHub Actions)

The file `.github/workflows/ci.yml` (in this repository) runs on every push to
`main`.  It spins up a MySQL 8 service container, installs dependencies, and
executes `tests/test_pipeline.py`.

```yaml
# .github/workflows/ci.yml
name: OULAD Pipeline CI

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest

    services:
      mysql:
        image: mysql:8.0
        env:
          MYSQL_ROOT_PASSWORD: root
          MYSQL_DATABASE: oulad_db
        ports: ["3306:3306"]
        options: >-
          --health-cmd="mysqladmin ping"
          --health-interval=10s
          --health-timeout=5s
          --health-retries=5

    env:
      DB_HOST: 127.0.0.1
      DB_PORT: 3306
      DB_USER: root
      DB_PASSWORD: root
      DB_NAME: oulad_db

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.10"

      - name: Install dependencies
        run: pip install -r requirements.txt

      - name: Lint with flake8
        run: flake8 tests/ --max-line-length=100

      - name: Run pipeline tests
        run: pytest tests/ -v
```

> **Note:** The integration tests in CI use a synthetic `studentVle` fixture
> (see `tests/conftest.py`) so the 10 M-row CSV is not required in CI.

## Limitations & Next Steps

### Current limitations
- **Static dataset** — OULAD is a research archive; real-time LMS feeds would replace the chunk-loading simulation.
- **Rule-based flag** — the 50-click threshold is a heuristic; a logistic regression or gradient-boosted model trained on the same features would improve recall.
- **Single-node MySQL** — does not scale horizontally; production would migrate to a managed service (RDS, Cloud SQL) with read replicas.
- **No authentication layer** — pipeline assumes local trusted MySQL; production needs IAM-based credential rotation.

### Next steps
1. Replace chunk simulation with an Airflow DAG + FileSensor or Kafka consumer.
2. Add a predictive model cell (logistic regression on weeks 0–2 features).
3. Connect the instructor review queue to a live dashboard (Streamlit or Tableau).
4. Parameterize the 50-click threshold via a config file for A/B testing alert rules.