In [18]:
import sys
sys.path.append('../')

import numpy as np
import os
from PIL import Image
import json
import math
import copy
from sklearn.model_selection import train_test_split
import ast

import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModel
import torchvision
from tqdm.notebook import tqdm

from cognitive.task_bank import CompareLocTemporal
from cognitive import task_generator as tg
from cognitive import constants as const
from cognitive import stim_generator as sg
from cognitive import info_generator as ig
import random

from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC, LinearSVC
from sklearn.metrics import accuracy_score

TRAIN_DIR = 'datasets/train_2'  # Training Dataset Directory
VAL_DIR = 'datasets/val_2'  # Validation Dataset Directory
IMGM_PATH = 'offline_models/resnet/resnet'
MAX_FRAMES = 2
EMB_DIR = 'datasets/embeddings'
IMGM_OUT_DIM = 2048 # vision transformer output dimension

device = torch.device("mps")

# Experiment 1 (SVM on Coords)

In [2]:
def read_trials(path):
    infos = []
    imgs = []

    for trial_fp in os.listdir(path):
        if 'trial' not in trial_fp:
            continue

        trial_fp = os.path.join(path, trial_fp)
        
        info = None
        
        for fp in os.listdir(trial_fp):
            fp = os.path.join(trial_fp, fp)
            
            if 'epoch1.png' in fp:
                img = np.rollaxis(np.array(Image.open(fp), dtype=np.float32),2,0)
                imgs.append(img)
            elif 'trial_info' in fp:
                info = json.load(open(fp))
                infos.append(info)

    return imgs, infos

train_frames, train_infos = read_trials(TRAIN_DIR)
val_frames, val_infos = read_trials(VAL_DIR)

In [3]:
def map_coord(coord):
    if coord[0] < 0.5:
        if coord[1] < 0.5:
            return np.array([1])
        else:
            return np.array([2])
    else:
        if coord[1] < 0.5:
            return np.array([3])
        else:
            return np.array([4])

train_targets = [map_coord(ast.literal_eval(x['objects'][1]['location'])) for x in train_infos]
val_targets = [map_coord(ast.literal_eval(x['objects'][1]['location'])) for x in val_infos]

In [4]:
img_encoder = torchvision.models.resnet50(weights=torchvision.models.ResNet50_Weights.IMAGENET1K_V2.transforms).to(device).eval()

def img_embedder(frames, encoder):
    activation = {}
    def get_activation(name):
        def hook(model, input, output):
            activation[name] = output.detach()
        return hook
    
    encoder.avgpool.register_forward_hook(get_activation('layer'))

    with torch.no_grad():
        out = encoder(torch.tensor(frames).unsqueeze(0).to(device))
        out = torch.squeeze(activation['layer']) #torch.flatten(, start_dim=1, end_dim=2)
    return out.cpu().numpy()



In [5]:
train_frames_embs = []
val_frames_embs = []

for train_f in train_frames:             
    train_frames_embs.append(img_embedder(train_f, img_encoder))

for val_f in val_frames:
    val_frames_embs.append(img_embedder(val_f, img_encoder))

In [9]:
train_targets

