<a href="https://colab.research.google.com/github/elangbijak4/Humanoid-Robot/blob/main/Demo4%20-%203%20Agent%20vs%202%20Join%20Sendi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# CELL 1/2
# Setup, kinematics, dataset, train per-joint agents (numpy), save weights,
# then construct equivalent PyTorch agent modules (weights copied) for differentiable pipeline.

# NOTE: This cell may take ~1-2 minutes depending on runtime.

import numpy as np
np.random.seed(0)
import os
os.makedirs('models', exist_ok=True)

# ---------- Kinematics (2-DoF planar) ----------
DOF = 2
LINKS = np.array([1.0, 0.8], dtype=float)

def forward_kinematics_np(q):
    # q: (..., DOF)
    qs = np.cumsum(q, axis=-1)
    x = np.cumsum(np.cos(qs) * LINKS, axis=-1)
    y = np.cumsum(np.sin(qs) * LINKS, axis=-1)
    pts = np.stack([x, y], axis=-1)  # (..., DOF, 2)
    origin = np.zeros_like(pts[..., :1, :])
    joints = np.concatenate([origin, pts], axis=-2)
    ee = joints[..., -1, :]
    return ee, joints

def jacobian_np(q):
    J = np.zeros((2, DOF))
    angles = np.cumsum(q)
    for i in range(DOF):
        s = np.sum(np.sin(angles[i:]) * LINKS[i:])
        c = np.sum(np.cos(angles[i:]) * LINKS[i:])
        J[0,i] = -s
        J[1,i] = c
    return J

def jacobian_transpose_control(q, target, alpha=0.9, max_vel=2.0):
    ee, _ = forward_kinematics_np(q[None,:])
    ee = ee[0]
    err = target - ee
    J = jacobian_np(q)
    qdot = alpha * J.T.dot(err)
    qdot = np.clip(qdot, -max_vel, max_vel)
    return qdot

# ---------- Dataset ----------
def generate_dataset(n=3000):
    Q = np.random.uniform(-np.pi/2, np.pi/2, size=(n, DOF)).astype(np.float32)
    angles = np.random.uniform(0, 2*np.pi, size=(n,))
    radii = np.random.uniform(0.2, LINKS.sum()*0.9, size=(n,))
    T = np.stack([radii*np.cos(angles), radii*np.sin(angles)], axis=-1).astype(np.float32)
    QDOT = np.array([jacobian_transpose_control(q, t, alpha=0.9) for q,t in zip(Q,T)], dtype=np.float32)
    return Q, T, QDOT

Q, T, QDOT = generate_dataset(3000)
print("Dataset shapes:", Q.shape, T.shape, QDOT.shape)

# ---------- Numpy MLP for agents ----------
class NumpyMLP:
    def __init__(self, in_dim, hid_dim, out_dim, seed=0):
        rng = np.random.RandomState(seed)
        self.W1 = rng.randn(in_dim, hid_dim) * np.sqrt(2/(in_dim+hid_dim))
        self.b1 = np.zeros(hid_dim, dtype=float)
        self.W2 = rng.randn(hid_dim, out_dim) * np.sqrt(2/(hid_dim+out_dim))
        self.b2 = np.zeros(out_dim, dtype=float)
    def forward(self, x):
        z1 = x.dot(self.W1) + self.b1
        a1 = np.tanh(z1)
        z2 = a1.dot(self.W2) + self.b2
        return z2, a1
    def predict(self, x):
        z2, _ = self.forward(x)
        return z2
    def train(self, X, Y, epochs=200, lr=1e-3, batch=64, verbose=False):
        n = X.shape[0]
        for ep in range(epochs):
            perm = np.random.permutation(n)
            Xs = X[perm]; Ys = Y[perm]
            total_loss = 0.0
            for i in range(0, n, batch):
                xb = Xs[i:i+batch]; yb = Ys[i:i+batch]
                z1 = xb.dot(self.W1) + self.b1
                a1 = np.tanh(z1)
                out = a1.dot(self.W2) + self.b2
                err = out - yb
                loss = np.mean(err**2)
                total_loss += loss * xb.shape[0]
                # backprop
                d_out = 2 * err / xb.shape[0]
                dW2 = a1.T.dot(d_out)
                db2 = d_out.sum(axis=0)
                da1 = d_out.dot(self.W2.T)
                dz1 = da1 * (1 - np.tanh(z1)**2)
                dW1 = xb.T.dot(dz1)
                db1 = dz1.sum(axis=0)
                # gradient step
                self.W2 -= lr * dW2
                self.b2 -= lr * db2
                self.W1 -= lr * dW1
                self.b1 -= lr * db1
            avg = total_loss / n
            if verbose and (ep % 50 == 0 or ep==epochs-1):
                print(f"Epoch {ep+1}/{epochs}, loss {avg:.6f}")

