# Educationnal TBRS + MPC (Human-in-the-Loop)

This notebook demonstrates a **Trajectory‑Based Recommender System (TBRS)** using **Model Predictive Control (MPC)** within the framework of **control theory**.

We explore a scenario for developing cybersecurity skills:
- State $x \in \mathbb{R}^4$ represents skill levels in phishing, firewall, passwords, and incident response,
- The system follows linear dynamics $x_{t+1}=Ax_t + Bu_t$,
- The objective is to guide a student to a **target state** $x_{\text{target}}$ by recommending appropriate **educational resources**.

In this **human‑in‑the‑loop** approach, at each step, the system suggests a **Top‑k** list of items, and the learner selects an item based on a configurable choice policy, affecting the learning trajectory.


## 1) Imports and Utilities

In [None]:

import matplotlib.pyplot as plt
import numpy as np
import cvxpy as cp

np.set_printoptions(precision=3, suppress=True)

def colvec(x):
    """Force into a numpy column vector of shape (n,1)."""
    x = np.asarray(x).reshape(-1, 1)
    return x

def topk_indices(vec, k=3):
    """Returns indices sorted by descending value (Top-k) of a 1D vector."""
    v = np.asarray(vec).ravel()
    order = np.argsort(-v)  # descending sort
    return order[:k], v[order[:k]]

def generate_B(n, m, scenario=1, seed=None):
    """Generate matrix B based on the chosen scenario."""
    levels = np.random.choice([0,1,2], size=m, p=[0.4,0.4,0.2])
    if seed is not None:
        np.random.seed(seed)

    B = np.zeros((n, m))
    if scenario == 1:
        # Mono-compétence: chaque ressource agit sur une seule compétence
        for j in range(m):
            i = j % n
            lvl = levels[j]
            low, high = [(0.01,0.05), (0.05,0.1), (0.1,0.2)][lvl]
            B[i, j] = np.random.uniform(low, high)
    elif scenario == 2:
        # Intermédiaire: structure bandée
        for j in range(m):
            base_i = j % n
            lvl = levels[j]
            low, high = [(0.01,0.05), (0.05,0.1), (0.1,0.2)][lvl]
            width = 1 + lvl
            for i in range(n):
                if abs(i - base_i) <= width // 2:
                    B[i, j] = np.random.uniform(low, high)
    elif scenario == 3:
        # Multi-compétences: matrice dense
        for j in range(m):
            lvl = levels[j]
            low, high = [(0.01,0.05), (0.05,0.1), (0.1,0.2)][lvl]
            B[:, j] = np.random.uniform(low, high, n)
    else:
        raise ValueError("Scénario non reconnu. Choisir 1, 2 ou 3.")

    print(f"Generated B with shape {B.shape} for scenario {scenario}")
    return B, levels




## 2) Educational Model (A, B), Initial and Target State
In our example, we take:
- **A**: retention (99%) per skill, without cross-transfer,
- **B**: 80 resources
- **$x_0$** and **$x_{\text{target}}$** determined.
- **m**: Number of resources (80),
- **n**: Number of skills (4),
- **k**: Top-k construction parameters (8),
- **T_p**: MPC prediction horizon (10),
- **T_s**: Simulation horizon (50),
- **simulate_user**: Boolean to simulate user interaction (True),
- **Q_mpc**: State cost matrix,
- **Qf_mpc**: Final state cost matrix.

In [None]:

A = np.eye(4) * 0.99 # Matrix A: 99% retention without cross-skill coupling
m = 80  # Number of resources
n = 4  # Number of skills
k = 8 # Top-k construction parameters
T_p = 10 # T_p: MPC prediction horizon (e.g., 10)
T_s = 50 # T_s: Simulation horizon (e.g., 50)
simulate_user = True
Q_mpc = np.diag([1.0, 1.0, 1.0, 5.0])
Qf_mpc = np.diag([1.0, 1.0, 1.0, 5.0])

def R_t(t, levels):
    R = np.eye(m)
    for j, lvl in enumerate(levels):
        R[j, j] = [0.01, 0.05, 0.1][lvl]
    return R


# Initial state of student (x0) and target (x_target)
x0 = np.array([0.70, 0.40, 0.60, 0.30]).reshape(-1,1)
x_target = np.array([0.95, 0.80, 0.90, 0.70]).reshape(-1,1)

