# Final Project NeuroAI 
## Team Number 1: Femke, Tikva, Gabriela

In [19]:
# Importing all important packages
import numpy as np 
import pandas as pd 
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import os
import matplotlib.pyplot as plt


In [12]:
# print("PyTorch version:", torch.__version__)
# print("MPS available:", torch.backends.mps.is_available())
# print("MPS built:", torch.backends.mps.is_built())

PyTorch version: 2.5.1
MPS available: True
MPS built: True


In [18]:

for dirname, _, filenames in os.walk('data'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

data/test/dogs/dog_147.jpg
data/test/dogs/dog_219.jpg
data/test/dogs/dog_191.jpg
data/test/dogs/dog_344.jpg
data/test/dogs/dog_150.jpg
data/test/dogs/dog_227.jpg
data/test/dogs/dog_421.jpg
data/test/dogs/dog_380.jpg
data/test/dogs/dog_155.jpg
data/test/dogs/dog_141.jpg
data/test/dogs/dog_196.jpg
data/test/dogs/dog_551.jpg
data/test/dogs/dog_237.jpg
data/test/dogs/dog_236.jpg
data/test/dogs/dog_197.jpg
data/test/dogs/dog_168.jpg
data/test/dogs/dog_28.jpg
data/test/dogs/dog_354.jpg
data/test/dogs/dog_142.jpg
data/test/dogs/dog_181.jpg
data/test/dogs/dog_194.jpg
data/test/dogs/dog_369.jpg
data/test/dogs/dog_355.jpg
data/test/dogs/dog_124.jpg
data/test/dogs/dog_130.jpg
data/test/dogs/dog_534.jpg
data/test/dogs/dog_520.jpg
data/test/dogs/dog_521.jpg
data/test/dogs/dog_482.jpg
data/test/dogs/dog_59.jpg
data/test/dogs/dog_327.jpg
data/test/dogs/dog_443.jpg
data/test/dogs/dog_536.jpg
data/test/dogs/dog_244.jpg
data/test/dogs/dog_522.jpg
data/test/dogs/dog_442.jpg
data/test/dogs/dog_89.jpg
data

### Original Data augmentation kept in tact

In [31]:
# ==========================
# 1. Data Augmentation
# ==========================
image_size = (32, 32)  # was originally (128, 128)
batch_size = 32
train_dir = "data/train"
test_dir = "data/test"

# Define transforms (like ImageDataGenerator in Keras)
train_transforms = transforms.Compose([
    transforms.Resize(image_size),
    transforms.RandomRotation(20),
    transforms.RandomHorizontalFlip(),
    transforms.RandomResizedCrop(image_size, scale=(0.8, 1.0)),
    transforms.ToTensor(),
    transforms.Normalize([0.5, 0.5], [0.5, 0.5])  # rescale to [-1,1]
])

val_test_transforms = transforms.Compose([
    transforms.Resize(image_size),
    transforms.ToTensor(),
    transforms.Normalize([0.5, 0.5], [0.5, 0.5])
])

# Load datasets with ImageFolder
train_dataset = datasets.ImageFolder(root=train_dir, transform=train_transforms)
val_size = int(0.2 * len(train_dataset))
train_size = len(train_dataset) - val_size

train_dataset, val_dataset = torch.utils.data.random_split(train_dataset, [train_size, val_size])

test_dataset = datasets.ImageFolder(root=test_dir, transform=val_test_transforms)

# Data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

### Original Model Definition

In [7]:
# ==========================
# # 2. Model Definition
# # ==========================
# class SimpleNN(nn.Module):
#     def __init__(self):
#         super(SimpleNN, self).__init__()
#         self.flatten = nn.Flatten()
#         self.fc1 = nn.Linear(128*128*3, 256)
#         self.fc2 = nn.Linear(256, 128)
#         self.fc3 = nn.Linear(128, 1)
#         self.relu = nn.ReLU()
#         self.sigmoid = nn.Sigmoid()

#     def forward(self, x):
#         x = self.flatten(x)
#         x = self.relu(self.fc1(x))
#         x = self.relu(self.fc2(x))
#         x = self.sigmoid(self.fc3(x))
#         return x

# model = SimpleNN()

### Model definition of AI

In [32]:
# ==========================
# 2. Hebbian Model (Oja’s Rule)
# ==========================
class OjaLayer(nn.Module):
    def __init__(self, input_size, output_size, eta=0.001):
        super(OjaLayer, self).__init__()
        self.weights = nn.Parameter(torch.randn(output_size, input_size) * 0.01, requires_grad=False)
        self.eta = eta

    def forward(self, x):
        # x shape: [batch, input_size]
        y = x @ self.weights.t()  # [batch, output_size]
        return y

    def oja_update(self, x):
        x = x - x.mean(dim=0, keepdim=True)  # center inputs
        y = x @ self.weights.t()
        for i in range(y.size(0)):
            xi = x[i].unsqueeze(0)
            yi = y[i].unsqueeze(1)
            dw = self.eta * (yi @ (xi - yi.t() @ self.weights))
            self.weights.data += dw

class HebbianNet(nn.Module):
    def __init__(self):
        super(HebbianNet, self).__init__()
        self.flatten = nn.Flatten()
        self.hebb1 = OjaLayer(32*32*3, 128, eta=0.01)  
        self.fc = nn.Linear(128, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.flatten(x)
        x = self.hebb1(x)
        x = torch.relu(x)
        x = self.fc(x)
        x = self.sigmoid(x)
        return x
    

### Original compile setup and model training

In [None]:
# # ==========================
# # 3. Compile setup
# # ==========================
# criterion = nn.BCELoss()
# optimizer = optim.Adam(model.parameters(), lr=0.001)

# # ==========================
# # 4. Train the model
# # ==========================
# if torch.backends.mps.is_available():
#     device = torch.device("mps")
# elif torch.cuda.is_available():
#     device = torch.device("cuda")
# else:
#     device = torch.device("cpu")

# model.to(device)

# epochs = 10
# for epoch in range(epochs):
#     model.train()
#     running_loss, correct, total = 0.0, 0, 0
#     for inputs, labels in train_loader:
#         inputs, labels = inputs.to(device), labels.float().to(device)

#         optimizer.zero_grad()
#         outputs = model(inputs).squeeze()  # shape [batch_size]
#         loss = criterion(outputs, labels)
#         loss.backward()
#         optimizer.step()

#         running_loss += loss.item() * inputs.size(0)
#         preds = (outputs > 0.5).float()
#         correct += (preds == labels).sum().item()
#         total += labels.size(0)

#     train_acc = 100 * correct / total
#     train_loss = running_loss / total

#     # Validation
#     model.eval()
#     val_correct, val_total, val_loss = 0, 0, 0.0
#     with torch.no_grad():
#         for inputs, labels in val_loader:
#             inputs, labels = inputs.to(device), labels.float().to(device)
#             outputs = model(inputs).squeeze()
#             loss = criterion(outputs, labels)

#             val_loss += loss.item() * inputs.size(0)
#             preds = (outputs > 0.5).float()
#             val_correct += (preds == labels).sum().item()
#             val_total += labels.size(0)

#     val_acc = 100 * val_correct / val_total
#     val_loss = val_loss / val_total

#     print(f"Epoch [{epoch+1}/{epochs}] "
#           f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}% "
#           f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")


### Compile Setting and model training according to chat.

In [33]:
# ==========================
# 3. Training Loop with Oja’s Learning
# ==========================
if torch.backends.mps.is_available():
    device = torch.device("mps")
elif torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

model = HebbianNet().to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.fc.parameters(), lr=0.001)  # only train last layer via gradient descent

In [35]:
epochs = 10
for epoch in range(epochs):
    model.train()
    running_loss, correct, total = 0.0, 0, 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.float().to(device)

        # Oja update (unsupervised Hebbian)
        flat_inputs = inputs.view(inputs.size(0), -1)
        model.hebb1.oja_update(flat_inputs)

        # Standard supervised pass for output layer
        optimizer.zero_grad()
        outputs = model(inputs).squeeze()
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        preds = (outputs > 0.5).float()
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    train_loss = running_loss / total
    train_acc = 100 * correct / total
    print(f"Epoch [{epoch+1}/{epochs}] Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")


    if (epoch + 1) % 2 == 0:  # visualize every 2 epochs
        w = model.hebb1.weights.cpu().detach().numpy()
        w_min, w_max = w.min(), w.max()
        w = (w - w_min) / (w_max - w_min)
        num_filters = min(8, w.shape[0])
        cols = 4
        rows = int(np.ceil(num_filters / cols))
        plt.figure(figsize=(8, 8))
        for i in range(num_filters):
            plt.subplot(rows, cols, i + 1)
            # plt.imshow(w[i].reshape(32, 32, 3))
            plt.imshow(w[i].reshape(32, 32), cmap='gray', vmin=0, vmax=1)
            plt.axis('off')
        plt.suptitle(f"Epoch {epoch+1}: Hebbian Filters")
        plt.show()

RuntimeError: The size of tensor a (3) must match the size of tensor b (2) at non-singleton dimension 0