In [3]:
#imports
import torch
import torch.nn as nn
import torchvision.models as models
import pandas as pd
import torchvision
from torchvision.transforms import Compose, Resize, CenterCrop, ToTensor, Normalize, RandomHorizontalFlip, RandomRotation, ColorJitter, RandomResizedCrop, RandomApply, RandomAffine
from os.path import join
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from tqdm import tqdm
from sklearn.linear_model import LogisticRegression
import numpy as np


### define global variables

In [4]:
images_path = '/kaggle/input/trifeature/trifeature-dataset/color_texture_shape_stimuli/color_texture_shape_stimuli'

In [5]:
version_network = 'version_0'
version_decoder = 'version_1'

In [6]:
layer_names = ['layer4','avgpool', 'fc']

## Helper Functions

#### get path of labels mapping and log files

In [7]:
def get_labels_logs_path(feature_name, version):
    if(feature_name == 'color'): 
        return '/kaggle/input/trifeature/trifeature-dataset/dataset_splits-20240426T140423Z-001/dataset_splits/color_splits/splits.pkl', f'/kaggle/input/trifeature/trifeature-dataset/dataset_splits-20240426T140423Z-001/dataset_splits/color_splits/logs/{version}_split.txt'
    elif(feature_name == 'shape'): 
        return '/kaggle/input/trifeature/trifeature-dataset/dataset_splits-20240426T140423Z-001/dataset_splits/shape_splits/splits.pkl', f'/kaggle/input/trifeature/trifeature-dataset/dataset_splits-20240426T140423Z-001/dataset_splits/shape_splits/logs/{version}_split.txt'
    elif(feature_name == 'texture'):
        return '/kaggle/input/trifeature/trifeature-dataset/dataset_splits-20240426T140423Z-001/dataset_splits/texture_splits/splits.pkl', f'/kaggle/input/trifeature/trifeature-dataset/dataset_splits-20240426T140423Z-001/dataset_splits/texture_splits/logs/{version}_split.txt'


#### Extract truth labels as list for decoder y_test, y_train

In [8]:
def get_Truthlabels_list(feature_name, version, train_val):
    decoder_labels_path, log_path = get_labels_logs_path(feature_name, version)
    loaded_object = get_input_labels(decoder_labels_path)
    loaded_object = loaded_object[version][train_val]
    truth_labels = [item[feature_name] for item in loaded_object]
    class_dict = get_class_labels_dict(log_path, feature_name)
    labels_numeric = [class_dict[label] for label in truth_labels]
    return labels_numeric

#### extract labels as dictionary from pickle file

In [11]:
import pickle

def get_input_labels(path):
    # Open the .pkl file for reading in binary mode
    with open(path, 'rb') as f:
        # Load the object from the file
        loaded_object = pickle.load(f)
#         list of dictionaries
        return loaded_object

In [12]:
def get_classList(feature_name):
    color_class = ["red", "green", "blue", "yellow", "pink", "cyan", "purple", "ocean", "orange", "white"]
    shape_class = ["triangle", "square", "plus", "circle", "tee", "rhombus", "pentagon", "star", "fivesquare", "trapezoid"]
    texture_class = ["solid", "stripes", "grid", "hexgrid", "dots", "noise", "triangles", "zigzags", "rain", "pluses"]
    
    if(feature_name == 'color'):
        return color_class
    elif(feature_name == 'shape'):
        return shape_class
    elif(feature_name == 'texture'):
        return texture_class

In [13]:
def get_class_labels_dict(txt_file, feature_name):
    with open(txt_file, 'r') as file:
        content = file.readlines()

    # Iterate through the lines to find the shape classes
    classes = None
    for line in content:
        if feature_name in line:
            # Extract the shape classes
            classes = line.split(':')[1].strip()[1:-1].split(', ')
            classes = [cls.strip().strip("'") for cls in classes]
#             break

    # Print the list of shape classes
    print(f"List of {feature_name} hold out classes:", classes)
    
    original_class_list = get_classList(feature_name)
    # Remove shape classes from the original list
    remaining_items = [item for item in original_class_list if item not in classes]

    # Create a 0-indexed dictionary of remaining items
    indexed_dict = {item: index for index, item in enumerate(remaining_items)}

    # Print the indexed dictionary
    print("0-indexed dictionary of remaining items:", indexed_dict)
    
    return indexed_dict



