# Classroom Observation (COPUS‑Style, 2‑Minute Intervals)
This notebook runs a **privacy‑aware** classroom observation pipeline on a classroom video:

- **Person detection + tracking** (YOLOv8 + ByteTrack via `ultralytics`)
- **Face blurring by default** (OpenCV Haar cascade)
- **COPUS‑style codes** aggregated **per 2‑minute interval** (heuristic baseline; replaceable with trained classifiers)
- Exports **dashboard‑ready JSON** and **CSV**
- Generates **plots** for quick analysis

⚠️ **Important:** COPUS is typically human‑coded. This notebook produces **COPUS‑style proxy codes** based on measurable visual cues (movement, clustering, instructor dominance). For publication‑grade validity, calibrate with labeled COPUS data and report inter‑rater / model agreement.


In [None]:
# If needed, install dependencies (run once)
# !pip -q install ultralytics opencv-python pandas numpy matplotlib

import os, json, math
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional

import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from ultralytics import YOLO


## 1) Configuration
Set your video path and output directory below. The pipeline will produce:
- `annotated_blurred.mp4`
- `copus_intervals.csv`
- `copus_intervals.json`
- plots saved in `plots/`


In [None]:
# === USER SETTINGS ===
VIDEO_PATH = "classroom.mp4"   # <-- change this to your classroom video file
OUT_DIR = "outputs_copus"

# Analysis settings
INTERVAL_SEC = 120            # COPUS convention: 2-minute intervals
WINDOW_SEC = 5                # short window for smoothing
CONF_THRES = 0.35             # YOLO confidence
MAX_DET = 60                  # max detections per frame
MODEL_WEIGHTS = "yolov8n.pt"  # switch to "yolov8s.pt" for improved accuracy (slower)

# Privacy settings
BLUR_FACES = True
FACE_BLUR_KERNEL = (31, 31)   # larger => stronger blur

os.makedirs(OUT_DIR, exist_ok=True)
print("Output directory:", os.path.abspath(OUT_DIR))


## 2) Helper functions (tracking, privacy blurring, COPUS-style coding)
These heuristics are designed to be **transparent** and easy to replace with trained models later.

In [None]:
@dataclass
class TrackState:
    last_center: Tuple[float, float]
    ema_speed: float = 0.0  # exponential moving average of pixel speed


def pick_instructor(person_boxes: List[Tuple[float, float, float, float]], frame_w: int, frame_h: int):
    """Heuristic: instructor tends to be near the 'front' (top of frame) and/or larger."""
    if not person_boxes:
        return None, -1

    scores = []
    for b in person_boxes:
        x1, y1, x2, y2 = b
        area = max(0, x2 - x1) * max(0, y2 - y1)
        frontness = 1.0 - (y2 / (frame_h + 1e-9))  # higher if nearer top
        s = 0.6 * (area / (frame_w * frame_h + 1e-9)) + 0.4 * frontness
        scores.append(s)

    idx = int(np.argmax(scores))
    return person_boxes[idx], idx


def cluster_score(centers: np.ndarray, eps_px: float = 90.0) -> float:
    """Grouping proxy: closer nearest-neighbor distances => higher group-work likelihood."""
    if len(centers) < 2:
        return 0.0
    dists = []
    for i in range(len(centers)):
        diffs = centers - centers[i]
        dd = np.sqrt((diffs[:, 0] ** 2 + diffs[:, 1] ** 2) + 1e-9)
        dd_sorted = np.sort(dd)
        dists.append(dd_sorted[1])  # nearest neighbor (excluding self)
    nn = float(np.mean(dists))
    return float(np.clip((eps_px - nn) / eps_px, 0.0, 1.0))


def init_face_detector():
    """OpenCV Haar cascade face detector (fast baseline)."""
    cascade_path = os.path.join(cv2.data.haarcascades, "haarcascade_frontalface_default.xml")
    if not os.path.exists(cascade_path):
        raise FileNotFoundError(f"Could not find Haar cascade at: {cascade_path}")
    return cv2.CascadeClassifier(cascade_path)


