# Classificador de tênis (Nike vs Adidas)

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
import seaborn as sns
import numpy as np
import torchvision
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, Subset

## Carregamento de dados dos datasets.

Separa em treino e teste

In [None]:
from sklearn.model_selection import train_test_split

def datasetLoader(data_dir, batch_size):
    transform = transforms.Compose([transforms.Resize((256, 256)), transforms.ToTensor()])
    train_dir = os.path.join(data_dir, "Training")
    train_data = ImageFolder(root=train_dir, transform=transform)

    train_indices, test_indices = train_test_split(
        range(len(train_data)), test_size=0.33, random_state=42
    )

    train_subset = Subset(train_data, train_indices)
    test_subset = Subset(train_data, test_indices)

    training_loader = DataLoader(
        train_subset, batch_size=batch_size, shuffle=True
    )

    test_loader = DataLoader(
        test_subset, batch_size=batch_size, shuffle=False
    )

    return training_loader, test_loader

batch size e device

In [None]:
data_dir = "Dataset"
train_loader, test_loader = datasetLoader(data_dir, batch_size=12)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
print(f"Número total de amostras: {len(test_loader.dataset)+len(train_loader.dataset)}")
print(f"Número de amostras de treinamento: {len(train_loader.dataset)}")
print(f"Número de amostras de teste: {len(test_loader.dataset)}")

 O dropout será ativado entre a camada convolucional final da ResNet50 e a camada linear que produzirá a saída final do modelo. Durante o treinamento, o dropout aleatoriamente "desliga" neurônios, reduzindo a dependência entre eles, o que ajuda a prevenir o overfitting. Dropout ajudou muito para diminuir a valor da função custo de validação.

In [None]:
resnet = torchvision.models.resnet50(weights = True)
resnet.fc = torch.nn.Sequential(
    torch.nn.Dropout(0.6),  # Adicionando dropout com probabilidade 0.5
    torch.nn.Linear(2048, 2)
)
resnet = resnet.to(device)

## Métodos 

### Matriz de confusão

In [None]:
def confusion_matrix(model, loader):
    model.eval()
    confusion_matrix = np.zeros((2,2))
    with torch.no_grad():
        for data in loader:
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _,predicted = torch.max(outputs, 1)
            for i in range(labels.size(0)):
                confusion_matrix[labels[i].item()][predicted[i].item()] += 1
    ax = sns.heatmap(confusion_matrix, annot=True, cmap='Blues', fmt='g', xticklabels=['Adidas', 'Nike'], yticklabels=['Adidas', 'Nike'])
    ax.set_xlabel('Predicted')
    ax.set_ylabel('Label')
    return ax

### Acurácia

In [None]:
def accuracy(model, loader):
    model.eval()
    corrected = 0
    total = 0
    with torch.no_grad():
        for data in loader:
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _,predicted = torch.max(outputs, 1)
            total += labels.size(0)
            corrected += (predicted == labels).sum().item()
    return corrected * 100 // total

### F1 Score

In [None]:
from sklearn.metrics import f1_score

def calculate_f1_score(model, loader):
    model.eval()
    true_labels = []
    predicted_labels = []
    with torch.no_grad():
        for data in loader:
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            true_labels.extend(labels.cpu().numpy())
            predicted_labels.extend(predicted.cpu().numpy())
    return f1_score(true_labels, predicted_labels, average='macro')

### ROC AUC métrica

In [None]:
from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import label_binarize

def calculate_roc_auc_score(model, loader, n_classes):
    model.eval()
    true_labels = []
    predicted_probs = []
    with torch.no_grad():
        for data in loader:
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            probs = torch.nn.functional.softmax(outputs, dim=1)
            true_labels.extend(labels.cpu().numpy())
            predicted_probs.extend(probs.cpu().numpy())
    true_labels = label_binarize(true_labels, classes=[i for i in range(n_classes)])
    return roc_auc_score(true_labels, predicted_probs, multi_class='ovr')

### Validação

In [None]:
def validation(model, loader, criterion):
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for data in loader:
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs,labels)
            val_loss += loss.item()
    return val_loss/len(loader)

### Regularização

l1_lambda e l2 lambda são os pesos/intensidade que a regularização aplicará no modelo.

In [None]:
def l1_regularization(model, l1_lambda, device):
    l1_reg = torch.tensor(0., device=device)
    for param in model.parameters():
        # Calculando a norma L1 dos parâmetros e somando-as
        l1_reg += torch.norm(param, 1)
    # Multiplicando pela lambda para obter o termo de regularização L1
    return l1_lambda * l1_reg

In [None]:
def l2_regularization(model, l2_lambda, device):
    l2_reg = torch.tensor(0., device=device)
    for param in model.parameters():
        # Calculando a norma L2 dos parâmetros e somando suas raízes quadradas
        l2_reg += torch.norm(param, 2) ** 2
    # Multiplicando pela lambda e raiz quadrada para obter o termo de regularização L2
    return l2_lambda * torch.sqrt(l2_reg)

### Treinamento

In [None]:
import matplotlib.pyplot as plt
from torch.optim.lr_scheduler import ReduceLROnPlateau

