In [2]:
!pip install -q -U google-genai


In [3]:
import socket
import requests.packages.urllib3.util.connection as urllib3_cn

# Garder une référence à la fonction originale
_original_getaddrinfo = socket.getaddrinfo

def patched_getaddrinfo(*args, **kwargs):
    """Force la résolution d'adresses en IPv4."""
    responses = _original_getaddrinfo(*args, **kwargs)
    # Filtrer pour ne garder que les adresses IPv4 (socket.AF_INET)
    return [res for res in responses if res[0] == socket.AF_INET]

# Appliquer le patch globalement pour la résolution DNS via socket
socket.getaddrinfo = patched_getaddrinfo

# Appliquer aussi le patch pour urllib3 (utilisé par certaines bibliothèques http)
# Cela peut aider si d'autres appels HTTP indirects sont faits, bien que
# le problème principal ici soit gRPC.
urllib3_cn.allowed_gai_family = lambda: socket.AF_INET

print("Patch appliqué pour forcer l'utilisation d'IPv4.")

Patch appliqué pour forcer l'utilisation d'IPv4.


In [4]:
import pathlib
import textwrap

import google.generativeai as genai

from IPython.display import display
from IPython.display import Markdown

def to_markdown(text):
  text = text.replace('•', '  *')
  return Markdown(textwrap.indent(text, '> ', predicate=lambda _: True))

In [5]:
import os
from dotenv import load_dotenv

# Charger les variables depuis le fichier .env dans l'environnement du processus Python actuel
load_dotenv()

# Maintenant, essayez de lire la variable d'environnement
GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY')

if not GOOGLE_API_KEY:
    raise ValueError("La variable d'environnement GOOGLE_API_KEY n'a pas pu être chargée (vérifiez le fichier .env).")
else:
    print("Clé API chargée avec succès via le fichier .env.")
    print(f"Clé API : {GOOGLE_API_KEY[:4]}...")
    # Configurez genai ici, maintenant que la clé est chargée
    try:
        import google.generativeai as genai
        genai.configure(api_key=GOOGLE_API_KEY)
        print("Configuration de GenAI réussie.")
    except ImportError:
        print("Erreur: Le module google.generativeai n'est pas installé ou importé.")
    except Exception as e:
        print(f"Erreur lors de la configuration de genai : {e}")

ValueError: La variable d'environnement GOOGLE_API_KEY n'a pas pu être chargée (vérifiez le fichier .env).

In [None]:
for m in genai.list_models():
  if 'generateContent' in m.supported_generation_methods:
    print(m.name)

models/gemini-1.0-pro-vision-latest
models/gemini-pro-vision
models/gemini-1.5-pro-latest
models/gemini-1.5-pro-001
models/gemini-1.5-pro-002
models/gemini-1.5-pro
models/gemini-1.5-flash-latest
models/gemini-1.5-flash-001
models/gemini-1.5-flash-001-tuning
models/gemini-1.5-flash
models/gemini-1.5-flash-002
models/gemini-1.5-flash-8b
models/gemini-1.5-flash-8b-001
models/gemini-1.5-flash-8b-latest
models/gemini-1.5-flash-8b-exp-0827
models/gemini-1.5-flash-8b-exp-0924
models/gemini-2.5-pro-exp-03-25
models/gemini-2.0-flash-exp
models/gemini-2.0-flash
models/gemini-2.0-flash-001
models/gemini-2.0-flash-exp-image-generation
models/gemini-2.0-flash-lite-001
models/gemini-2.0-flash-lite
models/gemini-2.0-flash-lite-preview-02-05
models/gemini-2.0-flash-lite-preview
models/gemini-2.0-pro-exp
models/gemini-2.0-pro-exp-02-05
models/gemini-exp-1206
models/gemini-2.0-flash-thinking-exp-01-21
models/gemini-2.0-flash-thinking-exp
models/gemini-2.0-flash-thinking-exp-1219
models/learnlm-1.5-pro-e

In [None]:
model = genai.GenerativeModel('models/gemini-2.5-pro-exp-03-25')

In [None]:

response = model.generate_content("What is the meaning of life?")

In [None]:
to_markdown(response.text)

> That's arguably *the* fundamental question humans have pondered across cultures and millennia! There's **no single, universally agreed-upon answer**, and the meaning of life is often considered subjective and deeply personal.
> 
> However, we can explore the different ways people approach this question:
> 
> **1. Philosophical Perspectives:**
> 
> *   **Nihilism:** Argues that life has no inherent, objective meaning, purpose, or intrinsic value.
> *   **Existentialism:** Agrees there's no pre-ordained meaning, but emphasizes that individuals are free and responsible for *creating* their own meaning through their choices and actions. (Think Sartre, Camus).
> *   **Hedonism:** Suggests the meaning of life is to maximize pleasure and minimize pain.
> *   **Stoicism:** Focuses on living virtuously, in accordance with reason and nature, accepting what we cannot control. Meaning comes from inner resilience and ethical action.
> *   **Humanism:** Emphasizes human potential, reason, ethics, and flourishing in this life, often focusing on contributing to the greater good of humanity without recourse to the supernatural.
> 
> **2. Religious & Spiritual Perspectives:**
> 
> *   **Theistic Religions (e.g., Christianity, Islam, Judaism):** Often propose that meaning comes from serving a higher power (God), fulfilling divine commandments, achieving salvation or enlightenment, and living in relationship with the creator and fellow beings.
> *   **Eastern Religions (e.g., Buddhism, Hinduism):** May focus on achieving enlightenment, breaking the cycle of rebirth (samsara), understanding one's true nature (Atman/Buddha-nature), living in harmony, and reducing suffering through compassion and detachment.
> *   **Spirituality (Broader sense):** Finding meaning through connection to something larger than oneself – nature, the universe, a collective consciousness, or an inner sense of purpose.
> 
> **3. Scientific & Biological Perspectives:**
> 
> *   From a purely biological standpoint, the "purpose" of life is survival and reproduction – passing on genetic material.
> *   Some scientists and thinkers find meaning in the universe's capacity to develop complexity and consciousness – seeing humans as a way the universe experiences itself.
> *   However, science primarily describes *how* life works, not necessarily its subjective *meaning* or ultimate *why*.
> 
> **4. Psychological & Personal Perspectives:**
> 
> *   **Finding Purpose:** Many find meaning through having clear goals, pursuing passions, or dedicating themselves to a cause larger than themselves.
> *   **Connection & Relationships:** Love, family, friendship, and community provide profound meaning for many people.
> *   **Contribution & Service:** Making a positive impact on others or the world.
> *   **Growth & Learning:** Continuously developing oneself, gaining knowledge and wisdom.
> *   **Experience & Appreciation:** Finding meaning in the simple act of experiencing life, beauty, joy, and even overcoming challenges.
> *   **Legacy:** Creating something lasting or influencing future generations.
> *   **Logotherapy (Viktor Frankl):** Argues that the primary human drive is not pleasure, but the discovery and pursuit of what we personally find meaningful, which can be found through work, love, and courage in suffering.
> 
> **In Conclusion:**
> 
> Instead of a single answer, the "meaning of life" might be:
> 
> *   **Something you discover:** Through introspection, experience, or faith.
> *   **Something you create:** Through your choices, actions, and commitments.
> *   **A combination of both.**
> *   **Found in the journey itself:** The process of living, learning, connecting, and seeking might be where the meaning lies, rather than a final destination or definition.
> 
> Ultimately, the question "What is the meaning of *my* life?" is one that each individual must grapple with and answer for themselves. What gives *your* life meaning and purpose?