def blur_faces_in_frame(frame_bgr: np.ndarray, face_cascade, blur_kernel=(31, 31)) -> np.ndarray:
    """Detects faces and blurs them. This is a baseline privacy step."""
    gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
    # scaleFactor/minNeighbors tuned for classroom-ish videos; adjust if needed
    faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
    out = frame_bgr.copy()
    for (x, y, w, h) in faces:
        roi = out[y:y+h, x:x+w]
        if roi.size == 0:
            continue
        roi_blur = cv2.GaussianBlur(roi, blur_kernel, 0)
        out[y:y+h, x:x+w] = roi_blur
    return out


def copus_style_codes_from_signals(
    people_count: float,
    instructor_move: float,
    student_move: float,
    grouping: float,
    lecture_score: float,
    groupwork_score: float
) -> Dict[str, List[str]]:
    """Map signals -> COPUS-style code sets (proxy).

    Instructor COPUS-style codes (subset):
      - 'Lec'   (Lecturing)
      - 'FUp'   (Follow-up / feedback; proxy via moderate instructor movement with low student movement)
      - 'MG'    (Moving & guiding; proxy via high instructor movement + moderate student movement)
      - 'Adm'   (Administration/management; proxy via low overall motion)
      - '1o1'   (One-on-one; proxy via low people_count + localized clusters; rough)

    Student COPUS-style codes (subset):
      - 'Lis'   (Listening)
      - 'Ind'   (Individual thinking/working; proxy via low movement + low clustering)
      - 'Grp'   (Group work)
      - 'AsQ'   (Asking questions; proxy via moderate student movement spikes; very rough)

    Note: This is *not* a validated COPUS coder. It is a transparent baseline.
    """
    I, S = [], []

    # Instructor codes
    if lecture_score >= 0.55 and student_move <= 10:
        I.append("Lec")
    if 0.35 <= lecture_score < 0.55 and student_move <= 12:
        I.append("FUp")
    if instructor_move >= 18 and student_move >= 8:
        I.append("MG")  # moving & guiding
    if instructor_move < 8 and student_move < 6:
        I.append("Adm")  # low activity / transitions / admin

    # Student codes
    if groupwork_score >= 0.60 and grouping >= 0.40:
        S.append("Grp")
    if lecture_score >= 0.50 and student_move < 10:
        S.append("Lis")
    if student_move < 8 and grouping < 0.25 and lecture_score < 0.55:
        S.append("Ind")
    if 10 <= student_move <= 16 and lecture_score < 0.55:
        S.append("AsQ")  # weak proxy

    # Ensure at least one code each (helps dashboards)
    if not I:
        I = ["Unk"]
    if not S:
        S = ["Unk"]

    return {"instructor_codes": sorted(list(set(I))), "student_codes": sorted(list(set(S)))}


## 3) Core analyzer
This function processes the video, applies face blurring, tracks people, computes signals, aggregates **2‑minute COPUS‑style intervals**, and exports artifacts.

