In [None]:
# Clone Spyne - Car Segmentation Training

Entrainement du modele U-Net sur le dataset Carvana.

IMPORTANT : Active le GPU avant de commencer !
Menu > Runtime > Change runtime type > A100 GPU

In [None]:
# 1. Verifier le GPU
!nvidia-smi

import torch
print(f"\nPyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

In [None]:
# 2. Installation des dependances
!pip install -q segmentation-models-pytorch albumentations kaggle

In [None]:
# 3. Configuration Kaggle API
import os
import json

# Credentials Kaggle
kaggle_creds = {"username": "rubengareginyan", "key": "7e3b61a9ded4da719ee5f8fc902bd254"}

# Creer le fichier de config
os.makedirs('/root/.config/kaggle', exist_ok=True)
with open('/root/.config/kaggle/kaggle.json', 'w') as f:
    json.dump(kaggle_creds, f)
os.chmod('/root/.config/kaggle/kaggle.json', 0o600)

print("Kaggle API configuree!")
print(f"Username: {kaggle_creds['username']}")

In [None]:
# 4. Telecharger le dataset Carvana (environ 15 Go)
# IMPORTANT: Accepte d'abord les regles sur https://www.kaggle.com/c/carvana-image-masking-challenge/rules

!kaggle competitions download -c carvana-image-masking-challenge -f train.zip
!kaggle competitions download -c carvana-image-masking-challenge -f train_masks.zip

print("\nTelechargement termine !")

In [None]:
# 5. Extraire les fichiers
import zipfile
import os
import shutil

os.makedirs('data/train', exist_ok=True)
os.makedirs('data/train_masks', exist_ok=True)

print("Extraction des images...")
with zipfile.ZipFile('train.zip', 'r') as z:
    z.extractall('data/')

print("Extraction des masques...")
with zipfile.ZipFile('train_masks.zip', 'r') as z:
    z.extractall('data/')

# Verifier la structure
print("\nContenu de data/:")
for item in os.listdir('data/'):
    path = os.path.join('data/', item)
    if os.path.isdir(path):
        count = len(os.listdir(path))
        print(f"  {item}/ ({count} fichiers)")
    else:
        print(f"  {item}")

# Compter les images
train_path = 'data/train' if os.path.exists('data/train') else 'data/'
mask_path = 'data/train_masks' if os.path.exists('data/train_masks') else 'data/'

n_images = len([f for f in os.listdir(train_path) if f.endswith('.jpg')])
n_masks = len([f for f in os.listdir(mask_path) if f.endswith('.gif') or f.endswith('.png')])
print(f"\n{n_images} images, {n_masks} masques")

In [None]:
# 6. Dataset & DataLoader
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import albumentations as A
from albumentations.pytorch import ToTensorV2
import numpy as np
from PIL import Image
from pathlib import Path
import os

# Detecter les chemins corrects
if os.path.exists('data/train') and len(os.listdir('data/train')) > 0:
    IMAGES_DIR = 'data/train'
    MASKS_DIR = 'data/train_masks'
else:
    IMAGES_DIR = 'data'
    MASKS_DIR = 'data'

print(f"Images: {IMAGES_DIR}")
print(f"Masks: {MASKS_DIR}")

class CarvanaDataset(Dataset):
    def __init__(self, images_dir, masks_dir, transform=None):
        self.images_dir = Path(images_dir)
        self.masks_dir = Path(masks_dir)
        self.transform = transform
        
        # Trouver toutes les images jpg
        self.images = sorted([f for f in self.images_dir.glob('*.jpg')])
        print(f"{len(self.images)} images trouvees dans {images_dir}")
        
        if len(self.images) == 0:
            print("ERREUR: Aucune image trouvee!")
            print(f"Contenu du dossier: {list(self.images_dir.iterdir())[:10]}")
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img_path = self.images[idx]
        
        # Chercher le masque (peut etre .gif ou .png)
        mask_name = img_path.stem + '_mask'
        mask_path = self.masks_dir / (mask_name + '.gif')
        if not mask_path.exists():
            mask_path = self.masks_dir / (mask_name + '.png')
        
        image = np.array(Image.open(img_path).convert('RGB'))
        mask = np.array(Image.open(mask_path).convert('L'))
        
        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']
        
        return image, mask.unsqueeze(0).float() / 255.0

# Augmentations
train_transform = A.Compose([
    A.Resize(512, 512),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.3),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

val_transform = A.Compose([
    A.Resize(512, 512),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

# Creer dataset
full_dataset = CarvanaDataset(IMAGES_DIR, MASKS_DIR, transform=train_transform)

if len(full_dataset) == 0:
    raise ValueError("Dataset vide! Verifie que l'extraction a fonctionne.")

# Split train/val (90/10)
train_size = int(0.9 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(full_dataset, [train_size, val_size])

# DataLoaders
BATCH_SIZE = 16
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)

print(f"\nTrain: {len(train_dataset)} | Val: {len(val_dataset)}")

In [None]:
# 7. Modele U-Net avec ResNet34 pre-entraine
import segmentation_models_pytorch as smp

model = smp.Unet(
    encoder_name="resnet34",
    encoder_weights="imagenet",
    in_channels=3,
    classes=1,
)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Modele cree: {params:,} parametres")
print(f"Device: {device}")

In [None]:
# 8. Loss & Metrics
class DiceLoss(nn.Module):
    def __init__(self, smooth=1e-6):
        super().__init__()
        self.smooth = smooth
    
    def forward(self, pred, target):
        pred = torch.sigmoid(pred).view(-1)
        target = target.view(-1)
        intersection = (pred * target).sum()
        return 1 - (2. * intersection + self.smooth) / (pred.sum() + target.sum() + self.smooth)

class CombinedLoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.bce = nn.BCEWithLogitsLoss()
        self.dice = DiceLoss()
    
    def forward(self, pred, target):
        return self.bce(pred, target) + self.dice(pred, target)

def dice_score(pred, target, threshold=0.5):
    pred = (torch.sigmoid(pred) > threshold).float().view(-1)
    target = target.view(-1)
    intersection = (pred * target).sum()
    return (2. * intersection) / (pred.sum() + target.sum() + 1e-6)

print("Loss functions definies")

In [None]:
# 9. ENTRAINEMENT
from tqdm.notebook import tqdm
import torch.optim as optim

EPOCHS = 30
LR = 1e-4

criterion = CombinedLoss()
optimizer = optim.Adam(model.parameters(), lr=LR)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=3, factor=0.5)

best_dice = 0
history = {'train_loss': [], 'val_loss': [], 'val_dice': []}

print("Debut de l'entrainement")
print("=" * 50)

for epoch in range(EPOCHS):
    # TRAIN
    model.train()
    train_loss = 0
    
    pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS}")
    for images, masks in pbar:
        images = images.to(device)
        masks = masks.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        pbar.set_postfix({'loss': f'{loss.item():.4f}'})
    
    train_loss /= len(train_loader)
    
    # VALIDATION
    model.eval()
    val_loss = 0
    val_dice = 0
    
    with torch.no_grad():
        for images, masks in val_loader:
            images = images.to(device)
            masks = masks.to(device)
            outputs = model(images)
            val_loss += criterion(outputs, masks).item()
            val_dice += dice_score(outputs, masks).item()
    
    val_loss /= len(val_loader)
    val_dice /= len(val_loader)
    scheduler.step(val_dice)
    
    history['train_loss'].append(train_loss)
    history['val_loss'].append(val_loss)
    history['val_dice'].append(val_dice)
    
    print(f"   Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | Val Dice: {val_dice:.4f}")
    
    if val_dice > best_dice:
        best_dice = val_dice
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'dice_score': val_dice,
        }, 'car_segmentation_best.pth')
        print(f"   Nouveau meilleur modele sauvegarde! (Dice: {val_dice:.4f})")