[array([1]),
 array([3]),
 array([1]),
 array([3]),
 array([3]),
 array([1]),
 array([2]),
 array([1]),
 array([1]),
 array([1]),
 array([3]),
 array([3]),
 array([2]),
 array([3]),
 array([2]),
 array([1]),
 array([3]),
 array([1]),
 array([3]),
 array([2]),
 array([1]),
 array([1]),
 array([4]),
 array([1]),
 array([4]),
 array([2]),
 array([4]),
 array([3]),
 array([1]),
 array([4]),
 array([1]),
 array([4]),
 array([3]),
 array([3]),
 array([4]),
 array([4]),
 array([4]),
 array([3]),
 array([4]),
 array([2]),
 array([1]),
 array([1]),
 array([3]),
 array([3]),
 array([2]),
 array([3]),
 array([1]),
 array([4]),
 array([3]),
 array([4]),
 array([2]),
 array([1]),
 array([4]),
 array([3]),
 array([1]),
 array([3]),
 array([2]),
 array([2]),
 array([4]),
 array([2]),
 array([4]),
 array([3]),
 array([2]),
 array([1]),
 array([1]),
 array([1]),
 array([3]),
 array([4]),
 array([2]),
 array([1]),
 array([1]),
 array([4]),
 array([4]),
 array([1]),
 array([1]),
 array([3]),
 array([2]),

In [73]:
# Step 3: Create an SVM classifier
# You can choose different types of SVM (linear, rbf, polynomial, etc.) by changing the kernel parameter.
# C is the regularization parameter; you can adjust it for your specific problem.
svm_classifier = SVC(kernel='linear', C=1.0)

# Step 4: Train the SVM classifier on the training data
svm_classifier.fit(train_frames_embs, train_targets)

  y = column_or_1d(y, warn=True)


In [74]:
# Step 5: Make predictions on the test data
y_pred = svm_classifier.predict(val_frames_embs)

# Step 6: Evaluate the model
accuracy = accuracy_score(val_targets, y_pred)
print(f"Accuracy: {accuracy * 100:.2f}%")

Accuracy: 98.77%


# Experiment 2 (SVM on Whole Sequence Action)

In [19]:
def read_trials(path):
    frames = []
    infos = []

    for trial_fp in os.listdir(path):
        if 'trial' not in trial_fp:
            continue

        trial_fp = os.path.join(path, trial_fp)
        imgs = []
        info = None
        
        for fp in os.listdir(trial_fp):
            fp = os.path.join(trial_fp, fp)
            
            if fp[-4:] == '.png':
                img = np.rollaxis(np.array(Image.open(fp), dtype=np.float32),2,0)
                imgs.append(img)
            elif 'trial_info' in fp:
                info = json.load(open(fp))
                infos.append(info)
                
        if len(imgs) > MAX_FRAMES:
            raise Exception(trial_fp + " contains more frames than the set maximum (MAX_FRAMES) !!!")
        elif len(imgs) != len(info['answers']):
            raise Exception(trial_fp + " numbers of frames does not match number of actions")
            
        frames.append(np.array(imgs))

    return frames, infos

train_frames, train_infos = read_trials(TRAIN_DIR)
val_frames, val_infos = read_trials(VAL_DIR)

In [20]:
train_frames[0].shape

(2, 3, 224, 224)

In [21]:
train_raw_targets = [x['answers'] for x in train_infos]
val_raw_targets = [x['answers'] for x in val_infos]

In [22]:
action_map = {'true': 0, 'false': 1}

def map_actions(amap, raw_actions):
    count = {'true': 0, 'false': 0}
    target_actions = []

    for actions in raw_actions:
        encoded = []
        count[actions[-1]] += 1
        encoded.append(amap[actions[1]])
        target_actions.append(np.array(encoded))
    
    return target_actions, count

train_targets, train_targets_count = map_actions(action_map, train_raw_targets)
val_targets, val_targets_count = map_actions(action_map, val_raw_targets)

print(train_targets_count)
print(val_targets_count)

{'true': 2022, 'false': 1978}
{'true': 297, 'false': 303}


In [40]:
# img_encoder = torchvision.models.resnet50(weights=torchvision.models.ResNet50_Weights.IMAGENET1K_V2.transforms).to(device)
# torch.save(img_encoder, 'tutorials/offline_models/resnet/resnet' )

In [41]:
# 

img_encoder = torch.load('tutorials/offline_models/resnet/resnet').to(device)
img_encoder.eval()

def img_embedder(frames, encoder):
    activation = {}
    def get_activation(name):
        def hook(model, input, output):
            activation[name] = output.detach()
        return hook
    
    encoder.avgpool.register_forward_hook(get_activation('layer'))

    with torch.no_grad():
        out = encoder(torch.tensor(frames).to(device))
        out = torch.squeeze(activation['layer']) #torch.flatten(, start_dim=1, end_dim=2)
        out = F.normalize(out, dim=1).flatten()
    return out.cpu().numpy()

In [48]:
# np.save('datasets/train_frames', np.array(train_frames))
# np.save('datasets/val_frames', np.array(val_frames))

# np.save('datasets/train_targets', np.array(train_targets))
# np.save('datasets/val_targets', np.array(val_targets))

train_frames = np.load('datasets/train_frames.npy') 
val_frames = np.load('datasets/val_frames.npy') 

In [49]:
train_frames_embs = []
val_frames_embs = []

for train_f in train_frames:             
    train_frames_embs.append(img_embedder(train_f, img_encoder))

for val_f in val_frames:
    val_frames_embs.append(img_embedder(val_f, img_encoder))

In [50]:
from sklearn.preprocessing import StandardScaler
sc = StandardScaler()
train_frames_embs = sc.fit_transform(train_frames_embs)
val_frames_embs = sc.transform(val_frames_embs)

In [51]:
# Step 3: Create an SVM classifier
# You can choose different types of SVM (linear, rbf, polynomial, etc.) by changing the kernel parameter.
# C is the regularization parameter; you can adjust it for your specific problem.
svm_classifier = SVC(kernel='rbf', C=1.5, cache_size=8000, gamma='scale')

# Step 4: Train the SVM classifier on the training data
svm_classifier.fit(train_frames_embs, train_targets)

  y = column_or_1d(y, warn=True)


In [52]:
# Step 5: Make predictions on the test data
y_pred = svm_classifier.predict(val_frames_embs)

# Step 6: Evaluate the model
accuracy = accuracy_score(val_targets, y_pred)
print(f"Accuracy: {accuracy * 100:.2f}%")

Accuracy: 80.00%


In [53]:
np.save('datasets/train_embs', np.array(train_frames_embs))
np.save('datasets/val_embs', np.array(val_frames_embs))

np.save('datasets/train_targets', np.array(train_targets))
np.save('datasets/val_targets', np.array(val_targets))

# Dataset Quality Control

In [2]:
def read_trials(path):
    frames = []
    infos = []

    for trial_fp in os.listdir(path):
        if 'trial' not in trial_fp:
            continue

        trial_fp = os.path.join(path, trial_fp)
        imgs = []
        info = None
        
        for fp in os.listdir(trial_fp):
            fp = os.path.join(trial_fp, fp)
            
            if fp[-4:] == '.png':
                img = np.rollaxis(np.array(Image.open(fp), dtype=np.float32),2,0)
                imgs.append(img)
            elif 'trial_info' in fp:
                info = json.load(open(fp))
                infos.append(info)
                
        if len(imgs) > MAX_FRAMES:
            raise Exception(trial_fp + " contains more frames than the set maximum (MAX_FRAMES) !!!")
        elif len(imgs) != len(info['answers']):
            raise Exception(trial_fp + " numbers of frames does not match number of actions")
            
        frames.append(np.array(imgs))

    return frames, infos

train_frames, train_infos = read_trials(TRAIN_DIR)
val_frames, val_infos = read_trials(VAL_DIR)

In [3]:
train_ins = [x['instruction'] for x in train_infos]
train_raw_targets = [x['answers'] for x in train_infos]

val_ins = [x['instruction'] for x in val_infos]
val_raw_targets = [x['answers'] for x in val_infos]

In [4]:
action_map = {'true': 0, 'false': 1, 'null': 2}

def map_actions(amap, raw_actions):
    count = {'true': 0, 'false': 0, 'null': 0}
    target_actions = []

    for actions in raw_actions:
        encoded = []
        for action in actions:
            count[action] += 1
            encoded.append(amap[action])
        target_actions.append(encoded)
    
    return target_actions, count

train_targets, train_targets_count = map_actions(action_map, train_raw_targets)
val_targets, val_targets_count = map_actions(action_map, val_raw_targets)

In [8]:
lm_encoder = AutoModel.from_pretrained('sentence-transformers/all-mpnet-base-v2').to(device).eval()
tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-mpnet-base-v2')

def lm_embedder(instruction, tokenizer, encoder):

    instruction = tokenizer(instruction, padding=True, truncation=True, return_tensors='pt').to(device)

    #Mean Pooling - Take attention mask into account for correct averaging
    def mean_pooling(model_output, attention_mask):
        token_embeddings = model_output[0] # First element of model_output contains all token embeddings
        input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
        return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)

    # Compute token embeddings
    with torch.no_grad():
        lm_output = encoder(**instruction)

    # Perform pooling
    sentence_embeddings = mean_pooling(lm_output, instruction['attention_mask'])
    
    # Normalize embeddings
    sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1)
        
    return sentence_embeddings.repeat(2,1).cpu().numpy()