In [6]:
# Cellule 1: Installation
!pip install -q diffusers transformers accelerate ftfy scipy imageio gradio python-dotenv matplotlib

In [8]:
# Cellule 2: Imports
import torch
from diffusers import StableDiffusionPipeline, DDIMScheduler, UNet2DConditionModel
from transformers import CLIPTextModel, CLIPTokenizer
from torch import optim, nn
from tqdm.auto import tqdm
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
from pathlib import Path
import os
from dotenv import load_dotenv
import imageio # Pour les GIFs

# Configuration initiale (GPU, etc.)
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")
import socket
import requests.packages.urllib3.util.connection as urllib3_cn
import functools # Bonne pratique pour le wrapping

print("Tentative d'application du patch IPv4...")

try:
    # Vérifier si le patch est déjà appliqué pour éviter les doubles patchs
    # (On vérifie si la fonction actuelle a le nom qu'on lui a donné)
    if getattr(socket.getaddrinfo, '__qualname__', '') == 'patched_getaddrinfo':
         print("Le patch IPv4 semble déjà appliqué.")
    else:
        # Garder une référence à la fonction originale
        _original_getaddrinfo = socket.getaddrinfo
        print(f"Fonction getaddrinfo originale : {_original_getaddrinfo}")

        @functools.wraps(_original_getaddrinfo) # Préserve les métadonnées de la fonction originale
        def patched_getaddrinfo(*args, **kwargs):
            """Force la résolution d'adresses en IPv4 en filtrant les résultats."""
            # print(f"Appel de patched_getaddrinfo avec args: {args}, kwargs: {kwargs}") # Pour débogage
            try:
                # ----- APPELER L'ORIGINAL SAUVEGARDÉ ICI -----
                responses = _original_getaddrinfo(*args, **kwargs)
                # ---------------------------------------------

                # Filtrer pour ne garder que les adresses IPv4 (socket.AF_INET)
                ipv4_responses = [res for res in responses if res[0] == socket.AF_INET]

                # print(f"Réponses originales: {len(responses)}, Réponses IPv4: {len(ipv4_responses)}") # Pour débogage

                # Que faire si aucune adresse IPv4 n'est trouvée ?
                # Retourner une liste vide pourrait casser certaines choses.
                # Pour l'instant, on retourne seulement les IPv4 si elles existent.
                # if not ipv4_responses:
                #     print("Avertissement: Aucune adresse IPv4 trouvée après filtrage.")
                #     # Option: retourner les réponses originales pour ne pas tout casser ?
                #     # return responses

                return ipv4_responses
            except Exception as e:
                print(f"Erreur à l'intérieur de patched_getaddrinfo : {e}")
                # En cas d'erreur dans le patch, peut-être revenir à l'original ?
                # return _original_getaddrinfo(*args, **kwargs)
                raise # Ou simplement relancer l'erreur

        # Appliquer le patch globalement
        socket.getaddrinfo = patched_getaddrinfo
        print(f"Fonction getaddrinfo patchée installée : {socket.getaddrinfo}")

        # Appliquer aussi le patch pour urllib3 (utilisé par certaines bibliothèques http)
        # Vérifier si déjà patché pour éviter les erreurs
        if not hasattr(urllib3_cn, '_original_allowed_gai_family'):
             urllib3_cn._original_allowed_gai_family = getattr(urllib3_cn, 'allowed_gai_family', None) # Sauvegarde si existe
             urllib3_cn.allowed_gai_family = lambda: socket.AF_INET
             print("Patch urllib3 allowed_gai_family appliqué.")
        else:
             print("urllib3 allowed_gai_family semble déjà patché.")

        print("Patch IPv4 appliqué avec succès.")

except Exception as e:
    print(f"Échec de l'application du patch IPv4 : {e}")
    # Optionnel : essayer de restaurer l'original si le patch a échoué à mi-chemin
    if '_original_getaddrinfo' in locals() and hasattr(socket, 'getaddrinfo') and getattr(socket.getaddrinfo, '__qualname__', '') == 'patched_getaddrinfo':
         socket.getaddrinfo = _original_getaddrinfo
         print("Tentative de restauration de getaddrinfo original.")
    if '_original_allowed_gai_family' in getattr(urllib3_cn, '__dict__', {}):
         urllib3_cn.allowed_gai_family = urllib3_cn._original_allowed_gai_family
         print("Tentative de restauration de allowed_gai_family original.")

Using device: cuda
Tentative d'application du patch IPv4...
Le patch IPv4 semble déjà appliqué.


In [12]:
# AU LIEU DE: model_id = "runwayml/stable-diffusion-v1-5"

# ESSAYEZ CECI:
model_id = "stabilityai/stable-diffusion-3.5-medium" # Maintenu par Stability AI (plus fiable)
# OU potentiellement juste:
# model_id = "stable-diffusion-v1-5" # Parfois Hugging Face a des alias directs

# --- Le reste de votre code de chargement ---
try:
    print(f"Tentative de chargement avec ID: {model_id}")
    tokenizer = CLIPTokenizer.from_pretrained(model_id, subfolder="tokenizer")
    text_encoder = CLIPTextModel.from_pretrained(model_id, subfolder="text_encoder").to(device)
    # Assurez-vous d'avoir importé AutoencoderKL
    from diffusers import AutoencoderKL
    vae = AutoencoderKL.from_pretrained(model_id, subfolder="vae").to(device)
    unet = UNet2DConditionModel.from_pretrained(model_id, subfolder="unet").to(device)
    scheduler = DDIMScheduler.from_pretrained(model_id, subfolder="scheduler")
    print(f"Modèle {model_id} chargé avec succès.")

except OSError as e:
    print(f"ERREUR lors du chargement de {model_id}: {e}")
    print("Vérifiez la connectivité réseau, le nom du modèle et les problèmes de cache.")
    # Ici, vous pourriez essayer un autre ID ou passer au chargement local si nécessaire.
except Exception as e:
     print(f"Une erreur inattendue est survenue : {e}")

Tentative de chargement avec ID: stabilityai/stable-diffusion-3.5-medium
ERREUR lors du chargement de stabilityai/stable-diffusion-3.5-medium: Can't load tokenizer for 'stabilityai/stable-diffusion-3.5-medium'. If you were trying to load it from 'https://huggingface.co/models', make sure you don't have a local directory with the same name. Otherwise, make sure 'stabilityai/stable-diffusion-3.5-medium' is the correct path to a directory containing all relevant files for a CLIPTokenizer tokenizer.
Vérifiez la connectivité réseau, le nom du modèle et les problèmes de cache.