# Prepare per-joint data
X1 = np.concatenate([Q[:,0:1], T], axis=1)  # (n,3)
X2 = np.concatenate([Q[:,1:2], T], axis=1)
Y1 = QDOT[:,0:1]
Y2 = QDOT[:,1:2]

agent1_np = NumpyMLP(3, 32, 1, seed=1)
agent2_np = NumpyMLP(3, 32, 1, seed=2)

print("Training agent1 (numpy)...")
agent1_np.train(X1, Y1, epochs=300, lr=5e-4, batch=128, verbose=True)
print("Training agent2 (numpy)...")
agent2_np.train(X2, Y2, epochs=300, lr=5e-4, batch=128, verbose=True)

# Save weights (numpy)
np.savez('models/agent1.npz', W1=agent1_np.W1, b1=agent1_np.b1, W2=agent1_np.W2, b2=agent1_np.b2)
np.savez('models/agent2.npz', W1=agent2_np.W1, b1=agent2_np.b1, W2=agent2_np.W2, b2=agent2_np.b2)
print("Saved numpy agent weights to ./models")

# ---------- Build equivalent PyTorch agents (for differentiable pipeline) ----------
import torch, torch.nn as nn
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("PyTorch device:", device)

class AgentTorch(nn.Module):
    def __init__(self, W1, b1, W2, b2):
        super().__init__()
        # create Linear layers with given weights (transpose because torch Linear uses out x in)
        in_dim = W1.shape[0]
        hid = W1.shape[1]
        out = W2.shape[1]
        self.l1 = nn.Linear(in_dim, hid)
        self.l2 = nn.Linear(hid, out)
        # copy weights
        with torch.no_grad():
            self.l1.weight.copy_(torch.from_numpy(W1.T))
            self.l1.bias.copy_(torch.from_numpy(b1))
            self.l2.weight.copy_(torch.from_numpy(W2.T))
            self.l2.bias.copy_(torch.from_numpy(b2))
    def forward(self, x):
        # x: (batch, in_dim)
        a = torch.tanh(self.l1(x))
        return self.l2(a)

# load saved numpy weights
d1 = np.load('models/agent1.npz')
d2 = np.load('models/agent2.npz')
agent1_torch = AgentTorch(d1['W1'], d1['b1'], d1['W2'], d1['b2']).to(device)
agent2_torch = AgentTorch(d2['W1'], d2['b1'], d2['W2'], d2['b2']).to(device)
agent1_torch.eval(); agent2_torch.eval()
print("Constructed PyTorch agents from numpy weights. Agents ready.")


Dataset shapes: (3000, 2) (3000, 2) (3000, 2)
Training agent1 (numpy)...
Epoch 1/300, loss 1.199614
Epoch 51/300, loss 0.707639
Epoch 101/300, loss 0.686887
Epoch 151/300, loss 0.681517
Epoch 201/300, loss 0.676376
Epoch 251/300, loss 0.670831
Epoch 300/300, loss 0.664815
Training agent2 (numpy)...
Epoch 1/300, loss 1.247843
Epoch 51/300, loss 0.227582
Epoch 101/300, loss 0.224382
Epoch 151/300, loss 0.223228
Epoch 201/300, loss 0.222127
Epoch 251/300, loss 0.221042
Epoch 300/300, loss 0.219991
Saved numpy agent weights to ./models
PyTorch device: cuda
Constructed PyTorch agents from numpy weights. Agents ready.


