In [65]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# experiment start

In [2]:
from pathlib import Path
from diffusers import UNet2DConditionModel, AutoencoderKL, DDPMScheduler, StableDiffusionPipeline, DiffusionPipeline
from transformers import CLIPTextModel, CLIPTokenizer
# from huggingface_hub import login
from peft import LoraConfig
import torch
import torch
from torch.utils.data import DataLoader
from torchvision import transforms
import torch.nn.functional as F
import math
from tqdm.auto import tqdm
# import matplotlib.pyplot as plt
# from peft.utils import get_peft_model_state_dict
# from diffusers.utils import convert_state_dict_to_diffusers
# from datasets import load_dataset
# from functools import partial
from PIL import Image
# from kaggle_secrets import UserSecretsClient
from torch.utils.data import Dataset
import pandas as pd
# from pydantic import BaseModel
from diffusers.training_utils import compute_snr
import cv2

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
def get_models(model_name, dtype=torch.float16):
    tokenizer = CLIPTokenizer.from_pretrained(model_name, subfolder="tokenizer")
    text_encoder = CLIPTextModel.from_pretrained(model_name, subfolder="text_encoder").to(dtype=dtype)
    vae = AutoencoderKL.from_pretrained(model_name, subfolder="vae").to(dtype=dtype)
    scheduler = DDPMScheduler.from_pretrained(model_name, subfolder="scheduler")
    unet = UNet2DConditionModel.from_pretrained(model_name, subfolder="unet").to(dtype=dtype)
    return tokenizer, text_encoder, vae, scheduler, unet

In [4]:
def setup_models_for_training(model_name, rank: int=128):
    tokenizer, text_encoder, vae, scheduler, unet = get_models(model_name)

    # freeze all weights
    for m in (unet, text_encoder, vae):
        for p in m.parameters():
            p.requires_grad = False

    # config LoRA
    unet_lora_config = LoraConfig(
        r=rank,
        lora_alpha=rank,
        init_lora_weights="gaussian",
        target_modules=["to_k", "to_q", "to_v", "to_out.0"],
    )

    unet.add_adapter(unet_lora_config)

    # set trainaible weights to float32
    for p in unet.parameters():
        if p.requires_grad:
            p.data = p.to(dtype=torch.float16)

    return tokenizer, text_encoder, vae, scheduler, unet

def get_lora_params(unet):
    return [p for p in filter(lambda p: p.requires_grad, [p for p in unet.parameters()])]

In [5]:
from dataclasses import dataclass
@dataclass
class TrainingConfig():
    train_steps: int = 30
    lr: float = 1e-5
    batch_size: int = 4
    accumulation_steps: int = 2
    rank: int = 128
    max_grad_norm: float = 1.0
    pretrained_name: str = "stabilityai/stable-diffusion-xl-base-1.0"
    snr_gamma: float = -1
    seed: int = -1
    CSV_PATH = '/mnt/Enterprise2/aavash/cpt/image_gen_hackathon/images/Carpets/desc.csv'
    BASE_IMAGE_DIR = '/mnt/Enterprise2/aavash/cpt/image_gen_hackathon/images/Carpets/'





In [6]:
import os
class CarpetWallpaperDataset(Dataset):
    def __init__(self, csv_path, base_image_dir,tokenizer):
        self.dataframe = pd.read_csv(csv_path)
        self.base_image_dir = base_image_dir
        self.tokenizer = tokenizer
        self.train_tranforms = transforms.Compose(
              [
                #   transforms.RandomHorizontalFlip(),
                  transforms.ToTensor(),
                  transforms.Resize((224,224)),
                  transforms.Normalize([0.5], [0.5]),
              ]
        )

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        # Construct full image path
        relative_path = self.dataframe.iloc[idx]['image']
        full_image_path = os.path.join(self.base_image_dir, relative_path)

        # Load image
        image = Image.open(full_image_path).convert('RGB')

        image_tensor=self.train_tranforms(image)


        input_ids = self.tokenizer(
            self.dataframe.iloc[idx]['prompt'],
            max_length=self.tokenizer.model_max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )["input_ids"][0]
        # Get corresponding text prompt
        # text_prompt = self.dataframe.iloc[idx]['prompt']

        return {"pixel_values": image_tensor,
                 "input_ids": input_ids}

