In [7]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from torchvision.transforms import functional as F
from PIL import Image
import matplotlib.pyplot as plt
from tqdm import tqdm
from skimage.metrics import peak_signal_noise_ratio as psnr
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from tqdm import tqdm
from PIL import Image
from skimage.util import random_noise

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [8]:
class VideoDataset(Dataset):
    def __init__(self, video_dir, sequence_length=5, transform=None):
        self.video_dir = video_dir
        self.sequence_length = sequence_length
        self.transform = transform
        self.frames = self._load_frames_from_directory(video_dir)

    def _load_frames_from_directory(self, video_dir):
        # Charger les frames depuis un répertoire
        files = sorted([f for f in os.listdir(video_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))])
        frames = []
        for file in files:
            frame_path = os.path.join(video_dir, file)
            frame = cv2.imread(frame_path)
            if frame is not None:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frames.append(frame)
        return frames

    def add_gaussian_noise(self, frame, mean=0, std=0.1):
        noise = np.random.normal(mean, std, frame.shape)
        noisy_frame = frame + noise
        return np.clip(noisy_frame, 0, 255).astype(np.uint8)

    def add_salt_pepper_noise(self, frame, amount=0.02):
        noisy_frame = random_noise(frame / 255.0, mode='s&p', amount=amount)
        return (noisy_frame * 255).astype(np.uint8)

    def __len__(self):
        return max(0, len(self.frames) - self.sequence_length + 1)

    def __getitem__(self, idx):
        frames = self.frames[idx:idx + self.sequence_length]
        noisy_frames = [self.add_gaussian_noise(f) for f in frames]
        noisy_frames = [self.add_salt_pepper_noise(f) for f in noisy_frames]
        if self.transform:
            frames = torch.stack([self.transform(Image.fromarray(f)) for f in frames])
            noisy_frames = torch.stack([self.transform(Image.fromarray(f)) for f in noisy_frames])
        return noisy_frames, frames


