In [1]:
import json
import os
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import RobustScaler

ACTIVITY_SEVERITY = {
    "Browser window swapped": 60,
    "Candidate iris looking left": 15,
    "Candidate iris looking right": 15,
    "Candidate looking down": 30,
    "Candidate looking left": 20,
    "Candidate looking right": 20,
    "Candidate looking up": 20,
    "Cell phone detected": 90,
    "Copy": 10,
    "Cut": 15,
    "Display change detected": 60,
    "Laptop detected": 95,
    "No face detected": 90,
    "Paste": 10,
    "Tab change detected": 75,
    "Window change detected": 65,
    "Window focus changed": 70
}


In [None]:

CRITICAL_ACTIVITIES = ["Cell phone detected", "Laptop detected", "No face detected"]

def convert_timestamp(ts_str):
    try:
        if "NaN" in ts_str:
            return None
        h, m, s = ts_str.split(":")
        return int(h)*3600 + int(m)*60 + float(s)
    except:
        return None

def get_time_bin(seconds, bin_size=300, max_bins=50):
    if seconds is None:
        return None
    idx = int(seconds // bin_size)
    return min(idx, max_bins - 1)

def load_json_data(path):
    try:
        with open(path) as f:
            return json.load(f)
    except:
        return {"activityLog": []}


In [None]:

def create_features_with_moderate_duplication(data, num_bins=50):
    counts = {a: np.zeros(num_bins) for a in ACTIVITY_SEVERITY}
    weights = {a: np.zeros(num_bins) for a in ACTIVITY_SEVERITY}
    critical = []
    last_swap = -1.0
    ccp_times = []

    for e in data.get("activityLog", []):
        ts = e.get("timestampInVideo", "")
        if "NaN" in ts:
            continue
        sec = convert_timestamp(ts)
        if sec is None:
            continue
        b = get_time_bin(sec, max_bins=num_bins)
        if b is None or b >= num_bins:
            continue

        act = e.get("activityDescription", "")
        cnt = e.get("count", 1)

        if act == "Browser window swapped":
            last_swap = max(last_swap, sec)
        if act in {"Copy", "Cut", "Paste"}:
            ccp_times.append(sec)

        if act in ACTIVITY_SEVERITY:
            counts[act][b] += cnt
            sev = ACTIVITY_SEVERITY[act]
            dup = 1 + sev/15
            weights[act][b] += cnt * dup
            if act in CRITICAL_ACTIVITIES:
                critical.append((sec, sev))

    feats = []
    # per‐activity bins + summaries
    for act in ACTIVITY_SEVERITY:
        c = counts[act]
        w = weights[act]
        feats.extend(c)
        feats.extend(w)
        feats.append(w.sum())
        feats.append(w.max())
        feats.append((w>0).sum()/num_bins)

    # critical summaries
    for act in CRITICAL_ACTIVITIES:
        w = weights[act]
        feats.append(w.sum())
        feats.append(w.max())
        feats.append((w>0).sum()/num_bins)

    # first/last critical
    if critical:
        times, sevs = zip(*critical)
        feats.append(min(times)/3600)
        feats.append(min(sevs))
        feats.append(max(times)/3600)
    else:
        feats.extend([0,0,0])

    # global stats
    all_w = np.sum(np.vstack(list(weights.values())), axis=0)
    feats.append(all_w.sum())
    feats.append(all_w.max())
    feats.append(all_w.std())

    # flags
    has = lambda a: int(counts[a].sum()>0) * ACTIVITY_SEVERITY[a]
    feats.append(has("Laptop detected"))
    feats.append(has("Cell phone detected"))
    feats.append(has("No face detected"))

    # new: mid‐exam (5–55 min) count
    sb = get_time_bin(300, max_bins=num_bins)
    eb = get_time_bin(3300, max_bins=num_bins)
    mid_count = sum(counts[a][sb:eb+1].sum() for a in ACTIVITY_SEVERITY)
    feats.append(mid_count)

    # new: CCP after last swap
    feats.append(sum(1 for t in ccp_times if t>last_swap))

    return np.array(feats)


In [None]:

def build_ensemble_model(features, train_idx, n_models=5):
    X = np.vstack([features[i] for i in train_idx if i < len(features)])
    scaler = RobustScaler().fit(X)
    Xs = scaler.transform(X)
    models = []
    for i in range(n_models):
        m = IsolationForest(
            n_estimators=500,
            max_samples=0.5,
            contamination=0.05,
            max_features=0.8,
            bootstrap=True,
            random_state=42 + i,
            n_jobs=-1
        )
        m.fit(Xs)
        models.append(m)
    return models, scaler

def adjust_anomaly_scores(raw, feats, valid):
    adjusted = []
    for i, r in enumerate(raw):
        base = int(max(0, 100*(1 - (r+1)/2) - 10))
        lap = feats[i][-5]
        pho = feats[i][-4]
        nof = feats[i][-3]
        mid = feats[i][-2]
        post = feats[i][-1]

        if lap>0: base = max(base, 83)
        if lap>1: base = max(base, 95)
        if pho>0: base = max(base, 82)
        if pho>1: base = max(base, 93)
        if mid > 100: base = max(base, 70)
        
       
        adjusted.append(min(100, max(0, base)))
    return adjusted


In [None]:

def main():
    np.random.seed(69)
    all_feats = []
    valid = []
    for i in range(1,51):
        fn = f"candidate{i}.json"
        if os.path.exists(fn):
            data = load_json_data(fn)
            all_feats.append(create_features_with_moderate_duplication(data))
            valid.append(i)
        else:
            # empty feature vector fallback
            if all_feats:
                all_feats.append(np.zeros_like(all_feats[0]))
            valid.append(i)

    if not all_feats:
        print("No data.")
        return

    all_feats = np.array(all_feats)
    idx = np.arange(len(valid))
    np.random.shuffle(idx)
    train = idx[:min(35, len(idx))]

    models, scaler = build_ensemble_model(all_feats, train, n_models=5)
    Xs = scaler.transform(all_feats)

  
    decs = np.vstack([m.decision_function(Xs) for m in models])
    raw_scores = decs.mean(axis=0)
 
    raw_scores += np.random.normal(loc=0, scale=0.03, size=raw_scores.shape)

    final = adjust_anomaly_scores(raw_scores, all_feats, valid)
    out = []
    for i, cid in enumerate(valid):
        out.append({"id": cid, "score": final[i]})
        print(f"{cid} scored {final[i]}")

    with open("ml_based_proctoring.json","w") as f:
        json.dump(out, f, indent=2)

if __name__ == "__main__":
    main()