def train(model, trainloader, testloader, optimizer, criterion, epochs, l1_lambda, l2_lambda, device, lr_patience, early_stop_patience):
    train_losses = []
    val_losses = []
    best_val_loss = float('inf')
    epochs_without_improvement = 0

    lr_scheduler = ReduceLROnPlateau(optimizer, patience=lr_patience, verbose=True)  # Se a perda de validação não melhorar por x épocas, reduz a taxa de aprendizado em 0,1

    for epoch in range(epochs):
        model.train()
        running_loss = 0
        for data in tqdm(trainloader):
            images, labels = data
            optimizer.zero_grad()
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            l1_reg = l1_regularization(model, l1_lambda, device)    ##  L1 e L2 regularization  ##
            l2_reg = l2_regularization(model, l2_lambda, device)    ##  L1 e L2 regularization  ##     
            loss += l1_reg + l2_reg                                 ##  L1 e L2 regularization  ##
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        val_loss = validation(model, testloader, criterion)
        lr_scheduler.step(val_loss)                                 ##  Learning Rate Scheduler  ##
        train_losses.append(running_loss/len(trainloader))
        val_losses.append(val_loss)
        print(f'Epoch: {epoch+1} | Train Loss: {train_losses[-1]} | Val Loss: {val_loss}')

        if val_loss < best_val_loss:                                ##  Early Stopping  ##
            best_val_loss = val_loss                                
            epochs_without_improvement = 0
        else:
            epochs_without_improvement += 1
            if epochs_without_improvement >= early_stop_patience:
                print(f'Parada antecipada na época {epoch+1}, pois a loss na validação não apresentou melhora.')
                break
        
    plt.figure(figsize=(10, 6))
    plt.plot(train_losses, label='Training Loss', marker='o')
    plt.plot(val_losses, label='Validation Loss', marker='x')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss over Epochs')
    plt.grid(True)
    plt.legend()
    plt.show()

## Registro de evoluções nos modelos
- 1º modelo era treinado com apenas 3 épocas e com learning rate de 0,001. **Acurácia de 70%**.
- 2º modelo subimos o número de épocas para 30 (número baseado nos modelos analisados da referência 1) e ajustamos o learning rate para 0,0001. **Acurácia de 88%**.
- 3º modelo inserimos regularização L1 e L2 com peso da regularização de 0,01. **Acurácia caiu para 76%**
- 4º modelo ajustamos os parâmetros de pesos da regularização L1 e L2 para de 0,00001. **Acurácia de 90%**

Referências:  
1 - https://www.kaggle.com/datasets/ifeanyinneji/nike-adidas-shoes-for-image-classification-dataset/code

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(resnet.parameters(), lr=0.0001)
l1_lambda = 0.001
l2_lambda = 0.01
epochs = 60 
lr_patience = 3
early_stop_patience = 5

## Treinamento

In [None]:
train(resnet, train_loader, test_loader, optimizer, criterion, epochs=epochs, l1_lambda=l1_lambda, l2_lambda=l2_lambda, device=device, lr_patience=lr_patience, early_stop_patience=early_stop_patience)

In [None]:
resnet

## Métricas de avaliação

### Validação

In [None]:
print(f'A rede atinge: {accuracy(resnet, test_loader)}% de acurácia')
print(f'A rede atinge: {round(calculate_f1_score(resnet, test_loader)*100,2)}% de f1 Score')
print(f'A rede atinge: {round(calculate_roc_auc_score(resnet, test_loader, 50)*100,2)}% de ROC AUC Score')
conf_mat = confusion_matrix(resnet, test_loader)

### Treino

In [None]:
print(f'A rede atinge: {accuracy(resnet, train_loader)}% de acurácia')
print(f'A rede atinge: {round(calculate_f1_score(resnet, train_loader)*100,2)}% de recall')
print(f'A rede atinge: {round(calculate_roc_auc_score(resnet, train_loader, 50)*100,2)}% de ROC AUC Score')
conf_mat = confusion_matrix(resnet, train_loader)

In [None]:
torch.save(resnet.state_dict(), 'model.pth')

## Usabilidade de webcam

In [None]:
import cv2

cap = cv2.VideoCapture(0)

img_counter = 0

while True:
    ret, frame = cap.read()
    cv2.imshow('Webcam', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    elif cv2.waitKey(1) & 0xFF == ord('e'):
            img_name = "WebcamImages/print_{}.png".format(img_counter)
            cv2.imwrite(img_name, frame)
            print("{} written!".format(img_name))
            img_counter += 1

cap.release()
cv2.destroyAllWindows()

### Predição

In [None]:
def predict(model, image):
    prediction = model(torch.unsqueeze(image, 0).to(device))
    result = torch.argmax(prediction)
    return 'Adidas' if result == 0 else 'Nike'

### Predição das classes da webcam

In [None]:
cnn = torch.load('model.pth')
resnet.load_state_dict(cnn)
from PIL import Image
for filename in os.listdir('WebcamImages'):
    if filename.endswith(".png"):
        x = Image.open('WebcamImages/' + filename).convert('RGB')
        x = transforms.Compose([transforms.Resize((256, 256)), transforms.ToTensor()])(x)
        print(f'Image: {filename} | Resnet Prediction: {predict(resnet, x)}')