Ce notebook a été entièrement réalisé par Mélnaie Gomis.

# ViT

In [1]:
import numpy as np
import glob
import os
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim


os.chdir("../Données")

# Chargement des données

In [2]:
fichiers_npz = glob.glob('*.npz')
cosmos_files = [f for f in fichiers_npz if "COSMOS" in f]

# Listes séparées pour train et test
train_cubes_list = []
train_infos_list = []
train_flag_list = []

test_cubes_list = []
test_infos_list = []
test_flag_list = []

print(f"Chargement de {len(cosmos_files)} fichiers COSMOS.")

for fichier in cosmos_files:
    try:
        data = np.load(fichier, allow_pickle=True)
        
        # 'info' est un array structuré. 'dtype.names' donne les noms des colonnes.
        info_fields = data['info'].dtype.names
        
        # On vérifie si la *colonne* 'ZSPEC' existe dans ce fichier
        if 'ZSPEC' in info_fields:
            # Ce fichier contient des données de TEST (il a la colonne ZSPEC)
            test_cubes_list.append(data['cube'])
            test_infos_list.append(data['info'])
            test_flag_list.append(data['flag'])
        else:
            # Ce fichier contient des données de TRAIN (pas de colonne ZSPEC)
            train_cubes_list.append(data['cube'])
            train_infos_list.append(data['info'])
            train_flag_list.append(data['flag'])
            
            
    except Exception as e:
        print(f":x: Erreur en chargeant {fichier}: {e}")

print("Chargement terminé.")

Chargement de 4 fichiers COSMOS.
Chargement terminé.


In [3]:
# Création des jeux train/test

# Jeu d’entraînement (basé sur les fichiers SANS ZSPEC)
# On vérifie qu'on a bien trouvé des fichiers de train
if train_infos_list:
    X_train = np.concatenate(train_cubes_list, axis=0)
    infos_train = np.concatenate(train_infos_list, axis=0)
    flags_train = np.concatenate(train_flag_list, axis=0)
    y_train = infos_train['ZPHOT'] # Obtenir ZPHOT depuis les infos de train
    print(f"Objets d'entraînement (ZPHOT) : {len(y_train)}")
    print("X_train :", X_train.shape)
    print("y_train :", y_train.shape)
else:
    print("Aucun fichier d'entraînement (sans ZSPEC) trouvé.")

print("\n---\n")

# Jeu de test (basé sur les fichiers AVEC ZSPEC)
# On vérifie qu'on a bien trouvé des fichiers de test
if test_infos_list:
    X_test = np.concatenate(test_cubes_list, axis=0)
    infos_test = np.concatenate(test_infos_list, axis=0)
    flags_test = np.concatenate(test_flag_list, axis=0)
    y_test = infos_test['ZSPEC'] # Obtenir ZSPEC depuis les infos de test

    # Si on ne garde que les ZSPEC valides
    # mask_zspec_valid = ~np.isnan(y_test) ou ???
    # X_test = X_test[mask_zspec_valid]
    # y_test = y_test[mask_zspec_valid]

    print(f"Objets de test (ZSPEC) : {len(y_test)}")
    print("X_test  :", X_test.shape)
    print("y_test  :", y_test.shape)
else:
    print("Aucun fichier de test (avec ZSPEC) trouvé.")

Objets d'entraînement (ZPHOT) : 12497
X_train : (12497, 64, 64, 9)
y_train : (12497,)

---

Objets de test (ZSPEC) : 27
X_test  : (27, 64, 64, 9)
y_test  : (27,)


# Encodeur d’images (Image Encoder)

On veut des patchs précis mais pas trop lourd. 

- Patch embedding 
    - On utilisera des patchs de 16x16 feront l'affaire.
    - La projection se fera dans un embedding dimension de 512.
- Transformers 
    - On prendre 7 couches et 8 têtes (car 512 / 64 = 8).



In [4]:
X_train_tensor = torch.tensor(np.ascontiguousarray(X_train), dtype=torch.float32)
y_train_tensor = torch.tensor(np.ascontiguousarray(y_train), dtype=torch.float32).unsqueeze(1)

X_test_tensor = torch.tensor(np.ascontiguousarray(X_test), dtype=torch.float32)
y_test_tensor = torch.tensor(np.ascontiguousarray(y_test), dtype=torch.float32).unsqueeze(1)


In [5]:
batch_size = 32

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