In [5]:
# CELL 2/2
# Blackboard implemented in PyTorch (differentiable).
# - optional batch training of blackboard to minimize multi-step EE error
# - run demo episode with finetune per-step (gradient-based)
# - interactive widgets: Play/Slider, Random Target, Recompute, two bias sliders that modulate blackboard outputs in real-time

# Install widgets if needed (Colab)
try:
    import ipywidgets as widgets
    from IPython.display import display, clear_output
except Exception:
    !pip install ipywidgets
    import ipywidgets as widgets
    from IPython.display import display, clear_output

# imports
import torch, torch.nn as nn, torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation
import time

# ensure device from previous cell
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ---------- Torch forward kinematics (differentiable) ----------
LINKS_t = torch.tensor(LINKS, dtype=torch.float32, device=device)

def fk_torch(q):
    # q: (batch, DOF)
    # returns ee: (batch,2)
    angles = torch.cumsum(q, dim=-1)
    x = torch.cumsum(torch.cos(angles) * LINKS_t, dim=-1)
    y = torch.cumsum(torch.sin(angles) * LINKS_t, dim=-1)
    ee = torch.stack([x[..., -1], y[..., -1]], dim=-1)
    return ee

# ---------- Blackboard PyTorch module ----------
class BlackboardTorch(nn.Module):
    def __init__(self, in_dim=2, hid=32, out_dim=2):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, hid),
            nn.Tanh(),
            nn.Linear(hid, hid),
            nn.Tanh(),
            nn.Linear(hid, out_dim)
        )
    def forward(self, err):  # err: (batch,2)
        z = self.net(err)
        mod = 1.0 + torch.tanh(z)   # range (0,2)
        return mod

blackboard_t = BlackboardTorch().to(device)

# ---------- Helper: convert numpy agent inputs to torch and run agents to get qdot (differentiable) ----------
def agent_outputs_torch(q_batch, target_batch):
    # q_batch: (batch, DOF)
    # target_batch: (batch, 2)
    # For per-joint agent networks, inputs are [q_i, tx, ty]
    b = q_batch.shape[0]
    q1 = q_batch[:,0:1]
    q2 = q_batch[:,1:2]
    inp1 = torch.cat([q1, target_batch], dim=-1).to(device).float()
    inp2 = torch.cat([q2, target_batch], dim=-1).to(device).float()
    out1 = agent1_torch(inp1)  # (batch,1)
    out2 = agent2_torch(inp2)
    out = torch.cat([out1, out2], dim=-1)  # (batch,2)
    return out

# ---------- Simulation (torch) for multiple steps (differentiable) ----------
# --- REPLACEMENT: differentiable simulation + safe blackboard trainer ---

import torch, torch.optim as optim

# Ensure agent params are frozen (we do not train agents here)
for p in agent1_torch.parameters():
    p.requires_grad = False
for p in agent2_torch.parameters():
    p.requires_grad = False

# Ensure blackboard params are trainable
for p in blackboard_t.parameters():
    p.requires_grad = True

def simulate_multi_step_torch(q0, target, blackboard, steps=20, dt=0.05, finetune_blackboard=False, optimizer=None):
    """
    Fully differentiable simulation in torch.
    q0: tensor (batch, DOF), target: tensor (batch,2)
    returns: traj (steps+1, batch, DOF), mods (steps, batch, 2), errs (steps, batch)
    All tensors kept as torch tensors (no detach) so gradients can flow to blackboard params.
    """
    q = q0.clone()            # tensor
    traj = [q]
    mods = []
    errs = []
    for t in range(steps):
        ee = fk_torch(q)                 # (batch,2)
        err = target - ee                # (batch,2)
        mod = blackboard(err)            # (batch,2) depends on blackboard params
        out = agent_outputs_torch(q, target)  # (batch,2), depends on agent params but agents frozen
        qdot = out * mod                 # (batch,2)
        q = q + qdot * dt                # state updated; graph preserved
        traj.append(q)
        mods.append(mod)
        errs.append(torch.norm(err, dim=-1))
        # optional online finetune: if requested, perform optimizer step on blackboard to reduce immediate next-step error
        if finetune_blackboard and (optimizer is not None):
            optimizer.zero_grad()
            ee_next = fk_torch(q)   # next EE (differentiable)
            loss_step = torch.mean((target - ee_next)**2)
            loss_step.backward()
            optimizer.step()
    traj = torch.stack(traj, dim=0)    # (steps+1, batch, DOF)
    if len(mods)>0:
        mods = torch.stack(mods, dim=0)    # (steps, batch, 2)
    else:
        mods = torch.zeros((0, q0.shape[0], 2), device=q0.device)
    if len(errs)>0:
        errs = torch.stack(errs, dim=0)    # (steps, batch)
    else:
        errs = torch.zeros((0, q0.shape[0]), device=q0.device)
    return traj, mods, errs