In [14]:
# train_val_labels_path - pickle file
def getAllDataLoders(feature_name, b_size=64, version = 'version_0', shuffle_train=True):
    
    train_val_labels_path, log_path = get_labels_logs_path(feature_name, version)
    # get list of label mappings
    
    train_val_labels = get_input_labels(train_val_labels_path)[version]
    train_labels = train_val_labels['train']
    val_labels = train_val_labels['val']
    
    # create dataset
    train_dataset = getDataset(train_labels, log_path, feature_name  )
    val_dataset = getDataset(val_labels, log_path, feature_name )

    train_loader = DataLoader(train_dataset, batch_size=b_size, shuffle = shuffle_train)
    val_loader = DataLoader(val_dataset, batch_size=b_size, shuffle=False)

    
    return train_loader, val_loader


In [15]:
def getDataset(labels, txt_file, feature_name):
    
    classes_dict = get_class_labels_dict(txt_file, feature_name)
    dataset = TrifeatureDataset(images_path, labels, classes_dict, feature_name)
    return dataset

### Define Model

In [16]:
# device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [18]:
def getModel(num_classes=7):
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("Device:", device)

    # Set random seed for reproducibility
    torch.manual_seed(42)

    # Load ResNet50 model
    model = models.resnet18(pretrained=False)
    
    # Replace the fully connected layer with a new one for the desired number of classes
    num_ftrs = model.fc.in_features
    model.fc = nn.Linear(num_ftrs, num_classes)

    model = model.to(device)

    return model

    # model.eval()
    # Print model summary
    # print(model)


In [19]:
model = getModel()
print(model)

Device: cuda




ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

### Make Dataset
* requires classes_dict of 7 labels of target feature

In [20]:
class TrifeatureDataset(torch.utils.data.Dataset):
#  details list - maps image to its actual labels
#  classes_dict - dictionary of 7 classes of the target feature
# ex - 'red':0, 'blue':1  this helps to map colour to a index for final output layer
    def __init__(self, img_path, details_list, classes_dict, feature):
        super(TrifeatureDataset, self).__init__()

        self.img_path = img_path
        self.details_list = details_list
        self.feature = feature

        self.transform = self._transform(224)
        
        self.classes = classes_dict


    @staticmethod    
    def _convert_image_to_rgb(image):
        return image.convert("RGB")

    def _transform(self, n_px):
        mean = [0.50190921, 0.50194219, 0.49818846]
        std =  [0.1426835,  0.1282568,  0.13595397]
        return Compose([
            Resize(n_px),
            self._convert_image_to_rgb,
            ToTensor(),
            Normalize(mean, std)
        ])

    def read_img(self, file_name):
        im_path = join(self.img_path,file_name)   
        img = Image.open(im_path)
        img = self.transform(img)
        return img

    def __getitem__(self, index):
        file_name = self.details_list[index]['fname']
        img = self.read_img(file_name)
        target_label = self.details_list[index][self.feature]
        return img, self.classes[target_label]


    def __len__(self):
        return len(self.details_list)

#### make dataset

In [21]:
def getDataLoader(dataset, batch_size=64, shuffle = True):
    data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
    return data_loader

### Train Model

In [23]:
best_val_path = 'best_val.pth'
best_val_loss = float('inf')
best_train_loss = float('inf')

In [24]:
def train_model(model, train_loader, val_loader, optimizer, criterion, best_val_loss=float('inf'), num_epochs=30):
    print(best_val_loss)
    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item() * images.size(0)

        avg_train_loss = total_train_loss / len(train_loader.dataset)

        # Validation phase
        model.eval()
        total_val_loss = 0
        correct_predictions = 0
        total_predictions = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                total_val_loss += loss.item() * images.size(0)
                
                _, predicted = torch.max(outputs.data, 1)
                total_predictions += labels.size(0)
                correct_predictions += (predicted == labels).sum().item()

        avg_val_loss = total_val_loss / len(val_loader.dataset)
        val_accuracy = correct_predictions / total_predictions
        print("val accuracy:", val_accuracy)

        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}')
        
        
        # Save the model if validation loss has decreased
        if avg_val_loss < best_val_loss:
            print('Validation loss decreased ({:.4f} --> {:.4f}).  Saving model ...'.format(
                best_val_loss,
                avg_val_loss))
            torch.save(model.state_dict(), best_val_path)
            best_val_loss = avg_val_loss
        print(f"Epoch number{epoch+1}: Train Loss:{avg_train_loss}, Val Loss:{avg_val_loss}")
    return best_val_loss
        


