In [1]:
import torch
import torch.nn as nn
import torchvision.models as models

def get_pretrained_regression_model(output_size):
    # Load the pretrained ResNet18 model
    pretrained_model = models.resnet18(weights='DEFAULT')
    
    # Modify the last fully connected layer for regression with custom output size
    in_features = pretrained_model.fc.in_features
    pretrained_model.fc = nn.Linear(in_features, output_size)
    

    return pretrained_model

model = get_pretrained_regression_model(100)

In [2]:
pretrained_model = models.resnet18(weights='DEFAULT')

In [8]:
pretrained_model.avgpool.output_size

(1, 1)

In [3]:
from torch.utils.data import Dataset, DataLoader
from sklearn.decomposition import PCA 
from torchvision import transforms
from PIL import Image
import numpy as np

# Define an imaginary dataset class
class ImaginaryDataset(Dataset):
    def __init__(self, num_samples, transform=None, PCA = None):
        self.num_samples = num_samples
        self.transform = transform
        self.PCA = PCA
        self.data, self.output = self.generate_data()

    def generate_data(self):
        data = []
        output_concat = []
        for _ in range(self.num_samples):
            # Generate random image data (3 channels, 128x128 pixels)
            image = np.random.randint(0, 256, size=(3, 128, 128), dtype=np.uint8)
            image = Image.fromarray(np.transpose(image, (1, 2, 0)))

            # Generate random output array of length 1000
            output = np.random.rand(1000)

            data.append((image, output))
            output_concat.append(output)
        if self.PCA: self.PCA.fit(output_concat)
        return data, output_concat
    
    def give_output(self):
        return self.output
    
    def internal_PCA(self):
        if self.PCA:
            return self.PCA
        else:
            print(f'PCA is {self.PCA}')

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        image, output = self.data[idx]

        if self.transform:
            image = self.transform(image)
        if self.PCA:
            output = self.PCA.transform(output.reshape(1, -1))
        return image, torch.FloatTensor(output[0])

# Define data transformations (you can customize these as needed)
data_transform = transforms.Compose([
    transforms.ToTensor(),
])


# Create an instance of the ImaginaryDataset
num_samples = 1000  # You can adjust this based on your needs
imaginary_dataset = ImaginaryDataset(num_samples, transform= data_transform, PCA = PCA(n_components= 100))


# Create a DataLoader for batch processing
batch_size = 32
data_loader = DataLoader(imaginary_dataset, batch_size=batch_size, shuffle=True)

In [69]:
class ImaginaryDataset(Dataset):
    def __init__(self, images_list, outputs_list, transform=None, PCA=None):
        self.num_samples = len(images_list)
        self.transform = transform
        self.PCA = PCA
        self.data, self.output = self.load_data(images_list, outputs_list)

    def load_data(self, images_list, outputs_list):
        data = []
        output_concat = []

        for i in range(self.num_samples):
            # Load image from the given list
            image = Image.fromarray(images_list[i])

            # Load output array from the given list
            output = outputs_list[i]

            data.append((image, output))
            output_concat.append(output)

        if self.PCA:
            self.PCA.fit(output_concat)

        return data, output_concat

    def give_output(self):
        return self.output

    def internal_PCA(self):
        if self.PCA:
            return self.PCA
        else:
            print(f'PCA is {self.PCA}')

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        image, output = self.data[idx]

        if self.transform:
            image = self.transform(image)
        if self.PCA:
            output = self.PCA.transform(output.reshape(1, -1))

        return image, torch.FloatTensor(output[0])

In [68]:
Frozen_PCA = imaginary_dataset.internal_PCA()

PCA is None


In [65]:
len(Frozen_PCA.inverse_transform(imaginary_dataset[0][1]))

1000

In [72]:
from ..utils import yay

ImportError: attempted relative import with no known parent package

In [7]:
import torch