def train_blackboard_batch_torch(blackboard, epochs=100, lr=5e-3, steps=20, batch_size=64):
    """
    Train only blackboard parameters using fully-differentiable simulation.
    Uses dataset Q, T from earlier (numpy).
    """
    optimizer_bb = optim.Adam(blackboard.parameters(), lr=lr)
    Q_np_local = Q   # make sure Q (numpy) exists from Cell1
    T_np_local = T
    n = Q_np_local.shape[0]
    for ep in range(1, epochs+1):
        perm = np.random.permutation(n)
        total_loss = 0.0
        for i in range(0, n, batch_size):
            idx = perm[i:i+batch_size]
            q_batch = torch.from_numpy(Q_np_local[idx]).to(device).float()      # (batch, DOF)
            target_batch = torch.from_numpy(T_np_local[idx]).to(device).float() # (batch, 2)
            # differentiable simulate
            traj_t, mods_t, errs_t = simulate_multi_step_torch(q_batch, target_batch, blackboard, steps=steps, dt=0.05, finetune_blackboard=False, optimizer=None)
            q_final = traj_t[-1]   # tensor still attached to graph
            ee_final = fk_torch(q_final)
            loss = torch.mean((target_batch - ee_final)**2)
            optimizer_bb.zero_grad()
            loss.backward()
            optimizer_bb.step()
            total_loss += loss.item() * q_batch.size(0)
        avg_loss = total_loss / n
        if ep % 10 == 0 or ep==1 or ep==epochs:
            print(f"[Blackboard train] epoch {ep}/{epochs}, avg_loss={avg_loss:.6f}")
    return

# ---- USAGE: call the corrected trainer instead of the old one ----
# Example warm-start (short):
train_blackboard_batch_torch(blackboard_t, epochs=60, lr=5e-3, steps=20, batch_size=128)


# ---------- Prepare demo initial states (numpy) ----------
initial_q_np = np.array([0.1, -0.2], dtype=np.float32)
target_np = np.array([1.2, 0.2], dtype=np.float32)

# Convert to torch for simulation
q0_t = torch.from_numpy(initial_q_np[None,:]).to(device).float()
target_t = torch.from_numpy(target_np[None,:]).to(device).float()

# Optional: quick batch-train blackboard for a few epochs to initialize policy
print("Running short batch training for blackboard (optional warm-start)...")
train_blackboard_batch(blackboard_t, epochs=60, lr=5e-3, steps=20, batch_size=128)
print("Blackboard warm-start complete.")

# ---------- Demo: run a differentiable episode with optional finetune (online adaptation) ----------
def run_demo_and_return_numpy(initial_q_np, target_np, blackboard, finetune_online=False, finetune_lr=1e-2, finetune_steps=1):
    q0 = torch.from_numpy(initial_q_np[None,:]).to(device).float()
    tgt = torch.from_numpy(target_np[None,:]).to(device).float()
    # if online finetune, we create optimizer for blackboard
    optim_bb = None
    if finetune_online:
        optim_bb = optim.Adam(blackboard.parameters(), lr=finetune_lr)
    traj, mods, errs = simulate_multi_step(q0, tgt, blackboard, steps=300, dt=0.05, finetune_blackboard=finetune_online, finetune_steps=finetune_steps, optimizer=optim_bb)
    # traj shape (steps+1, batch, DOF) -> squeeze batch
    traj = traj[:,0,:]
    if mods.shape[0]>0:
        mods = mods[:,0,:]
    errs = errs[:,0]
    return traj, mods, errs

