# Analyse semi-supervisée

## Chargement des données

In [2]:
import pandas as pd
from sklearn.model_selection import train_test_split

# Chargement des features + labels forts
df_features = pd.read_parquet("../data/brain_features_resnet50.parquet")

# Données fortement labellisées (100 images)
df_strong = df_features[df_features["has_label"] == True].copy()
df_strong["target"] = df_strong["label_num"].astype(int)  # 0 normal, 1 cancer

# Données faiblement labellisées (pseudo-labels à partir de KMeans)
df_weak = pd.read_parquet("../data/brain_weak_labels_kmeans.parquet").copy()
df_weak.rename(columns={"weak_label_num": "target"}, inplace=True)
df_weak["target"] = df_weak["target"].astype(int)

print("Strongly labeled:", df_strong.shape)
print("Weakly labeled:", df_weak.shape)

df_strong.head(), df_weak.head()

Strongly labeled: (100, 2053)
Weakly labeled: (1406, 3)


(                                            filepath  has_label   label  \
 0  ..\data\mri_dataset_brain_cancer_oc\avec_label...       True  cancer   
 1  ..\data\mri_dataset_brain_cancer_oc\avec_label...       True  cancer   
 2  ..\data\mri_dataset_brain_cancer_oc\avec_label...       True  cancer   
 3  ..\data\mri_dataset_brain_cancer_oc\avec_label...       True  cancer   
 4  ..\data\mri_dataset_brain_cancer_oc\avec_label...       True  cancer   
 
         f_0       f_1       f_2       f_3       f_4       f_5       f_6  ...  \
 0  0.011641  0.014855  0.309468  0.112984  0.092031  0.000000  0.443308  ...   
 1  0.047576  0.045124  0.004747  0.000000  0.000000  0.000000  0.003254  ...   
 2  0.023101  0.067208  0.135242  0.000000  0.000000  0.000375  0.203493  ...   
 3  0.003039  0.192276  0.042327  0.000000  0.004168  0.000000  1.024806  ...   
 4  0.151541  0.046058  0.109404  0.000000  0.026315  0.008330  0.357026  ...   
 
      f_2040    f_2041    f_2042    f_2043    f_2044  

## Split train / test

In [9]:
train_strong, test_strong = train_test_split(
    df_strong,
    test_size=0.2,
    random_state=42,
    stratify=df_strong["target"]
)

print("Train fort:", train_strong.shape)
print("Test fort:", test_strong.shape)
train_strong["target"].value_counts(), test_strong["target"].value_counts()

Train fort: (80, 2053)
Test fort: (20, 2053)


(target
 0    40
 1    40
 Name: count, dtype: int64,
 target
 1    10
 0    10
 Name: count, dtype: int64)

## Dataset & DataLoader (images, pas features)

In [12]:
from pathlib import Path
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

# Transforms d'entraînement / test (ImageNet)
image_transforms = {
    "train": transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.Grayscale(num_output_channels=3),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225],
        ),
    ]),
    "test": transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.Grayscale(num_output_channels=3),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225],
        ),
    ]),
}

class MRICNNDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df.reset_index(drop=True)
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = row["filepath"]
        img = Image.open(img_path).convert("RGB")

        if self.transform is not None:
            img = self.transform(img)

        label = int(row["target"])
        return img, label

### Dataloaders

In [17]:
batch_size = 16

train_strong_ds = MRICNNDataset(train_strong, transform=image_transforms["train"])
test_strong_ds  = MRICNNDataset(test_strong,  transform=image_transforms["test"])
weak_ds         = MRICNNDataset(df_weak,      transform=image_transforms["train"])

train_strong_loader = DataLoader(train_strong_ds, batch_size=batch_size, shuffle=True, num_workers=0)
test_loader         = DataLoader(test_strong_ds,  batch_size=batch_size, shuffle=False, num_workers=0)
weak_loader         = DataLoader(weak_ds,         batch_size=batch_size, shuffle=True, num_workers=0)

len(train_strong_loader), len(test_loader), len(weak_loader)

(5, 2, 88)

## Modèle CNN : ResNet adapté à 2 classes

In [20]:
import torch.nn as nn
from torchvision.models import resnet50, ResNet50_Weights

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

def create_model(freeze_backbone=True):
    weights = ResNet50_Weights.IMAGENET1K_V2
    model = resnet50(weights=weights)

    # Remplacer la couche finale pour 2 classes
    in_features = model.fc.in_features
    model.fc = nn.Linear(in_features, 2)

    if freeze_backbone:
        for name, param in model.named_parameters():
            if not name.startswith("fc."):
                param.requires_grad = False

    return model.to(device)


Device: cpu


## Fonctions d'entrainement & d'évaluation

In [23]:
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
import numpy as np

