In [1]:
# Some useful settings for interactive work
%load_ext autoreload
%autoreload 2

%matplotlib widget

import torch
torch.set_float32_matmul_precision('high')

In [2]:
# Import the relevant modules
import os
import torch
import albumentations as A
import figs.visualize.generate_videos as gv
import sousvide.control.networks.feature_extractors as fe
import numpy as np
import matplotlib.pyplot as plt
import cv2
from PIL import Image

from sklearn.decomposition import PCA
from albumentations.pytorch import ToTensorV2
from sousvide.control.pilot import Pilot

transform = A.Compose([                                             # Image transformation pipeline
        A.Resize(256, 256),
        A.CenterCrop(224, 224),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2()
        ])            
process_image = lambda x: transform(image=x)["image"]

def overlay_heatmap_on_image(heatmap, image, colormap='viridis', alpha=0.5):
    """
    Overlay a 16x16 heatmap on a 360x640 RGB image.
    
    Args:
        heatmap: (16, 16) array, float or uint8
        image: (360, 640, 3) RGB image (uint8)
        colormap: name of a matplotlib colormap
        alpha: blending factor [0 = only image, 1 = only heatmap]

    Returns:
        overlayed RGB image (uint8)
    """
    # Normalize heatmap to [0, 1]
    heatmap = heatmap.astype(np.float32)
    heatmap -= heatmap.min()
    heatmap /= (heatmap.max() + 1e-8)

    # Resize heatmap to match image size
    heatmap_resized = cv2.resize(heatmap, (image.shape[1], image.shape[0]), interpolation=cv2.INTER_CUBIC)

    # Apply colormap
    cmap = plt.get_cmap(colormap)
    heatmap_colored = cmap(heatmap_resized)[:, :, :3]  # drop alpha
    heatmap_colored = (heatmap_colored * 255).astype(np.uint8)

    # Blend with original image
    overlay = cv2.addWeighted(image, 1 - alpha, heatmap_colored, alpha, 0)

    return overlay

In [3]:
cohort = "experimental"

data_method = "eval_single"
eval_method = "eval_single"

scene = "mid_gate"

courses = ["traverse"]   

roster = ["clanGhostBear"]

In [4]:
workspace = os.path.join("../cohorts/experimental/rollout_data/traverse")

# Load the Rollout Data
trajs_path = os.path.join(workspace,"trajectories")
imgs_path = os.path.join(workspace,"images")

# Extract the image and trajectory files
trajs_files = [os.path.join(trajs_path,f) for f in os.listdir(trajs_path) if f.endswith('.pt')]
imgs_files = [os.path.join(imgs_path,f) for f in os.listdir(imgs_path) if f.endswith('.pt')]

trajs_files.sort()
imgs_files.sort()

trajs_file = trajs_files[:1]
imgs_file = imgs_files[:1]

# Load the data
traj = torch.load(trajs_file[0])[0]
imgs = torch.load(imgs_file[0])[0]

Tro,Xro,Uro = traj["Tro"],traj["Xro"],traj["Uro"]
Fro,tXUd,obj = traj["Fro"], traj["tXUd"], traj["obj"]

Iro = imgs["images"]
# img = Image.open("cow.jpg")
# img_array = np.array(img, dtype=np.uint8)
# Iro = np.expand_dims(img_array, axis=0)

height,width = Iro.shape[1], Iro.shape[2]
Nro = Iro.shape[0]

In [9]:
pilot = Pilot("experimental","clanGhostBear")
for i in range(20,30):
    tcr = Tro[i]
    xcr = Xro[:,i]
    upr = Uro[:,i-1]
    icr = Iro[i]
    ucr = pilot.control(tcr,xcr,upr,obj,icr,{})

tensor([-3.5491, -1.7979, -1.0319, -0.1383,  0.5849], device='cuda:0')
ha tensor([[ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [-3.5491, -1.7979, -1.0319, -0.1383,  0.5849]], device='cuda:0')
tensor([-3.5563, -1.7674, -1.0366, -0.1490,  0.6348], device='cuda:0')
ha tensor([[ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [-3.5491, -1.7979, -1.0319, -0.1383,  0.5849],
        [-3.5563, -1.7674, -1.0366, -0.1490,  0.6348]], device='cuda:0')
tensor([-3.5640, -1.7344, -1.0417, -0.1595,  0.6849], device='cuda:0')
ha tensor([[ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000, 

In [6]:
# vit = fe.DINOv2()
# # vit = fe.VitB16()

In [7]:
# alpha = 0.5
# Iout = np.zeros((Nro,height,width,3),dtype=np.uint8)
# for i in range(Nro):
#     iro = process_image(Iro[i,:,:,:]).unsqueeze(0)
#     with torch.no_grad():
#         patch_tokens = vit(iro).squeeze(0)
        
#     X = patch_tokens.cpu().numpy()
#     pca = PCA(n_components=4)
#     X_pca = pca.fit_transform(X)

#     v1 = pca.components_[0]
#     if i == 0:
#         vp = v1
#     v = alpha*vp + (1-alpha)*v1
#     v = v / np.linalg.norm(v)
#     vp = v

#     scores = X@v
#     heatmap = scores.reshape(16,16)
#     heatmap[heatmap<0.0] = 0
#     Iout[i,:,:,:] = overlay_heatmap_on_image(heatmap, Iro[i,:,:,:])

# gv.images_to_mp4(Iout,"output.mp4",fps=20)

In [8]:
# Fpc = np.zeros((Nro,768))
# for i in range(Nro):
#     iro = process_image(Iro[i,:,:,:]).unsqueeze(0)
#     with torch.no_grad():
#         patch_tokens = vit(iro).squeeze(0)
#     X = patch_tokens.cpu().numpy()
#     pca = PCA(n_components=4)
#     X_pca = pca.fit_transform(X)
#     first_pc = pca.components_[0]
#     Fpc[i,:] = first_pc
# v1 = np.mean(Fpc,axis=0)
# v1 = v1/np.linalg.norm(v1)

# Iout = np.zeros((Nro,height,width,3),dtype=np.uint8)
# for i in range(Nro):
#     iro = process_image(Iro[i,:,:,:]).unsqueeze(0)
#     with torch.no_grad():
#         patch_tokens = vit(iro).squeeze(0)
#     X = patch_tokens.cpu().numpy()

#     scores = X@v1
#     heatmap = scores.reshape(16,16)
#     heatmap[heatmap<0] = 0
#     Iout[i,:,:,:] = overlay_heatmap_on_image(heatmap, Iro[i,:,:,:])

# gv.images_to_mp4(Iout,"output.mp4",fps=20)