# Allocation of arrays for simulation
x_sim = np.zeros((T_s + 1, n))
x_sim[0] = x0.ravel()
u_applied = np.zeros((T_s, m))



## 3) Exploitation of U* to Build the Top-k and Human-in-the-Loop Simulation

In this cell, we show:
- how to build an instantaneous Top-k (u0);
- how to build a weighted Top-k over the horizon (short-term vs long-term policy);
- optionally, how to simulate the human‑in‑the‑loop by applying a one-hot choice at each step.


In [None]:

def build_weight_vector(N, slope='short'):
    """Returns a normalized weight vector for horizon weighting."""
    if slope == 'short':
        w = np.linspace(1.0, 0.1, N)
    elif slope == 'long':
        w = np.linspace(0.1, 1.0, N)
    else:
        w = np.ones(N)
    return w / np.sum(w)


def ask_user_choice(topk_idxs, topk_vals, simulate_user=True, randomness=0.2):

    if simulate_user:
        weights = np.linspace(1.0, 0.1, len(topk_idxs))
        weights = (1 - randomness) * weights + randomness * np.ones_like(weights)
        weights /= weights.sum()
        return int(np.random.choice(topk_idxs, p=weights))

    print('Suggestions (Top-k):')
    for i, v in enumerate(topk_vals):
        print(f'  [{i}] {resources[topk_idxs[i]]} (score={v:.3f})')
    while True:
        try:
            s = input(f"Enter local index (0..{len(topk_idxs)-1}) or Enter for top-1: ").strip()
            if s == '':
                return int(topk_idxs[0])
            k_choice = int(s)
            if 0 <= k_choice < len(topk_idxs):
                return int(topk_idxs[k_choice])
        except Exception:
            print('Invalid entry, please try again.')


def solve_mpc(x_current, T_p):
    U = cp.Variable((m, T_p))
    X = cp.Variable((n, T_p + 1))
    constraints = [X[:, 0] == x_current.ravel()]
    for t in range(T_p):
        constraints += [X[:, t + 1] == A @ X[:, t] + B @ U[:, t]]
        constraints += [U[:, t] >= 0, U[:, t] <= 1, cp.sum(U[:, t]) <= 1]
    cost = 0
    for t in range(T_p):
        dx = X[:, t + 1] - x_target.ravel()
        cost += cp.quad_form(dx, Q_mpc) + cp.quad_form(U[:, t], R_t(t, levels))
    prob = cp.Problem(cp.Minimize(cost), constraints)
    prob.solve(
        solver=cp.OSQP,
        verbose=False,
        eps_abs=1e-5,
        eps_rel=1e-5,
        max_iter=20000
    )
    if prob.status not in ['optimal', 'optimal_inaccurate']:
        raise RuntimeError('MPC not solved correctly: ' + str(prob.status))
    return np.array(U.value)


# ==== Simulation Loop ====
U_applied = np.zeros((m, T_s))
chosen_idx = []

scenarios = [1, 2, 3]
policies = ["instant", "short", "long"]

results = {}