In [17]:
def train(
    tokenizer: CLIPTokenizer,
    text_encoder: CLIPTextModel,
    vae: AutoencoderKL,
    scheduler: DDPMScheduler,
    unet: UNet2DConditionModel,
    config: TrainingConfig,
    device = None
):
    if device is None:
        device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")

    lora_params = get_lora_params(unet)

    text_encoder.to(device).eval()
    vae.to(device).eval()
    unet.to(device).train()

    # data set
    train_dataset = CarpetWallpaperDataset(config.CSV_PATH,config.BASE_IMAGE_DIR, tokenizer)
    train_dataloader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)

    # optimizer
    steps_per_epoch = math.ceil(len(train_dataloader) / config.accumulation_steps)
    epochs = math.ceil(config.train_steps / steps_per_epoch)

    lr = config.lr * config.accumulation_steps * config.batch_size
    optimizer = torch.optim.AdamW(lora_params, lr=lr)

    scaler = torch.cuda.amp.GradScaler()

    # progress bar setup
    global_step = 0
    progress_bar = tqdm(
        range(config.train_steps),
        desc="Steps"
    )

    print(f"configs: {config}")
    print(f"epochs: {epochs}")
    print(f"steps per epoch: {steps_per_epoch}")
    print(f"total steps: {config.train_steps}")
    print(f"accumulation steps: {config.accumulation_steps}")
    print(f"total batch size: {config.batch_size * config.accumulation_steps}")
    print(f"lr: {lr}")

    losses = []
    for _ in range(epochs):
        for step, batch in enumerate(train_dataloader):
            bs = batch["input_ids"].shape[0]

            with torch.autocast(device_type="cuda:1", dtype=torch.float16):
                with torch.no_grad():
                    encoder_output = text_encoder(batch["input_ids"].to(device), return_dict=True)
                    encoder_hidden_states = encoder_output.last_hidden_state
                    # Get text embeddings for additional conditioning
                    text_embeds = encoder_output.pooler_output

                timesteps = torch.randint(0, scheduler.config.num_train_timesteps, (bs,)).long().to(device)

                with torch.no_grad():
                    batch["pixel_values"] = batch["pixel_values"].type(torch.float16)
                    latents = vae.encode(batch["pixel_values"].to(device)).latent_dist.sample()
                    latents = latents * vae.config.scaling_factor

                noise = torch.randn_like(latents)
                noisy_latents = scheduler.add_noise(latents, noise, timesteps)
                original_size = (224, 224)
                target_size = (224, 224)
                crops_coords_top_left = (0, 0)
                time_ids = torch.tensor(
                    [
                        original_size + target_size + crops_coords_top_left
                        for _ in range(bs)
                    ],
                    device=device,
                    dtype=torch.long,
                )

                # Pass both text embeddings and time IDs as additional conditioning
                added_cond_kwargs = {
                    "text_embeds": text_embeds,
                    "time_ids": time_ids
                }

                noise_pred = unet(noisy_latents, timesteps, encoder_hidden_states, return_dict=False, added_cond_kwargs=added_cond_kwargs)[0]

                if config.snr_gamma > 0:
                    # should converge faster with snr_gamma, however works well with unweighted mse
                    # https://arxiv.org/abs/2303.09556
                    # https://github.com/huggingface/diffusers/blob/main/examples/text_to_image/train_text_to_image_lora.py
                    snr = compute_snr(scheduler, timesteps)
                    mse_loss_weights = torch.stack([snr, config.snr_gamma * torch.ones_like(timesteps)], dim=1).min(
                        dim=1
                    )[0]
                    mse_loss_weights = mse_loss_weights / snr
                    loss = F.mse_loss(noise_pred, noise, reduction="none")
                    loss = loss.mean(dim=list(range(1, len(loss.shape)))) * mse_loss_weights
                    loss = loss.mean()
                else:
                    loss = F.mse_loss(noise_pred, noise, reduction="mean")

            global_step+=1
            scaler.scale(loss).backward()

            if global_step % config.accumulation_steps == 0:
                if config.max_grad_norm > 0:
                    scaler.unscale_(optimizer)
                    torch.nn.utils.clip_grad_norm_(lora_params, config.max_grad_norm)
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad()
                progress_bar.update(1)

            losses.append(loss.item())

            progress_bar.set_postfix({"loss": losses[-1]})
            if global_step / config.accumulation_steps >= config.train_steps:
                break

    return {
        "losses": losses
    }

