<a href="https://colab.research.google.com/github/emmaebrl/LASCAR/blob/main/Easymodel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm.notebook import tqdm
import random
import os
import rasterio
from pathlib import Path
import json

  data = fetch_version_info()


**Configuration**

In [2]:
# ✅ Chemins absolus complets
TRAIN_IMG_DIR = r'data\train\images'
TRAIN_MASK_DIR = r'data\train\masks'
PROPORTION_CSV = r'data\train_labels_GY1QjFw.csv'
TEST_IMG_DIR = r'data\test\images'
TEST_CSV_PATH = r'data\test_images_kkwOpBC.csv'

In [3]:
def seed_everything(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # si tu utilises le GPU
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False  # pour reproductibilité

seed_everything(42)

In [4]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(DEVICE)
BATCH_SIZE = 8
IMG_SIZE = 256

cpu


In [5]:
# ✅ Chargement des données
def load_tif(path):
    path = os.path.normpath(path)  # standardise les séparateurs
    if not os.path.exists(path):
        raise FileNotFoundError(f"Fichier introuvable : {path}")
    with rasterio.open(path) as src:
        img = src.read()
        img = np.transpose(img, (1, 2, 0))
    return img

In [6]:
# Charger CSV
df = pd.read_csv(PROPORTION_CSV)
image_paths = [os.path.join(TRAIN_IMG_DIR, f"{str(f)}.tif") for f in df['sample_id']]
selected_classes = ['no_data', 'clouds','cultivated', 'herbaceous', 'broadleaf', 'coniferous', 'artificial', 'water', 'natural', 'snow', ]

NUM_CLASSES = len(selected_classes)
targets = df[selected_classes].values

In [7]:
# ================== TRANSFORMS ==================
train_transform = A.Compose([
    A.Resize(IMG_SIZE, IMG_SIZE),
    A.Normalize(),
    ToTensorV2(),
])

**Méthode 1: Régression c'est à dire on prédit les proportions directement**

In [8]:
# ================== DATASET 1 - Regression ==================
class ProportionDataset(Dataset):
    def __init__(self, image_paths, targets, transform=None):
        self.image_paths = image_paths
        self.targets = targets
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img = load_tif(self.image_paths[idx])
        if self.transform:
            img = self.transform(image=img)['image']
        target = torch.tensor(self.targets[idx], dtype=torch.float32)
        return img, target

# ================== MODEL 1 - Proportion Regression ==================
class SimpleCNN(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(4, 32, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(), nn.AdaptiveAvgPool2d(1),
            nn.Dropout(0.3)
        )
        self.fc = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.encoder(x)
        x = x.view(x.size(0), -1)
        return torch.log_softmax(self.fc(x), dim=1)
    
### MODIF DEBUT - Ajout de la validation loss pour le modèle 1 ###
def validate_model(model, val_loader, criterion):
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for x, y in val_loader:
            x, y = x.to(DEVICE), y.to(DEVICE)
            preds = model(x)
            loss = criterion(preds, y)
            val_loss += loss.item()
    return val_loss / len(val_loader)
### MODIF FIN ###


# ================== TRAINING LOOP ==================
### MODIF DEBUT - Ajout du calcul de val_loss dans train_model ###
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs=5):
    model.to(DEVICE)
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        loop = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}")
        for x, y in loop:
            x, y = x.to(DEVICE), y.to(DEVICE)
            optimizer.zero_grad()
            preds = model(x)
            loss = criterion(preds, y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            loop.set_postfix(loss=loss.item())
        
        val_loss = validate_model(model, val_loader, criterion)
        print(f"Epoch {epoch+1}: Train Loss = {total_loss / len(train_loader):.4f} | Val Loss = {val_loss:.4f}")
### MODIF FIN ###

In [9]:
# Split et Dataset
train_imgs, val_imgs, train_y, val_y = train_test_split(image_paths, targets, test_size=0.2, random_state=42, stratify=targets.argmax(axis=1))

train_ids = [Path(p).stem for p in train_imgs]
val_ids = [Path(p).stem for p in val_imgs]

# Sauvegarder dans un fichier JSON (ou CSV si tu préfères)
with open("splits/train_val_ids.json", "w") as f:
    json.dump({"train": train_ids, "val": val_ids}, f)

train_ds = ProportionDataset(train_imgs, train_y, transform=train_transform)
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)

val_ds = ProportionDataset(val_imgs, val_y, transform=train_transform)
val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False)
# Model
model1 = SimpleCNN(NUM_CLASSES)
optimizer = torch.optim.Adam(model1.parameters(), lr=1e-3)
criterion = nn.KLDivLoss(reduction='batchmean')

