In [62]:
import os, sys
import torch
import matplotlib.pyplot as plt
from PIL import Image
sys.path.append('./perception_models')
import core.vision_encoder.pe as pe
import core.vision_encoder.transforms as transforms
from huggingface_hub import hf_hub_download
import cv2


def preprocess_video(video_path, num_frames=30, transform=None, return_first_frame_for_demo=True):
    print(video_path)
    cap = cv2.VideoCapture(video_path, apiPreference=cv2.CAP_ANY)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    frame_indices = [int(i * (total_frames / num_frames)) for i in range(num_frames)]
    preprocessed_frames = []
    first_frame = None

    current_index = 0
    grabbed_index = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if current_index == frame_indices[grabbed_index]:
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            pil_img = Image.fromarray(frame_rgb)
            if transform:
                frame_tensor = transform(pil_img)
            else:
                frame_tensor = transforms.ToTensor()(pil_img)

            preprocessed_frames.append(frame_tensor)

            if grabbed_index == 0 and return_first_frame_for_demo:
                first_frame = frame_rgb

            grabbed_index += 1
            if grabbed_index >= len(frame_indices):
                break

        current_index += 1

    cap.release()
    return torch.stack(preprocessed_frames, dim=0), first_frame

local_ckpt_path = hf_hub_download(
    repo_id="facebook/PE-Core-B16-224",
    filename="PE-Core-B16-224.pt"
)
device = torch.device("mps")
model_name = 'PE-Core-B16-224'

model = pe.CLIP.from_config(model_name, pretrained=True, checkpoint_path=local_ckpt_path)
model = model.to(device)

preprocess = transforms.get_image_transform(model.image_size)
tokenizer = transforms.get_text_tokenizer(model.context_length)


Missing keys for loading model: []
Unexpected keys for loading model: []


In [30]:
print(dir(model))
print(model.image_size)
# Freeze all initially
for param in model.parameters():
    param.requires_grad = False


model.add_module('classifier')




