In [1]:
import os
# On s'assure que le dossier configs existe
os.makedirs("configs", exist_ok=True)

In [2]:
%%writefile configs/diffractive.yaml
experiment_name: "Gaussian_Diffraction_Linear"
device: "cuda"

physics:
  wavelength: 0.0008
  k: 7853.98
  r_max: 20.0
  z_max: 2000.0
  bounds:
    A: [0.1, 1.0]
    w0: [0.5, 2.0]
    f: [500.0, 10000.0]

model:
  branch_dim: 3
  trunk_dim: 2
  latent_dim: 128
  fourier_scales: [1.0, 2.0, 5.0, 10.0, 20.0, 40.0, 80.0]
  epsilon_curriculum: 0.01

training:
  batch_size_ic: 1024
  batch_size_pde: 512
  
  weights:
    ic_loss: 10.0
    pde_loss_divisor: 10000.0
    
  ic_phase:
    iterations: 5000
    learning_rate: 1.0e-3
    threshold: 0.01

  zones:
    zone_1:
      limit: 300.0
      step_size: 20.0
      iterations: 3000
      max_retries: 2
      use_lbfgs: False
      target_error: 0.03

    zone_2:
      limit: 1500.0
      step_size: 5.0
      iterations: 5000
      max_retries: 3
      use_lbfgs: True
      target_error: 0.03

    zone_3:
      step_size: 10.0
      iterations: 4000
      max_retries: 3
      use_lbfgs: False
      target_error: 0.03

Writing configs/diffractive.yaml


In [3]:
%%writefile src/models/Diffractive_PI_DeepOnet.py

import torch
import torch.nn as nn
import numpy as np

class MultiScaleFourierFeatureEncoding(nn.Module):
    def __init__(self, input_dim, num_features, scales):
        super().__init__()
        self.num_features = num_features
        self.scales = scales
        self.out_dim = 2 * num_features * len(scales)
        
        self.B_list = nn.ParameterList([
            nn.Parameter(torch.randn(input_dim, num_features) * scale, requires_grad=False)
            for scale in scales
        ])

    def forward(self, x):
        features = []
        for B in self.B_list:
            proj = (2.0 * np.pi * x) @ B
            features.append(torch.cos(proj))
            features.append(torch.sin(proj))
        return torch.cat(features, dim=-1)