print("Training Model 1 (Regression)...")
train_model(model1, train_loader, val_loader, criterion, optimizer, epochs=8)

Training Model 1 (Regression)...


Epoch 1/8:   0%|          | 0/1849 [00:00<?, ?it/s]

  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)


Epoch 1: Train Loss = 0.2079 | Val Loss = 0.1159


Epoch 2/8:   0%|          | 0/1849 [00:00<?, ?it/s]

Epoch 2: Train Loss = 0.1239 | Val Loss = 0.0928


Epoch 3/8:   0%|          | 0/1849 [00:00<?, ?it/s]

Epoch 3: Train Loss = 0.1009 | Val Loss = 0.0773


Epoch 4/8:   0%|          | 0/1849 [00:00<?, ?it/s]

Epoch 4: Train Loss = 0.0902 | Val Loss = 0.0700


Epoch 5/8:   0%|          | 0/1849 [00:00<?, ?it/s]

Epoch 5: Train Loss = 0.0854 | Val Loss = 0.0663


Epoch 6/8:   0%|          | 0/1849 [00:00<?, ?it/s]

Epoch 6: Train Loss = 0.0811 | Val Loss = 0.0638


Epoch 7/8:   0%|          | 0/1849 [00:00<?, ?it/s]

Epoch 7: Train Loss = 0.0763 | Val Loss = 0.0688


Epoch 8/8:   0%|          | 0/1849 [00:00<?, ?it/s]

Epoch 8: Train Loss = 0.0747 | Val Loss = 0.0623


In [10]:
# save model
torch.save(model1.state_dict(), 'models/model_proportion_complexified.pth')

In [11]:
# ================== EVALUATION - KL Divergence ==================
def kl_divergence(y_true, y_pred, eps=1e-8):
    y_true = np.clip(y_true, eps, 1)
    y_pred = np.clip(y_pred, eps, 1)
    return np.sum(y_true * np.log(y_true / y_pred))

# Validation Dataset & Loader
val_ds = ProportionDataset(val_imgs, val_y, transform=train_transform)
val_loader = DataLoader(val_ds, batch_size=1)

model1.eval()
kl_scores = []
with torch.no_grad():
    for img, true_prop in val_loader:
        img = img.to(DEVICE)
        log_preds = model1(img).cpu().numpy()[0]
        probs = np.exp(log_preds)
        true_prop = true_prop.numpy()[0]
        kl = kl_divergence(true_prop, probs)
        kl_scores.append(kl)

avg_kl = np.mean(kl_scores)
print(f"\n🔍 KL Divergence on validation set: {avg_kl:.6f}")



🔍 KL Divergence on validation set: 0.062228


In [12]:
class TestDataset(Dataset):
    def __init__(self, image_paths, transform=None):
        self.image_paths = image_paths
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img = load_tif(self.image_paths[idx])
        sample_id = os.path.splitext(os.path.basename(self.image_paths[idx]))[0]
        if self.transform:
            img = self.transform(image=img)['image']
        return img, sample_id


In [13]:
def predict_proportions(model, test_loader):
    model.eval()
    all_preds = []
    all_ids = []

    with torch.no_grad():
        for img, sample_id in tqdm(test_loader, desc="Predicting on test set"):
            img = img.to(DEVICE)
            log_probs = model(img)  # log_softmax en sortie
            probs = torch.exp(log_probs).cpu().numpy().squeeze()  # on repasse en softmax
            all_preds.append(probs)
            all_ids.append(sample_id[0])

    return all_ids, np.array(all_preds)

In [None]:
# === Charger CSV test ===
test_df = pd.read_csv(TEST_CSV_PATH)
test_image_paths = [os.path.join(TEST_IMG_DIR, f"{s}.tif") for s in test_df['sample_id']]
test_ds = TestDataset(test_image_paths, transform=train_transform)
test_loader = DataLoader(test_ds, batch_size=1, shuffle=False)


# Load model
model1 = SimpleCNN(NUM_CLASSES)
model1.load_state_dict(torch.load('models/model_proportion_complexified.pth'))
# === Prédire ===
sample_ids, preds = predict_proportions(model1, test_loader)


Predicting on test set:   0%|          | 0/5043 [00:00<?, ?it/s]

  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)


In [None]:
# Normalisation en cas de légères erreurs de somme ≠ 1
preds = preds / preds.sum(axis=1, keepdims=True)