In [None]:
def analyze_classroom_video(
    video_path: str,
    out_dir: str,
    interval_sec: int = 120,
    window_sec: int = 5,
    conf_thres: float = 0.35,
    max_det: int = 60,
    model_weights: str = "yolov8n.pt",
    blur_faces: bool = True,
    face_blur_kernel=(31, 31),
) -> Dict[str, str]:
    os.makedirs(out_dir, exist_ok=True)
    plots_dir = os.path.join(out_dir, "plots")
    os.makedirs(plots_dir, exist_ok=True)

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"Could not open video: {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)

    annotated_path = os.path.join(out_dir, "annotated_blurred.mp4")
    csv_path = os.path.join(out_dir, "copus_intervals.csv")
    json_path = os.path.join(out_dir, "copus_intervals.json")

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(annotated_path, fourcc, fps, (W, H))

    model = YOLO(model_weights)

    face_cascade = init_face_detector() if blur_faces else None

    # tracking/motion
    track_state: Dict[int, TrackState] = {}

    window_frames = max(1, int(window_sec * fps))
    interval_frames = max(1, int(interval_sec * fps))

    # collect per-window signals then aggregate into 2-minute intervals
    cur_interval = {
        "people_count": [],
        "student_move": [],
        "instructor_move": [],
        "grouping": [],
        "lecture_score": [],
        "groupwork_score": [],
    }

    intervals = []
    frame_idx = 0

    def finalize_interval(start_frame: int, end_frame: int):
        if len(cur_interval["people_count"]) == 0:
            return None

        t_start = start_frame / fps
        t_end = end_frame / fps

        # averages
        people_avg = float(np.mean(cur_interval["people_count"]))
        stud_move_avg = float(np.mean(cur_interval["student_move"]))
        instr_move_avg = float(np.mean(cur_interval["instructor_move"]))
        grouping_avg = float(np.mean(cur_interval["grouping"]))
        lecture_avg = float(np.mean(cur_interval["lecture_score"]))
        groupwork_avg = float(np.mean(cur_interval["groupwork_score"]))

        codes = copus_style_codes_from_signals(
            people_count=people_avg,
            instructor_move=instr_move_avg,
            student_move=stud_move_avg,
            grouping=grouping_avg,
            lecture_score=lecture_avg,
            groupwork_score=groupwork_avg
        )

        row = {
            "interval_index": len(intervals),
            "t_start_sec": round(t_start, 3),
            "t_end_sec": round(t_end, 3),
            "people_avg": round(people_avg, 3),
            "student_movement_avg": round(stud_move_avg, 3),
            "instructor_movement_avg": round(instr_move_avg, 3),
            "cluster_score_avg": round(grouping_avg, 3),
            "lecture_score_avg": round(lecture_avg, 3),
            "groupwork_score_avg": round(groupwork_avg, 3),
            "instructor_codes": ",".join(codes["instructor_codes"]),
            "student_codes": ",".join(codes["student_codes"]),
        }
        return row

    while True:
        ok, frame = cap.read()
        if not ok:
            break

        # privacy first: blur faces in the working frame (and consequently in annotated export)
        if blur_faces and face_cascade is not None:
            frame_proc = blur_faces_in_frame(frame, face_cascade, blur_kernel=face_blur_kernel)
        else:
            frame_proc = frame

        # YOLO person tracking
        results = model.track(
            source=frame_proc,
            persist=True,
            conf=conf_thres,
            iou=0.5,
            classes=[0],  # person only
            max_det=max_det,
            verbose=False,
        )
        r = results[0]

        boxes_xyxy, track_ids = [], []
        if r.boxes is not None and len(r.boxes) > 0:
            b = r.boxes.xyxy.cpu().numpy()
            tid = r.boxes.id.cpu().numpy() if r.boxes.id is not None else None
            boxes_xyxy = [tuple(map(float, bb)) for bb in b]
            track_ids = [int(x) for x in tid] if tid is not None else list(range(len(boxes_xyxy)))

        instructor_box, instructor_idx = pick_instructor(boxes_xyxy, W, H)

        # movement estimation (EMA of pixel speed per track)
        centers = []
        movement_by_id = {}
        for bb, tid in zip(boxes_xyxy, track_ids):
            x1, y1, x2, y2 = bb
            cx, cy = (x1 + x2) / 2.0, (y1 + y2) / 2.0
            centers.append((cx, cy))

            if tid not in track_state:
                track_state[tid] = TrackState(last_center=(cx, cy), ema_speed=0.0)
            else:
                lx, ly = track_state[tid].last_center
                speed = float(np.sqrt((cx - lx) ** 2 + (cy - ly) ** 2))
                track_state[tid].ema_speed = 0.7 * track_state[tid].ema_speed + 0.3 * speed
                track_state[tid].last_center = (cx, cy)

            movement_by_id[tid] = track_state[tid].ema_speed

        centers_np = np.array(centers, dtype=np.float32) if centers else np.zeros((0, 2), dtype=np.float32)

        people_count = len(boxes_xyxy)
        instructor_move = 0.0
        student_moves = []

        for i, tid in enumerate(track_ids):
            m = movement_by_id.get(tid, 0.0)
            if instructor_box is not None and i == instructor_idx:
                instructor_move = m
            else:
                student_moves.append(m)

        student_move = float(np.mean(student_moves)) if student_moves else 0.0
        grouping = cluster_score(centers_np, eps_px=90.0)

        # Transparent "scores" for dashboards
        lecture_score = float(np.clip((instructor_move / 30.0) - (student_move / 40.0) + 0.25, 0.0, 1.0))
        groupwork_score = float(np.clip((student_move / 35.0) + 0.75 * grouping, 0.0, 1.0))

        # record per-window
        cur_interval["people_count"].append(people_count)
        cur_interval["student_move"].append(student_move)
        cur_interval["instructor_move"].append(instructor_move)
        cur_interval["grouping"].append(grouping)
        cur_interval["lecture_score"].append(lecture_score)
        cur_interval["groupwork_score"].append(groupwork_score)

        # Annotate frame (boxes only; faces are already blurred)
        vis = frame_proc.copy()
        for i, (bb, tid) in enumerate(zip(boxes_xyxy, track_ids)):
            x1, y1, x2, y2 = map(int, bb)
            is_instructor = (instructor_box is not None and i == instructor_idx)
            label = f"{'Instructor' if is_instructor else 'Student'} ID:{tid} v:{movement_by_id.get(tid, 0.0):.1f}"
            cv2.rectangle(vis, (x1, y1), (x2, y2), (0, 255, 0) if is_instructor else (255, 255, 255), 2)
            cv2.putText(vis, label, (x1, max(20, y1 - 6)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 3)
            cv2.putText(vis, label, (x1, max(20, y1 - 6)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

        header = (
            f"COPUS-style interval:{len(intervals)} | people:{people_count} "            f"| lec:{lecture_score:.2f} grp:{groupwork_score:.2f} | cluster:{grouping:.2f}"
        )
        cv2.putText(vis, header, (15, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 0), 4)
        cv2.putText(vis, header, (15, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2)

        writer.write(vis)

        # finalize interval every interval_frames
        if (frame_idx + 1) % interval_frames == 0:
            start_frame = frame_idx + 1 - interval_frames
            end_frame = frame_idx + 1
            row = finalize_interval(start_frame, end_frame)
            if row is not None:
                intervals.append(row)
            # reset
            for k in cur_interval:
                cur_interval[k] = []

        frame_idx += 1
        if total_frames and frame_idx % int(max(1, fps * 10)) == 0:
            print(f"Processed {frame_idx}/{total_frames} frames..." հաստատ=True)

    # finalize tail interval if remaining
    if len(cur_interval["people_count"]) > 0:
        # last partial interval
        start_frame = frame_idx - len(cur_interval["people_count"])
        end_frame = frame_idx
        row = finalize_interval(start_frame, end_frame)
        if row is not None:
            intervals.append(row)

    cap.release()
    writer.release()

    df = pd.DataFrame(intervals)
    df.to_csv(csv_path, index=False)

    dashboard = {
        "schema_version": "1.0",
        "video": {
            "path": os.path.abspath(video_path),
            "fps": float(fps),
            "width": int(W),
            "height": int(H),
            "total_frames": int(total_frames),
        },
        "settings": {
            "interval_sec": int(interval_sec),
            "window_sec": float(window_sec),
            "conf_thres": float(conf_thres),
            "max_det": int(max_det),
            "model_weights": model_weights,
            "blur_faces": bool(blur_faces),
            "face_blur_kernel": list(face_blur_kernel),
        },
        "intervals": intervals,
    }
    with open(json_path, "w") as f:
        json.dump(dashboard, f, indent=2)

    # ---- Plots (matplotlib; no custom colors specified) ----
    if len(df) > 0:
        t_mid = (df["t_start_sec"].values + df["t_end_sec"].values) / 2.0 / 60.0  # minutes

        # Plot 1: People and motion
        plt.figure()
        plt.plot(t_mid, df["people_avg"].values, label="People avg")
        plt.plot(t_mid, df["student_movement_avg"].values, label="Student movement avg")
        plt.plot(t_mid, df["instructor_movement_avg"].values, label="Instructor movement avg")
        plt.xlabel("Time (minutes)")
        plt.ylabel("Value")
        plt.title("Classroom signals over time (2-minute intervals)")
        plt.legend()
        p1 = os.path.join(plots_dir, "signals_over_time.png")
        plt.savefig(p1, dpi=200, bbox_inches="tight")
        plt.show()

        # Plot 2: Lecture vs Groupwork score
        plt.figure()
        plt.plot(t_mid, df["lecture_score_avg"].values, label="Lecture score")
        plt.plot(t_mid, df["groupwork_score_avg"].values, label="Groupwork score")
        plt.xlabel("Time (minutes)")
        plt.ylabel("Score (0-1)")
        plt.title("Lecture vs Groupwork scores (2-minute intervals)")
        plt.legend()
        p2 = os.path.join(plots_dir, "lecture_vs_groupwork.png")
        plt.savefig(p2, dpi=200, bbox_inches="tight")
        plt.show()

        # Plot 3: COPUS-style code counts
        def explode_codes(col: str) -> pd.Series:
            codes = []
            for s in df[col].fillna(""):
                parts = [p.strip() for p in str(s).split(",") if p.strip()]
                codes.extend(parts)
            return pd.Series(codes)

        instr_codes = explode_codes("instructor_codes")
        stud_codes = explode_codes("student_codes")

        instr_counts = instr_codes.value_counts().sort_index()
        stud_counts = stud_codes.value_counts().sort_index()

        plt.figure()
        instr_counts.plot(kind="bar")
        plt.xlabel("Instructor code")
        plt.ylabel("Count")
        plt.title("Instructor COPUS-style code frequency")
        p3 = os.path.join(plots_dir, "instructor_code_frequency.png")
        plt.savefig(p3, dpi=200, bbox_inches="tight")
        plt.show()

        plt.figure()
        stud_counts.plot(kind="bar")
        plt.xlabel("Student code")
        plt.ylabel("Count")
        plt.title("Student COPUS-style code frequency")
        p4 = os.path.join(plots_dir, "student_code_frequency.png")
        plt.savefig(p4, dpi=200, bbox_inches="tight")
        plt.show()

    return {
        "annotated_video": annotated_path,
        "csv": csv_path,
        "json": json_path,
        "plots_dir": plots_dir,
    }


## 4) Run the analysis
Run this cell after setting `VIDEO_PATH`. If your video is large, start with a short clip first.

In [None]:
artifacts = analyze_classroom_video(
    video_path=VIDEO_PATH,
    out_dir=OUT_DIR,
    interval_sec=INTERVAL_SEC,
    window_sec=WINDOW_SEC,
    conf_thres=CONF_THRES,
    max_det=MAX_DET,
    model_weights=MODEL_WEIGHTS,
    blur_faces=BLUR_FACES,
    face_blur_kernel=FACE_BLUR_KERNEL,
)

artifacts


## 5) Load exports (CSV/JSON) for dashboard use
- The CSV is convenient for spreadsheets.
- The JSON is ready for a web dashboard (e.g., React, Streamlit, or Plotly Dash).

In [None]:
df = pd.read_csv(artifacts["csv"])
df.head(10)


In [None]:
with open(artifacts["json"], "r") as f:
    dashboard = json.load(f)

# Show JSON keys + first interval
list(dashboard.keys()), dashboard["intervals"][0] if dashboard["intervals"] else None
