In [7]:
import os
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, ConcatDataset
import glob
from PIL import Image
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import random
import torchvision.models as models
from peft import get_peft_model, LoraConfig, TaskType
from collections import defaultdict
from tqdm import tqdm

In [8]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

In [9]:
class EADataDataset(Dataset):
    
    def __init__(self, samples, transform=None):
       
        self.samples = samples
        self.transform = transform

    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        path, label, subfolder = self.samples[idx]
        img = Image.open(path).convert("RGB")
        if self.transform is not None:
            img = self.transform(img)
        
        return img, label, subfolder


def create_train_test_split(
    root, class_to_idx, transform=None, ratio=0.8, subfolder_categories=None
):
   
    data_dict = {}

    
    for class_name in os.listdir(root):
        class_path = os.path.join(root, class_name)

        if not os.path.isdir(class_path):
            continue
        if class_name not in class_to_idx:
            continue
        
        label = class_to_idx[class_name]

       
        for subfolder in os.listdir(class_path):
            
            if subfolder_categories is not None and subfolder not in subfolder_categories:
                continue

            subfolder_path = os.path.join(class_path, subfolder)
            if not os.path.isdir(subfolder_path):
                continue
            
         
            key = (class_name, subfolder)
            if key not in data_dict:
                data_dict[key] = []
            
            # Parcours des fichiers images
            for img_file in os.listdir(subfolder_path):
                full_path = os.path.join(subfolder_path, img_file)
                data_dict[key].append((full_path, label, subfolder))
    
   
    train_samples = []
    test_samples = []

    for key, sample_list in data_dict.items():
        random.shuffle(sample_list)
        split_index = int(ratio * len(sample_list))
        train_samples.extend(sample_list[:split_index])
        test_samples.extend(sample_list[split_index:])
    
    
    train_dataset = EADataDataset(train_samples, transform=transform)
    test_dataset = EADataDataset(test_samples, transform=transform)
    
    return train_dataset, test_dataset


class_to_idx = {
    "bird": 0,
    "reptile": 1,
    "carnivore": 2,
    "dog": 3,
    "insect": 4,
    "primate": 5,
    "fish": 6,
    "instrument": 7,
    "vehicle": 8
}

# Exemple : n'entraîner que sur les sous-dossiers "mixed_rand" et "original"
subfolder_categories = [ "mixed_rand"]

train_dataset, test_dataset = create_train_test_split(
    root="/Data/ea_data",
    class_to_idx=class_to_idx,
    transform=transform,
    ratio=0.8,  # 80% train, 20% test
    subfolder_categories=subfolder_categories
)

print("Taille du train_dataset :", len(train_dataset))
print("Taille du test_dataset :", len(test_dataset))

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

Taille du train_dataset : 11066
Taille du test_dataset : 2771


In [10]:

model = models.resnet50(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 9)



for param in model.parameters():
    param.requires_grad = False



target_modules = ["layer4.0.conv2", "layer4.1.conv2", "layer4.2.conv2"]

lora_config = LoraConfig( 
    r=4,           
    lora_alpha=1,    
    lora_dropout=0.0,
    target_modules=target_modules)


model = get_peft_model(model, lora_config)

for param in model.fc.parameters():
    param.requires_grad = True

for name, param in model.named_parameters():
    if param.requires_grad:
        print(name)

model = model.to("cuda" if torch.cuda.is_available() else "cpu")
print(model)
optimizer = optim.SGD([p for p in model.parameters() if p.requires_grad], lr=0.001, momentum=0.9)

trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Total number of trainable parameters: {trainable_params}")

base_model.model.layer4.0.conv2.lora_A.default.weight
base_model.model.layer4.0.conv2.lora_B.default.weight
base_model.model.layer4.1.conv2.lora_A.default.weight
base_model.model.layer4.1.conv2.lora_B.default.weight
base_model.model.layer4.2.conv2.lora_A.default.weight
base_model.model.layer4.2.conv2.lora_B.default.weight
base_model.model.fc.weight
base_model.model.fc.bias
PeftModel(
  (base_model): LoraModel(
    (model): ResNet(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (conv2): Conv2

In [11]:
criterion = nn.CrossEntropyLoss()
num_epochs = 50
max_steps = 10000

for epoch in range(num_epochs):
    ######################################
    # Phase d'entraînement
    ######################################
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    current_step = 0
    for images, labels, subfolder in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Training]"):
        current_step += 1
        if current_step > max_steps:
            break
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * images.size(0)
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
    
    epoch_loss = running_loss / total
    epoch_acc = correct / total
    print(f"[Epoch {epoch+1}/{num_epochs}] Train Loss: {epoch_loss:.4f} - Train Acc: {epoch_acc:.4f}")
    
    ######################################
    # Phase de test
    ######################################
    model.eval()
    test_loss = 0.0
    correct_test = 0
    total_test = 0
    
    # Dictionnaires pour stocker les résultats par sous-dossier
    subfolder_correct = defaultdict(int)
    subfolder_total = defaultdict(int)

    with torch.no_grad():
        for images, labels, subfolder in tqdm(test_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Testing]"):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            test_loss += loss.item() * images.size(0)
            _, preds = torch.max(outputs, 1)
            correct_test += (preds == labels).sum().item()
            total_test += labels.size(0)
            
            # Mise à jour des compteurs par sous-dossier
            for sf, p, l in zip(subfolder, preds, labels):
                p = p.item()
                l = l.item()
                subfolder_total[sf] += 1
                if p == l:
                    subfolder_correct[sf] += 1

    test_loss /= total_test
    test_acc = correct_test / total_test
    print(f"[Epoch {epoch+1}/{num_epochs}] Test Loss: {test_loss:.4f} - Test Acc: {test_acc:.4f}")

    # Affichage de l'accuracy par sous-dossier
    print("Accuracy par sous-dossier :")
    for sf in subfolder_correct:
        acc_sf = subfolder_correct[sf] / subfolder_total[sf]
        print(f"  {sf} -> {acc_sf:.4f}")
    print()

    ######################################
    # Sauvegarde du checkpoint tous les 10 epochs
    ######################################
    if (epoch + 1) % 10 == 0:
        checkpoint = {
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'train_loss': epoch_loss,
            'train_acc': epoch_acc,
            'test_loss': test_loss,
            'test_acc': test_acc,
        }
        checkpoint_path = f"checkpoint_epoch_mixed_rand_{epoch+1}.pth"
        torch.save(checkpoint, checkpoint_path)
        print(f"Checkpoint sauvegardé à l'epoch {epoch+1} sous '{checkpoint_path}'")

# Optionnel : sauvegarde finale du modèle
torch.save(model.state_dict(), "model_final_mixed_rand.pth")
print("Modèle final sauvegardé sous 'model_final.pth'")

Epoch 1/50 [Training]:   7%|▋         | 6/87 [00:08<01:57,  1.45s/it]


KeyboardInterrupt: 