In [18]:
try:
    # in case of rerun, to make sure we free up GPU before calling train
    del models, pipe
    import gc; gc.collect()
    torch.cuda.empty_cache()
except:
    pass

config = TrainingConfig()
config.lr = 1e-5
config.rank = 62
config.train_steps = 30
config.snr_gamma = 5.0
config.seed = 42

torch.manual_seed(config.seed)

models = setup_models_for_training(config.pretrained_name, rank=config.rank)

outputs = train(
    *models,
    config,
)

Steps:   0%|          | 0/30 [05:12<?, ?it/s]
  scaler = torch.cuda.amp.GradScaler()
Steps:   0%|          | 0/30 [00:00<?, ?it/s]

configs: TrainingConfig(train_steps=30, lr=1e-05, batch_size=4, accumulation_steps=2, rank=62, max_grad_norm=1.0, pretrained_name='stabilityai/stable-diffusion-xl-base-1.0', snr_gamma=5.0, seed=42)
epochs: 6
steps per epoch: 5
total steps: 30
accumulation steps: 2
total batch size: 8
lr: 8e-05


RuntimeError: mat1 and mat2 shapes cannot be multiplied (4x2304 and 2816x1280)

# experiment end

In [None]:
!nvidia-smi

In [53]:
device = torch.device("cuda" if cuda.is_available() else "cpu")

In [54]:
import torch.nn.functional as F

In [55]:
class CustomImageVariationLoss(torch.nn.Module):
    def __init__(self, clip_model_name='openai/clip-vit-base-patch32'):
        super().__init__()
        self.clip_model = CLIPModel.from_pretrained(clip_model_name)
        self.clip_processor = CLIPProcessor.from_pretrained(clip_model_name)

    def extract_edges(self, image):
        if isinstance(image, torch.tensor):
            image = image.permute(1, 2, 0).numpy()
        gray_image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
        edges = cv2.Canny(gray_image, threshold1=100, threshold2=200)
        return torch.from_numpy(edges).float()/255.0

    def text_image_alignment_loss(self, generated_image, text_prompt):
        # Compute CLIP text-image similarity
        inputs = self.clip_processor(text=text_prompt, images=generated_image, return_tensors="pt", padding=True)
        outputs = self.clip_model(**inputs)
        return -outputs.logits_per_image.mean()

    def structural_preservation_loss(self, original_image, generated_image):
        # Compute structural similarity using edge detection and feature matching
        original_edges = self.extract_edges(original_image)
        generated_edges = self.extract_edges(generated_image)

        # Structural preservation metric
        structural_loss = F.mse_loss(original_edges, generated_edges)
        return structural_loss

    def forward(self, original_image, generated_image, text_prompt):
        text_alignment = self.text_image_alignment_loss(generated_image, text_prompt)
        structural_preserve = self.structural_preservation_loss(original_image, generated_image)

        # Weighted combination of losses
        total_loss = 0.6 * text_alignment + 0.4 * structural_preserve
        return total_loss

In [56]:
def setup_lora_model(base_model_path):
    # Configure LoRA parameters
    lora_config = LoraConfig(
        r=16,  # Rank of low-rank adaptation
        lora_alpha=32,  # Scaling factor
        target_modules=["to_q", "to_v"],
        lora_dropout=0.1,
        bias="none"
    )

    # Load base Stable Diffusion model
    model = StableDiffusionPipeline.from_pretrained(base_model_path)

    unet = model.unet

    lora_model = get_peft_model(unet, lora_config)

    model.unet = lora_model

    return model

In [None]:
from torchvision import transforms

In [57]:
class CarpetWallpaperDataset(Dataset):
    def __init__(self, csv_path, base_image_dir):
        self.dataframe = pd.read_csv(csv_path)
        self.base_image_dir = base_image_dir
        self.transform= transforms.Compose(
            [transforms.Resize((224,224)),
            transforms.ToTensor()]
        )

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        # Construct full image path
        relative_path = self.dataframe.iloc[idx]['image']
        full_image_path = os.path.join(self.base_image_dir, relative_path)

        # Load image
        image = Image.open(full_image_path).convert('RGB')

        image_tensor=self.transform(image)


        # Get corresponding text prompt
        text_prompt = self.dataframe.iloc[idx]['prompt']

        return {
            'image': image_tensor,
            'text_prompt': text_prompt
        }