# Quick run (no online finetune)
traj_np, mods_np, errs_np = run_demo_and_return_numpy(initial_q_np, target_np, blackboard_t, finetune_online=False)
print("Demo run complete: steps:", traj_np.shape[0], "final error:", float(errs_np[-1]) if len(errs_np)>0 else 0.0)

# ---------- Interactive visualization + UI ----------
# We'll provide Play/Slider, Random Target, Recompute, and two sliders bias_m1 and bias_m2 that add offsets to blackboard output in real-time.

# Prepare matplotlib figure (inline widget mode may be used in Colab)
%matplotlib widget
import matplotlib.pyplot as plt
fig = plt.figure(figsize=(9,5))
ax = fig.add_subplot(1,2,1)
ax.set_aspect('equal', 'box')
lim = float(LINKS.sum() + 0.2)
ax.set_xlim(-lim, lim); ax.set_ylim(-lim, lim)
ax.set_title("2-DoF Arm (Agents + Blackboard)")

line, = ax.plot([], [], lw=3)
pts, = ax.plot([], [], 'o')
target_plot, = ax.plot([], [], marker='x', markersize=10, color='k')
txt = ax.text(-lim+0.05, lim-0.15, '', fontsize=10)

ax2 = fig.add_subplot(2,2,2)
line_m1, = ax2.plot([], [], label='m1')
line_m2, = ax2.plot([], [], label='m2')
ax2.legend()
ax2.set_xlim(0, 300)
ax2.set_ylim(0, 2.2)
ax2.set_xlabel('Timestep')

ax3 = fig.add_subplot(2,2,4)
line_err, = ax3.plot([], [], color='r')
ax3.set_xlim(0, 300)
ax3.set_ylim(0, max(0.1, errs_np.max()*1.1 if len(errs_np)>0 else 0.1))
ax3.set_xlabel('Timestep')

# UI widgets
play = widgets.Play(value=0, min=0, max=max(1, traj_np.shape[0]-1), step=1, interval=40)
slider = widgets.IntSlider(value=0, min=0, max=max(1, traj_np.shape[0]-1), step=1)
widgets.jslink((play, 'value'), (slider, 'value'))

btn_random = widgets.Button(description='Random Target')
btn_recompute = widgets.Button(description='Recompute Traj')
btn_train_bb = widgets.Button(description='Train Blackboard (batch)')
btn_toggle_finetune = widgets.ToggleButton(value=False, description='Online Finetune')
bias_m1 = widgets.FloatSlider(min=-0.5, max=0.5, step=0.01, value=0.0, description='bias_m1')
bias_m2 = widgets.FloatSlider(min=-0.5, max=0.5, step=0.01, value=0.0, description='bias_m2')
status = widgets.Output()

ui_top = widgets.HBox([play, slider, btn_random, btn_recompute, btn_train_bb, btn_toggle_finetune])
ui_bot = widgets.HBox([bias_m1, bias_m2])
display(ui_top, ui_bot, status)

# functions to recompute trajectories with current blackboard and biases
current_initial = initial_q_np.copy()
current_target = target_np.copy()

def compute_traj_with_biases():
    # create a wrapper blackboard that adds biases to output (non-destructive)
    def blackboard_with_bias(err_np):
        # err_np: (batch,2) numpy
        err_t = torch.from_numpy(err_np).to(device).float()
        mod_t = blackboard_t(err_t).detach().cpu().numpy()
        mod_t[:,0] += bias_m1.value
        mod_t[:,1] += bias_m2.value
        # clamp to [0,2]
        mod_t = np.clip(mod_t, 0.0, 2.0)
        return mod_t
    # Instead of modifying blackboard internals, we will call the torch simulate but then multiply by bias factors externally:
    # Simpler: run run_demo_and_return_numpy (which uses blackboard_t), then apply biases to mods sequence
    traj, mods, errs = run_demo_and_return_numpy(current_initial, current_target, blackboard_t, finetune_online=btn_toggle_finetune.value)
    # apply bias
    if mods is not None and mods.shape[0]>0:
        mods[:,0] = np.clip(mods[:,0] + bias_m1.value, 0.0, 2.0)
        mods[:,1] = np.clip(mods[:,1] + bias_m2.value, 0.0, 2.0)
    return traj, mods, errs