In [9]:
class ContextGuidedBlock(nn.Module):
    def __init__(self, nIn, nOut, dilation_rate=2):
        super(ContextGuidedBlock, self).__init__()
        self.conv1x1 = nn.Conv2d(nIn, nOut // 2, kernel_size=1)
        self.conv3x3 = nn.Conv2d(nOut // 2, nOut, kernel_size=3, padding=dilation_rate, dilation=dilation_rate)
        self.bn = nn.BatchNorm2d(nOut)
        self.relu = nn.ReLU()
    
    def forward(self, x):
        x = self.conv1x1(x)
        x = self.conv3x3(x)
        x = self.bn(x)
        return self.relu(x)

class ContextModule(nn.Module):
    def __init__(self, nIn, nOut):
        super(ContextModule, self).__init__()
        self.conv_dil1 = nn.Conv2d(nIn, nOut, kernel_size=3, padding=1, dilation=1)
        self.conv_dil2 = nn.Conv2d(nIn, nOut, kernel_size=3, padding=2, dilation=2)
        self.conv_dil3 = nn.Conv2d(nIn, nOut, kernel_size=3, padding=3, dilation=3)
        self.bn = nn.BatchNorm2d(nOut)
        self.relu = nn.ReLU()
    
    def forward(self, x):
        x1 = self.conv_dil1(x)
        x2 = self.conv_dil2(x)
        x3 = self.conv_dil3(x)
        x = x1 + x2 + x3
        x = self.bn(x)
        return self.relu(x)

class CGNet(nn.Module):
    def __init__(self):
        super(CGNet, self).__init__()
        self.block1 = ContextGuidedBlock(3, 32)
        self.block2 = ContextGuidedBlock(32, 64)
        self.res_block1 = self._make_res_block(64, 64)
        self.context_module = ContextModule(64, 64)
        self.res_block2 = self._make_res_block(64, 64)
        self.conv_final = nn.Conv2d(64, 3, kernel_size=1)
    
    def _make_res_block(self, nIn, nOut):
        return nn.Sequential(
            nn.Conv2d(nIn, nOut, kernel_size=3, padding=1),
            nn.BatchNorm2d(nOut),
            nn.ReLU(),
            nn.Conv2d(nOut, nOut, kernel_size=3, padding=1),
            nn.BatchNorm2d(nOut),
            nn.ReLU()
        )

    def forward(self, x):
        x1 = self.block1(x)
        x2 = self.block2(x1)
        x2 = self.res_block1(x2)
        x_context = self.context_module(x2)
        x3 = self.res_block2(x_context)
        output = self.conv_final(x3)
        return output


In [None]:
# Initialisation du modèle CGNet pour la vidéo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CGNet().to(device)

criterion = nn.MSELoss()  # Fonction de perte
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  # Optimiseur

# Charger les données d'entraînement
sequence_length = 5
transform = transforms.Compose([transforms.ToTensor()])
video_dir = r"DAVIS/DAVIS/JPEGImages/Full-Resolution/gold-fish"
dataset = VideoDataset(video_dir, sequence_length, transform)
loader = DataLoader(dataset, batch_size=1, shuffle=True)

print(f"Dataset chargé avec {len(dataset)} séquences vidéo.")

# Entraînement
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for noisy_frames, clean_frames in tqdm(loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        # noisy_frames: [batch_size, sequence_length, channels, height, width]
        # clean_frames: [batch_size, sequence_length, channels, height, width]
        
        noisy_frames = noisy_frames.squeeze(0)  # Supprimer batch_size (1) temporairement
        clean_frames = clean_frames.squeeze(0)
        
        batch_loss = 0.0
        for t in range(sequence_length):
            noisy_frame = noisy_frames[t].unsqueeze(0).to(device)  # Ajouter batch_size (1)
            clean_frame = clean_frames[t].unsqueeze(0).to(device)
            
            optimizer.zero_grad()
            outputs = model(noisy_frame)  # Passer une frame à la fois
            loss = criterion(outputs, clean_frame)
            loss.backward()
            optimizer.step()
            
            batch_loss += loss.item()
        
        running_loss += batch_loss / sequence_length  # Moyenne des pertes pour la séquence
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss / len(loader):.4f}")

# Sauvegarder le modèle entraîné
model_path = "cgnet_video.pth"
torch.save(model.state_dict(), model_path)
print(f"Modèle entraîné et sauvegardé sous : {model_path}.")


Dataset chargé avec 74 séquences vidéo.


Epoch 1/10:   0%|          | 0/74 [00:00<?, ?it/s]

In [49]:
def denoise_video(video_path, output_path, model, device, transform):
    if not os.path.exists(video_path):
        raise FileNotFoundError(f"Le fichier vidéo {video_path} est introuvable.")
    
    cap = cv2.VideoCapture(video_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Configuration pour la vidéo de sortie
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec MP4
    out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

    print("Débruitage de la vidéo...")
    frame_count = 0  # Compteur de frames traitées
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Prétraitement de la frame
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame_pil = Image.fromarray(frame_rgb)
        input_tensor = transform(frame_pil).unsqueeze(0).to(device)
        
        # Débruitage avec le modèle
        with torch.no_grad():
            output_tensor = model(input_tensor).squeeze(0).cpu()
        
        # Post-traitement
        output_frame = (output_tensor.permute(1, 2, 0).numpy() * 255).astype(np.uint8)
        output_frame_bgr = cv2.cvtColor(output_frame, cv2.COLOR_RGB2BGR)
        
        # Écriture de la frame dans la vidéo de sortie
        out.write(output_frame_bgr)
        frame_count += 1
    
    cap.release()
    out.release()

    print(f"Vidéo débruitée sauvegardée dans : {output_path}")
    print(f"Nombre de frames traitées : {frame_count}")


In [None]:
# Débruiter une vidéo
input_video = "video_noised/gold-fish_noised.mp4"
output_video = "video_denoised/gold-fish_denoised.mp4"
denoise_video(input_video, output_video, model, device, transform)


Débruitage de la vidéo...
Vidéo débruitée sauvegardée dans : video_denoised/surf_denoised.mp4
Nombre de frames traitées : 255


In [None]:
class NIMAModel(nn.Module):
    def __init__(self):
        super(NIMAModel, self).__init__()
        self.base_model = models.mobilenet_v2(weights='IMAGENET1K_V2')
        self.base_model.classifier = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(self.base_model.last_channel, 10)
        )
    
    def forward(self, x):
        return self.base_model(x)

nima_model = NIMAModel().to(device)
nima_model.eval()

def calculate_nima_score(image_tensor):
    with torch.no_grad():
        scores = nima_model(image_tensor.unsqueeze(0).to(device))
        scores = torch.softmax(scores, dim=1)
        return torch.sum(scores * torch.arange(1, 11).to(device)).item()


In [None]:
def show_sample_with_metrics(model, dataset, index):
    """
    Afficher les images bruitées, originales, et débruitées avec les scores PSNR et NIMA.
    """
    model.eval()
    noisy_img, clean_img = dataset[index]

    noisy_img = noisy_img.unsqueeze(0).to(device)
    clean_img = clean_img.unsqueeze(0).to(device)
    
    with torch.no_grad():
        output, _ = model(noisy_img)
        output = output.cpu().squeeze()
    
    psnr_noisy = psnr(clean_img.squeeze().permute(1, 2, 0).cpu().numpy(), noisy_img.squeeze().permute(1, 2, 0).cpu().numpy())
    psnr_denoised = psnr(clean_img.squeeze().permute(1, 2, 0).cpu().numpy(), output.permute(1, 2, 0).numpy())
    
    nima_noisy = calculate_nima_score(noisy_img.squeeze())
    nima_clean = calculate_nima_score(clean_img.squeeze())
    nima_denoised = calculate_nima_score(output)
    
    print(f"PSNR Noisy: {psnr_noisy:.2f}, PSNR Denoised: {psnr_denoised:.2f}")
    print(f"NIMA Noisy: {nima_noisy:.2f}, NIMA Clean: {nima_clean:.2f}, NIMA Denoised: {nima_denoised:.2f}")
    
    # Préparer les images pour affichage
    noisy_img = noisy_img.squeeze().permute(1, 2, 0).cpu().numpy()
    clean_img = clean_img.squeeze().permute(1, 2, 0).cpu().numpy()
    output = output.permute(1, 2, 0).cpu().numpy()
    
    # S'assurer que les valeurs des images sont entre 0 et 1
    noisy_img = np.clip(noisy_img, 0, 1)
    clean_img = np.clip(clean_img, 0, 1)
    output = np.clip(output, 0, 1)
    
    # Afficher les images
    plt.figure(figsize=(18, 6))
    
    plt.subplot(1, 3, 1)
    plt.title(f"Noisy Image\nNIMA: {nima_noisy:.2f}")
    plt.imshow(noisy_img)
    plt.axis('off')
    
    plt.subplot(1, 3, 2)
    plt.title(f"Clean Image\nNIMA: {nima_clean:.2f}")
    plt.imshow(clean_img)
    plt.axis('off')
    
    plt.subplot(1, 3, 3)
    plt.title(f"Denoised Image\nPSNR: {psnr_denoised:.2f}, NIMA: {nima_denoised:.2f}")
    plt.imshow(output)
    plt.axis('off')
    
    plt.show()

show_sample_with_metrics(model, train_dataset, 0)
show_sample_with_metrics(model, train_dataset, 0)
show_sample_with_metrics(model, train_dataset, 1)
show_sample_with_metrics(model, train_dataset, 2)
show_sample_with_metrics(model, train_dataset, 3)
show_sample_with_metrics(model, train_dataset, 4)
show_sample_with_metrics(model, train_dataset, 5)
show_sample_with_metrics(model, train_dataset, 6)
show_sample_with_metrics(model, train_dataset, 7)
show_sample_with_metrics(model, train_dataset, 8)
show_sample_with_metrics(model, train_dataset, 9)
show_sample_with_metrics(model, train_dataset, 10)


In [None]:
import sys
import os

# Forcer l'installation de nbformat dans l'environnement actuel du notebook
!{sys.executable} -m pip install --upgrade nbformat

import nbformat
import zipfile

def clear_notebook_output(notebook_path, output_path=None):
    """
    Nettoyer toutes les sorties d'un notebook Jupyter pour le rendre plus léger.
    """
    with open(notebook_path, 'r', encoding='utf-8') as f:
        notebook = nbformat.read(f, as_version=4)


    for cell in notebook.cells:
        if 'outputs' in cell:
            cell['outputs'] = []
        if 'execution_count' in cell:
            cell['execution_count'] = None


    output_path = output_path or notebook_path
    with open(output_path, 'w', encoding='utf-8') as f:
        nbformat.write(notebook, f)
    print(f"Nettoyage terminé ! Notebook allégé sauvegardé sous : {output_path}")

def zip_notebook(notebook_path):
    """
    Compresser le notebook nettoyé pour réduire sa taille.
    """
    zip_path = notebook_path.replace(".ipynb", ".zip")
    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        zipf.write(notebook_path)
    print(f"Notebook compressé et sauvegardé sous : {zip_path}")

def delete_large_files():
    """
    Supprimer tous les fichiers volumineux tels que .pth, .h5 et les checkpoints.
    """
    for root, dirs, files in os.walk("."):
        for file in files:
            if file.endswith(".pth") or file.endswith(".h5"):
                file_path = os.path.join(root, file)
                os.remove(file_path)
                print(f"Fichier supprimé : {file_path}")


    if os.path.exists('.ipynb_checkpoints'):
        os.system("rm -rf .ipynb_checkpoints")
        print("Checkpoints supprimés.")

    print("Nettoyage des fichiers volumineux terminé !")

notebook_name = "CGNet_BSD500.ipynb"


clear_notebook_output(notebook_name, "notebook_allégé.ipynb")
zip_notebook("notebook_allégé.ipynb")
delete_large_files()

print("\n📦 Opérations terminées ! Le notebook a été nettoyé, compressé et les fichiers inutiles supprimés.")


In [None]:
# Exemple d'utilisation
show_sample_with_metrics(model, train_dataset, 0)
show_sample_with_metrics(model, train_dataset, 1)
show_sample_with_metrics(model, train_dataset, 2)
show_sample_with_metrics(model, train_dataset, 3)
show_sample_with_metrics(model, train_dataset, 4)
show_sample_with_metrics(model, train_dataset, 5)
show_sample_with_metrics(model, train_dataset, 6)
show_sample_with_metrics(model, train_dataset, 7)
show_sample_with_metrics(model, train_dataset, 8)
show_sample_with_metrics(model, train_dataset, 9)
show_sample_with_metrics(model, train_dataset, 10)