In [6]:
# --- Patch Embedding ---
class PatchEmbedding(nn.Module):
    def __init__(self, img_size=64, patch_size=16, in_channels=9, embed_dim=512):
        super().__init__()
        self.patch_size = patch_size
        self.n_patches = (img_size // patch_size) ** 2
        self.proj = nn.Linear(patch_size * patch_size * in_channels, embed_dim)
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))
        self.pos_embed = nn.Parameter(torch.randn(1, self.n_patches + 1, embed_dim))

    def forward(self, x):
        B, H, W, C = x.shape
        patches = x.unfold(1, self.patch_size, self.patch_size)\
                   .unfold(2, self.patch_size, self.patch_size)
        patches = patches.contiguous().view(B, -1, self.patch_size*self.patch_size*C)
        patches = self.proj(patches)
        cls_tokens = self.cls_token.expand(B, -1, -1)
        x = torch.cat((cls_tokens, patches), dim=1)
        x = x + self.pos_embed
        return x

# --- Transformer Block ---
class TransformerBlock(nn.Module):
    def __init__(self, embed_dim=512, num_heads=8, mlp_dim=2048, dropout=0.1):
        super().__init__()
        self.ln1 = nn.LayerNorm(embed_dim)
        self.attn = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout)
        self.ln2 = nn.LayerNorm(embed_dim)
        self.mlp = nn.Sequential(
            nn.Linear(embed_dim, mlp_dim),
            nn.GELU(),
            nn.Linear(mlp_dim, embed_dim)
        )

    def forward(self, x):
        x_ = self.ln1(x)
        x_attn, _ = self.attn(x_, x_, x_)
        x = x + x_attn
        x = x + self.mlp(self.ln2(x))
        return x

# --- ViT Image Encoder ---
class ViTImageEncoder(nn.Module):
    def __init__(self, img_size=64, patch_size=16, in_channels=9, embed_dim=512,
                 depth=7, num_heads=8, mlp_dim=2048):
        super().__init__()
        self.patch_embed = PatchEmbedding(img_size, patch_size, in_channels, embed_dim)
        self.transformer_blocks = nn.ModuleList([
            TransformerBlock(embed_dim, num_heads, mlp_dim) for _ in range(depth)
        ])
        self.ln = nn.LayerNorm(embed_dim)

    def forward(self, x):
        x = self.patch_embed(x)
        x = x.permute(1, 0, 2)  # seq_len, batch, embed_dim
        for blk in self.transformer_blocks:
            x = blk(x)
        x = x.permute(1, 0, 2)  # batch, seq_len, embed_dim
        x = self.ln(x)
        cls_embedding = x[:, 0, :]
        return cls_embedding


In [7]:
class ParamMLP(nn.Module):
    def __init__(self, input_dim=1, embed_dim=512):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, embed_dim),
            nn.ReLU(),
            nn.Linear(embed_dim, embed_dim)
        )
        
    def forward(self, x):
        return self.fc(x)


In [8]:
class ViTRegressor(nn.Module):
    def __init__(self, img_size=64, patch_size=16, in_channels=9,
                 embed_dim=512, depth=7, num_heads=8, mlp_dim=2048):
        super().__init__()
        from torch.nn import TransformerEncoder, TransformerEncoderLayer

        # ViT encodeur
        self.vit = ViTImageEncoder(img_size, patch_size, in_channels,
                                   embed_dim, depth, num_heads, mlp_dim)
        
        # couche finale pour prédire ZPHOT
        self.regressor = nn.Linear(embed_dim, 1)
        
    def forward(self, x):
        embedding = self.vit(x)  # B, 512
        zphot_pred = self.regressor(embedding)  # B,1
        return zphot_pred


In [9]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = ViTRegressor().to(device)

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)


In [10]:
num_epochs = 10  # tu peux augmenter si nécessaire

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        pred = model(xb)
        loss = criterion(pred, yb)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * xb.size(0)
    epoch_loss = running_loss / len(train_loader.dataset)
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_loss:.4f}")
    
    # évaluation rapide sur test
    model.eval()
    with torch.no_grad():
        test_loss = 0.0
        for xb, yb in test_loader:
            xb, yb = xb.to(device), yb.to(device)
            pred = model(xb)
            test_loss += criterion(pred, yb).item() * xb.size(0)
        test_loss /= len(test_loader.dataset)
    print(f"Test Loss: {test_loss:.4f}\n")


Epoch 1/10, Train Loss: 0.7680
Test Loss: 2.6748

Epoch 2/10, Train Loss: 0.6124
Test Loss: 2.8939

Epoch 3/10, Train Loss: 0.5755
Test Loss: 2.4913

Epoch 4/10, Train Loss: 0.5396
Test Loss: 2.3085

Epoch 5/10, Train Loss: 0.5198
Test Loss: 2.3977

Epoch 6/10, Train Loss: 0.4828
Test Loss: 2.5987

Epoch 7/10, Train Loss: 0.4698
Test Loss: 2.3666

Epoch 8/10, Train Loss: 0.4351
Test Loss: 2.7990

Epoch 9/10, Train Loss: 0.4120
Test Loss: 2.7725

Epoch 10/10, Train Loss: 0.3917
Test Loss: 2.9114

