#  PSO-SGD 

## Com Modelos Sem Pré-Treinamento

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader, Subset
import copy
import torchvision.models as models
import random

random.seed(123)
torch.manual_seed(123)
torch.cuda.manual_seed(123)

# Configuração do dispositivo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")



# Função para selecionar o modelo sem pré-treinamento
def select_model(architecture):
    if architecture == 'alexnet':
        model = models.alexnet(weights=None)
        # model = models.alexnet(weights=models.AlexNet_Weights.IMAGENET1K_V1)
        model.classifier[6] = nn.Linear(model.classifier[6].in_features, 10)
    elif architecture == 'vgg11':
        model = models.vgg11(weights=None)
        # model = models.vgg11(weights=models.VGG11_Weights.IMAGENET1K_V1)
        model.classifier[6] = nn.Linear(model.classifier[6].in_features, 10)
    elif architecture == 'resnet18':
        model = models.resnet18(weights=None)
        # model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
        model.fc = nn.Linear(model.fc.in_features, 10)
    elif architecture == 'mobilenet_v2':
        model = models.mobilenet_v2(weights=None)
        # model = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.IMAGENET1K_V1)
        model.classifier[1] = nn.Linear(model.classifier[1].in_features, 10)
    elif architecture == 'squeezenet':
        model = models.squeezenet1_1(weights=None)
        # model = models.squeezenet1_1(weights=models.SqueezeNet1_1_Weights.IMAGENET1K_V1)
        model.classifier[1] = nn.Conv2d(512, 10, kernel_size=(1, 1), stride=(1, 1))
        model.num_classes = 10
    else:
        raise ValueError("Arquitetura não suportada: escolha 'alexnet', 'vgg11', 'resnet18', 'mobilenet_v2' ou 'squeezenet'")

    return model.to(device)


In [None]:
# Classe Partícula para PSO adaptado ao treinamento da rede neural
class Particle:
    def __init__(self, model, device):
        self.model = copy.deepcopy(model).to(device)
        self.best_model = copy.deepcopy(model).to(device)
        # self.position = {name: torch.zeros_like(param).to(device) for name, param in model.named_parameters()}
        # self.velocity = {name: torch.zeros_like(param).to(device) for name, param in model.named_parameters()}
        
        # Definir os limites do espaço de busca e a escala da velocidade
        low = -10.0  # Limite inferior do espaço de busca
        high = 10.0  # Limite superior do espaço de busca
        velocity_scale = 0.1  # Escala para as velocidades iniciais
        
        # Inicializar a posição com valores aleatórios uniformes no intervalo [low, high]
        self.position = {name: torch.rand_like(param).to(device) * (high - low) + low for name, param in model.named_parameters()}
        
        # Inicializar a velocidade com valores aleatórios pequenos (normalmente distribuídos)
        self.velocity = {name: torch.randn_like(param).to(device) * velocity_scale for name, param in model.named_parameters()}

        self.best_score = float('inf')
        self.device = device
        
        # Inicializar o otimizador (por exemplo, Adam)
        #self.optimizer = optim.Adam(self.model.parameters(), lr=0.0001)
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.0001, weight_decay=1e-5)

    def pso_sgd(self, global_best_model, inertia, c1, c2, learning_rate, beta1, beta2, epsilon, m, v, t):
        for name, param in self.model.named_parameters():
            #if param.grad is None:
                #continue

            local_rand = random.random()
            global_rand = random.random()

            # Atualização da velocidade
            self.velocity[name] = (
                inertia * self.velocity[name]
                + c1 * local_rand * (self.best_model.state_dict()[name].to(self.device) - param.data)
                + c2 * global_rand * (global_best_model.state_dict()[name].to(self.device) - param.data)
            )
            
            self.position[name] = param.data + self.velocity[name]
            param.data = self.position[name]

            # Move m[name] e v[name] para o mesmo dispositivo de param
            m[name] = m[name].to(param.device)
            v[name] = v[name].to(param.device)

            # Verificar e Atualização do Adam
            if param.grad is None:
                m[name] = beta1 * m[name] + (1 - beta1) * param.grad
                v[name] = beta2 * v[name] + (1 - beta2) * (param.grad ** 2)

                m_hat = m[name] / (1 - beta1 ** t)
                v_hat = v[name] / (1 - beta2 ** t)

                # param.data = self.position[name] - learning_rate * m_hat / (torch.sqrt(v_hat) + epsilon)
                param.data = self.position[name] - learning_rate * param.grad
                # param.data = self.position[name] + learning_rate * param.grad
                
    def evaluate_test(self, dataloader, criterion):
        self.model.eval()
        total_loss = 0
        correct = 0
        total = 0

        with torch.no_grad():
            for images, labels in dataloader:
                images, labels = images.to(self.device), labels.to(self.device)
                outputs = self.model(images)
                loss = criterion(outputs, labels)
                total_loss += loss.item()

                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        avg_loss = total_loss / len(dataloader)
        accuracy = correct / total
        return avg_loss, accuracy
    
    def evaluate_train(self, dataloader, criterion):
        """Calcula o erro (perda) e a acurácia da partícula utilizando o otimizador Adam."""
        self.model.train()  # Colocar o modelo em modo de treinamento
        total_loss = 0
        correct = 0
        total = 0

        for images, labels in dataloader:
            images, labels = images.to(self.device), labels.to(self.device)
            
            # Zerar gradientes acumulados
            self.optimizer.zero_grad()
            
            # Forward pass
            outputs = self.model(images)
            
            # Cálculo do erro (loss)
            loss = criterion(outputs, labels)
            
            # Backward pass (propagação do gradiente)
            loss.backward()
            
            # Atualizar os pesos usando Adam
            self.optimizer.step()
            
            # Acumular o erro total
            total_loss += loss.item()

            # Cálculo da acurácia
            _, predicted = torch.max(outputs.data, 1)  # Obter predições (classe com maior probabilidade)
            total += labels.size(0)                    # Número total de amostras
            correct += (predicted == labels).sum().item()  # Número de predições corretas
        
        # Cálculo da perda média e acurácia
        avg_loss = total_loss / len(dataloader)
        accuracy = correct / total  # Acurácia = (predições corretas) / (total de amostras)

        return avg_loss, accuracy

