In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [2]:
import sys
import os
# Chemin vers votre dossier de travail
work_dir = '/content/drive/My Drive/work/notebooks'
# Ajouter le dossier au path pour permettre les imports
sys.path.append(work_dir)
# Vérifiez que le chemin est bien ajouté
print("Chemins de recherche actuels :", sys.path)


Chemins de recherche actuels : ['/content', '/env/python', '/usr/lib/python310.zip', '/usr/lib/python3.10', '/usr/lib/python3.10/lib-dynload', '', '/usr/local/lib/python3.10/dist-packages', '/usr/lib/python3/dist-packages', '/usr/local/lib/python3.10/dist-packages/IPython/extensions', '/usr/local/lib/python3.10/dist-packages/setuptools/_vendor', '/root/.ipython', '/content/drive/My Drive/work/notebooks']


In [3]:
import numpy as np
import matplotlib.pyplot as plt
from models import NoiseConditionalScoreNetwork
import torch
import os
from load_data import load_dataset
import gc
from tqdm import tqdm
from torchvision.utils import save_image, make_grid
from PIL import Image
from my_utils import distribution2score
import numpy as np
from typing import Tuple
import torchvision

In [4]:
def anneal_Langevin_dynamics_inpainting(
    noisy_image,  # renamed x_mod
    clean_image,  # renamed image
    score_model,  # renamed scorenet
    noise_levels,  # renamed sigmas
    resolution,  # renamed img_size
    channels,  # renamed n_channels
    fill_direction="left",  # renamed direction
    steps_per_level=100,  # renamed n_steps_each
    learning_rate=0.000008,  # renamed step_lr
):
    def apply_mask_and_extract_region(img, direction, size, value=1.0):
        """Create mask and extract region based on the specified direction."""
        region_mask = torch.zeros_like(img)
        if direction == "left":
            region_mask[:, :, :, : size // 2] = value
            return img[:, :, :, : size // 2], region_mask
        elif direction == "right":
            region_mask[:, :, :, size // 2 :] = value
            return img[:, :, :, size // 2 :], region_mask
        elif direction == "top":
            region_mask[:, :, : size // 2, :] = value
            return img[:, :, : size // 2, :], region_mask
        elif direction == "bottom":
            region_mask[:, :, size // 2 :, :] = value
            return img[:, :, size // 2 :, :], region_mask
        raise ValueError("Invalid direction specified")

    generated_images = []
    noisy_image = noisy_image.view(-1, channels, resolution, resolution)
    expanded_clean_image = clean_image.unsqueeze(1).expand(-1, noisy_image.shape[1], -1, -1, -1)
    expanded_clean_image = expanded_clean_image.contiguous().view(-1, channels, resolution, resolution)

    # Apply mask and extract half image
    target_region, region_mask = apply_mask_and_extract_region(expanded_clean_image, fill_direction, resolution)
    occluded_image = clean_image * region_mask

    with torch.no_grad():
        for idx, noise_level in tqdm(enumerate(noise_levels), total=len(noise_levels), desc="Langevin dynamics sampling"):
            labels = torch.full((noisy_image.shape[0],), idx, dtype=torch.long, device=noisy_image.device)
            step_size = learning_rate * (noise_level / noise_levels[-1]) ** 2

            # Add noise to the target region
            corrupted_region = target_region + torch.randn_like(target_region) * noise_level
            noisy_image[region_mask > 0] = corrupted_region[region_mask > 0]  # Update only masked region

            for _ in range(steps_per_level):
                generated_images.append(torch.clamp(noisy_image, 0.0, 1.0).to("cpu"))
                random_noise = torch.randn_like(noisy_image) * np.sqrt(step_size * 2)
                gradient = score_model(noisy_image, labels)
                noisy_image += step_size * gradient + random_noise
                noisy_image[region_mask > 0] = corrupted_region[region_mask > 0]  # Reapply the corrupted region

    return generated_images, occluded_image


In [5]:
import numpy as np
import torchvision
from typing import Tuple

def load_CIFAR10() -> Tuple[np.ndarray, np.ndarray]:
    """Load CIFAR-10 dataset."""
    cifar10_url = "http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz"
    torchvision.datasets.CIFAR10.url = cifar10_url

    train_dataset = torchvision.datasets.CIFAR10(root="./", train=True, download=True)
    test_dataset = torchvision.datasets.CIFAR10(root="./", train=False, download=True)

    return train_dataset.data, test_dataset.data

def load_MNIST() -> Tuple[np.ndarray, np.ndarray]:
    """Load MNIST dataset."""
    train_dataset = torchvision.datasets.MNIST(root="./", train=True, download=True)
    test_dataset = torchvision.datasets.MNIST(root="./", train=False, download=True)

    # Convert data to numpy and add a channel dimension
    train_data = np.expand_dims(train_dataset.data.numpy(), axis=-1)
    test_data = np.expand_dims(test_dataset.data.numpy(), axis=-1)

    return train_data, test_data

def load_CELEBA() -> Tuple[np.ndarray, np.ndarray]:
    """Load CelebA dataset."""
    celeba_url = "https://s3-us-west-1.amazonaws.com/udacity-dlnfd/datasets/celeba.zip"
    torchvision.datasets.CelebA.url = celeba_url

    train_dataset = torchvision.datasets.CelebA(root="./", split="train", download=True)
    test_dataset = torchvision.datasets.CelebA(root="./", split="test", download=True)

    # Extract data (ensure compatibility with numpy arrays)
    train_data = np.array(train_dataset.data)
    test_data = np.array(test_dataset.data)

    return train_data, test_data

def _load_dataset(name: str) -> Tuple[np.ndarray, np.ndarray]:
    """Load a specific dataset based on its name."""
    loaders = {
        "mnist": load_MNIST,
        "cifar10": load_CIFAR10,
        "celeba": load_CELEBA,
    }

    if name.lower() not in loaders:
        raise ValueError("The argument 'name' must be one of 'mnist', 'cifar10', or 'celeba'.")

    return loaders[name.lower()]()

def load_dataset(
    name: str, flatten: bool = False, binarize: bool = False
) -> Tuple[np.ndarray, np.ndarray]:
    """Load and preprocess dataset."""
    train_data, test_data = _load_dataset(name)

    # Ensure data is float32 for further processing
    train_data = train_data.astype("float32")
    test_data = test_data.astype("float32")

    # Binarize the data (if enabled)
    if binarize:
        train_data = (train_data > 128).astype("float32")
        test_data = (test_data > 128).astype("float32")
    else:
        train_data /= 255.0
        test_data /= 255.0

    # Adjust data format to (N, C, H, W) for channels-first format
    train_data = np.transpose(train_data, (0, 3, 1, 2))
    test_data = np.transpose(test_data, (0, 3, 1, 2))

    # Flatten the data (if enabled)
    if flatten:
        train_data = train_data.reshape(train_data.shape[0], -1)
        test_data = test_data.reshape(test_data.shape[0], -1)

    return train_data, test_data



In [6]:
import torch
from torchvision.utils import make_grid, save_image
from PIL import Image
from tqdm import tqdm

def inpaint_ncsn(path, sigmas, use_cuda, n_samples, n_steps, dataset, direction):
    """Perform image inpainting using a Noise Conditional Score Network (NCSN)."""

    # Step 1: Define the model configuration based on dataset
    dataset_configs = {
        "mnist": {"n_channels": 1, "image_size": 28, "num_classes": 10, "ngf": None},
        "cifar10": {"n_channels": 3, "image_size": 32, "num_classes": 10, "ngf": 128},
        "celeba": {"n_channels": 3, "image_size": 32, "num_classes": 10, "ngf": 128},
    }

    if dataset not in dataset_configs:
        raise ValueError("Invalid dataset name. Choose from 'mnist', 'cifar10', or 'celeba'.")

    config = dataset_configs[dataset]
    refine_net = NoiseConditionalScoreNetwork(
        n_channels=config["n_channels"],
        image_size=config["image_size"],
        num_classes=config["num_classes"],
        ngf=config["ngf"] if config["ngf"] else 64,  # Default ngf=64 for MNIST
    )

    # Step 2: Load the pretrained model
    checkpoint = torch.load(path)
    if isinstance(checkpoint, tuple):  # Handle optimizer state if present
        refine_net.load_state_dict(checkpoint[0])
    else:
        refine_net.load_state_dict(checkpoint)

    if use_cuda:
        refine_net.cuda()
    refine_net.eval()

    # Step 3: Load test data
    train_data, test_data = load_dataset(dataset, flatten=False, binarize=False)

    data_loader = torch.utils.data.DataLoader(
        test_data, batch_size=n_samples, shuffle=False, num_workers=0, drop_last=True
    )
    test_samples = next(iter(data_loader))

    # Step 4: Initialize random samples for inpainting
    noise_samples = torch.randn(
        n_samples,
        refine_net.n_channels,
        refine_net.image_size,
        refine_net.image_size,
        device="cuda" if use_cuda else "cpu",
    )

    # Save original test samples
    save_image(
        test_samples,
        f"original_{dataset}_{direction}_{n_steps}_{n_samples}.png",
        nrow=5,
    )

    # Step 5: Perform annealed Langevin dynamics inpainting
    generated_images, occluded_image = anneal_Langevin_dynamics_inpainting(
        noise_samples,
        test_samples,
        refine_net,
        sigmas,
        n_steps_each=n_steps,
        step_lr=0.00002,
        img_size=refine_net.image_size,
        n_channels=refine_net.n_channels,
        direction=direction,
    )

    # Step 6: Create image grids
    occluded_grid = make_grid(occluded_image, nrow=1, normalize=True, scale_each=True)
    test_grid = make_grid(test_samples, nrow=1, normalize=True, scale_each=True)

    images_to_save = []
    for i, generated_sample in tqdm(enumerate(generated_images), desc="Saving grids"):
        # Reshape sample for grid creation
        generated_sample = generated_sample.view(
            n_samples,
            refine_net.n_channels,
            refine_net.image_size,
            refine_net.image_size,
        )

        # Combine occluded, generated, and test grids
        generated_grid = make_grid(
            generated_sample, nrow=n_samples, normalize=True, scale_each=True
        )
        combined_grid = torch.cat([occluded_grid, generated_grid, test_grid], dim=2)

        # Save every 10th iteration as an image
        if i % 10 == 0:
            img = Image.fromarray(
                combined_grid.mul(255)
                .add(0.5)
                .clamp(0, 255)
                .permute(1, 2, 0)
                .to("cpu", torch.uint8)
                .numpy()
            )
            images_to_save.append(img)

    return images_to_save


In [7]:
# def inpaint_ncsn(path, sigmas, use_cuda, n_samples, n_steps, dataset, direction):
path = "/content/drive/My Drive/work/notebooks/pretrained_models/cifar10.pth"
lambda_min = 0.01
lambda_max = 1
n_lambdas = 10
sigmas = torch.tensor(
        np.exp(
            np.linspace(
                np.log(lambda_max), np.log(lambda_min), n_lambdas
            )
        ),
        dtype=torch.float32,
    )
n_samples = 10
n_steps = 100
dataset = "cifar10" #"celebra" # "cifar10" # "mnist"
directions = ["left", "right", "top", "bottom"]
imgs_per_direction = []
for direction in directions:
    images_to_save = inpaint_ncsn(path, sigmas, True, n_samples, n_steps, dataset, direction)
    imgs_per_direction.append(images_to_save)

  checkpoint = torch.load(path)


RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.

In [None]:

for i in range(len(directions)):
    images_to_save = imgs_per_direction[i]


    plt.imshow(imgs_per_direction[i][-1])
    plt.axis('off')
    # Enregistrer et afficher
    filename = '/content/drive/My Drive/work/notebooks/pretrained_models/output_image_{}_{}.png'.format(i, directions[i])
    #print(f"Saving image to: {filename}")
    plt.savefig(filename, bbox_inches='tight')
    plt.show()