class Trainer:
    def __init__(self):
        self.model = None
        self.optimizer = None
        self.loss_fn = None
        self.train_loader = None
        self.val_loader = None
        self.history = {'train_loss': [], 'val_loss': []}

    def compile(self, model, optimizer, learning_rate, loss_fn):
        self.model = model
        self.optimizer = optimizer(self.model.parameters(), lr=learning_rate)
        self.loss_fn = loss_fn

    def fit(self, num_epochs, train_loader, val_loader=None):
        self.train_loader = train_loader
        self.val_loader = val_loader

        for epoch in range(num_epochs):
            self.model.train()
            total_loss = 0.0
            for inputs, targets in self.train_loader:
                self.optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = self.loss_fn(outputs, targets)
                loss.backward()
                self.optimizer.step()
                total_loss += loss.item()

            avg_loss = total_loss / len(self.train_loader)
            self.history['train_loss'].append(avg_loss)
            print(f"Epoch [{epoch + 1}/{num_epochs}], Training Loss: {avg_loss}")

            if self.val_loader is not None:
                val_loss = self.evaluate(self.val_loader, "Validation")
                self.history['val_loss'].append(val_loss)
                print(f"Validation Loss: {val_loss}")

    def evaluate(self, data_loader, mode="Test"):
        self.model.eval()
        total_loss = 0.0
        with torch.no_grad():
            for inputs, targets in data_loader:
                outputs = self.model(inputs)
                loss = self.loss_fn(outputs, targets)
                total_loss += loss.item()

        avg_loss = total_loss / len(data_loader)
        print(f"{mode} Loss: {avg_loss}")
        return avg_loss

    def save(self, filepath):
        torch.save({
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'history': self.history
        }, filepath)
        print(f"Model and training history saved to {filepath}")

    def load(self, filepath):
        checkpoint = torch.load(filepath)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.history = checkpoint['history']
        print(f"Model and training history loaded from {filepath}")


In [8]:
trainer = Trainer()
trainer.compile(model = model, optimizer= torch.optim.Adam, learning_rate= 0.0001, loss_fn= torch.nn.MSELoss())
trainer.fit(train_loader= data_loader, num_epochs= 2)

Epoch [1/2], Training Loss: 0.12307136203162372
Epoch [2/2], Training Loss: 0.10246795928105712


In [9]:
trainer.history

{'train_loss': [0.12307136203162372, 0.10246795928105712], 'val_loss': []}

In [10]:
trainer.save('woah')

Model and training history saved to woah


In [11]:
trainer.load(r'C:\Users\rvacher\Downloads\aml_project_2023\src\notebooks\woah')

Model and training history loaded from C:\Users\rvacher\Downloads\aml_project_2023\src\notebooks\woah


In [1]:
from torch.utils.data import Dataset, DataLoader
from sklearn.decomposition import PCA 
from torchvision import transforms
import numpy as np
from PIL import Image
import torch
import os

def load_images_from_folder(folder_path, start=None, end=None):
    images = []
    for filename in os.listdir(folder_path)[start:end]:
        img_path = os.path.join(folder_path, filename)
        img = Image.open(img_path)
        img_array = np.array(img)
        # .transpose((2, 0, 1))
        images.append(img_array)
    return images

def load_subject_data(subject, index_start=None, index_end=None):
    path = '../../data/algonauts/subj0' + str(subject)
    data_lh = np.load(path + '/training_split/training_fmri/lh_training_fmri.npy')[index_start : index_end]
    data_rh = np.load(path + '/training_split/training_fmri/rh_training_fmri.npy')[index_start : index_end]
    folder_path = path+"/training_split/training_images/"
    image_data = load_images_from_folder(folder_path, index_start, index_end)
    return data_lh, data_rh, image_data

class CustomDataset(Dataset):
    def __init__(self, images_list, outputs_list, transform=None, PCA=None):
        self.num_samples = len(images_list)
        self.transform = transform
        self.PCA = PCA
        self.data, self.output = self.load_data(images_list, outputs_list)

    def load_data(self, images_list, outputs_list):
        data = []
        output_concat = []

        for i in range(self.num_samples):
            # Load image from the given list
            image = Image.fromarray(images_list[i])

            # Load output array from the given list
            output = outputs_list[i]

            data.append((image, output))
            output_concat.append(output)

        if self.PCA:
            self.PCA.fit(output_concat)

        return data, output_concat

    def give_output(self):
        return self.output

    def internal_PCA(self):
        if self.PCA:
            return self.PCA
        else:
            print(f'PCA is {self.PCA}')

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        image, output = self.data[idx]

        if self.transform:
            image = self.transform(image)
        if self.PCA:
            output = self.PCA.transform(output.reshape(1, -1))

        return image, torch.FloatTensor(output[0])

In [3]:
import torch
import torch.nn as nn
import torchvision.models as models

def get_pretrained_regression_model(output_size):
    # Load the pretrained ResNet18 model
    pretrained_model = models.resnet18(weights='DEFAULT')
    
    # Modify the last fully connected layer for regression with custom output size
    in_features = pretrained_model.fc.in_features
    pretrained_model.fc = nn.Linear(in_features, output_size)
    
    return pretrained_model