### Get Embeddings from intermediate layers

In [26]:
# Hook function to store activations
def hook_fn(module, input, output, name, activations):
    if name not in activations:
        activations[name] = []
    activations[name].append(output)

In [27]:

def getEmbeddings(model, input_dataloader , layer_names = ['avgpool', 'fc']):
    # Dictionary to store the activations of selected layers
    activations = {}
    hook_handles = {}

    # Register hooks on the selected layers
    for name, module in model.named_modules():
        if name in layer_names:
            handle = module.register_forward_hook(lambda m, i, o, name=name: hook_fn(m, i, o, name, activations))
            hook_handles[name] = handle
    # Set the model to evaluation mode
    model.eval()

    # Forward pass
    with torch.no_grad():
        for images, labels in input_dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            
    for key, val in hook_handles.items():
        hook_handles[key].remove()

    # Extract and print activations
    for name, activation in activations.items():
        print(f'Activation of layer {name}: Shape={len(activation)}')
        
    return activations

In [28]:
model = getModel()

Device: cuda


In [29]:
print(model.avgpool)
print(model.fc)
print(model.named_modules)

AdaptiveAvgPool2d(output_size=(1, 1))
Linear(in_features=512, out_features=7, bias=True)
<bound method Module.named_modules of ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
 

In [30]:
print(model)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [31]:
print(model.layer4[1].conv2)

Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)


## Defining a neural network for Logistic Regression

## Train Decoder

In [40]:
class LogisticRegression(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(LogisticRegression, self).__init__()
        self.linear = nn.Linear(input_dim, output_dim)

    def forward(self, x):
        return self.linear(x)

In [42]:
class DecoderDataset(Dataset):
    def __init__(self, inputs, labels):
        self.inputs = inputs
        self.labels = labels
        
    def __len__(self):
        return len(self.inputs)
    
    def __getitem__(self, idx):
        input_data = torch.tensor(self.inputs[idx], dtype=torch.float)
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        return input_data, label


In [43]:
def decoder_accuracy(model, dataloader=None, embeddings=None, labels=None):
    
    if(not dataloader):
        np_list = [np.array(tensor.cpu()) for tensor in embeddings]
        embeddings_np = np.array(np_list)
        embeddings_np = embeddings_np.reshape(embeddings_np.shape[0], -1)
        input_dim = embeddings_np.shape[1]
        output_dim = 7
        dataset = DecoderDataset(embeddings_np, labels)

        dataloader = DataLoader(dataset, batch_size=64, shuffle=False)
    
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs.float())
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total

In [45]:
best_decoder_path = 'best_decoder.pth'

In [55]:
def train_custom_decoder(train_embeddings, train_labels, lr=0.001, num_epochs=200):
    
    np_list = [np.array(tensor.cpu()) for tensor in train_embeddings]
    train_embeddings_np = np.array(np_list)
    train_embeddings_np = train_embeddings_np.reshape(train_embeddings_np.shape[0], -1)
    input_dim = train_embeddings_np.shape[1]
    output_dim = 7
    dataset = DecoderDataset(train_embeddings_np, train_labels)
    train_loader = DataLoader(dataset, batch_size=64, shuffle=True)
    
    model = LogisticRegression(input_dim, output_dim)
    model = model.to(device)
    
    best_loss = float('inf')
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr= lr)

    count=0

    for epoch in range(num_epochs):
        epoch_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs.float())
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item() * inputs.size(0)
        
        epoch_loss /= len(train_loader.dataset)  # Compute average epoch loss
        if epoch_loss < best_loss:
#             print("saving model")
            count+=1
            best_loss = epoch_loss
            torch.save(model.state_dict(), best_decoder_path)
        
        if (epoch+1) % 100 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, saved_times: {count}')
            count=0
    print(f"Best model saved with loss: {best_loss:.4f}")
    model.load_state_dict(torch.load(best_decoder_path))
    return model, decoder_accuracy(model, dataloader = train_loader)

### Decoder Test accuracy

# whole pipeline:
* train original network on:
    shape, color, texture
  also take untrained network
* for each of the above networks train and test decoder independently on the features:
    shape, color, texture

In [49]:
# learning_rate = {'avgpool': 0.001, 'fc': 0.000001}
learning_rate = {'layer4':0.01 , 'avgpool': 0.01, 'fc': 0.01}