In [58]:
def prepare_dataloader(csv_path, base_image_dir, batch_size=4):
    dataset = CarpetWallpaperDataset(csv_path, base_image_dir)
    dataloader = DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=True,
        pin_memory=True
    )

    return dataloader

In [59]:
def train_image_variation_model(
    csv_path,
    base_image_dir,
    custom_loss_fn,
    num_epochs=5,
    learning_rate=1e-4
):

    # Setup model and optimizer
    lora_model = setup_lora_model("stabilityai/stable-diffusion-xl-base-1.0").to(device)
    clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

    scaler = torch.cuda.amp.GradScaler()
    optimizer = torch.optim.AdamW(lora_model.unet.parameters(), lr=learning_rate)

    # Prepare DataLoader
    dataloader = prepare_dataloader(csv_path, base_image_dir)

    # Training loop
    for epoch in range(num_epochs):
        for batch in dataloader:
            images = batch['image'].to(device)  # Load images to device
            prompts = batch['text_prompt']  # List of text prompts

            # Preprocess text prompts and get embeddings
            text_inputs = clip_processor(text=prompts, return_tensors="pt", padding=True).to(device)
            text_embeds = lora_model.text_encoder(**text_inputs).last_hidden_state

            print(len(text_inputs))
            print(len(text_embeds))

            optimizer.zero_grad()  # Clear gradients before backpropagation

            # Image-to-image generation using mixed precision training
            with torch.cuda.amp.autocast():
                generated_images = lora_model(
                    prompt=prompts[0],
                    image=images[0],
                    strength=0.75,  # Controls image variation intensity
                    guidance_scale=7.5,
                    added_cond_kwargs={"text_embeds": text_embeds}  # Pass text embeddings
                ).images

                # Compute custom loss
                total_loss = 0
                for orig_img, gen_img, prompt in zip(images, generated_images, prompts):
                    batch_loss = custom_loss_fn(
                        original_image=orig_img,
                        generated_image=gen_img,
                        text_prompt=prompt,

                    )
                    total_loss += batch_loss

            # Backpropagate and update model
            scaler.scale(total_loss).backward()
            scaler.step(optimizer)
            scaler.update()

        print(f"Epoch {epoch + 1}/{num_epochs} completed")

    return lora_model


# experiment start

In [61]:
lora_model = setup_lora_model("stabilityai/stable-diffusion-xl-base-1.0").to(device)


Loading pipeline components...:   0%|          | 0/5 [00:00<?, ?it/s]

You have disabled the safety checker for <class 'diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline'> by passing `safety_checker=None`. Ensure that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered results in services or applications open to the public. Both the diffusers team and Hugging Face strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling it only for use-cases that involve analyzing network behavior or auditing its results. For more information, please have a look at https://github.com/huggingface/diffusers/pull/254 .


In [62]:
lora_model

StableDiffusionPipeline {
  "_class_name": "StableDiffusionPipeline",
  "_diffusers_version": "0.32.2",
  "_name_or_path": "stabilityai/stable-diffusion-xl-base-1.0",
  "feature_extractor": [
    null,
    null
  ],
  "image_encoder": [
    null,
    null
  ],
  "requires_safety_checker": true,
  "safety_checker": [
    null,
    null
  ],
  "scheduler": [
    "diffusers",
    "EulerDiscreteScheduler"
  ],
  "text_encoder": [
    "transformers",
    "CLIPTextModel"
  ],
  "tokenizer": [
    "transformers",
    "CLIPTokenizer"
  ],
  "unet": [
    "diffusers",
    "UNet2DConditionModel"
  ],
  "vae": [
    "diffusers",
    "AutoencoderKL"
  ]
}