In [None]:
# Função para criar um subconjunto dos dados
def create_subset(dataset, subset_size):
    indices = list(range(len(dataset)))
    subset_indices = random.sample(indices, subset_size)
    return Subset(dataset, subset_indices)    

# Transformações de dados com normalizações para modelos pré-treinados
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    #transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Reduzir a quantidade de dados usados
train_subset_size = 50000  # Número ainda mais reduzido de exemplos de treino
test_subset_size = 10000    # Número ainda mais reduzido de exemplos de teste

trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
train_subset = create_subset(trainset, train_subset_size)
trainloader = DataLoader(train_subset, batch_size=64, shuffle=True, num_workers=2)  # Reduzir o batch_size

testset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
test_subset = create_subset(testset, test_subset_size)
testloader = DataLoader(test_subset, batch_size=64, shuffle=False, num_workers=2)  # Reduzir o batch_size


# Carregar o conjunto de dados CIFAR-10
#trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
#testset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

# Criar DataLoader
#trainloader = DataLoader(trainset, batch_size=10, shuffle=True, num_workers=2)
#testloader = DataLoader(testset, batch_size=10, shuffle=False, num_workers=2)


In [None]:
# Definir os hiperparâmetros do PSO e do Adam
pop_size = 10
num_epochs = 30
#inertia = 0.9
c1, c2 = 0.8, 0.9
learning_rate = 0.0001
beta1, beta2 = 0.9, 0.999
epsilon = 1e-8

## AlexNet

In [None]:
# Selecionar o modelo
architecture = 'alexnet'  # escolha 'alexnet', 'vgg11', 'resnet18' ou 'mobilenet_v2'
model = select_model(architecture)

# Inicializar as partículas (modelos)
particles = [Particle(model, device) for _ in range(pop_size)]

global_best_model = copy.deepcopy(particles[0].model)
global_best_score = float('inf')

criterion = nn.CrossEntropyLoss()

# Inicializar m e v para Adam
m = {name: torch.zeros_like(param) for name, param in model.named_parameters()}
v = {name: torch.zeros_like(param) for name, param in model.named_parameters()}

# Loop de treinamento do PSO
for epoch in range(num_epochs):
    inertia = 0.9 - ((0.9-0.4)/num_epochs)*epoch
    for particle in particles:
        # Colocar o modelo em modo de treinamento
        particle.model.train()
        
        particle.optimizer.zero_grad()

        # Treinar a partícula (atualização de posição)
        particle.pso_sgd(global_best_model, inertia, c1, c2, learning_rate, beta1, beta2, epsilon, m, v, epoch + 1)
        
        # Avaliar a partícula e atualizar o local best
        val_loss, val_accuracy = particle.evaluate_train(trainloader, criterion)
        
        if val_loss < particle.best_score:
            particle.best_score = val_loss
            particle.best_model = copy.deepcopy(particle.model)

    #Determinar e atualizar o g-best (modelo global)
    best_particle = min(particles, key=lambda p: p.best_score)
    if best_particle.best_score < global_best_score:
        global_best_score = best_particle.best_score
        global_best_model = copy.deepcopy(best_particle.best_model)

    # Avaliar e imprimir a cada época
    if (epoch + 1) % 1 == 0:
        val_loss, val_accuracy = best_particle.evaluate_test(testloader, criterion)
        print(f'Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')