# layer_names = ['avgpool', 'fc']

In [66]:
def whole_pipeline(target_feature, train_embeddings=None, test_embeddings=None):

    model = getModel()
    # Define the loss criterion
    criterion = nn.CrossEntropyLoss()
    # Define the optimizer
    optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=0.0001)
    if(target_feature != 'untrained'):
        print("training network model for target feature:", target_feature)
        train_val_labels_path = get_labels_logs_path(target_feature, version_network)
        train_loader, val_loader = getAllDataLoders(target_feature) 
#         if(target_feature != 'shape'):
        train_model(model, train_loader, val_loader, optimizer, criterion, best_val_loss, num_epochs=12)
        model.load_state_dict(torch.load('best_val.pth'))
        
        
    # find embeddings for version 1
    features = ['shape', 'color', 'texture']
    accuracies = {}
    
    for feature_name in features:
        print("decoding feature:", feature_name)
        decoder_train_loader, decoder_val_loader = getAllDataLoders( feature_name, 1, version = version_decoder, shuffle_train=False)       
        train_embeddings = getEmbeddings(model, decoder_train_loader , layer_names = layer_names)
        test_embeddings = getEmbeddings(model, decoder_val_loader , layer_names = layer_names)


        train_labels = get_Truthlabels_list(feature_name, version_decoder, 'train')        
        
        
        test_labels = get_Truthlabels_list(feature_name, version_decoder, 'val')

        layer_accuracies = {}
        for layer in layer_names:
            print(f"learning rate for {layer} =", learning_rate[layer])
            
            decoder, decoder_acc_train = train_custom_decoder(train_embeddings[layer] , train_labels, learning_rate[layer], num_epochs=300)
            
            print(f"decoder train Accuracy for layer {layer}: {decoder_acc_train:.2f}")
            # Predict on the test set
            decoder_acc_test = decoder_accuracy(decoder, embeddings = test_embeddings[layer], labels = test_labels)
            print(f"test Accuracy for layer {layer}: {decoder_acc_test:.2f}")
            layer_accuracies[layer] = (decoder_acc_train, decoder_acc_test)
        accuracies[feature_name] = layer_accuracies

    return accuracies, train_embeddings
    
    

In [51]:
embeddings_path = 'embeddings_file.pkl'
decoding_acc_path = 'decoding_acc.pkl'

In [64]:
def main():
    target_features = [ 'shape', 'untrained', 'color', 'texture']
    
    model_accuracies = {} 
    embeddings = {}
#     model_accuracies = {'target_feature': {'feature_name': {'layer_name': accuracy_value}}}
    for target_feature in target_features:
        # get accuracy of current model for all features
        print("network model train target feature:", target_feature)
        accuracy_dict, emb = whole_pipeline(target_feature)

        model_accuracies[target_feature] = accuracy_dict
        embeddings[target_feature] = emb
        
    with open(embeddings_path, 'wb') as f:
        pickle.dump(embeddings, f)
    
    with open(decoding_acc_path, 'wb') as f:
        pickle.dump(model_accuracies, f)
    print("decoding accuracies:", model_accuracies)
    return model_accuracies


In [67]:
model_acc = main()

network model train target feature: shape
Device: cuda
training network model for target feature: shape
List of shape hold out classes: ['trapezoid', 'pentagon', 'square']
0-indexed dictionary of remaining items: {'triangle': 0, 'plus': 1, 'circle': 2, 'tee': 3, 'rhombus': 4, 'star': 5, 'fivesquare': 6}
List of shape hold out classes: ['trapezoid', 'pentagon', 'square']
0-indexed dictionary of remaining items: {'triangle': 0, 'plus': 1, 'circle': 2, 'tee': 3, 'rhombus': 4, 'star': 5, 'fivesquare': 6}
inf
val accuracy: 0.5397759103641456
Epoch 1/12, Train Loss: 1.0577, Val Loss: 1.6940
Validation loss decreased (inf --> 1.6940).  Saving model ...
Epoch number1: Train Loss:1.0576514619953779, Val Loss:1.693976469534118
val accuracy: 0.8518207282913165
Epoch 2/12, Train Loss: 0.2587, Val Loss: 0.4292
Validation loss decreased (1.6940 --> 0.4292).  Saving model ...
Epoch number2: Train Loss:0.2587496824671159, Val Loss:0.42923013711509916
val accuracy: 0.9635854341736695
Epoch 3/12, Train 