In [63]:
def train_image_variation_model(
    csv_path,
    base_image_dir,
    custom_loss_fn,
    num_epochs=5,
    learning_rate=1e-4
):
    device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")

    # Setup model and optimizer
    lora_model = setup_lora_model("stabilityai/stable-diffusion-xl-base-1.0").to(device)
    clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

    # Ensure lora_model.text_encoder exists
    if not hasattr(lora_model, "text_encoder"):
        raise AttributeError("The LoRA model does not have a `text_encoder` attribute.")

    scaler = torch.cuda.amp.GradScaler()
    optimizer = torch.optim.AdamW(lora_model.unet.parameters(), lr=learning_rate)

    # Prepare DataLoader
    dataloader = prepare_dataloader(csv_path, base_image_dir)

    # Training loop
    for epoch in range(num_epochs):
        for batch in dataloader:
            images = batch['image'].to(device)  # Load images to device
            prompts = batch['text_prompt']  # List of text prompts

            # Debug: Check prompts
            if not prompts:
                raise ValueError("Prompts list is empty.")
            # print("Prompts:", prompts)

            # Preprocess text prompts and get embeddings
            text_inputs = clip_processor(text=prompts, return_tensors="pt", padding=True).to(device)
            # print("Text Inputs:", text_inputs)

            try:
                text_embeds = lora_model.text_encoder(**text_inputs).last_hidden_state
                if text_embeds is None:
                    raise ValueError("Text embeddings could not be generated.")
                print("Text Embeds Shape:", text_embeds.shape)
            except Exception as e:
                print(f"Error generating text embeddings: {e}")
                continue  # Skip this batch

            optimizer.zero_grad()  # Clear gradients before backpropagation

            # Image-to-image generation using mixed precision training
            with torch.cuda.amp.autocast():
                try:
                    generated_images = lora_model(
                        prompt=prompts,
                        # image=images,
                        strength=0.75,  # Controls image variation intensity
                        guidance_scale=7.5,
                        added_cond_kwargs={"text_embeds": text_embeds}  # Pass text embeddings
                    ).images
                except Exception as e:
                    print(f"Error during image generation: {e}")
                    continue  # Skip this batch

            # Compute custom loss
            total_loss = 0
            for orig_img, gen_img, prompt in zip(images, generated_images, prompts):
                batch_loss = custom_loss_fn(
                    original_image=orig_img,
                    generated_image=gen_img,
                    text_prompt=prompt,
                )
                total_loss += batch_loss

            # Backpropagate and update model
            scaler.scale(total_loss).backward()
            scaler.step(optimizer)
            scaler.update()

        print(f"Epoch {epoch + 1}/{num_epochs} completed")

    return lora_model


# experiment end

In [64]:
# Usage example
if __name__ == "__main__":
    # Paths to configure
    CSV_PATH = '/content/drive/MyDrive/Carpets/desc.csv'
    BASE_IMAGE_DIR = '/content/drive/MyDrive/Carpets'

    # Initialize custom loss function
    custom_loss_fn = CustomImageVariationLoss()

    # Train the model
    trained_model = train_image_variation_model(
        csv_path=CSV_PATH,
        base_image_dir=BASE_IMAGE_DIR,
        custom_loss_fn=custom_loss_fn
    )

Loading pipeline components...:   0%|          | 0/5 [00:00<?, ?it/s]

You have disabled the safety checker for <class 'diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline'> by passing `safety_checker=None`. Ensure that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered results in services or applications open to the public. Both the diffusers team and Hugging Face strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling it only for use-cases that involve analyzing network behavior or auditing its results. For more information, please have a look at https://github.com/huggingface/diffusers/pull/254 .
  scaler = torch.cuda.amp.GradScaler()


Text Embeds Shape: torch.Size([4, 69, 768])


  with torch.cuda.amp.autocast():


  0%|          | 0/50 [00:00<?, ?it/s]

Error during image generation: argument of type 'NoneType' is not iterable
Text Embeds Shape: torch.Size([4, 66, 768])


  0%|          | 0/50 [00:00<?, ?it/s]

Error during image generation: argument of type 'NoneType' is not iterable
Text Embeds Shape: torch.Size([4, 67, 768])


  0%|          | 0/50 [00:00<?, ?it/s]

Error during image generation: argument of type 'NoneType' is not iterable
Text Embeds Shape: torch.Size([4, 77, 768])


KeyboardInterrupt: 