# DataFrame + export
df_out = pd.DataFrame(preds, columns=selected_classes)
df_out.insert(0, 'sample_id', sample_ids)
df_out.to_csv("submission_regression_complexified.csv", index=False)

print("✅ Fichier 'submission_regression_complexified.csv' généré.")


✅ Fichier 'submission_regression_complexified.csv' généré.


**Modèle 2: Classification des pixeks (on prédit les masks)**

In [None]:
# ================== DATASET 2 - Segmentation ==================
class SegmentationDataset(Dataset):
    def __init__(self, image_paths, mask_paths, transform=None):
        self.image_paths = image_paths
        self.mask_paths = mask_paths
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img = load_tif(self.image_paths[idx])
        mask = load_tif(self.mask_paths[idx])[:, :, 0]  # original mask

        if self.transform:
            augmented = self.transform(image=img, mask=mask)
            img = augmented['image']
            mask = augmented['mask']
        return img, mask.long()

# ================== MODEL 2 - Segmentation ==================
class SimpleSegNet(nn.Module):
    def __init__(self, in_channels, num_classes):
        super().__init__()
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=3, padding=1), nn.BatchNorm2d(32), nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3, padding=1), nn.BatchNorm2d(32), nn.ReLU(),
            nn.MaxPool2d(2),  # 128x128

            nn.Conv2d(32, 64, kernel_size=3, padding=1), nn.BatchNorm2d(64), nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1), nn.BatchNorm2d(64), nn.ReLU(),
            nn.MaxPool2d(2),  # 64x64

            nn.Conv2d(64, 128, kernel_size=3, padding=1), nn.BatchNorm2d(128), nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, padding=1), nn.BatchNorm2d(128), nn.ReLU(),
            nn.MaxPool2d(2),  # 32x32
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2),  # 64x64
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1), nn.BatchNorm2d(64), nn.ReLU(),

            nn.ConvTranspose2d(64, 32, kernel_size=2, stride=2),  # 128x128
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3, padding=1), nn.BatchNorm2d(32), nn.ReLU(),

            nn.ConvTranspose2d(32, num_classes, kernel_size=2, stride=2),  # 256x256
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x


In [None]:
train_mask_paths = [os.path.join(TRAIN_MASK_DIR, f"{Path(p).stem}.tif") for p in train_imgs]
val_mask_paths = [os.path.join(TRAIN_MASK_DIR, f"{Path(p).stem}.tif") for p in val_imgs]

# Chargement des datasets pour la segmentation
train_ds2 = SegmentationDataset(train_imgs, train_mask_paths, transform=train_transform)
train_loader2 = DataLoader(train_ds2, batch_size=BATCH_SIZE, shuffle=True)

val_ds2 = SegmentationDataset(val_imgs, val_mask_paths, transform=train_transform)
val_loader2 = DataLoader(val_ds2, batch_size=1, shuffle=False)

model2 = SimpleSegNet(in_channels=4, num_classes=NUM_CLASSES)
model2.to(DEVICE)
optimizer2 = torch.optim.Adam(model2.parameters(), lr=1e-3)
criterion2 = nn.CrossEntropyLoss()

### MODIF DEBUT - Validation loss pour segmentation ###
def validate_segmentation(model, val_loader, criterion):
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for imgs, masks in val_loader:
            imgs, masks = imgs.to(DEVICE), masks.to(DEVICE)
            out = model(imgs)
            loss = criterion(out, masks)
            val_loss += loss.item()
    return val_loss / len(val_loader)
### MODIF FIN ###


print("Training Model 2 (Segmentation)...")
### MODIF DEBUT - Ajout val_loss pour modèle 2 ###
for epoch in range(4):
    model2.train()
    total_loss = 0
    loop = tqdm(train_loader2, desc=f"Epoch {epoch+1}/4")
    for img, mask in loop:
        img, mask = img.to(DEVICE), mask.to(DEVICE)
        optimizer2.zero_grad()
        out = model2(img)
        loss = criterion2(out, mask)
        loss.backward()
        optimizer2.step()
        total_loss += loss.item()
        loop.set_postfix(loss=loss.item())

    val_loss = validate_segmentation(model2, val_loader2, criterion2)
    print(f"Epoch {epoch+1}: Train Loss = {total_loss / len(train_loader2):.4f} | Val Loss = {val_loss:.4f}")
### MODIF FIN ###


Training Model 2 (Segmentation)...


Epoch 1/4:   0%|          | 0/1849 [00:00<?, ?it/s]

  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)


Epoch 1: Train Loss = 0.7601 | Val Loss = 0.5628


Epoch 2/4:   0%|          | 0/1849 [00:00<?, ?it/s]

