# Solve Dependencies

In [2]:
import os
from collections import Counter

import numpy as np
import pandas as pd
import typing as tp
from PIL import Image

from torch.utils.data import DataLoader
import torchvision.models as models

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms, models

from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt

import pytorch_lightning as pl

pl.seed_everything(seed=42, workers=True)


Seed set to 42


42

# Create CSV File to Store Filenames and Labels
<a id="create-labels-csv-file"></a>


In [3]:
def create_label_csv(root_dir, output_csv):
    # Initialize an empty list to hold the data
    data = []

    # Define class names and their corresponding labels
    class_names = ['CNV', 'DME', 'DRUSEN', 'NORMAL']

    # Loop through each class folder
    for class_name in class_names:
        class_folder = os.path.join(root_dir, class_name)
        
        # Check if the class folder exists
        if not os.path.exists(class_folder):
            print(f"Warning: {class_folder} does not exist.")
            continue
        
        # Loop through all files in the class folder
        for filename in os.listdir(class_folder):
            if filename.endswith(('.png', '.jpg', '.jpeg')):  # Add any other image formats if needed
                # Create a tuple of (filename, label)
                data.append((class_name+'/'+filename, class_name))
    
    # Create a DataFrame from the data
    df = pd.DataFrame(data, columns=['filename', 'label'])
    
    # Save the DataFrame to a CSV file
    df.to_csv(output_csv, index=False)
    print(f"CSV file created at: {output_csv}")

In [None]:
# Specify the directory and the output CSV file name
test_root_directory = '../dataset/test'  # Path to the train directory
test_output_csv_file = '../dataset/test.csv'  # Output CSV file name

create_label_csv(test_root_directory, test_output_csv_file)

In [4]:
class_map = {
    'CNV': 0,
    'DME': 1,
    'DRUSEN': 2,
    'NORMAL': 3
}

class OCTDataset(Dataset):
    def __init__(
        self,
        images_dir: str,
        labels_csv: str,
        indices: tp.Optional[tp.List[int]] = None,
        transform: tp.Optional[transforms.Compose] = None,
    ):
        self.images_dir = images_dir
        self.labels_csv = labels_csv
        self.transform = transform
        
        # Carrega o arquivo CSV e filtra pelos índices, se fornecidos
        labels_df = pd.read_csv(labels_csv)
        
        if indices is not None:
            # Seleciona apenas os índices fornecidos
            labels_df = labels_df.iloc[indices].reset_index(drop=True)
        
        # Armazena os caminhos das imagens e rótulos como listas
        self.image_paths = [os.path.join(images_dir, fname) for fname in labels_df['filename']]
        self.labels = labels_df['label'].tolist()

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # Carrega a imagem a partir do caminho
        image = Image.open(self.image_paths[idx]).convert("L")
        
        # Aplica as transformações, se existirem
        if self.transform:
            image = self.transform(image)
        
        # Retorna a imagem e o rótulo
        label = torch.tensor(class_map[self.labels[idx]])
        return image, label

In [7]:
test_dataset = OCTDataset(
    images_dir=test_root_directory,
    labels_csv=test_output_csv_file,
)

# Avaliação do modelo no conjunto de teste
test_loader = DataLoader(test_dataset, batch_size=100, shuffle=False)

In [12]:

# Define o dispositivo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Caminho para salvar checkpoints
CHECKPOINT_PATH = "checkpoints"
os.makedirs(CHECKPOINT_PATH, exist_ok=True)

def load_checkpoint(filename, model, optimizer):
    """Carrega um checkpoint."""
    if os.path.isfile(filename):
        checkpoint = torch.load(filename, map_location=torch.device('cpu'))
        model.load_state_dict(checkpoint["model_state_dict"])
        optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
        epoch = checkpoint["epoch"]
        losses = checkpoint["losses"]
        accuracies = checkpoint["accuracies"]
        print(f"Checkpoint carregado de {filename}, retomando do epoch {epoch+1}")
        return epoch, losses, accuracies
    else:
        print(f"Nenhum checkpoint encontrado em {filename}, iniciando do zero.")
        return 0, {"train": [], "val": []}, {"train": [], "val": []}

def baseline_work(batch_size, lr, num_epochs):
    # Carrega o modelo pré-treinado SqueezeNet
    model = models.squeezenet1_1(pretrained=True)

    # Modifica a primeira camada para aceitar imagens de um canal
    model.features[0] = nn.Conv2d(
        in_channels=1,  # Aceitar 1 canal
        out_channels=64,
        kernel_size=(3, 3),
        stride=(2, 2),
        padding=(1, 1),
        bias=False
    )

    # Ajusta o classificador para 4 classes
    model.classifier = nn.Sequential(
        nn.Dropout(p=0.5),
        nn.Conv2d(512, 4, kernel_size=(1, 1)),
        nn.ReLU(inplace=True),
        nn.AdaptiveAvgPool2d((1, 1))
    )

    # Movendo para o dispositivo
    model = model.to(device)

    # Define o critério e otimizador
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr)

    # Transformação das imagens
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])

    # Carrega os dados (substitua supervised_train/val pelos seus datasets)
    # val_loader = DataLoader(supervised_val, batch_size, shuffle=False)

    # Checkpoints
    checkpoint_file = os.path.join(CHECKPOINT_PATH, "last_checkpoint.pth")
    start_epoch, losses, accuracies = load_checkpoint(checkpoint_file, model, optimizer)

    # Testes
    for epoch in range(start_epoch, num_epochs):
        # Modo de validação (teste)
        model.eval()
        running_loss_val = 0
        correct_val = 0
        total_val = 0
        with torch.no_grad():
            for images, labels in test_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                running_loss_val += loss.item() * images.size(0)

                # Cálculo da acurácia de validação
                _, predicted = torch.max(outputs.data, 1)
                total_val += labels.size(0)
                correct_val += (predicted == labels).sum().item()

        val_loss = running_loss_val / len(test_loader.dataset)
        val_accuracy = 100 * correct_val / total_val
        losses["val"].append(val_loss)
        accuracies["val"].append(val_accuracy)

        # Exibe métricas por época
        print(f"Época {epoch+1}/{num_epochs}, Loss Treino: {train_loss}, Acurácia Treino: {train_accuracy}")
        print(f"Época {epoch+1}/{num_epochs}, Loss Validação: {val_loss}, Acurácia Validação: {val_accuracy}")

base = baseline_work(batch_size=32, lr=1e-5, num_epochs=30)

  checkpoint = torch.load(filename, map_location=torch.device('cpu'))


Checkpoint carregado de checkpoints\last_checkpoint.pth, retomando do epoch 30


TypeError: default_collate: batch must contain tensors, numpy arrays, numbers, dicts or lists; found <class 'PIL.Image.Image'>

In [None]:
# Instantiate the dataset
root_dir = '../dataset/test'  # Replace with the actual path to your dataset root
test_dataset = OCTDataset(
    images_dir=root_dir,
    labels_csv='../dataset/test.csv',
)

# Avaliação do modelo no conjunto de teste
test_loader = DataLoader(test_dataset, batch_size=100, shuffle=False)

model.eval()  # Coloca o modelo em modo de avaliação
correct = 0
total = 0
test_loss = 0

with torch.no_grad():  # Desabilita o cálculo do gradiente
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)  # Calcula a perda no conjunto de teste
        test_loss += loss.item() * images.size(0)

        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

test_loss = test_loss / len(test_loader.dataset)
accuracy = 100 * correct / total
print(f'Test Loss: {test_loss}, Test Accuracy: {accuracy}%')