class PI_DeepONet_Robust(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        # Extraction depuis la config
        self.k_phys = cfg.physics['k']
        b = cfg.physics['bounds']
        scales = cfg.model['fourier_scales']
        latent_dim = cfg.model['latent_dim']
        
        # Buffers Normalisation
        self.register_buffer('A_min', torch.tensor(b['A'][0]))
        self.register_buffer('A_max', torch.tensor(b['A'][1]))
        self.register_buffer('w0_min', torch.tensor(b['w0'][0]))
        self.register_buffer('w0_max', torch.tensor(b['w0'][1]))
        self.register_buffer('f_min', torch.tensor(b['f'][0]))
        self.register_buffer('f_max', torch.tensor(b['f'][1]))
        self.register_buffer('r_max', torch.tensor(cfg.physics['r_max']))
        self.register_buffer('z_max', torch.tensor(cfg.physics['z_max']))
        
        self.register_buffer('epsilon_curr', torch.tensor(cfg.model['epsilon_curriculum']))

        # Branch Net
        self.branch_net = nn.Sequential(
            nn.Linear(cfg.model['branch_dim'], 200), nn.SiLU(),
            nn.Linear(200, 200), nn.SiLU(),
            nn.Linear(200, 200), nn.SiLU(),
            nn.Linear(200, 200), nn.SiLU(),
            nn.Linear(200, 2 * latent_dim)
        )
        
        # Trunk Net
        self.trunk_encoding = MultiScaleFourierFeatureEncoding(cfg.model['trunk_dim'], 64, scales)
        self.trunk_net = nn.Sequential(
            nn.Linear(self.trunk_encoding.out_dim, 200), nn.SiLU(),
            nn.Linear(200, 200), nn.SiLU(),
            nn.Linear(200, 200), nn.SiLU(),
            nn.Linear(200, 200), nn.SiLU(),
            nn.Linear(200, latent_dim)
        )
        self.latent_dim = latent_dim

    def normalize_linear(self, x, x_min, x_max):
        return 2.0 * (x - x_min) / (x_max - x_min + 1e-9) - 1.0

    def normalize_log(self, x, x_min, x_max):
        return self.normalize_linear(torch.log10(torch.abs(x)+1e-9), 
                                     torch.log10(torch.abs(x_min)+1e-9), 
                                     torch.log10(torch.abs(x_max)+1e-9))

    def forward(self, params, coords):
        f_raw = params[:, 2:3]
        r_raw = coords[:, 0:1]
        z_raw = coords[:, 1:2]
        
        params_norm = torch.cat([
            self.normalize_linear(params[:,0:1], self.A_min, self.A_max),
            self.normalize_log(params[:,1:2], self.w0_min, self.w0_max),
            self.normalize_log(params[:,2:3], self.f_min, self.f_max)
        ], dim=1)
        
        coords_norm = torch.cat([
            self.normalize_linear(coords[:,0:1], 0.0, self.r_max),
            self.normalize_linear(coords[:,1:2], 0.0, self.z_max)
        ], dim=1)
        
        B = self.branch_net(params_norm)
        T = self.trunk_net(self.trunk_encoding(coords_norm))
        B_re, B_im = torch.split(B, self.latent_dim, dim=1)
        
        env_re = torch.sum(B_re * T, dim=1, keepdim=True)
        env_im = torch.sum(B_im * T, dim=1, keepdim=True)
        
        phase_lens = - (self.k_phys * r_raw**2) / (2.0 * f_raw)
        cos_p = torch.cos(phase_lens)
        sin_p = torch.sin(phase_lens)
        
        geom_scale = 1.0 / torch.sqrt( (1.0 - z_raw/f_raw)**2 + self.epsilon_curr )
        
        out_re = geom_scale * (env_re * cos_p - env_im * sin_p)
        out_im = geom_scale * (env_re * sin_p + env_im * cos_p)
        
        return out_re, out_im

    def set_epsilon(self, new_eps):
        self.epsilon_curr.fill_(new_eps)

Writing src/models/Diffractive_PI_DeepOnet.py


In [4]:
%%writefile src/data/generators.py
import torch
import numpy as np

def get_ic_batch_sobolev(n_samples, cfg, device):
    # cfg est un objet, on acc√®de aux dicts via attributs
    b = cfg.physics['bounds']
    
    A = torch.rand(n_samples, 1, device=device) * (b['A'][1] - b['A'][0]) + b['A'][0]
    w0_min, w0_max = b['w0']
    w0 = 10 ** (torch.rand(n_samples, 1, device=device) * np.log10(w0_max/w0_min) + np.log10(w0_min))
    f = torch.rand(n_samples, 1, device=device) * (b['f'][1] - b['f'][0]) + b['f'][0]
    branch = torch.cat([A, w0, f], dim=1)
    
    # Sniper sampling (80% in beam, 20% outside)
    n_in = int(0.8 * n_samples)
    n_out = n_samples - n_in
    r_in = torch.rand(n_in, 1, device=device) * 2.5 * w0[:n_in]
    r_out = torch.rand(n_out, 1, device=device) * cfg.physics['r_max']
    r = torch.cat([r_in, r_out], dim=0)
    z = torch.zeros_like(r)
    coords = torch.cat([r, z], dim=1).requires_grad_(True)
    
    # Target Values & Gradients
    k = cfg.physics['k']
    arg_gauss = -(r**2)/(w0**2)
    amp = torch.sqrt(A) * torch.exp(arg_gauss)
    phase = -(k * r**2)/(2*f)
    
    cos_p = torch.cos(phase); sin_p = torch.sin(phase)
    t_re = amp * cos_p; t_im = amp * sin_p
    
    # Sobolev Targets
    dA_dr = amp * (-2*r / w0**2)
    dP_dr = -(k * r) / f
    dt_re = dA_dr * cos_p - amp * sin_p * dP_dr
    dt_im = dA_dr * sin_p + amp * cos_p * dP_dr
    
    return branch, coords, t_re, t_im, dt_re, dt_im

def get_pde_batch_z_limited(n_samples, cfg, device, z_limit):
    b = cfg.physics['bounds']
    
    A = torch.rand(n_samples, 1, device=device) * (b['A'][1] - b['A'][0]) + b['A'][0]
    w0 = torch.rand(n_samples, 1, device=device) * (b['w0'][1] - b['w0'][0]) + b['w0'][0]
    f = torch.rand(n_samples, 1, device=device) * (b['f'][1] - b['f'][0]) + b['f'][0]
    branch = torch.cat([A, w0, f], dim=1)
    
    r = torch.abs(torch.randn(n_samples, 1, device=device)) * (cfg.physics['r_max'] / 2.0)
    r = torch.clamp(r, 0, cfg.physics['r_max'])
    z = torch.rand(n_samples, 1, device=device) * z_limit
    coords = torch.cat([r, z], dim=1)
    
    return branch, coords

Writing src/data/generators.py


In [5]:
%%writefile src/physics/diffractive.py
import torch

def pde_residual_corrected(model, branch, coords, cfg):
    coords.requires_grad_(True)
    E_re, E_im = model(branch, coords)
    
    # 1. Gradients Premiers
    grads_re = torch.autograd.grad(E_re, coords, torch.ones_like(E_re), create_graph=True)[0]
    grads_im = torch.autograd.grad(E_im, coords, torch.ones_like(E_im), create_graph=True)[0]
    
    u_z = grads_re[:, 1:2]
    v_z = grads_im[:, 1:2]
    u_r = grads_re[:, 0:1]
    v_r = grads_im[:, 0:1]
    
    # 2. Gradients Seconds (Laplacien Radial)
    u_rr = torch.autograd.grad(u_r, coords, torch.ones_like(u_r), create_graph=True)[0][:, 0:1]
    v_rr = torch.autograd.grad(v_r, coords, torch.ones_like(v_r), create_graph=True)[0][:, 0:1]
    
    r = coords[:, 0:1]
    # Ajout epsilon pour √©viter div/0 √† r=0
    lap_u = u_rr + (1.0/(r+1e-6)) * u_r
    lap_v = v_rr + (1.0/(r+1e-6)) * v_r
    
    # 3. Schr√∂dinger Paraxiale
    k_scale = 2.0 * cfg.physics['k']
    
    res_re = -k_scale * v_z + lap_u
    res_im =  k_scale * u_z + lap_v
    
    return res_re, res_im

Writing src/physics/diffractive.py


In [6]:
%%writefile scripts/train.py
import sys
import os
import argparse
import yaml
import torch
import torch.optim as optim

# Ajout du dossier racine au path pour les imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

# Import avec le nom exact de ton fichier
from src.models.Diffractive_PI_DeepOnet import PI_DeepONet_Robust
from src.data.generators import get_ic_batch_sobolev, get_pde_batch_z_limited
from src.physics.diffractive import pde_residual_corrected
from src.utils.metrics import evaluate_robust_metrics_smart

# --- Helper pour supporter la syntaxe cfg.physics['k'] ---
class Config:
    def __init__(self, dictionary):
        for k, v in dictionary.items():
            setattr(self, k, v)

def main(config_path):
    # 1. Chargement Config
    print(f"üìñ Chargement de {config_path}...")
    with open(config_path, 'r') as f:
        yaml_data = yaml.safe_load(f)
    
    # Transformation en objet compatible avec ton code (cfg.section['key'])
    cfg = Config(yaml_data)
    
    # Device
    device = torch.device(cfg.device if torch.cuda.is_available() else "cpu")
    print(f"üñ•Ô∏è Device : {device}")

    # 2. Mod√®le
    print("üèóÔ∏è Cr√©ation du mod√®le...")
    model = PI_DeepONet_Robust(cfg).to(device)
    
    # 3. Entra√Ænement IC
    print("\nüî• D√âMARRAGE IC (Condition Initiale)...")
    ic_iters = int(cfg.training['ic_phase']['iterations'])
    ic_lr = float(cfg.training['ic_phase']['learning_rate'])
    
    optimizer_ic = optim.Adam(model.parameters(), lr=ic_lr)
    scheduler_ic = optim.lr_scheduler.ReduceLROnPlateau(optimizer_ic, factor=0.5, patience=500, verbose=True)
    
    model.train()
    for it in range(ic_iters):
        optimizer_ic.zero_grad()
        branch, coords, t_re, t_im, dt_re, dt_im = get_ic_batch_sobolev(cfg.training['batch_size_ic'], cfg, device)
        
        p_re, p_im = model(branch, coords)
        loss_val = torch.mean((p_re - t_re)**2) + torch.mean((p_im - t_im)**2)
        
        # Sobolev (Gradients)
        grad_re = torch.autograd.grad(p_re, coords, torch.ones_like(p_re), create_graph=True)[0][:, 0:1]
        grad_im = torch.autograd.grad(p_im, coords, torch.ones_like(p_im), create_graph=True)[0][:, 0:1]
        loss_grad = torch.mean((grad_re - dt_re)**2) + torch.mean((grad_im - dt_im)**2)
        
        loss = loss_val + 0.1 * loss_grad
        loss.backward()
        optimizer_ic.step()
        scheduler_ic.step(loss)
        
        if it % 1000 == 0:
            print(f"IC It {it} | Loss: {loss.item():.2e}")

    # Audit IC
    ic_err, _ = evaluate_robust_metrics_smart(model, cfg, n_samples=1000, z_eval=0.0)
    print(f"‚úÖ IC Valid√©e avec erreur L2: {ic_err*100:.2f}%")

    # 4. Curriculum Loop (Les Zones)
    z_current = 0.0
    z_max = cfg.physics['z_max']
    
    print("\nüöÄ D√âMARRAGE CURRICULUM ZONES...")
    
    while z_current < z_max:
        # D√©termination de la zone active via la config
        if z_current < cfg.training['zones']['zone_1']['limit']:
            zone_cfg = cfg.training['zones']['zone_1']
            name = "ZONE 1 (Chauffe)"
        elif z_current < cfg.training['zones']['zone_2']['limit']:
            zone_cfg = cfg.training['zones']['zone_2']
            name = "ZONE 2 (Critique)"
        else:
            zone_cfg = cfg.training['zones']['zone_3']
            name = "ZONE 3 (Sortie)"
            
        z_next = min(z_current + zone_cfg['step_size'], z_max)
        print(f"\nüåç {name} : {z_current:.1f} -> {z_next:.1f} mm")
        
        # --- Appel de la logique train_step_zone ---
        success = False
        
        # Phase Adam
        optimizer = optim.Adam(model.parameters(), lr=5e-4)
        model.train()
        for it in range(int(zone_cfg['iterations'])):
            optimizer.zero_grad()
            # Batchs
            br_ic, co_ic, t_re, t_im, _, _ = get_ic_batch_sobolev(128, cfg, device)
            br_pde, co_pde = get_pde_batch_z_limited(cfg.training['batch_size_pde'], cfg, device, z_next)
            
            # Loss
            p_re, p_im = model(br_ic, co_ic)
            l_ic = torch.mean((p_re - t_re)**2) + torch.mean((p_im - t_im)**2)
            r_re, r_im = pde_residual_corrected(model, br_pde, co_pde, cfg)
            l_pde = torch.mean(r_re**2) + torch.mean(r_im**2)
            
            loss = cfg.training['weights']['ic_loss'] * l_ic + (l_pde / cfg.training['weights']['pde_loss_divisor'])
            loss.backward()
            optimizer.step()
        
        # Check
        err, _ = evaluate_robust_metrics_smart(model, cfg, n_samples=500, z_eval=z_next)
        if err < zone_cfg['target_error']:
            success = True
        else:
            # Retries logic...
            print(f"‚ö†Ô∏è Warning: Erreur {err*100:.2f}% > {zone_cfg['target_error']*100}%.")
            if zone_cfg['use_lbfgs']:
                print("   üöÄ L-BFGS activ√©...")
                lbfgs = optim.LBFGS(model.parameters(), lr=1.0, max_iter=20, line_search_fn="strong_wolfe")
                def closure():
                    lbfgs.zero_grad()
                    br_ic, co_ic, t_re, t_im, _, _ = get_ic_batch_sobolev(256, cfg, device)
                    br_pde, co_pde = get_pde_batch_z_limited(cfg.training['batch_size_pde'], cfg, device, z_next)
                    p_re, p_im = model(br_ic, co_ic); l_ic = torch.mean((p_re - t_re)**2) + torch.mean((p_im - t_im)**2)
                    r_re, r_im = pde_residual_corrected(model, br_pde, co_pde, cfg)
                    l_pde = torch.mean(r_re**2) + torch.mean(r_im**2)
                    ls = cfg.training['weights']['ic_loss'] * l_ic + (l_pde / cfg.training['weights']['pde_loss_divisor'])
                    ls.backward()
                    return ls
                lbfgs.step(closure)
                
            # Re-check final
            err, _ = evaluate_robust_metrics_smart(model, cfg, n_samples=500, z_eval=z_next)
            if err < 0.08: # Tol√©rance ultime
                success = True
            
        if success:
            print(f"‚úÖ √âtape valid√©e. ({err*100:.2f}%)")
            z_current = z_next
            # Sauvegarde Checkpoint interm√©diaire
            os.makedirs("outputs/checkpoints", exist_ok=True)
            if int(z_current) % 100 == 0:
                torch.save(model.state_dict(), f"outputs/checkpoints/ckpt_z{int(z_current)}.pth")
        else:
            print("‚ùå √âchec critique. Arr√™t.")
            break

    print("üíæ Sauvegarde finale...")
    torch.save(model.state_dict(), "outputs/diffractive_final.pth")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--config", type=str, default="configs/diffractive.yaml")
    args = parser.parse_args()
    main(args.config)

Writing scripts/train.py


In [7]:
%%writefile src/utils/solver.py
import numpy as np
from scipy import sparse
from scipy.sparse.linalg import splu
from tqdm import tqdm

class CrankNicolsonSolver:
    def __init__(self, cfg, nr=1000, nz=1000):
        """
        Solveur Num√©rique Classique pour dE/dz = (i/2k) * Laplacian(E)
        """
        # Adaptation pour lire la config YAML structur√©e
        self.k = cfg.physics['k']
        self.r_max = cfg.physics['r_max']
        self.z_max = cfg.physics['z_max']
        
        self.Nr = nr
        self.Nz = nz
        
        # 1. Maillage
        self.r = np.linspace(0, self.r_max, nr)
        self.dr = self.r[1] - self.r[0]
        
        self.z = np.linspace(0, self.z_max, nz)
        self.dz = self.z[1] - self.z[0]
        
        # 2. Pr√©paration des Matrices
        self._setup_matrices()
        
    def _setup_matrices(self):
        # Facteur alpha pour Crank-Nicolson : alpha = i * dz / (4 * k * dr^2)
        alpha = (1j * self.dz) / (4.0 * self.k * self.dr**2)
        
        # --- Diagonales ---
        j = np.arange(1, self.Nr - 1)
        lower_diag = alpha * (1.0 - 0.5 / j)
        main_diag  = alpha * (-2.0) * np.ones(self.Nr)
        upper_diag = alpha * (1.0 + 0.5 / j)
        
        # --- Assemblage M ---
        data = np.zeros((3, self.Nr), dtype=complex)
        data[2, 1:-1] = upper_diag 
        data[1, 1:-1] = main_diag[1:-1]
        data[0, 2:]   = lower_diag
        
        # Conditions aux Limites (BC)
        # r=0 (Neumann)
        data[1, 0] = -4.0 * alpha
        data[2, 1] =  4.0 * alpha
        # r=r_max (Dirichlet)
        data[1, -1] = 0.0 
        
        M = sparse.spdiags(data, [-1, 0, 1], self.Nr, self.Nr)
        
        I = sparse.eye(self.Nr, format='csc')
        self.Mat_Left  = I - M
        self.Mat_Right = I + M
        
        # Factorisation LU
        self.LU = splu(self.Mat_Left.tocsc())

    def solve(self, A_val, w0_val, f_val):
        """
        Retourne z, r, E_field
        """
        E = np.zeros((self.Nz, self.Nr), dtype=complex)
        
        # Condition Initiale
        arg_gauss = -(self.r**2) / (w0_val**2)
        phase = -(self.k * self.r**2) / (2.0 * f_val)
        E[0, :] = np.sqrt(A_val) * np.exp(arg_gauss) * np.exp(1j * phase)
        
        u = E[0, :].copy()
        
        # Propagation (sans print tqdm si on veut √™tre silencieux sur HPC, sinon laisser)
        for n in range(self.Nz - 1):
            b = self.Mat_Right.dot(u)
            u_new = self.LU.solve(b)
            E[n+1, :] = u_new
            u = u_new
            
        return self.z, self.r, E

Writing src/utils/solver.py


In [8]:
%%writefile scripts/compare.py
import sys
import os
import yaml
import torch
import numpy as np
import matplotlib.pyplot as plt
from types import SimpleNamespace

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from src.models.Diffractive_PI_DeepOnet import PI_DeepONet_Robust
from src.utils.solver import CrankNicolsonSolver

# Helper Config
class Config:
    def __init__(self, dictionary):
        for k, v in dictionary.items():
            setattr(self, k, v)

def main():
    # 1. Config & Mod√®le
    config_path = "configs/diffractive.yaml"
    with open(config_path, 'r') as f:
        cfg = Config(yaml.safe_load(f))
        
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = PI_DeepONet_Robust(cfg).to(device)
    
    # Chargement des poids
    ckpt_path = "outputs/diffractive_final.pth"
    if os.path.exists(ckpt_path):
        print(f"Chargement de {ckpt_path}...")
        model.load_state_dict(torch.load(ckpt_path, map_location=device))
    else:
        print("‚ö†Ô∏è Pas de checkpoint trouv√©. On utilise un mod√®le non entra√Æn√© (juste pour tester le script).")

    model.eval()

    # 2. Param√®tres du Cas Test
    A_val = 1.0
    w0_val = 1.0
    f_val = 1000.0
    print(f"üß™ Test Case : A={A_val}, w0={w0_val}, f={f_val}")

    # 3. Solveur Crank-Nicolson (V√©rit√© Terrain)
    print("üßÆ Ex√©cution Crank-Nicolson...")
    solver = CrankNicolsonSolver(cfg, nr=500, nz=200) # R√©solution moyenne pour aller vite
    z_cn, r_cn, E_cn = solver.solve(A_val, w0_val, f_val)
    E_cn_mod = np.abs(E_cn)

    # 4. Pr√©diction PINN (Sur la m√™me grille)
    print("üß† Ex√©cution PINN...")
    # On doit cr√©er une grille (r, z) correspondant √† celle du CN
    R_grid, Z_grid = np.meshgrid(r_cn, z_cn)
    
    r_flat = torch.tensor(R_grid.flatten(), dtype=torch.float32).view(-1, 1).to(device)
    z_flat = torch.tensor(Z_grid.flatten(), dtype=torch.float32).view(-1, 1).to(device)
    coords = torch.cat([r_flat, z_flat], dim=1)
    
    # Branch input constant
    n_pts = len(r_flat)
    branch = torch.tensor([[A_val, w0_val, f_val]], device=device).repeat(n_pts, 1)
    
    with torch.no_grad():
        p_re, p_im = model(branch, coords)
        p_mod = torch.sqrt(p_re**2 + p_im**2).cpu().numpy().reshape(Z_grid.shape)

    # 5. Plot Comparatif
    print("üé® G√©n√©ration du Plot...")
    os.makedirs("outputs/plots", exist_ok=True)
    
    plt.figure(figsize=(12, 5))
    
    # Plot CN
    plt.subplot(1, 3, 1)
    plt.imshow(E_cn_mod, extent=[0, cfg.physics['r_max'], cfg.physics['z_max'], 0], aspect='auto', cmap='inferno')
    plt.title("Crank-Nicolson (Truth)")
    plt.xlabel("r (mm)"); plt.ylabel("z (mm)")
    plt.colorbar()
    
    # Plot PINN
    plt.subplot(1, 3, 2)
    plt.imshow(p_mod, extent=[0, cfg.physics['r_max'], cfg.physics['z_max'], 0], aspect='auto', cmap='inferno')
    plt.title("PINN Prediction")
    plt.xlabel("r (mm)")
    plt.colorbar()
    
    # Plot Erreur
    err = np.abs(E_cn_mod - p_mod)
    plt.subplot(1, 3, 3)
    plt.imshow(err, extent=[0, cfg.physics['r_max'], cfg.physics['z_max'], 0], aspect='auto', cmap='seismic')
    plt.title("Diff√©rence Absolue")
    plt.xlabel("r (mm)")
    plt.colorbar()
    
    plt.tight_layout()
    plt.savefig("outputs/plots/comparison_cn_pinn.png")
    print("‚úÖ Sauvegard√© dans outputs/plots/comparison_cn_pinn.png")

if __name__ == "__main__":
    main()

Writing scripts/compare.py


In [9]:
%%writefile scripts/test_local.py
import sys
import os
import torch
import yaml
import time
from types import SimpleNamespace

# 1. AJOUT DU PATH RACINE
# Permet de trouver le dossier 'src' m√™me si on lance le script depuis n'importe o√π
current_dir = os.path.dirname(os.path.abspath(__file__))
root_dir = os.path.dirname(current_dir)
sys.path.append(root_dir)

print(f"üìÇ Racine du projet d√©tect√©e : {root_dir}")

# 2. IMPORTS DU PROJET
print("üîÑ Test des imports...")
try:
    from src.models.Diffractive_PI_DeepOnet import PI_DeepONet_Robust
    from src.data.generators import get_ic_batch_sobolev, get_pde_batch_z_limited
    from src.physics.diffractive import pde_residual_corrected
    from src.utils.solver import CrankNicolsonSolver
    from src.utils.metrics import evaluate_robust_metrics_smart
    print("‚úÖ Imports r√©ussis.")
except ImportError as e:
    print(f"‚ùå ERREUR D'IMPORT : {e}")
    print("V√©rifiez que vous avez bien ex√©cut√© les cellules pr√©c√©dentes pour cr√©er les fichiers !")
    sys.exit(1)

# Helper pour la config
class Config:
    def __init__(self, dictionary):
        for k, v in dictionary.items():
            if isinstance(v, dict):
                setattr(self, k, Config(v))
            else:
                setattr(self, k, v)
    
    # Permet d'acc√©der comme un dictionnaire aussi (pour compatibilit√©)
    def __getitem__(self, item):
        return getattr(self, item)

def main():
    print("\nüß™ D√âMARRAGE DU DRY RUN (TEST LOCAL)...")
    
    # A. CHARGEMENT CONFIG
    config_path = os.path.join(root_dir, "configs", "diffractive.yaml")
    if not os.path.exists(config_path):
        print(f"‚ùå Config introuvable : {config_path}")
        return

    with open(config_path, 'r') as f:
        cfg_dict = yaml.safe_load(f)
    cfg = Config(cfg_dict)
    print("‚úÖ Configuration charg√©e.")

    # B. GPU CHECK
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"üñ•Ô∏è  Device utilis√© : {device}")

    # C. MOD√àLE
    try:
        model = PI_DeepONet_Robust(cfg).to(device)
        print("‚úÖ Mod√®le instanci√© avec succ√®s.")
        # Petit test forward rapide avec des donn√©es bidon
        dummy_branch = torch.randn(2, 3).to(device) # A, w0, f
        dummy_coords = torch.randn(2, 2).to(device) # r, z
        _ = model(dummy_branch, dummy_coords)
        print("   -> Forward pass OK.")
    except Exception as e:
        print(f"‚ùå Erreur Mod√®le : {e}")
        return

    # D. DATA GENERATION
    try:
        # On teste avec un tout petit batch (batch_size=4)
        print("üè≠ Test G√©n√©ration Donn√©es...")
        br, co, tr, ti, dtr, dti = get_ic_batch_sobolev(4, cfg, device)
        br_pde, co_pde = get_pde_batch_z_limited(4, cfg, device, z_limit=10.0)
        print("‚úÖ Batches g√©n√©r√©s.")
    except Exception as e:
        print(f"‚ùå Erreur Data : {e}")
        return

    # E. PHYSIQUE (PDE RESIDUAL)
    try:
        print("üßÆ Test Calcul Physique (Autograd)...")
        r_re, r_im = pde_residual_corrected(model, br_pde, co_pde, cfg)
        loss = torch.mean(r_re**2) + torch.mean(r_im**2)
        loss.backward() # V√©rifie que le graphe de calcul n'est pas bris√©
        print(f"‚úÖ Physique OK (Loss dummy: {loss.item():.2e})")
    except Exception as e:
        print(f"‚ùå Erreur Physique : {e}")
        return

    # F. SOLVEUR CRANK-NICOLSON
    try:
        print("üìê Test Solveur Crank-Nicolson...")
        solver = CrankNicolsonSolver(cfg, nr=50, nz=10) # Tr√®s basse r√©solution
        _, _, _ = solver.solve(A_val=1.0, w0_val=1.0, f_val=1000.0)
        print("‚úÖ Solveur OK.")
    except Exception as e:
        print(f"‚ùå Erreur Solveur : {e}")
        return

    print("\nüéâ SUCC√àS TOTAL ! LE CODE EST ROBUSTE ET PR√äT POUR LE HPC.")
    print("üëâ Vous pouvez lancer : python scripts/train.py")