# initial compute
traj_np, mods_np, errs_np = compute_traj_with_biases()

# drawing helpers
def draw_frame(i):
    i = int(i)
    i = max(0, min(i, traj_np.shape[0]-1))
    j = traj_np[i]
    # compute joints via numpy fk
    ee, joints = forward_kinematics_np(j[None,:])
    joints = joints[0]
    line.set_data(joints[:,0], joints[:,1])
    pts.set_data(joints[:,0], joints[:,1])
    target_plot.set_data([current_target[0]], [current_target[1]])
    # update time plots
    if mods_np is not None and mods_np.shape[0]>0:
        line_m1.set_data(np.arange(min(i+1, mods_np.shape[0])), mods_np[:min(i+1, mods_np.shape[0]),0])
        line_m2.set_data(np.arange(min(i+1, mods_np.shape[0])), mods_np[:min(i+1, mods_np.shape[0]),1])
    if errs_np is not None and errs_np.shape[0]>0:
        line_err.set_data(np.arange(min(i+1, errs_np.shape[0])), errs_np[:min(i+1, errs_np.shape[0])])
    txt.set_text(f"step={i}, err={errs_np[min(i,len(errs_np)-1)]:.3f}" if len(errs_np)>0 else f"step={i}")
    fig.canvas.draw_idle()

# slider observer
def on_slider_change(change):
    draw_frame(change['new'])

slider.observe(on_slider_change, names='value')

# random target handler
def on_random_clicked(b):
    global current_target
    ang = np.random.uniform(0, 2*np.pi)
    r = np.random.uniform(0.2, LINKS.sum()*0.85)
    current_target = np.array([r*np.cos(ang), r*np.sin(ang)], dtype=np.float32)
    with status:
        status.clear_output()
        print("Random target:", current_target)
    recompute_traj(None)

btn_random.on_click(on_random_clicked)

# recompute handler
def recompute_traj(b):
    global traj_np, mods_np, errs_np
    with status:
        status.clear_output()
        print("Recomputing trajectory (this may take a few seconds)...")
    traj_np, mods_np, errs_np = compute_traj_with_biases()
    # update slider/play range
    length = traj_np.shape[0]
    play.max = max(0, length-1)
    slider.max = max(0, length-1)
    play.value = 0
    slider.value = 0
    with status:
        print("Recomputed. steps:", length, "final err:", float(errs_np[-1]) if len(errs_np)>0 else 0.0)

btn_recompute.on_click(recompute_traj)

# train blackboard batch handler
def on_train_bb(b):
    with status:
        status.clear_output()
        print("Training blackboard (batch) for 50 epochs (this may take time)...")
    train_blackboard_batch(blackboard_t, epochs=50, lr=5e-3, steps=20, batch_size=256)
    with status:
        print("Blackboard batch training finished. Recomputing trajectory...")
    recompute_traj(None)

btn_train_bb.on_click(on_train_bb)

# toggle online finetune: just recompute so it's used on next run
def on_toggle_finetune(change):
    with status:
        status.clear_output()
        print("Online finetune set to", change['new'])
    recompute_traj(None)

btn_toggle_finetune.observe(on_toggle_finetune, names='value')

# bias sliders: recompute on change
def on_bias_change(change):
    recompute_traj(None)

bias_m1.observe(on_bias_change, names='value')
bias_m2.observe(on_bias_change, names='value')

# initial draw
draw_frame(0)

print("Interactive demo ready. Use controls to change target, train blackboard, toggle online finetune, and adjust bias sliders (bias affect modulation in realtime).")


[Blackboard train] epoch 1/60, avg_loss=0.844182
[Blackboard train] epoch 10/60, avg_loss=0.719792
[Blackboard train] epoch 20/60, avg_loss=0.647405
[Blackboard train] epoch 30/60, avg_loss=0.642161
[Blackboard train] epoch 40/60, avg_loss=0.641224
[Blackboard train] epoch 50/60, avg_loss=0.641392
[Blackboard train] epoch 60/60, avg_loss=0.640584
Running short batch training for blackboard (optional warm-start)...


RuntimeError: element 0 of tensors does not require grad and does not have a grad_fn