# Loop over scenarios 1–3
for scenario in scenarios:
    B, levels = generate_B(4, m, scenario)

    print("Mean coupling per skill in B:", np.round(B.mean(axis=1), 3))
    print("x_sim shape =", x_sim.shape)
    print("A shape =", A.shape)
    print("B shape =", B.shape)

    print(f"\n=== Scenario {scenario} ===")
    print(f"Mean coupling per skill: {np.round(B.mean(axis=1), 3)}")

    print(f"Generated B with shape {B.shape} for scenario {scenario}")
    print("Matrix B:")
    print(B)
    print("Resource levels:")
    print(levels)

    skills = ["Phishing", "Firewall", "Passwords", "Incident"]
    levels_names = ["easy", "medium", "hard"]
    levels_values = [0.25, 0.50, 0.75]

    resources = []
    for j in range(m):
        dominant_skill_idx = np.argmax(B[:, j])
        dominant_skill = skills[dominant_skill_idx]

        lvl_idx = levels[j]
        lvl_name = levels_names[lvl_idx]
        lvl_val  = levels[lvl_idx]

        resources.append(f"r{j+1:03d}: {dominant_skill} ({lvl_name})")

    print(f"{len(resources)} resources generated.")
    print(resources[:8])

    print("\n=== DEBUG: Matrix B and Resource Mapping ===")
    header = "      " + " ".join([f"{s[:8]:>10}" for s in skills])
    print(header)
    print("-" * len(header))

    n, m = B.shape
    n, m

    for chosen_policy in policies:
        print(f"\n--- Policy: {chosen_policy} ---")

        for t in range(T_s):
            # --- MPC solve for current state ---
            U_pred = solve_mpc(x_sim[t], T_p)
            u0 = U_pred[:, 0]

            # --- Apply projection policy (Hadamard mask) ---
            P_instant = np.zeros_like(U_pred); P_instant[:, 0] = 1
            P_short   = np.zeros_like(U_pred); P_short[:, :T_p//2] = 1
            P_long    = np.zeros_like(U_pred); P_long[:, T_p//2:]  = 1

            # --- Policy selection ---
            if chosen_policy == 'instant':
                P = P_instant
            elif chosen_policy == 'short':
                P = P_short
            elif chosen_policy == 'long':
                P = P_long
            else:
                P = np.ones_like(U_pred)

            U_masked = U_pred * P  # ⊙ Hadamard

            # --- Build global scores by concatenating horizon vectors ---
            scores_concat = U_masked.flatten(order='F')  # column-major flatten (u0,u1,...)
            indices_concat = np.arange(U_masked.size)    # each corresponds to (resource, step)

            # --- Sort to get global Top-k ---
            topk_idx_flat = np.argsort(scores_concat)[::-1][:k]
            topk_vals_flat = scores_concat[topk_idx_flat]

            # --- Map back to (resource, step) coordinates ---
            res_idx, step_idx = np.unravel_index(topk_idx_flat, U_masked.shape)

            # --- Display Top-k aggregated over horizon ---
            print(f"\nTop-{k} global (policy={chosen_policy}):")
            for r, s, v in zip(res_idx, step_idx, topk_vals_flat):
                print(f"  t={s:<2} | {resources[r]:<35} (score={v:.3f})")

            idx_order = res_idx[:k]
            vals = topk_vals_flat[:k]

            # --- User (simulated or manual) choice ---
            choice = ask_user_choice(idx_order, vals, simulate_user=simulate_user)
            chosen_idx.append(choice)

            # --- Apply chosen resource effect ---
            a_t = np.zeros_like(u0)
            a_t[choice] = 1

            # The control is conceptual only (used for ranking), the actual effect comes from B
            # We apply directly the column of B for the chosen resource
            b_effect = B[:, choice]

            # --- Debug: visualize the actual control applied ---
            print(f"\n[DEBUG] Step {t} — Applied resource: {resources[choice]}")
            print(f"  a(t):            {a_t.astype(int)}")                 # one-hot control
            print(f"  B[:, choice]:    {np.round(b_effect, 3)}")           # resource effect on each skill
            print(f"  x(t):            {np.round(x_sim[t], 3)}")           # current user state
            print(f"  A @ x(t):        {np.round(A @ x_sim[t], 3)}")       # retained state
            print(f"  Δx:              {np.round(b_effect, 3)}")           # change due to resource
            print(f"  x(t+1) predicted:{np.round(A @ x_sim[t] + b_effect, 3)}")  # new state before clipping

            # Optional: scaling factor (learning rate, exposure, etc.)
            alpha = 1.0

            x_sim[t + 1] = (A @ x_sim[t] + alpha * b_effect).clip(0, 1)
            U_applied[:, t] = u0  # still store u0 for logs

            print(f"  x(t+1) applied:  {np.round(x_sim[t+1], 3)}")         # actual next state

            # --- Step diagnostics ---
            dist_to_target = np.linalg.norm(x_sim[t] - x_target.ravel())
            control_norm = np.linalg.norm(u0)
            if t % 5 == 0:
                print(f"[Step {t}] ||x-x*||={dist_to_target:.3f}  ||u||={control_norm:.3f}  mean(R)={np.mean(np.diag(R_t(t, levels))):.3f}")

        print('\nIndices chosen per time step:', chosen_idx)
        print('Chosen resources:', [resources[i] for i in chosen_idx])

        from collections import Counter
        skill_stats = Counter([r.split(":")[1].split("(")[0].strip() for r in [resources[i] for i in chosen_idx]])
        print("\nResource choice distribution:")
        for skill, count in skill_stats.items():
            print(f"  {skill:<10}: {count:3d} times")

        incident_idx = [i for i, r in enumerate(resources) if "Incident" in r]
        print(f"Incident resources: {len(incident_idx)}")
        for i in incident_idx[:10]:
            print(f"{resources[i]} — B[:,{i}] = {np.round(B[:, i], 3)}")

        # --- Monochrome-friendly skill plot ---
        fig, ax = plt.subplots(figsize=(8, 4))
        times = np.arange(T_s + 1)

        # Define line styles for clarity in black & white
        line_styles = ['-', '--', '-.', ':']
        target_styles = ['--', ':', '-.', '--']

        for i in range(n):
            style = line_styles[i % len(line_styles)]
            ax.plot(times, x_sim[:, i], linestyle=style, linewidth=1.8, label=f"Skill {skills[i]}")
            ax.axhline(y=x_target[i], linestyle=target_styles[i % len(target_styles)], linewidth=1)

        # --- Info box in grayscale (no color) ---
        avg_retention = np.mean(np.diag(A)) * 100
        has_coupling = not np.allclose(A, np.diag(np.diag(A)))

        A_summary = (
            f"A: {avg_retention:.0f}% retention, "
            f"{'cross-skill coupling' if has_coupling else 'diagonal'}"
        )

        info_text = (
            f"Policy: {chosen_policy}\n"
            f"Tₚ={T_p}, Tₛ={T_s}\n"
            f"{A_summary}\n"
            f"Scenario: {scenario if 'scenario' in locals() else 'N/A'}"
        )

        ax.text(
            1.02, 0.5, info_text,
            transform=ax.transAxes,
            fontsize=9,
            va='center', ha='left',
            bbox=dict(boxstyle="round,pad=0.3", fc='white', ec='black', lw=0.6, alpha=0.9)
        )

        ax.set_xlabel("Simulation step")
        ax.set_ylabel("Skill level")
        ax.set_ylim(0, 1.05)
        ax.set_title("MPC Simulation — Skill trajectories and targets")
        ax.legend(loc='lower right', fontsize=8, frameon=False)
        plt.tight_layout()
        plt.show()


        # Save results
        key = f"S{scenario}_{chosen_policy}"
        results[key] = {
            "x_sim": x_sim.copy(),
            "U": U_pred.copy(),
            "policy": chosen_policy,
            "scenario": scenario,
            "label": f"S{scenario}-{chosen_policy}",
            "final": x_sim[-1],
        }





## 4) Display of results

In this cell we display the result of the simulation

In [None]:

# Styles for B/W rendering
styles = {
    "instant": (0, (1, 3)),   # very dotted line
    "short":   (0, (5, 3)),   # medium dash
    "long":    (0, (1, 1)),   # continuous line
}
markers = {
    1: "o",    # scenario 1 = single-skill
    2: "s",    # scenario 2 = mixed
    3: "x",    # scenario 3 = multi
}

# === Global convergence plot ===
plt.figure(figsize=(8, 4))
times = np.arange(T_s + 1)

for key, res in results.items():
    scen = res["scenario"]
    pol  = res["policy"]

    # Euclidean distance to the target over the entire horizon
    d = np.linalg.norm(res["x_sim"] - x_target.T, axis=1)

    plt.plot(
        times,
        d,
        linestyle=styles[pol],
        marker=markers[scen],
        markevery=5,
        linewidth=1.1,
        label=f"S{scen}-{pol}"
    )

plt.title("Convergence Speed — Distance to Target over Time")
plt.xlabel("Step")
plt.ylabel("‖x - x*‖₂ (global skill gap)")
plt.yscale("log")  # highlights speed differences
plt.grid(True, linestyle=':')
plt.legend(ncol=3, fontsize=8)
plt.tight_layout()
plt.show()



print("\n=== Summary Table ===")
print("Scenario | Policy   | Final State (x)")
print("-" * 45)
for key, res in results.items():
    scen, pol = res["scenario"], res["policy"]
    xf = np.round(res["x_sim"][-1], 3)
    print(f"{scen:^8} | {pol:<8} | {xf}")