class ResNet2HeadModel(nn.Module):
    def __init__(self, output_size):
        super(ResNet2HeadModel, self).__init__()
        
        # Load the pretrained ResNet18 model
        self.pretrained_model = models.resnet18(weights='DEFAULT')
        
        # Remove the last fully connected layer
        self.pretrained_model.fc = nn.Identity()
        
        # Add two new linear layers for regression with custom output size
        self.fc1 = nn.Linear(self.pretrained_model.fc.in_features, output_size)
        self.fc2 = nn.Linear(self.pretrained_model.fc.in_features, output_size)

    def forward(self, x):
        # Forward pass through the pretrained ResNet18 model
        x = self.pretrained_model(x)
        
        # Forward pass through the first linear layer
        output1 = self.fc1(x)
        
        # Forward pass through the second linear layer
        output2 = self.fc2(x)
        
        return output1, output2

class Trainer:
    def __init__(self):
        self.model = None
        self.optimizer = None
        self.loss_fn = None
        self.train_loader = None
        self.val_loader = None
        self.history = {'train_loss': [], 'val_loss': []}

    def compile(self, model, optimizer, learning_rate, loss_fn):
        self.model = model
        self.optimizer = optimizer(self.model.parameters(), lr=learning_rate)
        self.loss_fn = loss_fn

    def fit(self, num_epochs, train_loader, val_loader=None):
        self.train_loader = train_loader
        self.val_loader = val_loader

        for epoch in range(num_epochs):
            self.model.train()
            total_loss = 0.0
            for inputs, targets in self.train_loader:
                self.optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = self.loss_fn(outputs, targets)
                loss.backward()
                self.optimizer.step()
                total_loss += loss.item()

            avg_loss = total_loss / len(self.train_loader)
            self.history['train_loss'].append(avg_loss)
            print(f"Epoch [{epoch + 1}/{num_epochs}], Training Loss: {avg_loss}")

            if self.val_loader is not None:
                val_loss = self.evaluate(self.val_loader, "Validation")
                self.history['val_loss'].append(val_loss)

    def evaluate(self, data_loader, mode="Test"):
        self.model.eval()
        total_loss = 0.0
        with torch.no_grad():
            for inputs, targets in data_loader:
                outputs = self.model(inputs)
                loss = self.loss_fn(outputs, targets)
                total_loss += loss.item()

        avg_loss = total_loss / len(data_loader)
        print(f"{mode} Loss: {avg_loss}")
        return avg_loss

    def save(self, filepath):
        torch.save({
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'history': self.history
        }, filepath)
        print(f"Model and training history saved to {filepath}")

    def load(self, filepath):
        checkpoint = torch.load(filepath)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.history = checkpoint['history']
        print(f"Model and training history loaded from {filepath}")


In [1]:
import torch

In [55]:
x = (torch.rand(1, 3, 224, 224), [1])
y = torch.rand(3, 3)


In [28]:
torch.stack((x, y))

tensor([[[0.0507, 0.3095, 0.8162],
         [0.7257, 0.2786, 0.1846],
         [0.6987, 0.4213, 0.8925]],

        [[0.4104, 0.5804, 0.5016],
         [0.5148, 0.8272, 0.0081],
         [0.1956, 0.8005, 0.2048]]])

In [29]:
x, y

(tensor([[0.0507, 0.3095, 0.8162],
         [0.7257, 0.2786, 0.1846],
         [0.6987, 0.4213, 0.8925]]),
 tensor([[0.4104, 0.5804, 0.5016],
         [0.5148, 0.8272, 0.0081],
         [0.1956, 0.8005, 0.2048]]))

In [81]:
import torch
import torch.nn as nn
import torchvision.models as models
from tqdm import tqdm
import torch.nn.functional as F