print("Treinamento completo.")
print("Melhor pontuação de g-best:", global_best_score)

# Avaliação final no conjunto de teste
test_loss, test_accuracy = best_particle.evaluate_test(testloader, criterion)
print(f'Avaliação final no conjunto de teste - Loss: {test_loss:.4f}, Accuracy: {test_accuracy:.4f}')


## MobileNet_v2

In [None]:
# Selecionar o modelo
architecture = 'mobilenet_v2'  # escolha 'alexnet', 'vgg11', 'resnet18' ou 'mobilenet_v2'
model = select_model(architecture)

# Inicializar as partículas (modelos)
particles = [Particle(model, device) for _ in range(pop_size)]

global_best_model = copy.deepcopy(particles[0].model)
global_best_score = float('inf')

criterion = nn.CrossEntropyLoss()

# Inicializar m e v para Adam
m = {name: torch.zeros_like(param) for name, param in model.named_parameters()}
v = {name: torch.zeros_like(param) for name, param in model.named_parameters()}

# Loop de treinamento do PSO
for epoch in range(num_epochs):
    inertia = 0.9 - ((0.9-0.4)/num_epochs)*epoch
    for particle in particles:
        # Colocar o modelo em modo de treinamento
        particle.model.train()
        
        particle.optimizer.zero_grad()

        # Treinar a partícula (atualização de posição)
        particle.pso_sgd(global_best_model, inertia, c1, c2, learning_rate, beta1, beta2, epsilon, m, v, epoch + 1)
        
        # Avaliar a partícula e atualizar o local best
        val_loss, val_accuracy = particle.evaluate_train(trainloader, criterion)
        
        if val_loss < particle.best_score:
            particle.best_score = val_loss
            particle.best_model = copy.deepcopy(particle.model)

    # Determinar e atualizar o g-best (modelo global)
    best_particle = min(particles, key=lambda p: p.best_score)
    if best_particle.best_score < global_best_score:
        global_best_score = best_particle.best_score
        global_best_model = copy.deepcopy(best_particle.best_model)

    # Avaliar e imprimir a cada época
    if (epoch + 1) % 1 == 0:
        val_loss, val_accuracy = best_particle.evaluate_test(testloader, criterion)
        print(f'Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')

print("Treinamento completo.")
print("Melhor pontuação de g-best:", global_best_score)

# Avaliação final no conjunto de teste
test_loss, test_accuracy = best_particle.evaluate_test(testloader, criterion)
print(f'Avaliação final no conjunto de teste - Loss: {test_loss:.4f}, Accuracy: {test_accuracy:.4f}')


## ResNet18

In [None]:
# Selecionar o modelo
architecture = 'resnet18'  # escolha 'alexnet', 'vgg11', 'resnet18' ou 'mobilenet_v2'
model = select_model(architecture)

# Inicializar as partículas (modelos)
particles = [Particle(model, device) for _ in range(pop_size)]

global_best_model = copy.deepcopy(particles[0].model)
global_best_score = float('inf')

criterion = nn.CrossEntropyLoss()

# Inicializar m e v para Adam
m = {name: torch.zeros_like(param) for name, param in model.named_parameters()}
v = {name: torch.zeros_like(param) for name, param in model.named_parameters()}

# Loop de treinamento do PSO
for epoch in range(num_epochs):
    inertia = 0.9 - ((0.9-0.4)/num_epochs)*epoch
    for particle in particles:
        # Colocar o modelo em modo de treinamento
        particle.model.train()
        
        particle.optimizer.zero_grad()

        # Treinar a partícula (atualização de posição)
        particle.pso_sgd(global_best_model, inertia, c1, c2, learning_rate, beta1, beta2, epsilon, m, v, epoch + 1)
        
        # Avaliar a partícula e atualizar o local best
        val_loss, val_accuracy = particle.evaluate_train(trainloader, criterion)
        
        if val_loss < particle.best_score:
            particle.best_score = val_loss
            particle.best_model = copy.deepcopy(particle.model)

    # Determinar e atualizar o g-best (modelo global)
    best_particle = min(particles, key=lambda p: p.best_score)
    if best_particle.best_score < global_best_score:
        global_best_score = best_particle.best_score
        global_best_model = copy.deepcopy(best_particle.best_model)

    # Avaliar e imprimir a cada época
    if (epoch + 1) % 1 == 0:
        val_loss, val_accuracy = best_particle.evaluate_test(testloader, criterion)
        print(f'Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')

print("Treinamento completo.")
print("Melhor pontuação de g-best:", global_best_score)