Epoch 2: Train Loss = 0.5843 | Val Loss = 0.5154


Epoch 3/4:   0%|          | 0/1849 [00:00<?, ?it/s]

Epoch 3: Train Loss = 0.5428 | Val Loss = 0.5135


Epoch 4/4:   0%|          | 0/1849 [00:00<?, ?it/s]

Epoch 4: Train Loss = 0.5222 | Val Loss = 0.5115


In [None]:
# save model
torch.save(model2.state_dict(), 'models\model2_segmentation_final.pth')

  torch.save(model2.state_dict(), 'models\model2_segmentation_final.pth')


In [None]:
# ================== POST-PROCESSING ==================
def mask_to_proportions(mask_pred, num_classes):
    flat = mask_pred.flatten()
    props = [(flat == i).sum() / len(flat) for i in range(num_classes)]
    return props

# ================== METRIC - KL Divergence on Val ==================
def kl_divergence(y_true, y_pred, eps=1e-8):
    y_true = np.clip(y_true, eps, 1)
    y_pred = np.clip(y_pred, eps, 1)
    return np.sum(y_true * np.log(y_true / y_pred))

model2.eval()
kl_scores = []
with torch.no_grad():
    for img, mask in tqdm(val_loader2, desc="Evaluating KL"):
        img = img.to(DEVICE)
        pred = model2(img)
        pred_mask = torch.argmax(pred.squeeze(0), dim=0).cpu().numpy()
        true_mask = mask.squeeze(0).numpy()

        pred_prop = mask_to_proportions(pred_mask, NUM_CLASSES)
        true_prop = mask_to_proportions(true_mask, NUM_CLASSES)

        kl = kl_divergence(np.array(true_prop), np.array(pred_prop))
        kl_scores.append(kl)

avg_kl_seg = np.mean(kl_scores)
print(f"\n🔍 KL Divergence on segmentation val set: {avg_kl_seg:.6f}")

Evaluating KL:   0%|          | 0/3699 [00:00<?, ?it/s]


🔍 KL Divergence on segmentation val set: 0.157424


In [None]:
### PREDICT ON TEST ###
class TestDataset(Dataset):
    def __init__(self, image_paths, transform=None):
        self.image_paths = image_paths
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img = load_tif(self.image_paths[idx])
        sample_id = os.path.splitext(os.path.basename(self.image_paths[idx]))[0]
        if self.transform:
            augmented = self.transform(image=img)
            img = augmented['image']
        return img, sample_id


def predict_on_test(model, test_loader, num_classes):
    model.eval()
    all_preds = []
    all_ids = []

    with torch.no_grad():
        for img, sample_id in tqdm(test_loader, desc="Predicting on test set"):
            img = img.to(DEVICE)
            out = model(img)
            soft = torch.softmax(out, dim=1).squeeze(0).cpu().numpy()  # [C, H, W]
            proportions = soft.reshape(num_classes, -1).mean(axis=1)
            all_preds.append(proportions)
            all_ids.append(sample_id[0])

    return all_ids, all_preds

In [None]:
# === Préparer test set ===
test_df = pd.read_csv(TEST_CSV_PATH)  # fichier contenant les sample_id
test_image_paths = [os.path.join(TEST_IMG_DIR, f"{str(s)}.tif") for s in test_df['sample_id']]
test_ds = TestDataset(test_image_paths, transform=train_transform)
test_loader = DataLoader(test_ds, batch_size=1, shuffle=False)

# === Charger le modèle entraîné ===
model2.load_state_dict(torch.load("models\model2_segmentation_final.pth"))  # facultatif si sauvegardé
model2.eval()

# === Prédire ===
sample_ids, preds = predict_on_test(model2, test_loader, num_classes=NUM_CLASSES)


  model2.load_state_dict(torch.load("models\model2_segmentation_final.pth"))  # facultatif si sauvegardé


Predicting on test set:   0%|          | 0/5043 [00:00<?, ?it/s]

  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)


In [None]:
# Re-normalisation des prédictions sans no_data/clouds si jamais inclus
preds = np.array(preds)
preds = preds / preds.sum(axis=1, keepdims=True)  # Juste au cas où

# Sauvegarde
df_sub = pd.DataFrame(preds, columns=selected_classes)
df_sub.insert(0, 'sample_id', sample_ids)
df_sub.to_csv("submission_segmentation.csv", index=False)
print("✅ Sauvegarde dans submission_segmentation.csv terminée.")


✅ Sauvegarde dans submission_segmentation.csv terminée.