class ResNet1HeadID(nn.Module):
    def __init__(self, output_size):
        super(ResNet1HeadID, self).__init__()
        
        # Load the pretrained ResNet18 model
        self.pretrained_model = models.resnet18(weights='DEFAULT')
        in_features = 512 # in_features for first layer after CNN

        # Remove the last fully connected layer
        self.pretrained_model = torch.nn.Sequential(*(list(self.pretrained_model.children())[:-1]))
        
        # Add shared layer
        self.shared = nn.Linear(in_features, 256)

        # Add subject-specific layers
        self.sub1 = nn.Linear(in_features, 256)
        self.sub2 = nn.Linear(in_features, 256)
        self.sub3 = nn.Linear(in_features, 256)
        self.sub4 = nn.Linear(in_features, 256)
        self.sub5 = nn.Linear(in_features, 256)
        self.sub6 = nn.Linear(in_features, 256)
        self.sub7 = nn.Linear(in_features, 256)
        self.sub8 = nn.Linear(in_features, 256)

        # Combine shared and subject-specific layers
        self.head = nn.Linear(256, output_size)

    def forward(self, x):
        # Extract image and subject ID from the input
        if isinstance(x, tuple):
            images, ids = x
        else:
            images = x

        # Forward pass through the pretrained ResNet18 model
        features = self.pretrained_model(images)

        # Flatten the features
        flat_features = torch.flatten(features, 1)

        # Forward pass through the shared layer
        shared = self.shared(flat_features)

        # Forward pass through the subject-specific layers
        subjects = []
        # Loop through the batch, one sample at a time
        for i in range(len(ids)):
            if ids[i] == 1:
                subject = self.sub1(flat_features[i])
            elif ids[i] == 2:
                subject = self.sub2(flat_features[i])
            elif ids[i] == 3:
                subject = self.sub3(flat_features[i])
            elif ids[i] == 4:
                subject = self.sub4(flat_features[i])
            elif ids[i] == 5:
                subject = self.sub5(flat_features[i])
            elif ids[i] == 6:
                subject = self.sub6(flat_features[i])
            elif ids[i] == 7:
                subject = self.sub7(flat_features[i])
            elif ids[i] == 8:
                subject = self.sub8(flat_features[i])
            subjects.append(subject)
        
        # Add the shared and subject-specific layers
        combined = shared + torch.stack(subjects)

        # Forward pass through the first linear layer
        output = self.head(combined)

        return output

In [82]:
mod = ResNet1HeadID(100)

In [80]:
mod(x)

tensor([[-0.0601,  0.1089, -0.1437,  0.1525, -0.0716, -0.0025, -0.4145, -0.2707,
          0.4093, -0.3949,  0.5593, -0.3620, -0.1316,  0.4586,  0.2874,  0.2037,
         -0.4205,  0.2724, -0.6329, -0.5341, -0.1427,  0.4858,  0.2596, -0.0081,
          0.1731,  0.2704,  0.7015, -0.2749,  0.7827, -0.6425,  0.4789,  0.3481,
         -0.3662,  0.0111, -0.3362, -0.0721, -0.1263,  0.2133,  0.2006,  0.0561,
          0.1850, -0.0236,  0.5004,  0.3014,  0.1970,  0.2078, -0.1617, -0.0366,
         -0.9960,  0.5667,  0.4975,  0.1157, -0.0297,  0.4084,  0.5416,  0.1940,
         -0.4456, -0.1455,  0.3758, -0.4747,  0.1847,  0.2982,  0.3728,  0.0718,
          0.6179,  0.4347,  0.4040,  0.0990,  0.0104,  0.2588, -0.3225,  0.2256,
         -0.1816, -0.4968,  0.1319,  0.1268, -0.0192, -0.5750, -0.0277, -0.0844,
          0.0207,  0.0605, -0.1199, -0.0619, -0.7332, -0.4250,  0.1274,  0.2061,
         -0.5868,  0.5041,  0.3611, -0.1960,  0.1671, -0.2073, -0.1841,  0.4834,
          0.3887,  0.7876,  

In [83]:
mod(x)

tensor([[ 0.0044, -0.1590,  0.1581,  0.2252,  0.0524, -0.2350,  0.3702, -0.2658,
         -0.1770,  0.1410, -0.2528,  0.4067,  0.2504,  0.4881,  0.5289, -0.0141,
         -0.5821,  0.8918,  0.1390,  0.0808, -0.0523,  0.1360, -0.1956, -0.0195,
         -0.3305,  0.4626, -0.1085, -0.6898,  0.0526,  0.4327,  0.4977,  0.2247,
         -0.6784,  0.4940,  0.6999, -0.0847,  0.0211, -0.2923,  0.0829,  0.2277,
         -0.9396, -0.0971, -0.1146, -0.1839,  0.2156,  1.0705,  0.0989,  0.1656,
          0.5468,  0.0450, -0.7394, -0.4590, -0.1416, -0.7711,  0.0613,  0.0961,
         -0.4866,  0.1559,  0.3784,  0.0847, -0.1762, -0.3044, -0.5761,  0.2785,
         -0.2440, -0.2412,  0.2981, -0.2943, -0.1076,  0.2459, -0.3213, -0.0139,
          0.0727, -1.0446, -0.1758, -0.9760, -0.3012,  0.0366, -0.1409, -1.1384,
          0.0810,  0.3016, -0.2152,  0.2002, -0.0674,  0.2108, -0.2159,  0.5773,
          0.8046, -0.6388,  0.5346,  0.2537,  0.8645,  0.0937, -0.5181,  0.0146,
          0.2152, -0.5644,  