print("\n" + "=" * 50)
print(f"Entrainement termine! Meilleur Dice: {best_dice:.4f}")

In [None]:
# 10. Courbes d'apprentissage
import matplotlib.pyplot as plt

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

ax1.plot(history['train_loss'], label='Train Loss', color='#10b981')
ax1.plot(history['val_loss'], label='Val Loss', color='#ef4444')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.set_title('Loss par Epoch')
ax1.legend()
ax1.grid(True, alpha=0.3)

ax2.plot(history['val_dice'], label='Val Dice', color='#3b82f6')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Dice Score')
ax2.set_title('Dice Score par Epoch')
ax2.legend()
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()


In [None]:
# 11. Test sur quelques images
checkpoint = torch.load('car_segmentation_best.pth')
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()

test_images = sorted(os.listdir('data/train'))[:6]
fig, axes = plt.subplots(3, 6, figsize=(18, 9))

for i, img_name in enumerate(test_images):
    img = Image.open(f'data/train/{img_name}').convert('RGB')
    img_np = np.array(img)
    
    augmented = val_transform(image=img_np)
    input_tensor = augmented['image'].unsqueeze(0).to(device)
    
    with torch.no_grad():
        output = model(input_tensor)
        pred_mask = torch.sigmoid(output).squeeze().cpu().numpy()
    
    mask_name = img_name.replace('.jpg', '_mask.gif')
    real_mask = np.array(Image.open(f'data/train_masks/{mask_name}').convert('L').resize((512, 512)))
    
    axes[0, i].imshow(img.resize((512, 512)))
    axes[0, i].set_title('Image')
    axes[0, i].axis('off')
    
    axes[1, i].imshow(pred_mask, cmap='gray')
    axes[1, i].set_title('Prediction')
    axes[1, i].axis('off')
    
    axes[2, i].imshow(real_mask, cmap='gray')
    axes[2, i].set_title('Ground Truth')
    axes[2, i].axis('off')

plt.tight_layout()
plt.show()


In [None]:
# 12. Exporter et telecharger le modele
torch.save({
    'model_state_dict': model.state_dict(),
    'dice_score': best_dice,
    'encoder': 'resnet34',
    'image_size': 512,
}, 'car_segmentation_final.pth')

print(f"Modele sauvegarde: car_segmentation_final.pth")
print(f"Dice Score: {best_dice:.4f}")

# Telecharger
from google.colab import files
files.download('car_segmentation_final.pth')

print("\nModele telecharge!")
print("\nPour l'utiliser dans Clone Spyne:")
print("1. Place le fichier dans apps/api/python/")
print("2. python segment_with_trained.py car_segmentation_final.pth image.jpg")