In [None]:
# Cellule 4: Préparation de l'entrée
input_image_path = "path/to/your/image.jpg" # Mettez le chemin de votre image
input_image_pil = Image.open(input_image_path).convert("RGB").resize((512, 512))

target_prompt = "A photo of a cat wearing a party hat" # Votre texte cible

# Fonction pour prétraiter l'image pour le VAE
def preprocess(image):
    w, h = image.size
    w, h = map(lambda x: x - x % 32, (w, h)) # ensure resolution is multiple of 32
    image = image.resize((w, h), resample=Image.LANCZOS)
    image = np.array(image).astype(np.float32) / 255.0
    image = image[None].transpose(0, 3, 1, 2)
    image = torch.from_numpy(image)
    return 2.0 * image - 1.0

input_image_tensor = preprocess(input_image_pil).to(device)
# Obtenir la latente initiale de l'image d'entrée (utilisée pour la perte de reconstruction)
with torch.no_grad():
    input_latents = vae.encode(input_image_tensor).latent_dist.sample() * vae.config.scaling_factor # scaling_factor ~0.18215

plt.imshow(input_image_pil)
plt.title("Input Image")
plt.show()

In [None]:
# Cellule 5: Étape A - Optimisation de l'Embedding

# Obtenir e_tgt (non entraînable)
text_input_tgt = tokenizer(target_prompt, padding="max_length", max_length=tokenizer.model_max_length, truncation=True, return_tensors="pt")
with torch.no_grad():
    e_tgt = text_encoder(text_input_tgt.input_ids.to(device))[0]

# Initialiser e_opt comme clone de e_tgt, mais entraînable
e_opt = e_tgt.clone().detach().requires_grad_(True)

# Configuration de l'optimisation
optimizer = optim.Adam([e_opt], lr=1e-3) # Ajuster lr
num_optimization_steps = 100 # Comme dans Imagic (~100), ajuster si besoin
unet.eval() # Garder UNet gelé pendant cette étape
text_encoder.eval() # Garder text_encoder gelé
vae.eval() # Garder VAE gelé

print("Début de l'optimisation de l'embedding...")
pbar = tqdm(range(num_optimization_steps))
for step in pbar:
    optimizer.zero_grad()

    # Échantillonner un timestep aléatoire
    t = torch.randint(0, scheduler.config.num_train_timesteps, (1,), device=device).long()

    # Bruiter l'image latente d'entrée
    noise = torch.randn_like(input_latents)
    latents_noisy = scheduler.add_noise(input_latents, noise, t)

    # Prédire le bruit avec e_opt
    noise_pred = unet(latents_noisy, t, encoder_hidden_states=e_opt).sample

    # Calculer la perte de reconstruction
    loss = nn.functional.mse_loss(noise_pred, noise, reduction="mean")

    loss.backward()
    optimizer.step()

    pbar.set_description(f"Loss: {loss.item():.4f}")

print("Optimisation de l'embedding terminée.")
e_opt_optimized = e_opt.clone().detach() # Sauvegarder l'embedding optimisé

In [None]:
# Cellule 6: Étape B - Fine-tuning UNet (Simplifié - Envisager LoRA en pratique)

# Rendre UNet entraînable, geler le reste
unet.train()
text_encoder.eval()
vae.eval()
# e_opt_optimized n'a pas besoin de grad ici

optimizer_unet = optim.Adam(unet.parameters(), lr=5e-6) # Très petit lr pour fine-tuning
num_finetune_steps = 200 # Ajuster (~1500 dans Imagic, mais long/coûteux)

print("Début du fine-tuning de UNet...")
pbar = tqdm(range(num_finetune_steps))
for step in pbar:
    optimizer_unet.zero_grad()

    t = torch.randint(0, scheduler.config.num_train_timesteps, (1,), device=device).long()
    noise = torch.randn_like(input_latents)
    latents_noisy = scheduler.add_noise(input_latents, noise, t)

    # Prédire le bruit avec le UNet actuel et e_opt_optimized
    noise_pred = unet(latents_noisy, t, encoder_hidden_states=e_opt_optimized).sample

    loss = nn.functional.mse_loss(noise_pred, noise, reduction="mean")

    loss.backward()
    optimizer_unet.step()

    pbar.set_description(f"UNet Loss: {loss.item():.4f}")

print("Fine-tuning UNet terminé.")
# Sauvegarder les poids fine-tunés si nécessaire (ou les poids LoRA)
# unet.save_pretrained("./unet_finetuned")
unet.eval() # Remettre en mode évaluation

In [None]:
# Cellule 7: Étape C - Interpolation et Génération

eta = 0.7 # Facteur d'interpolation (entre 0 et 1) - à ajuster
guidance_scale = 7.5 # Force de la guidance (CFG)
num_inference_steps = 50 # Nombre d'étapes de sampling

# Calculer l'embedding interpolé
e_bar = eta * e_tgt + (1 - eta) * e_opt_optimized

# Préparer pour CFG (Classifier-Free Guidance)
uncond_input = tokenizer([""] * 1, padding="max_length", max_length=tokenizer.model_max_length, return_tensors="pt")
with torch.no_grad():
    uncond_embeddings = text_encoder(uncond_input.input_ids.to(device))[0]

text_embeddings = torch.cat([uncond_embeddings, e_bar])