if __name__ == "__main__":
    main()

Writing scripts/test_local.py


In [2]:
import sys
# Affiche quel python tourne vraiment pour √™tre s√ªr
print(f"üêç Python actuel : {sys.executable}")

# Force l'installation DANS ce noyau pr√©cis
%pip install torch torchvision torchaudio

üêç Python actuel : C:\Users\emmag\anaconda3\python.exe
Collecting torch
  Downloading torch-2.10.0-cp313-cp313-win_amd64.whl.metadata (31 kB)
Collecting torchvision
  Downloading torchvision-0.25.0-cp313-cp313-win_amd64.whl.metadata (5.4 kB)
Collecting torchaudio
  Downloading torchaudio-2.10.0-cp313-cp313-win_amd64.whl.metadata (6.9 kB)
Downloading torch-2.10.0-cp313-cp313-win_amd64.whl (113.8 MB)
   ---------------------------------------- 0.0/113.8 MB ? eta -:--:--
   - -------------------------------------- 3.7/113.8 MB 24.8 MB/s eta 0:00:05
   --- ------------------------------------ 9.2/113.8 MB 26.1 MB/s eta 0:00:05
   ----- ---------------------------------- 16.0/113.8 MB 28.6 MB/s eta 0:00:04
   -------- ------------------------------- 22.8/113.8 MB 29.5 MB/s eta 0:00:04
   ---------- ----------------------------- 29.4/113.8 MB 30.2 MB/s eta 0:00:03
   ------------ --------------------------- 35.7/113.8 MB 30.2 MB/s eta 0:00:03
   --------------- ------------------------ 43.