def train_one_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0

    for imgs, labels in dataloader:
        imgs = imgs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(imgs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * imgs.size(0)

    return running_loss / len(dataloader.dataset)

def evaluate(model, dataloader, device):
    model.eval()
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for imgs, labels in dataloader:
            imgs = imgs.to(device)
            outputs = model(imgs)
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            all_preds.append(preds)
            all_labels.append(labels.numpy())

    all_preds = np.concatenate(all_preds)
    all_labels = np.concatenate(all_labels)

    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    cm = confusion_matrix(all_labels, all_preds)
    return acc, f1, cm

##  Baseline : entrainement supervisé uniquement sur formtement labellisé

In [28]:
import torch.optim as optim

num_epochs = 8

baseline_model = create_model(freeze_backbone=True)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(baseline_model.parameters(), lr=1e-3)

for epoch in range(num_epochs):
    train_loss = train_one_epoch(baseline_model, train_strong_loader, optimizer, criterion, device)
    acc, f1, cm = evaluate(baseline_model, test_loader, device)
    print(f"[Baseline] Epoch {epoch+1}/{num_epochs} - loss={train_loss:.4f} - acc={acc:.3f} - f1={f1:.3f}")

[Baseline] Epoch 1/8 - loss=0.6454 - acc=0.550 - f1=0.182
[Baseline] Epoch 2/8 - loss=0.5250 - acc=0.700 - f1=0.571
[Baseline] Epoch 3/8 - loss=0.4456 - acc=0.800 - f1=0.750
[Baseline] Epoch 4/8 - loss=0.3645 - acc=0.850 - f1=0.824
[Baseline] Epoch 5/8 - loss=0.3319 - acc=0.850 - f1=0.824
[Baseline] Epoch 6/8 - loss=0.2754 - acc=0.900 - f1=0.889
[Baseline] Epoch 7/8 - loss=0.3115 - acc=0.950 - f1=0.952
[Baseline] Epoch 8/8 - loss=0.2495 - acc=0.950 - f1=0.952


In [30]:
baseline_acc, baseline_f1, baseline_cm = evaluate(baseline_model, test_loader, device)
baseline_acc, baseline_f1, baseline_cm

(0.95,
 0.9523809523809523,
 array([[ 9,  1],
        [ 0, 10]], dtype=int64))

## Semi-supervisé : faible puis fort

### Phase 1 : entrainement du jeu faible

In [34]:
semi_model = create_model(freeze_backbone=True)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(semi_model.parameters(), lr=1e-3)

num_epochs_weak = 5

for epoch in range(num_epochs_weak):
    train_loss = train_one_epoch(semi_model, weak_loader, optimizer, criterion, device)
    acc, f1, cm = evaluate(semi_model, test_loader, device)
    print(f"[Semi phase 1 - weak] Epoch {epoch+1}/{num_epochs_weak} - loss={train_loss:.4f} - acc={acc:.3f} - f1={f1:.3f}")

[Semi phase 1 - weak] Epoch 1/5 - loss=0.4888 - acc=0.600 - f1=0.333
[Semi phase 1 - weak] Epoch 2/5 - loss=0.3619 - acc=0.750 - f1=0.667
[Semi phase 1 - weak] Epoch 3/5 - loss=0.3409 - acc=0.750 - f1=0.667
[Semi phase 1 - weak] Epoch 4/5 - loss=0.3330 - acc=0.750 - f1=0.667
[Semi phase 1 - weak] Epoch 5/5 - loss=0.3209 - acc=0.800 - f1=0.750


### Phase 2 : entrainement du jeu fort

In [38]:
optimizer = optim.Adam(semi_model.parameters(), lr=5e-4)
num_epochs_strong = 5

for epoch in range(num_epochs_strong):
    train_loss = train_one_epoch(semi_model, train_strong_loader, optimizer, criterion, device)
    acc, f1, cm = evaluate(semi_model, test_loader, device)
    print(f"[Semi phase 2 - strong] Epoch {epoch+1}/{num_epochs_strong} - loss={train_loss:.4f} - acc={acc:.3f} - f1={f1:.3f}")

[Semi phase 2 - strong] Epoch 1/5 - loss=0.3714 - acc=0.900 - f1=0.889
[Semi phase 2 - strong] Epoch 2/5 - loss=0.3817 - acc=0.900 - f1=0.900
[Semi phase 2 - strong] Epoch 3/5 - loss=0.3296 - acc=0.850 - f1=0.857
[Semi phase 2 - strong] Epoch 4/5 - loss=0.3219 - acc=0.850 - f1=0.857
[Semi phase 2 - strong] Epoch 5/5 - loss=0.3334 - acc=0.850 - f1=0.857


### Comparaison

In [40]:
semi_acc, semi_f1, semi_cm = evaluate(semi_model, test_loader, device)
semi_acc, semi_f1, semi_cm

(0.85,
 0.8571428571428571,
 array([[8, 2],
        [1, 9]], dtype=int64))

| Modèle                  | Données d'entraînement                               | Accuracy (test) | F1-score (test) |
|-------------------------|------------------------------------------------------|-----------------|-----------------|
| CNN supervisé (baseline) | Train fortement labellisé uniquement                | X.XXX           | Y.YYY           |
| CNN semi-supervisé      | Faiblement labellisé puis finetune fort labellisé   | A.AAA           | B.BBB           |