# Initialiser la latente de départ (bruit pur)
latents = torch.randn((1, unet.config.in_channels, 512 // 8, 512 // 8), device=device) # Taille pour SD 1.5
latents = latents * scheduler.init_noise_sigma

# Boucle de sampling DDIM
scheduler.set_timesteps(num_inference_steps)

print("Début de la génération de l'image éditée...")
for t in tqdm(scheduler.timesteps):
    # Expansion des latentes pour CFG
    latent_model_input = torch.cat([latents] * 2)
    latent_model_input = scheduler.scale_model_input(latent_model_input, t)

    # Prédire le bruit
    with torch.no_grad():
        noise_pred = unet(latent_model_input, t, encoder_hidden_states=text_embeddings).sample

    # CFG
    noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
    noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

    # Calculer l'étape précédente de la latente
    latents = scheduler.step(noise_pred, t, latents).prev_sample

print("Génération terminée.")

# Décoder l'image finale
latents = 1 / vae.config.scaling_factor * latents
with torch.no_grad():
    image = vae.decode(latents).sample
image = (image / 2 + 0.5).clamp(0, 1)
image = image.cpu().permute(0, 2, 3, 1).numpy()[0]
edited_image_pil = Image.fromarray((image * 255).astype(np.uint8))

# Afficher l'image éditée
plt.imshow(edited_image_pil)
plt.title(f"Edited Image (eta={eta})")
plt.show()

In [None]:
# Cellule 8 (Concept): Contrôle par Attention Croisée

# --- Cette partie est complexe et nécessite une compréhension profonde ---
# --- de Prompt-to-Prompt et de l'architecture de CrossAttention de diffusers ---

# 1. Créer des classes AttentionProcessor personnalisées
class SaveAttentionProcessor:
    def __init__(self):
        self.attention_maps = {} # Dictionnaire pour stocker les cartes

    def __call__(self, attn, hidden_states, encoder_hidden_states=None, attention_mask=None):
        # ... logique pour calculer l'attention normalement ...
        # SAUVEGARDER la carte d'attention calculée (attn.attention_probs)
        # dans self.attention_maps avec une clé unique (ex: nom de couche, timestep)
        # ... retourner la sortie de l'attention ...
        return # output

class InjectAttentionProcessor:
    def __init__(self, saved_maps_source):
        self.saved_maps = saved_maps_source # Référence aux cartes sauvegardées

    def __call__(self, attn, hidden_states, encoder_hidden_states=None, attention_mask=None):
        # ... début du calcul de l'attention avec le prompt cible ...
        # RÉCUPÉRER la carte d'attention source correspondante depuis self.saved_maps
        # MODIFIER la carte d'attention cible en utilisant la carte source
        # (ex: remplacement partiel basé sur les tokens, etc.)
        # ... finir le calcul de l'attention avec la carte modifiée ...
        # ... retourner la sortie de l'attention ...
        return # output

# 2. Modifier la boucle de sampling (Étape C)

# -- Au début de la boucle de sampling --
# att_saver = SaveAttentionProcessor()
# unet.set_attn_processor(att_saver) # Attacher le saver

# latents_source = latents.clone() # Bruit initial pour la passe source

# -- Boucle sur les timesteps t --

    # --- Passe Source (pour sauvegarder l'attention) ---
    # latent_model_input_source = scheduler.scale_model_input(latents_source, t)
    # with torch.no_grad():
    #     # Utiliser e_opt_optimized pour la passe source
    #     _ = unet(latent_model_input_source, t, encoder_hidden_states=e_opt_optimized).sample
    # # Ici, att_saver.attention_maps devrait être rempli pour ce timestep

    # --- Passe Cible (avec injection d'attention) ---
    # att_injector = InjectAttentionProcessor(att_saver.attention_maps)
    # unet.set_attn_processor(att_injector) # Attacher l'injector

    # latent_model_input_target = torch.cat([latents] * 2) # Pour CFG
    # latent_model_input_target = scheduler.scale_model_input(latent_model_input_target, t)

    # with torch.no_grad():
    #     # Utiliser e_tgt (ou e_bar) pour la passe cible
    #     noise_pred = unet(latent_model_input_target, t, encoder_hidden_states=text_embeddings).sample # text_embeddings utilise e_tgt/e_bar

    # --- Fin de la passe cible ---
    # unet.set_attn_processor(None) # Détacher les processors (ou remettre les originaux)

    # ... reste de la logique CFG et scheduler.step(noise_pred, t, latents) ...

# --- Fin de la boucle ---

# 3. Décoder l'image finale comme avant
# ...

In [15]:
# Cellule 1: Imports et Configuration Initiale
import torch
from diffusers import AutoencoderKL, UNet2DConditionModel, DDIMScheduler
from transformers import CLIPTokenizer, CLIPTextModel
from torch import optim, nn
from tqdm.auto import tqdm
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
from pathlib import Path
import os

# Configuration initiale (GPU, etc.)
device = "cuda" if torch.cuda.is_available() else "cpu"
if not torch.cuda.is_available():
    print("ATTENTION: CUDA non disponible, utilisation du CPU (très lent).")
print(f"Using device: {device}")

# --- Optionnel: Patch IPv4 (si toujours nécessaire pour le téléchargement) ---
# import socket
# import requests.packages.urllib3.util.connection as urllib3_cn
# import functools
# print("Tentative d'application du patch IPv4...")
# try:
#     if getattr(socket.getaddrinfo, '__qualname__', '') == 'patched_getaddrinfo':
#          print("Le patch IPv4 semble déjà appliqué.")
#     else:
#         _original_getaddrinfo = socket.getaddrinfo
#         @functools.wraps(_original_getaddrinfo)
#         def patched_getaddrinfo(*args, **kwargs):
#             responses = _original_getaddrinfo(*args, **kwargs)
#             ipv4_responses = [res for res in responses if res[0] == socket.AF_INET]
#             return ipv4_responses if ipv4_responses else responses # Retourne original si pas d'IPv4
#         socket.getaddrinfo = patched_getaddrinfo
#         if not hasattr(urllib3_cn, '_original_allowed_gai_family'):
#              urllib3_cn._original_allowed_gai_family = getattr(urllib3_cn, 'allowed_gai_family', None)
#              urllib3_cn.allowed_gai_family = lambda: socket.AF_INET
#         print("Patch IPv4 appliqué.")
# except Exception as e:
#     print(f"Échec de l'application du patch IPv4 : {e}")
# ---------------------------------------------------------------------

# --- Cellule 2: Chargement des Composants Individuels ---
# Utiliser SD 1.5 comme base standard pour Imagic/Prompt-to-Prompt
model_id = "CompVis/stable-diffusion-v1-4"
# OU chargez depuis un dossier local si vous avez téléchargé manuellement:
# local_model_directory = "chemin/vers/votre/dossier/stable-diffusion-v1-5"
# if not os.path.exists(local_model_directory):
#      raise FileNotFoundError(f"Dossier local non trouvé: {local_model_directory}")
# model_id = local_model_directory # Utiliser le chemin local

try:
    print(f"Chargement des composants de : {model_id}")
    tokenizer = CLIPTokenizer.from_pretrained(model_id, subfolder="tokenizer", force_download=True)
    text_encoder = CLIPTextModel.from_pretrained(model_id, subfolder="text_encoder").to(device)
    vae = AutoencoderKL.from_pretrained(model_id, subfolder="vae").to(device)
    unet = UNet2DConditionModel.from_pretrained(model_id, subfolder="unet").to(device)
    scheduler = DDIMScheduler.from_pretrained(model_id, subfolder="scheduler") # Utiliser DDIM comme Imagic
    print("Composants chargés avec succès.")

    # Mettre les modèles en mode évaluation par défaut
    vae.eval()
    text_encoder.eval()
    unet.eval()

except Exception as e:
    print(f"ERREUR lors du chargement des composants: {e}")
    print("Vérifiez la connectivité, le chemin, le cache, ou téléchargez manuellement.")
    # Arrêter ici si le chargement échoue
    raise e

# --- Cellule 3: Préparation Image et Texte ---
# --- REMPLACEZ PAR VOTRE IMAGE ---
input_image_path = "path/to/your/input_image.jpg"
if not os.path.exists(input_image_path):
    # Mettre une image par défaut ou lever une erreur
    print(f"ATTENTION: Image d'entrée non trouvée à '{input_image_path}'. Utilisation d'une image placeholder (noire).")
    input_image_pil = Image.new('RGB', (512, 512), color = 'black')
    # raise FileNotFoundError(f"Image d'entrée non trouvée: {input_image_path}")
else:
    input_image_pil = Image.open(input_image_path).convert("RGB")

# Redimensionner l'image d'entrée
img_size = 512
input_image_pil = input_image_pil.resize((img_size, img_size), resample=Image.LANCZOS)

# --- REMPLACEZ PAR VOTRE PROMPT ---
target_prompt = "A photo of the input object wearing a party hat" # Adaptez à votre image !

# Fonction de prétraitement pour VAE
def preprocess(image):
    w, h = image.size
    # S'assurer que la taille est multiple de 8 (pour VAE)
    w, h = map(lambda x: x - x % 8, (w, h))
    image = image.resize((w, h), resample=Image.LANCZOS)
    image = np.array(image).astype(np.float32) / 255.0
    image = image[None].transpose(0, 3, 1, 2) # Ajouter batch dim, passer en CHW
    image = torch.from_numpy(image)
    return 2.0 * image - 1.0 # Normaliser entre -1 et 1

# Prétraiter et encoder l'image d'entrée en latente
input_image_tensor = preprocess(input_image_pil).to(device=device, dtype=vae.dtype) # Utiliser le dtype du VAE
with torch.no_grad():
    # Calculer la latente initiale (z0)
    latent_dist = vae.encode(input_image_tensor).latent_dist
    input_latents = latent_dist.sample() * vae.config.scaling_factor # Ne pas prendre la moyenne, échantillonner
    # Vous pouvez aussi garder mean et std si nécessaire pour des pertes VAE plus tard
    # input_latents_mean = latent_dist.mean * vae.config.scaling_factor
    # input_latents_logvar = latent_dist.logvar

print("Image d'entrée préparée et encodée en latente.")
plt.imshow(input_image_pil)
plt.title("Input Image")
plt.show()

# Encoder le prompt cible (e_tgt)
text_input_tgt = tokenizer(target_prompt, padding="max_length", max_length=tokenizer.model_max_length, truncation=True, return_tensors="pt")
with torch.no_grad():
    e_tgt = text_encoder(text_input_tgt.input_ids.to(device))[0].to(dtype=text_encoder.dtype) # Utiliser le dtype de l'encodeur

print("Prompt cible encodé (e_tgt).")


# --- Cellule 4: Étape A - Optimisation de l'Embedding Texte (e_opt) ---
print("\n--- Début Étape A: Optimisation Embedding (e_opt) ---")

# Initialiser e_opt comme clone de e_tgt, mais entraînable
e_opt = e_tgt.clone().detach().requires_grad_(True)

# Optimiseur pour e_opt SEULEMENT
optimizer_embedding = optim.Adam([e_opt], lr=1e-3) # Ajuster lr si besoin

# Paramètres d'optimisation
num_optimization_steps = 100 # ~100 dans Imagic
embedding_optimization_loss = 0 # Pour log

# S'assurer que seul e_opt est entraînable
vae.requires_grad_(False)
text_encoder.requires_grad_(False)
unet.requires_grad_(False)

pbar = tqdm(range(num_optimization_steps))
for step in pbar:
    optimizer_embedding.zero_grad()

    # Échantillonner un timestep aléatoire
    t = torch.randint(0, scheduler.config.num_train_timesteps, (1,), device=device).long()

    # Bruiter l'image latente d'entrée (input_latents)
    noise = torch.randn_like(input_latents, dtype=e_opt.dtype) # Utiliser le bon dtype
    latents_noisy = scheduler.add_noise(input_latents.to(dtype=e_opt.dtype), noise, t)

    # Prédire le bruit en utilisant e_opt comme condition
    noise_pred = unet(latents_noisy, t, encoder_hidden_states=e_opt).sample

    # Calculer la perte: différence entre bruit prédit et bruit ajouté
    loss = nn.functional.mse_loss(noise_pred.float(), noise.float(), reduction="mean") # Calculer en float32 pour stabilité

    loss.backward()
    optimizer_embedding.step()

    embedding_optimization_loss = loss.item()
    pbar.set_description(f"Step A Loss: {embedding_optimization_loss:.4f}")

print(f"--- Fin Étape A ---")
# Sauvegarder l'embedding optimisé (sans gradient)
e_opt_optimized = e_opt.clone().detach()


# --- Cellule 5: Étape B - Fine-tuning UNet ---
print("\n--- Début Étape B: Fine-tuning UNet ---")
# ATTENTION: Cette étape est coûteuse en calcul et mémoire.
# LoRA est une alternative plus efficace en pratique.
# Ici, nous faisons un fine-tuning complet simplifié pour l'exemple.

# Rendre UNet entraînable, geler le reste
unet.requires_grad_(True)
vae.requires_grad_(False)
text_encoder.requires_grad_(False)
# e_opt_optimized n'a pas besoin de gradient

# Optimiseur pour UNet SEULEMENT
optimizer_unet = optim.Adam(unet.parameters(), lr=5e-7) # Très petit lr pour fine-tuning

# Paramètres de fine-tuning
num_finetune_steps = 100 # Réduire pour test, Imagic utilise ~1500 (très long!)
finetune_loss = 0

unet.train() # Mettre UNet en mode entraînement

pbar = tqdm(range(num_finetune_steps))
for step in pbar:
    optimizer_unet.zero_grad()

    t = torch.randint(0, scheduler.config.num_train_timesteps, (1,), device=device).long()
    noise = torch.randn_like(input_latents, dtype=unet.dtype)
    latents_noisy = scheduler.add_noise(input_latents.to(dtype=unet.dtype), noise, t)

    # Prédire le bruit avec le UNet actuel et e_opt_optimized (gelé)
    noise_pred = unet(latents_noisy, t, encoder_hidden_states=e_opt_optimized.to(dtype=unet.dtype)).sample

    loss = nn.functional.mse_loss(noise_pred.float(), noise.float(), reduction="mean")

    loss.backward()
    optimizer_unet.step()

    finetune_loss = loss.item()
    pbar.set_description(f"Step B Loss: {finetune_loss:.4f}")

print("--- Fin Étape B ---")
unet.eval() # Remettre en mode évaluation

# Optionnel: Sauvegarder le UNet fine-tuné
# unet.save_pretrained("./unet_finetuned_imagic")


# --- Cellule 6: Étape C - Interpolation et Génération ---
print("\n--- Début Étape C: Interpolation et Génération ---")

eta = 0.7 # Facteur d'interpolation (entre 0 et 1) - à ajuster !
guidance_scale = 7.5 # Force de la guidance (CFG)
num_inference_steps = 50 # Nombre d'étapes de sampling DDIM

# Calculer l'embedding interpolé
# S'assurer que les dtypes correspondent avant l'interpolation
e_bar = eta * e_tgt.to(dtype=e_opt_optimized.dtype) + (1 - eta) * e_opt_optimized

# Préparer pour CFG (Classifier-Free Guidance)
# Utiliser un prompt vide pour l'inconditionnel
uncond_input = tokenizer([""] * 1, padding="max_length", max_length=tokenizer.model_max_length, return_tensors="pt")
with torch.no_grad():
    uncond_embeddings = text_encoder(uncond_input.input_ids.to(device))[0].to(dtype=unet.dtype)

# Concaténer inconditionnel et conditionnel (e_bar)
text_embeddings_final = torch.cat([uncond_embeddings, e_bar])

# Initialiser la latente de départ (bruit pur) pour l'inférence
latents_inf = torch.randn(
    (1, unet.config.in_channels, img_size // 8, img_size // 8), # Taille latente pour SD 1.x (512/8=64)
    device=device,
    dtype=unet.dtype # Utiliser le dtype de l'UNet
)

# Mettre à l'échelle le bruit initial
latents_inf = latents_inf * scheduler.init_noise_sigma

# Boucle de sampling DDIM
scheduler.set_timesteps(num_inference_steps)
print("Génération de l'image éditée...")
pbar = tqdm(scheduler.timesteps)
for t in pbar:
    # Expansion des latentes pour CFG (batch size de 2)
    latent_model_input = torch.cat([latents_inf] * 2)
    latent_model_input = scheduler.scale_model_input(latent_model_input, t) # Peut nécessiter un ajustement selon le scheduler

    # Prédire le bruit avec UNet fine-tuné
    with torch.no_grad():
        noise_pred = unet(latent_model_input, t, encoder_hidden_states=text_embeddings_final).sample

    # Appliquer CFG
    noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
    noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

    # Calculer l'étape précédente de la latente avec le scheduler
    latents_inf = scheduler.step(noise_pred, t, latents_inf).prev_sample

print("Génération terminée.")

# Décoder l'image finale depuis la latente
# Mettre à l'échelle inverse la latente
latents_inf_scaled = 1 / vae.config.scaling_factor * latents_inf
with torch.no_grad():
    # Utiliser le VAE pour décoder
    image = vae.decode(latents_inf_scaled.to(vae.dtype)).sample # Assurer le bon dtype pour VAE

# Post-traitement de l'image
image = (image / 2 + 0.5).clamp(0, 1) # Remettre entre 0 et 1
image = image.cpu().permute(0, 2, 3, 1).float().numpy()[0] # Passer en HWC pour affichage
edited_image_pil = Image.fromarray((image * 255).astype(np.uint8))

# Afficher l'image éditée
plt.figure(figsize=(8, 8))
plt.imshow(edited_image_pil)
plt.title(f"Edited Image (Imagic - eta={eta})")
plt.axis('off')
plt.show()

# --- Cellule 7: (PLACEHOLDER) Phase 2 - Contrôle par Attention Croisée ---
print("\n--- Phase 2: Contrôle par Attention Croisée (Implémentation future) ---")
# C'est ici que vous implémenteriez la logique complexe de manipulation
# des cartes d'attention pendant la boucle de génération (Cellule 6),
# en vous basant sur Prompt-to-Prompt ou une technique similaire.
# Cela nécessitera de modifier la boucle de sampling pour sauvegarder
# et injecter/modifier les attentions.

# Exemple conceptuel de génération avec attention (pseudo-code):
# prompt_source = "" # Ou un prompt décrivant l'image originale
# prompt_target = target_prompt
# generate_with_cross_attn_control(prompt_source, prompt_target, unet_finetuned, ...)

print("Implémentation du contrôle d'attention non réalisée dans cet exemple de base.")


# --- Cellule 8: (PLACEHOLDER) Phase 3 - Améliorations (GIF, Multi-Input) ---
print("\n--- Phase 3: Améliorations (Implémentation future) ---")
# Pour un GIF, vous mettriez la Cellule 6 dans une boucle sur eta
# et collecteriez les images PIL dans une liste, puis utiliseriez imageio.mimsave.

# Pour le multi-input, la stratégie reste à définir (très avancé).

Using device: cuda
Chargement des composants de : CompVis/stable-diffusion-v1-4
ERREUR lors du chargement des composants: Force download failed due to the above error.
Vérifiez la connectivité, le chemin, le cache, ou téléchargez manuellement.


ValueError: Force download failed due to the above error.

In [16]:
# Cellule 1: Imports et Configuration Initiale
import torch
import numpy as np
import matplotlib.pyplot as plt
import os
from diffusers import AutoencoderKL, UNet2DConditionModel, DDIMScheduler
from transformers import CLIPTokenizer, CLIPTextModel
from torch import optim, nn
from tqdm.auto import tqdm
from PIL import Image
from pathlib import Path
import torchvision.transforms as T

# Configuration initiale (GPU, etc.)
device = "cuda" if torch.cuda.is_available() else "cpu"
if not torch.cuda.is_available():
    print("ATTENTION: GPU non disponible, l'exécution sera très lente!")
print(f"Using device: {device}")

# --- Optionnel: Patch IPv4 (si toujours nécessaire pour le téléchargement) ---
# import socket
# import requests.packages.urllib3.util.connection as urllib3_cn
# import functools
# print("Tentative d'application du patch IPv4...")
# try:
#     if getattr(socket.getaddrinfo, '__qualname__', '') == 'patched_getaddrinfo':
#          print("Le patch IPv4 semble déjà appliqué.")
#     else:
#         _original_getaddrinfo = socket.getaddrinfo
#         @functools.wraps(_original_getaddrinfo)
#         def patched_getaddrinfo(*args, **kwargs):
#             responses = _original_getaddrinfo(*args, **kwargs)
#             ipv4_responses = [res for res in responses if res[0] == socket.AF_INET]
#             return ipv4_responses if ipv4_responses else responses # Retourne original si pas d'IPv4
#         socket.getaddrinfo = patched_getaddrinfo
#         if not hasattr(urllib3_cn, '_original_allowed_gai_family'):
#              urllib3_cn._original_allowed_gai_family = getattr(urllib3_cn, 'allowed_gai_family', None)
#              urllib3_cn.allowed_gai_family = lambda: socket.AF_INET
#         print("Patch IPv4 appliqué.")
# except Exception as e:
#     print(f"Échec de l'application du patch IPv4 : {e}")
# ---------------------------------------------------------------------

# --- Cellule 2: Chargement des Composants Individuels ---
# Utiliser SD 1.4 comme base pour Imagic
model_id = "CompVis/stable-diffusion-v1-4"
# OU chargez depuis un dossier local si vous avez téléchargé manuellement:
# local_model_directory = "chemin/vers/votre/dossier/stable-diffusion-v1-4"
# if not os.path.exists(local_model_directory):
#      raise FileNotFoundError(f"Dossier local non trouvé: {local_model_directory}")
# model_id = local_model_directory # Utiliser le chemin local

try:
    print(f"Chargement des composants de : {model_id}")
    tokenizer = CLIPTokenizer.from_pretrained(model_id, subfolder="tokenizer")
    text_encoder = CLIPTextModel.from_pretrained(model_id, subfolder="text_encoder").to(device)
    vae = AutoencoderKL.from_pretrained(model_id, subfolder="vae").to(device)
    unet = UNet2DConditionModel.from_pretrained(model_id, subfolder="unet").to(device)
    scheduler = DDIMScheduler.from_pretrained(model_id, subfolder="scheduler")
    
    # Mettre les modèles en mode évaluation
    text_encoder.eval()
    vae.eval()
    unet.eval()
    
    print("Tous les composants chargés avec succès!")
except Exception as e:
    print(f"Erreur lors du chargement des composants : {e}")
    print("Vérifiez la connectivité, le chemin, le cache, ou téléchargez manuellement.")
    # Arrêter ici si le chargement échoue
    raise e

# --- Cellule 3: Préparation Image et Texte ---
# --- REMPLACEZ PAR VOTRE IMAGE ---
input_image_path = "path/to/your/input_image.jpg"  # À remplacer
if not os.path.exists(input_image_path):
    print(f"Image introuvable: {input_image_path}")
    # Option: utiliser une image de démonstration si disponible
    input_image_path = "/home/antoine/genai_project/assets/demo_image.jpg"  # À ajuster
    if not os.path.exists(input_image_path):
        raise FileNotFoundError("Aucune image trouvée. Spécifiez un chemin valide.")
else:
    print(f"Image trouvée: {input_image_path}")

# Charger l'image d'entrée
input_image_pil = Image.open(input_image_path).convert("RGB")

# Redimensionner l'image d'entrée
img_size = 512
input_image_pil = input_image_pil.resize((img_size, img_size), resample=Image.LANCZOS)

# --- REMPLACEZ PAR VOTRE PROMPT ---
target_prompt = "A photo of the input object wearing a party hat"  # Adaptez à votre image !

# Fonction de prétraitement pour VAE
def preprocess(image):
    transform = T.Compose([
        T.ToTensor(),
        T.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    ])
    return transform(image).unsqueeze(0)

# Prétraiter et encoder l'image d'entrée en latente
input_image_tensor = preprocess(input_image_pil).to(device=device, dtype=vae.dtype)  # Utiliser le dtype du VAE
with torch.no_grad():
    # Encoder l'image en latente avec le VAE
    latent_dist = vae.encode(input_image_tensor)
    latent_input = latent_dist.latent_dist.sample() * 0.18215  # Facteur d'échelle du VAE

print("Image d'entrée préparée et encodée en latente.")
plt.imshow(input_image_pil)
plt.title("Input Image")
plt.show()

# Encoder le prompt cible (e_tgt)
text_input_tgt = tokenizer(target_prompt, padding="max_length", max_length=tokenizer.model_max_length, 
                           truncation=True, return_tensors="pt")
with torch.no_grad():
    e_tgt = text_encoder(text_input_tgt.input_ids.to(device))[0]  # Embeddings du texte cible

print("Prompt cible encodé (e_tgt).")


# --- Cellule 4: Étape A - Optimisation de l'Embedding Texte (e_opt) ---
print("\n--- Début Étape A: Optimisation Embedding (e_opt) ---")

# Initialiser e_opt comme clone de e_tgt, mais entraînable
e_opt = e_tgt.clone().detach().requires_grad_(True)

# Optimiseur pour e_opt SEULEMENT
optimizer_embedding = optim.Adam([e_opt], lr=1e-3)  # Ajuster lr si besoin

# Paramètres d'optimisation
num_optimization_steps = 100  # ~100 dans Imagic
embedding_optimization_loss = 0  # Pour log

# S'assurer que seul e_opt est entraînable
vae.requires_grad_(False)
text_encoder.requires_grad_(False)
unet.requires_grad_(False)

pbar = tqdm(range(num_optimization_steps))
for step in pbar:
    # Réinitialiser les gradients
    optimizer_embedding.zero_grad()
    
    # Échantillonner un timestep t aléatoire
    t = torch.randint(0, scheduler.config.num_train_timesteps, (1,), device=device).long()
    
    # Ajouter du bruit à la latente originale selon t
    noise = torch.randn_like(latent_input)
    noisy_latent = scheduler.add_noise(latent_input, noise, t)
    
    # Prédiction de bruit par l'UNet avec l'embedding optimisable
    noise_pred = unet(noisy_latent, t, encoder_hidden_states=e_opt).sample
    
    # Calculer la perte MSE entre le bruit prédit et le bruit ajouté
    loss = nn.MSELoss()(noise_pred, noise)
    
    # Rétropropagation et optimisation
    loss.backward()
    optimizer_embedding.step()
    
    # Mise à jour de la barre de progression
    embedding_optimization_loss = loss.item()
    pbar.set_description(f"Optimisation embedding - Loss: {embedding_optimization_loss:.4f}")

print(f"--- Fin Étape A ---")
# Sauvegarder l'embedding optimisé (sans gradient)
e_opt_optimized = e_opt.clone().detach()


# --- Cellule 5: Étape B - Fine-tuning UNet ---
print("\n--- Début Étape B: Fine-tuning UNet ---")
# ATTENTION: Cette étape est coûteuse en calcul et mémoire.
# LoRA est une alternative plus efficace en pratique.
# Ici, nous faisons un fine-tuning complet simplifié pour l'exemple.

# Rendre UNet entraînable, geler le reste
unet.requires_grad_(True)
vae.requires_grad_(False)
text_encoder.requires_grad_(False)
# e_opt_optimized n'a pas besoin de gradient

# Optimiseur pour UNet SEULEMENT
optimizer_unet = optim.Adam(unet.parameters(), lr=5e-7)  # Très petit lr pour fine-tuning

# Paramètres de fine-tuning
num_finetune_steps = 100  # Réduire pour test, Imagic utilise ~1500 (très long!)
finetune_loss = 0

unet.train()  # Mettre UNet en mode entraînement

pbar = tqdm(range(num_finetune_steps))
for step in pbar:
    # Réinitialiser les gradients
    optimizer_unet.zero_grad()
    
    # Échantillonner un timestep t aléatoire
    t = torch.randint(0, scheduler.config.num_train_timesteps, (1,), device=device).long()
    
    # Ajouter du bruit à la latente originale selon t
    noise = torch.randn_like(latent_input)
    noisy_latent = scheduler.add_noise(latent_input, noise, t)
    
    # Prédiction de bruit par l'UNet avec l'embedding optimisé fixe
    noise_pred = unet(noisy_latent, t, encoder_hidden_states=e_opt_optimized).sample
    
    # Calculer la perte MSE entre le bruit prédit et le bruit ajouté
    loss = nn.MSELoss()(noise_pred, noise)
    
    # Rétropropagation et optimisation
    loss.backward()
    optimizer_unet.step()
    
    # Mise à jour de la barre de progression
    finetune_loss = loss.item()
    pbar.set_description(f"Fine-tuning UNet - Loss: {finetune_loss:.4f}")

print("--- Fin Étape B ---")
unet.eval()  # Remettre en mode évaluation

# Optionnel: Sauvegarder le UNet fine-tuné
# unet.save_pretrained("./unet_finetuned_imagic")


# --- Cellule 6: Étape C - Interpolation et Génération ---
print("\n--- Début Étape C: Interpolation et Génération ---")

eta = 0.7  # Facteur d'interpolation (entre 0 et 1) - à ajuster !
guidance_scale = 7.5  # Force de la guidance (CFG)
num_inference_steps = 50  # Nombre d'étapes de sampling DDIM

# Calculer l'embedding interpolé
# S'assurer que les dtypes correspondent avant l'interpolation
e_bar = eta * e_tgt.to(dtype=e_opt_optimized.dtype) + (1 - eta) * e_opt_optimized

# Préparer pour CFG (Classifier-Free Guidance)
# Utiliser un prompt vide pour l'inconditionnel
uncond_input = tokenizer([""], padding="max_length", max_length=tokenizer.model_max_length, return_tensors="pt")
with torch.no_grad():
    uncond_embeddings = text_encoder(uncond_input.input_ids.to(device))[0]

# Concaténer inconditionnel et conditionnel (e_bar)
text_embeddings_final = torch.cat([uncond_embeddings, e_bar])

# Initialiser la latente de départ (bruit pur) pour l'inférence
latents_inf = torch.randn(
    (1, unet.config.in_channels, img_size // 8, img_size // 8),
    device=device,
    dtype=unet.dtype)

# Mettre à l'échelle le bruit initial
latents_inf = latents_inf * scheduler.init_noise_sigma

# Boucle de sampling DDIM
scheduler.set_timesteps(num_inference_steps)
print("Génération de l'image éditée...")
pbar = tqdm(scheduler.timesteps)
for t in pbar:
    # Dupliquer les latentes pour la guidance (inconditionnel et conditionnel)
    latent_model_input = torch.cat([latents_inf] * 2)
    
    # Définir le timestep actuel
    latent_model_input = scheduler.scale_model_input(latent_model_input, t)
    
    # Prédire le bruit résiduel
    with torch.no_grad():
        noise_pred = unet(latent_model_input, t, encoder_hidden_states=text_embeddings_final).sample
    
    # Séparer les prédictions conditionnelles et inconditionnelles
    noise_pred_uncond, noise_pred_cond = noise_pred.chunk(2)
    
    # Appliquer la guidance: pred_noise = uncond + guidance_scale * (cond - uncond)
    noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_cond - noise_pred_uncond)
    
    # Étape de débruitage DDIM
    latents_inf = scheduler.step(noise_pred, t, latents_inf).prev_sample

print("Génération terminée.")

# Décoder l'image finale depuis la latente
with torch.no_grad():
    latents_inf = 1 / 0.18215 * latents_inf  # Mise à l'échelle inverse du VAE
    image = vae.decode(latents_inf).sample

# Conversion en image PIL
image = (image / 2 + 0.5).clamp(0, 1)
image = image.detach().cpu().permute(0, 2, 3, 1).numpy()
image = (image * 255).round().astype("uint8")[0]
final_image = Image.fromarray(image)

# Afficher l'image originale et l'image éditée
fig, ax = plt.subplots(1, 2, figsize=(12, 6))
ax[0].imshow(input_image_pil)
ax[0].set_title("Image originale")
ax[0].axis("off")
ax[1].imshow(final_image)
ax[1].set_title(f"Image éditée: {target_prompt}")
ax[1].axis("off")
plt.tight_layout()
plt.show()

# Sauvegarder l'image générée
os.makedirs("./output", exist_ok=True)
final_image.save("./output/edited_image.png")
print("Image éditée sauvegardée dans ./output/edited_image.png")


# --- Cellule 7: Visualisation de l'attention croisée ---
print("\n--- Visualisation de l'attention croisée ---")

# Fonction pour extraire les cartes d'attention croisée de l'UNet
def get_cross_attention(unet, latents, t, text_embeddings):
    # Activation hooks pour capturer l'attention
    attention_maps = []
    
    def attention_hook(module, input, output):
        attention_map = output[1]  # Forme: [batch, heads, seq_len, dim]
        # Moyenne sur les têtes d'attention
        attention_map = attention_map.mean(1)  # [batch, seq_len, dim]
        attention_maps.append(attention_map)
    
    # Enregistrer les hooks sur les modules d'attention croisée
    hooks = []
    for name, module in unet.named_modules():
        if "attn2" in name:  # attn2 est l'attention croisée texte-image
            hooks.append(module.register_forward_hook(attention_hook))
    
    # Passer les latentes à travers l'UNet
    with torch.no_grad():
        _ = unet(latents, t, encoder_hidden_states=text_embeddings)
    
    # Retirer les hooks
    for hook in hooks:
        hook.remove()
        
    return attention_maps

# Générer des cartes d'attention pour une timestep spécifique
t_idx = len(scheduler.timesteps) // 2  # Timestep médiane
t_attention = scheduler.timesteps[t_idx]

# Préparer l'entrée du modèle
latent_input_viz = scheduler.scale_model_input(latents_inf, t_attention)

# Récupérer les cartes d'attention
attention_maps = get_cross_attention(unet, latent_input_viz, t_attention, e_bar.unsqueeze(0))

# Visualiser les cartes d'attention pour les tokens importants
# Trouver l'index des tokens principaux dans le prompt
tokens = tokenizer.tokenize(target_prompt)
print(f"Tokens du prompt: {tokens}")

# Sélectionner quelques tokens intéressants pour la visualisation
selected_token_indices = [1, 2, 3]  # À ajuster selon les tokens significatifs
token_attention_weights = {}

# Prendre la dernière carte d'attention (généralement la plus significative)
last_attention_map = attention_maps[-1][0]  # [seq_len, dim]

# Redimensionner pour correspondre à la taille de l'image
resolution = img_size // 8
for idx in selected_token_indices:
    if idx < last_attention_map.shape[0]:
        token = tokens[idx-1] if idx > 0 and idx-1 < len(tokens) else f"token_{idx}"
        attention_weights = last_attention_map[idx].reshape(resolution, resolution)
        attention_weights = attention_weights.cpu().numpy()
        token_attention_weights[token] = attention_weights

# Visualiser les cartes d'attention pour les tokens sélectionnés
n_tokens = len(token_attention_weights)
if n_tokens > 0:
    fig, axes = plt.subplots(1, n_tokens + 1, figsize=(15, 5))
    
    # Image originale
    axes[0].imshow(input_image_pil)
    axes[0].set_title("Image originale")
    axes[0].axis("off")
    
    # Cartes d'attention
    for i, (token, weights) in enumerate(token_attention_weights.items(), 1):
        axes[i].imshow(weights, cmap='inferno')
        axes[i].set_title(f"Attention: {token}")
        axes[i].axis("off")
    
    plt.tight_layout()
    plt.show()
else:
    print("Aucune carte d'attention disponible pour visualisation.")

Using device: cuda
Chargement des composants de : CompVis/stable-diffusion-v1-4
Erreur lors du chargement des composants : Can't load tokenizer for 'CompVis/stable-diffusion-v1-4'. If you were trying to load it from 'https://huggingface.co/models', make sure you don't have a local directory with the same name. Otherwise, make sure 'CompVis/stable-diffusion-v1-4' is the correct path to a directory containing all relevant files for a CLIPTokenizer tokenizer.
Vérifiez la connectivité, le chemin, le cache, ou téléchargez manuellement.


OSError: Can't load tokenizer for 'CompVis/stable-diffusion-v1-4'. If you were trying to load it from 'https://huggingface.co/models', make sure you don't have a local directory with the same name. Otherwise, make sure 'CompVis/stable-diffusion-v1-4' is the correct path to a directory containing all relevant files for a CLIPTokenizer tokenizer.