In [4]:
%%writefile scripts/test_local.py
import os
import sys

# --- FIX CRITIQUE WINDOWS/INTEL ---
# DOIT √äTRE PLAC√â AVANT L'IMPORT DE TORCH
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

import torch
import yaml
import time
from types import SimpleNamespace

# 1. AJOUT DU PATH RACINE
current_dir = os.path.dirname(os.path.abspath(__file__))
root_dir = os.path.dirname(current_dir)
sys.path.append(root_dir)

print(f"üìÇ Racine du projet d√©tect√©e : {root_dir}")

# 2. IMPORTS DU PROJET
print("üîÑ Test des imports...")
try:
    from src.models.Diffractive_PI_DeepOnet import PI_DeepONet_Robust
    from src.data.generators import get_ic_batch_sobolev, get_pde_batch_z_limited
    from src.physics.diffractive import pde_residual_corrected
    from src.utils.solver import CrankNicolsonSolver
    from src.utils.metrics import evaluate_robust_metrics_smart
    print("‚úÖ Imports r√©ussis.")
except ImportError as e:
    print(f"‚ùå ERREUR D'IMPORT : {e}")
    sys.exit(1)

# Helper pour la config
class Config:
    def __init__(self, dictionary):
        for k, v in dictionary.items():
            if isinstance(v, dict):
                setattr(self, k, Config(v))
            else:
                setattr(self, k, v)
    def __getitem__(self, item):
        return getattr(self, item)