In [9]:
img_encoder = torchvision.models.resnet50(weights=torchvision.models.ResNet50_Weights.IMAGENET1K_V2.transforms).to(device).eval()

def img_embedder(frames, encoder):
    activation = {}
    def get_activation(name):
        def hook(model, input, output):
            activation[name] = output.detach()
        return hook
    
    encoder.avgpool.register_forward_hook(get_activation('layer'))

    with torch.no_grad():
        out = encoder(torch.tensor(frames).to(device))
        out = torch.squeeze(activation['layer']) #torch.flatten(, start_dim=1, end_dim=2)
        out = F.normalize(out, dim=1)
    return out.cpu().numpy()



In [10]:
train_frames_embs = []
train_ins_embs = []
val_frames_embs = []
val_ins_embs = []

for train_f, train_i in zip(train_frames, train_ins):             
    train_frames_embs.append(img_embedder(train_f, img_encoder))
    train_ins_embs.append(lm_embedder(train_i, tokenizer, lm_encoder))

for val_f, val_i in zip(val_frames, train_ins):
    val_frames_embs.append(img_embedder(val_f, img_encoder))
    val_ins_embs.append(lm_embedder(val_i, tokenizer, lm_encoder))

In [16]:
np.save('datasets/embeddings/train_frames_embs', np.array(train_frames_embs))
np.save('datasets/embeddings/train_ins_embs', np.array(train_ins_embs))
np.save('datasets/embeddings/val_frames_embs', np.array(val_frames_embs))
np.save('datasets/embeddings/val_ins_embs', np.array(val_ins_embs))

np.save('datasets/embeddings/train_targets', np.array(train_targets))
np.save('datasets/embeddings/val_targets', np.array(val_targets))