# Avaliação final no conjunto de teste
test_loss, test_accuracy = best_particle.evaluate_test(testloader, criterion)
print(f'Avaliação final no conjunto de teste - Loss: {test_loss:.4f}, Accuracy: {test_accuracy:.4f}')


## SqueezeNet

In [None]:
# Selecionar o modelo
architecture = 'squeezenet'  # escolha 'alexnet', 'vgg11', 'resnet18', 'mobilenet_v2' ou 'squeezenet'
model = select_model(architecture)

# Inicializar as partículas (modelos)
particles = [Particle(model, device) for _ in range(pop_size)]

global_best_model = copy.deepcopy(particles[0].model)
global_best_score = float('inf')

criterion = nn.CrossEntropyLoss()

# Inicializar m e v para Adam
m = {name: torch.zeros_like(param) for name, param in model.named_parameters()}
v = {name: torch.zeros_like(param) for name, param in model.named_parameters()}

# Loop de treinamento do PSO
for epoch in range(num_epochs):
    inertia = 0.9 - ((0.9-0.4)/num_epochs)*epoch
    for particle in particles:
        # Colocar o modelo em modo de treinamento
        particle.model.train()

        particle.optimizer.zero_grad()

        # Treinar a partícula (atualização de posição)
        particle.pso_sgd(global_best_model, inertia, c1, c2, learning_rate, beta1, beta2, epsilon, m, v, epoch + 1)

        # Avaliar a partícula e atualizar o local best
        val_loss, val_accuracy = particle.evaluate_train(trainloader, criterion)

        if val_loss < particle.best_score:
            particle.best_score = val_loss
            particle.best_model = copy.deepcopy(particle.model)

    # Determinar e atualizar o g-best (modelo global)
    best_particle = min(particles, key=lambda p: p.best_score)
    if best_particle.best_score < global_best_score:
        global_best_score = best_particle.best_score
        global_best_model = copy.deepcopy(best_particle.best_model)

    # Avaliar e imprimir a cada época
    if (epoch + 1) % 1 == 0:
        val_loss, val_accuracy = best_particle.evaluate_test(testloader, criterion)
        print(f'Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')

print("Treinamento completo.")
print("Melhor pontuação de g-best:", global_best_score)

# Avaliação final no conjunto de teste
test_loss, test_accuracy = best_particle.evaluate_test(testloader, criterion)
print(f'Avaliação final no conjunto de teste - Loss: {test_loss:.4f}, Accuracy: {test_accuracy:.4f}')


## VGG11

In [None]:
# Selecionar o modelo
architecture = 'vgg11'  # escolha 'alexnet', 'vgg11', 'resnet18' ou 'mobilenet_v2'
model = select_model(architecture)

# Inicializar as partículas (modelos)
particles = [Particle(model, device) for _ in range(pop_size)]

global_best_model = copy.deepcopy(particles[0].model)
global_best_score = float('inf')

criterion = nn.CrossEntropyLoss()

# Inicializar m e v para Adam
m = {name: torch.zeros_like(param) for name, param in model.named_parameters()}
v = {name: torch.zeros_like(param) for name, param in model.named_parameters()}

# Loop de treinamento do PSO
for epoch in range(num_epochs):
    inertia = 0.9 - ((0.9-0.4)/num_epochs)*epoch
    for particle in particles:
        # Colocar o modelo em modo de treinamento
        particle.model.train()
        
        particle.optimizer.zero_grad()

        # Treinar a partícula (atualização de posição)
        particle.pso_sgd(global_best_model, inertia, c1, c2, learning_rate, beta1, beta2, epsilon, m, v, epoch + 1)
        
        # Avaliar a partícula e atualizar o local best
        val_loss, val_accuracy = particle.evaluate_train(trainloader, criterion)
        
        if val_loss < particle.best_score:
            particle.best_score = val_loss
            particle.best_model = copy.deepcopy(particle.model)

    # Determinar e atualizar o g-best (modelo global)
    best_particle = min(particles, key=lambda p: p.best_score)
    if best_particle.best_score < global_best_score:
        global_best_score = best_particle.best_score
        global_best_model = copy.deepcopy(best_particle.best_model)

    # Avaliar e imprimir a cada época
    if (epoch + 1) % 1 == 0:
        val_loss, val_accuracy = best_particle.evaluate_test(testloader, criterion)
        print(f'Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')

print("Treinamento completo.")
print("Melhor pontuação de g-best:", global_best_score)

# Avaliação final no conjunto de teste
test_loss, test_accuracy = best_particle.evaluate_test(testloader, criterion)
print(f'Avaliação final no conjunto de teste - Loss: {test_loss:.4f}, Accuracy: {test_accuracy:.4f}')