def main():
    print("\nüß™ D√âMARRAGE DU DRY RUN (TEST LOCAL)...")
    
    # A. CHARGEMENT CONFIG
    config_path = os.path.join(root_dir, "configs", "diffractive.yaml")
    if not os.path.exists(config_path):
        print(f"‚ùå Config introuvable : {config_path}")
        return

    with open(config_path, 'r') as f:
        cfg_dict = yaml.safe_load(f)
    cfg = Config(cfg_dict)
    print("‚úÖ Configuration charg√©e.")

    # B. GPU CHECK
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"üñ•Ô∏è  Device utilis√© : {device}")

    # C. MOD√àLE
    try:
        model = PI_DeepONet_Robust(cfg).to(device)
        print("‚úÖ Mod√®le instanci√© avec succ√®s.")
        # Petit test forward
        dummy_branch = torch.randn(2, 3).to(device)
        dummy_coords = torch.randn(2, 2).to(device)
        _ = model(dummy_branch, dummy_coords)
        print("   -> Forward pass OK.")
    except Exception as e:
        print(f"‚ùå Erreur Mod√®le : {e}")
        return

    # D. DATA GENERATION
    try:
        print("üè≠ Test G√©n√©ration Donn√©es...")
        br, co, tr, ti, dtr, dti = get_ic_batch_sobolev(4, cfg, device)
        br_pde, co_pde = get_pde_batch_z_limited(4, cfg, device, z_limit=10.0)
        print("‚úÖ Batches g√©n√©r√©s.")
    except Exception as e:
        print(f"‚ùå Erreur Data : {e}")
        return

    # E. PHYSIQUE
    try:
        print("üßÆ Test Calcul Physique...")
        r_re, r_im = pde_residual_corrected(model, br_pde, co_pde, cfg)
        loss = torch.mean(r_re**2) + torch.mean(r_im**2)
        loss.backward()
        print(f"‚úÖ Physique OK (Loss dummy: {loss.item():.2e})")
    except Exception as e:
        print(f"‚ùå Erreur Physique : {e}")
        return

    # F. SOLVEUR
    try:
        print("üìê Test Solveur Crank-Nicolson...")
        solver = CrankNicolsonSolver(cfg, nr=50, nz=10)
        _, _, _ = solver.solve(A_val=1.0, w0_val=1.0, f_val=1000.0)
        print("‚úÖ Solveur OK.")
    except Exception as e:
        print(f"‚ùå Erreur Solveur : {e}")
        return

    print("\nüéâ SUCC√àS TOTAL ! LE CODE EST PR√äT.")

