In [20]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import numpy as np
import matplotlib.pyplot as plt


In [21]:
class FFNN(nn.Module):
    def __init__(self):
        super(FFNN, self).__init__()
        self.fc1 = nn.Linear(28*28, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 10)

    def forward(self, x):
        x = x.view(-1, 28*28)
        x = torch.sigmoid(self.fc1(x))
        x = torch.sigmoid(self.fc2(x))
        x = self.fc3(x)
        return x


In [29]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class PSO:
    def __init__(self, particle_size, dimensions, inertia_weight=0.9, cognitive_coeff=1.5, social_coeff=1.5):
        self.particle_size = particle_size
        self.dimensions = dimensions
        self.inertia_weight = inertia_weight
        self.cognitive_coeff = cognitive_coeff
        self.social_coeff = social_coeff
        self.particles = torch.randn((particle_size, dimensions), device=device)
        self.velocities = torch.zeros((particle_size, dimensions), device=device)
        self.best_particles = self.particles.clone()
        self.global_best = self.particles[0].clone()
        self.best_scores = torch.full((particle_size,), float('inf'), device=device)

    def update_particles(self, net, criterion, data_loader):
        for i, particle in enumerate(self.particles):
            net.load_state_dict(vec_to_state_dict(particle, net))
            loss, _ = evaluate(net, criterion, data_loader)
            if loss < self.best_scores[i]:
                self.best_scores[i] = loss
                self.best_particles[i] = particle.clone()
                if loss < self.best_scores.min():
                    self.global_best = particle.clone()

            cognitive = self.cognitive_coeff * torch.rand(self.dimensions, device=device) * (self.best_particles[i] - particle)
            social = self.social_coeff * torch.rand(self.dimensions, device=device) * (self.global_best - particle)
            self.velocities[i] = self.inertia_weight * self.velocities[i] + cognitive + social
            self.particles[i] += self.velocities[i]


def vec_to_state_dict(vec, model):
    state_dict = model.state_dict()
    idx = 0
    for param in state_dict:
        numel = state_dict[param].numel()
        state_dict[param] = vec[idx:idx+numel].reshape(state_dict[param].shape).to(device)
        idx += numel
    return state_dict

def evaluate_model(model, criterion, data_loader):
    model.eval()
    total_loss = 0
    correct = 0
    with torch.no_grad():
        for inputs, targets in data_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            total_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            correct += (predicted == targets).sum().item()
    model.train()
    return total_loss / len(data_loader.dataset), 100 * correct / len(data_loader.dataset)


In [23]:
transform = transforms.Compose([transforms.ToTensor()])
train_dataset = datasets.FashionMNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.FashionMNIST(root='./data', train=False, download=True, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)


In [31]:
model = FFNN().to(device)
criterion = nn.CrossEntropyLoss()

# Calculate the dimensions needed for PSO particles based on the model's parameters.
dimensions = sum(p.numel() for p in model.parameters())

pso = PSO(particle_size=20, dimensions=dimensions)

# Train and plot setup
epoch_losses = []
epoch_accuracies = []

# Train using PSO
for epoch in range(10):  # Run for 10 epochs
    train_loss, train_accuracy = [], []
    pso.update_particles(model, criterion, train_loader)
    loss, accuracy = evaluate_model(model, criterion, train_loader)  # Evaluate the current global best model
    epoch_losses.append(loss)
    epoch_accuracies.append(accuracy)
    avg_loss = sum(epoch_losses) / len(epoch_losses)
    avg_accuracy = sum(epoch_accuracies) / len(epoch_accuracies)
    print(f"Epoch {epoch+1}: Avg Train Loss: {avg_loss:.4f}, Avg Train Accuracy: {avg_accuracy:.2f}%")


# Evaluate the model
model.load_state_dict(vec_to_state_dict(pso.global_best, model))
test_loss, test_accuracy = evaluate(model, criterion, test_loader)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%")


# Plot the loss and accuracy



0
1


KeyboardInterrupt: 