['T_destination', '__annotations__', '__call__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattr__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_apply', '_backward_hooks', '_backward_pre_hooks', '_buffers', '_call_impl', '_compiled_call_impl', '_forward_hooks', '_forward_hooks_always_called', '_forward_hooks_with_kwargs', '_forward_pre_hooks', '_forward_pre_hooks_with_kwargs', '_get_backward_hooks', '_get_backward_pre_hooks', '_get_name', '_is_full_backward_hook', '_load_from_state_dict', '_load_state_dict_post_hooks', '_load_state_dict_pre_hooks', '_maybe_warn_non_full_backward_hook', '_modules', '_named_members', '_non_persistent_buffers_set', '_parameters', '_register_load_state_dict_pre_hoo

TypeError: Module.add_module() missing 1 required positional argument: 'module'

In [40]:
import numpy as np
data = np.load('/Users/michaelrice/Documents/GitHub/Thesis/MSc_AI_Thesis/sampledata/proper_cartesting/1/1.npy', allow_pickle=True).item()



frames = data['rgb']
frame_ts = list(frames.keys())
frames = list(frames.values())
import pandas as pd

action_intervals = pd.read_csv('/Users/michaelrice/Documents/GitHub/Thesis/MSc_AI_Thesis/notebooks_and_scripts/actions.csv')

print(action_intervals)

    start_frame  end_frame                      action
0             0         63                     nothing
1            63         70   checking rear view mirror
2            70         85   checking left wing mirror
3            85        128                     nothing
4           129        144  checking right wing mirror
..          ...        ...                         ...
95         9965       9978   checking left wing mirror
96         9979      10025   checking rear view mirror
97        10026      10034                     nothing
98        10035      10065   checking rear view mirror
99        10066      10180                     nothing

[100 rows x 3 columns]


In [None]:
lwm = frames[7980:8010]


for i, frame in enumerate(lwm):
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    cv2.imshow("Gaze Visualization", frame_rgb)
    key = cv2.waitKey(250)
    if key == 27:
        break

output_path = "/Users/michaelrice/Documents/GitHub/Thesis/MSc_AI_Thesis/sampledata/vid_test/action_videos/rwm_4.mp4"
height, width, layers = lwm[0].shape
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video_writer = cv2.VideoWriter(output_path, fourcc, 15, (width, height))

for frame in lwm:
    video_writer.write(frame)

video_writer.release()
cv2.destroyAllWindows()


In [None]:
paths = os.listdir('/Users/michaelrice/Documents/GitHub/Thesis/MSc_AI_Thesis/sampledata/vid_test/action_videos/')
for path in paths:
    if path.endswith('.mp4'):
        video, first_frame = preprocess_video(f'/Users/michaelrice/Documents/GitHub/Thesis/MSc_AI_Thesis/sampledata/vid_test/action_videos/{path}', 30, transform=preprocess)
        video = video.unsqueeze(0).to(device)
        text = tokenizer(["checking left wing mirror ", "checking right wing mirror", 'checking rearview mirror', "driving"]).to(device)
        captions = ["checking left wing mirror ", "checking right wing mirror", 'checking rearview mirror', "driving"]

        with torch.no_grad():
            image_features = model.encode_video(video)
            text_features = model.encode_text(text)
            image_features /= image_features.norm(dim=-1, keepdim=True)
            text_features /= text_features.norm(dim=-1, keepdim=True)
            text_probs = (100.0 * image_features @ text_features.T).softmax(dim=-1).cpu().numpy()[0]


        print("Captions:", captions)
        print("Label probs:", ' '.join(['{:.2f}'.format(prob) for prob in text_probs]))  # prints: [[0.00, 1.00, 0.00]]
        print(f"This video is about {captions[text_probs.argmax()]}")


/Users/michaelrice/Documents/GitHub/Thesis/MSc_AI_Thesis/sampledata/vid_test/action_videos/nothing_5.mp4
Captions: ['checking left wing mirror ', 'checking right wing mirror', 'checking rearview mirror', 'driving']
Label probs: 0.08 0.07 0.74 0.11
This video is about checking rearview mirror
/Users/michaelrice/Documents/GitHub/Thesis/MSc_AI_Thesis/sampledata/vid_test/action_videos/rvm_4.mp4
Captions: ['checking left wing mirror ', 'checking right wing mirror', 'checking rearview mirror', 'driving']
Label probs: 0.15 0.10 0.65 0.09
This video is about checking rearview mirror
/Users/michaelrice/Documents/GitHub/Thesis/MSc_AI_Thesis/sampledata/vid_test/action_videos/rvm_5.mp4
Captions: ['checking left wing mirror ', 'checking right wing mirror', 'checking rearview mirror', 'driving']
Label probs: 0.12 0.09 0.70 0.09
This video is about checking rearview mirror
/Users/michaelrice/Documents/GitHub/Thesis/MSc_AI_Thesis/sampledata/vid_test/action_videos/nothing_4.mp4
Captions: ['checking lef

: 

In [None]:
# Suppose model.encode_video outputs features of shape [B, D]
from torch import nn


class VideoActionClassifier(nn.Module):
    def __init__(self, base_model, embed_dim, num_classes):
        super().__init__()
        self.base_model = base_model
        self.classifier = nn.Linear(embed_dim, num_classes)
    
    def forward(self, video):
        with torch.no_grad():
            features = self.base_model.encode_video(video)  # freeze video encoder initially
        features = features / features.norm(dim=-1, keepdim=True)
        logits = self.classifier(features)  # [B, num_classes]
        return logits

# Instantiate
num_classes = 3
num_epochs = 5
embed_dim = 224  # adjust to your model's output dim
classifier_model = VideoActionClassifier(model, embed_dim, num_classes).to(device)



optimizer = torch.optim.AdamW(classifier_model.classifier.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

# Training loop (simplified)
for epoch in range(num_epochs):
    for video_batch, labels in train_loader:
        video_batch = video_batch.to(device)
        labels = labels.to(device)
        
        logits = classifier_model(video_batch)
        loss = criterion(logits, labels)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