if __name__ == "__main__":
    main()

Overwriting scripts/test_local.py


In [5]:
!python scripts/test_local.py


Traceback (most recent call last):
  File [35m"C:\Users\emmag\Th√®se\Projet_DeepOnet_NLSE\scripts\test_local.py"[0m, line [35m18[0m, in [35m<module>[0m
    [31mprint[0m[1;31m(f"\U0001f4c2 Racine du projet d√©tect√©e : {root_dir}")
    [0m[31m~~~~~[0m[1;31m^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^[0m
  File [35m"C:\Users\emmag\anaconda3\Lib\encodings\cp1252.py"[0m, line [35m19[0m, in [35mencode[0m
    return [31mcodecs.charmap_encode[0m[1;31m(input,self.errors,encoding_table)[0m[0]
           [31m~~~~~~~~~~~~~~~~~~~~~[0m[1;31m^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^[0m
[1;35mUnicodeEncodeError[0m: [35m'charmap' codec can't encode character '\U0001f4c2' in position 0: character maps to <undefined>[0m


In [6]:
%%writefile scripts/test_local.py
import os
import sys

# --- FIX CRITIQUE WINDOWS/INTEL ---
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

import torch
import yaml
import time
from types import SimpleNamespace

# 1. AJOUT DU PATH RACINE
current_dir = os.path.dirname(os.path.abspath(__file__))
root_dir = os.path.dirname(current_dir)
sys.path.append(root_dir)

print(f"[INFO] Racine du projet detectee : {root_dir}")

# 2. IMPORTS DU PROJET
print("[INFO] Test des imports...")
try:
    from src.models.Diffractive_PI_DeepOnet import PI_DeepONet_Robust
    from src.data.generators import get_ic_batch_sobolev, get_pde_batch_z_limited
    from src.physics.diffractive import pde_residual_corrected
    from src.utils.solver import CrankNicolsonSolver
    from src.utils.metrics import evaluate_robust_metrics_smart
    print("[OK] Imports reussis.")
except ImportError as e:
    print(f"[ERREUR] Import : {e}")
    sys.exit(1)

# Helper pour la config
class Config:
    def __init__(self, dictionary):
        for k, v in dictionary.items():
            if isinstance(v, dict):
                setattr(self, k, Config(v))
            else:
                setattr(self, k, v)
    def __getitem__(self, item):
        return getattr(self, item)

def main():
    print("\n[START] DEMARRAGE DU DRY RUN (TEST LOCAL)...")
    
    # A. CHARGEMENT CONFIG
    config_path = os.path.join(root_dir, "configs", "diffractive.yaml")
    if not os.path.exists(config_path):
        print(f"[ERREUR] Config introuvable : {config_path}")
        return

    with open(config_path, 'r') as f:
        cfg_dict = yaml.safe_load(f)
    cfg = Config(cfg_dict)
    print("[OK] Configuration chargee.")

    # B. GPU CHECK
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"[INFO] Device utilise : {device}")

    # C. MOD√àLE
    try:
        model = PI_DeepONet_Robust(cfg).to(device)
        print("[OK] Modele instancie avec succes.")
        # Petit test forward
        dummy_branch = torch.randn(2, 3).to(device)
        dummy_coords = torch.randn(2, 2).to(device)
        _ = model(dummy_branch, dummy_coords)
        print("   -> Forward pass OK.")
    except Exception as e:
        print(f"[ERREUR] Modele : {e}")
        return

    # D. DATA GENERATION
    try:
        print("[INFO] Test Generation Donnees...")
        br, co, tr, ti, dtr, dti = get_ic_batch_sobolev(4, cfg, device)
        br_pde, co_pde = get_pde_batch_z_limited(4, cfg, device, z_limit=10.0)
        print("[OK] Batches generes.")
    except Exception as e:
        print(f"[ERREUR] Data : {e}")
        return

    # E. PHYSIQUE
    try:
        print("[INFO] Test Calcul Physique...")
        r_re, r_im = pde_residual_corrected(model, br_pde, co_pde, cfg)
        loss = torch.mean(r_re**2) + torch.mean(r_im**2)
        loss.backward()
        print(f"[OK] Physique OK (Loss dummy: {loss.item():.2e})")
    except Exception as e:
        print(f"[ERREUR] Physique : {e}")
        return

    # F. SOLVEUR
    try:
        print("[INFO] Test Solveur Crank-Nicolson...")
        solver = CrankNicolsonSolver(cfg, nr=50, nz=10)
        _, _, _ = solver.solve(A_val=1.0, w0_val=1.0, f_val=1000.0)
        print("[OK] Solveur OK.")
    except Exception as e:
        print(f"[ERREUR] Solveur : {e}")
        return

    print("\n[SUCCES] TOUT EST OK ! LE CODE EST PRET.")

if __name__ == "__main__":
    main()

Overwriting scripts/test_local.py


In [8]:
%%writefile src/utils/metrics.py
import torch
import numpy as np

def evaluate_robust_metrics_smart(model, cfg, n_samples=500, z_eval=0.0, chunk_size=50):
    model.eval()
    device = next(model.parameters()).device
    
    total_l2 = 0.0
    total_peak_err = 0.0
    total_samples = 0
    n_chunks = int(np.ceil(n_samples / chunk_size))
    
    with torch.no_grad():
        for i in range(n_chunks):
            current_n = min(chunk_size, n_samples - i*chunk_size)
            if current_n == 0: break
            
            # Acc√®s via dictionnaire (adaptation pour le YAML)
            b = cfg.physics['bounds']
            
            A = torch.rand(current_n, 1, device=device) * (b['A'][1] - b['A'][0]) + b['A'][0]
            w0 = 10 ** (torch.rand(current_n, 1, device=device) * np.log10(b['w0'][1]/b['w0'][0]) + np.log10(b['w0'][0]))
            f = torch.rand(current_n, 1, device=device) * (b['f'][1] - b['f'][0]) + b['f'][0]
            
            # --- VRAIE PHYSIQUE ---
            z_R = (cfg.physics['k'] * w0**2) / 2.0
            factor_geom = (1.0 - z_eval / f)
            factor_diff = (z_eval / z_R)
            w_z = w0 * torch.sqrt( factor_geom**2 + factor_diff**2 )
            
            # Scan Spatial
            n_pts = 100
            r_scan = torch.linspace(0, 1, n_pts, device=device).view(1, -1)
            r_phys = r_scan * 3.0 * w_z 
            
            r_flat = r_phys.view(-1, 1)
            z_flat = torch.ones_like(r_flat) * z_eval
            coords = torch.cat([r_flat, z_flat], dim=1)
            branch = torch.cat([A, w0, f], dim=1).repeat_interleave(n_pts, dim=0)
            
            p_re, p_im = model(branch, coords)
            p_mod = torch.sqrt(p_re**2 + p_im**2).view(current_n, n_pts)
            
            w0_ext = branch[:, 1:2].view(current_n, n_pts)
            wz_ext = w_z.repeat_interleave(n_pts, dim=0).view(current_n, n_pts)
            A_ext = branch[:, 0:1].view(current_n, n_pts)
            
            amp_axis = torch.sqrt(A_ext) * (w0_ext / wz_ext)
            arg = -(r_phys**2)/(wz_ext**2)
            t_mod = amp_axis * torch.exp(arg)
            
            # Peak Error
            p_max, _ = torch.max(p_mod, dim=1)
            t_max, _ = torch.max(t_mod, dim=1)
            batch_peak_err = torch.sum(torch.abs(p_max - t_max) / (t_max + 1e-9))
            
            # L2 Relative
            num = torch.sum((p_mod - t_mod)**2, dim=1)
            den = torch.sum(t_mod**2, dim=1)
            batch_l2 = torch.sum(torch.sqrt(num / (den + 1e-9)))
            
            total_l2 += batch_l2.item()
            total_peak_err += batch_peak_err.item()
            total_samples += current_n
            
    avg_l2 = total_l2 / total_samples
    avg_peak = total_peak_err / total_samples
    
    return avg_l2, avg_peak

Writing src/utils/metrics.py


In [9]:
!python scripts/test_local.py

[INFO] Racine du projet detectee : C:\Users\emmag\Th√®se\Projet_DeepOnet_NLSE
[INFO] Test des imports...
[OK] Imports reussis.

[START] DEMARRAGE DU DRY RUN (TEST LOCAL)...
[OK] Configuration chargee.
[INFO] Device utilise : cpu
[OK] Modele instancie avec succes.
   -> Forward pass OK.
[INFO] Test Generation Donnees...
[OK] Batches generes.
[INFO] Test Calcul Physique...
[OK] Physique OK (Loss dummy: 8.62e+01)
[INFO] Test Solveur Crank-Nicolson...
[OK] Solveur OK.

[SUCCES] TOUT EST OK ! LE CODE EST PRET.


In [10]:
%%writefile .gitignore
# Python
__pycache__/
*.py[cod]
*$py.class

# Jupyter
.ipynb_checkpoints/

# Environnements virtuels
venv/
env/
.env

# Fichiers Syst√®mes
.DS_Store
Thumbs.db

# Dossiers de sortie (On ne veut pas versionner les logs ou les checkpoints lourds)
outputs/logs/
outputs/checkpoints/
outputs/plots/
# On garde le dossier outputs mais pas son contenu, sauf un fichier vide pour git
outputs/*
!outputs/.gitkeep

# IDE
.vscode/
.idea/

# Fichiers zip de transfert
*.zip

Writing .gitignore
