# **Fine-Tuning Stable Diffusion Model with custom dataset for Text to Image Generation**

In [None]:
pip install datasets pandas pillow requests

In [None]:
!pip install \
    accelerate==1.0.1 \
    diffusers==0.30.3 \
    numpy==2.1.2 \
    pillow==11.0.0 \
    torch==2.4.1 \
    torchvision==0.19.1 \
    tqdm==4.66.5 \
    transformers==4.45.2

In [None]:
pip install ipywidgets

In [None]:
!pip uninstall -y tqdm
!pip install tqdm

In [None]:
pip install peft

In [None]:
import torch

torch.cuda.empty_cache()

In [None]:
import os

root_dir = "Lab2/"


DATASET_DIR = root_dir +"dataset/"
CAPTIONS_FILE = DATASET_DIR + "image_captions.csv"

FINETUNED_MODEL_DIR = root_dir + "fine_tuned_models/"

RESULTS_DIR = root_dir + "results/"
RESIZED_RESULTS_DIR = root_dir + "resized_images/"

### Download LAION Aesthetic dataset with images and captions

In [None]:
# 🔐 Token removed for security. Use environment variables.
import os
HF_TOKEN = os.getenv('HUGGINGFACE_TOKEN')

In [None]:
from tqdm import tqdm
from datasets import load_dataset

dataset = load_dataset("laion/laion2B-en-aesthetic", split="train[:15000]",  download_config=None)

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from io import BytesIO
from PIL import Image
from torchvision import transforms
from tqdm import tqdm

import pandas as pd
import requests

# Image transform pipeline
transform = transforms.Compose([
    transforms.Resize((512, 512)),
    transforms.CenterCrop(512),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])
])

stream = iter(dataset)
max_images = 15000
max_workers = 8
csv_data = []

def process_item(idx, item):
    try:
        url = item["URL"]
        caption = item["TEXT"]
        image_name = f"image_{idx:06}.png"
        save_path = os.path.join(DATASET_DIR, image_name)

        # Download
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        with Image.open(BytesIO(response.content)) as img:
            img = img.convert("RGB")
            tensor = transform(img)
            # Convert back to PIL for saving
            img_out = transforms.ToPILImage()(tensor * 0.5 + 0.5)
            img_out.save(save_path)

        return {"image_name": image_name, "caption": caption}
    except Exception as e:
        return None  # Skip failed downloads

# Use thread pool for bounded parallelism
with ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = []
    for idx in range(max_images):
        try:
            item = next(stream)
            futures.append(executor.submit(process_item, idx, item))
        except StopIteration:
            break

    for future in tqdm(as_completed(futures), total=len(futures)):
        result = future.result()
        if result:
            csv_data.append(result)

# Write CSV
df = pd.DataFrame(csv_data)
df.to_csv(CAPTIONS_FILE, index=False)

print(f"\n✅ Downloaded and processed {len(csv_data)} images.")

### Create a Dataset class to return image and caption pair loaded from LAION dataset

In [None]:
from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms

import pandas as pd

class LAIONDataset(Dataset):
    def __init__(self, csv_path, image_dir, image_size=512):
        self.df = pd.read_csv(csv_path)
        self.image_dir = image_dir
        self.transform = transforms.Compose([
            transforms.Resize((image_size, image_size)),
            transforms.CenterCrop(image_size),
            transforms.ToTensor(),
            transforms.Normalize([0.5], [0.5])
        ])

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image_path = os.path.join(self.image_dir, row["image_name"])
        caption = row["caption"]
        image = Image.open(image_path).convert("RGB")
        image = self.transform(image)
        return image, caption

dataset = LAIONDataset(
    csv_path=CAPTIONS_FILE,
    image_dir=DATASET_DIR
)

### Define LoRA layer and helper functions to add LoRA layer to the base Stable Diffusion model

In [None]:
import math
import torch
import torch.nn as nn

class LoRALinear(nn.Module):
    def __init__(self, linear, rank=4, alpha=1):
        super().__init__()
        self.linear = linear
        self.lora = LoRALayer(linear.in_features, linear.out_features, rank, alpha)

    def forward(self, x):
        return self.linear(x) + self.lora(x)

class LoRALayer(nn.Module):
    def __init__(self, in_features, out_features, rank=4, alpha=1, dtype=torch.float32):
        super().__init__()
        self.lora_A = nn.Parameter(torch.zeros((rank, in_features), dtype=dtype))
        self.lora_B = nn.Parameter(torch.zeros((out_features, rank), dtype=dtype))
        self.scale = alpha / rank
        nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5))
        nn.init.zeros_(self.lora_B)

    def forward(self, x):
        return (x @ self.lora_A.T @ self.lora_B.T) * self.scale

def add_lora_to_linear(linear, rank=4, alpha=1):
    lora = LoRALayer(linear.in_features, linear.out_features, rank, alpha)
    return lambda x: linear(x) + lora(x)

def apply_lora_to_model(model, rank=4, alpha=1):
    for name, module in model.named_modules():
        if isinstance(module, nn.Linear):
            parent_name = '.'.join(name.split('.')[:-1])
            child_name = name.split('.')[-1]
            parent = model.get_submodule(parent_name)
            lora_layer = LoRALinear(module, rank, alpha)
            lora_layer = lora_layer.to(module.weight.device, module.weight.dtype)
            setattr(parent, child_name, lora_layer)
    return model

### Utility functions to save/load weights of LoRA layer trained with base Stable Diffusion model

In [None]:
import torch

def save_lora_weights(model, path):
    lora_state_dict = {}
    for name, module in model.named_modules():
        if isinstance(module, LoRALayer):
            lora_state_dict[f"{name}.lora_A"] = module.lora_A
            lora_state_dict[f"{name}.lora_B"] = module.lora_B
    torch.save(lora_state_dict, path)
    print(f"LoRA weights saved to {path}")

def load_lora_weights(model, path):
    lora_state_dict = torch.load(path)
    for name, module in model.named_modules():
        if isinstance(module, LoRALayer):
            module.lora_A.data = lora_state_dict[f"{name}.lora_A"]
            module.lora_B.data = lora_state_dict[f"{name}.lora_B"]
    print(f"LoRA weights loaded from {path}")

### Define train function for fine-tunig the pretrained Stable Diffusion model using custom dataset and save the weights for the LoRA layer with the best loss during training epochs

In [None]:
# 🔐 Token removed for security. Use environment variables.
import os
HF_TOKEN = os.getenv('HUGGINGFACE_TOKEN')

### Load the pretrained base Stable Diffusion model from hub, inject LoRA layer to the model pipeline, define econder for text captions, Variational Autoencoder, and hyper parameters for training

In [None]:
# 🔐 Token removed for security. Use environment variables.
import os
HF_TOKEN = os.getenv('HUGGINGFACE_TOKEN')

## Train the model pipeline with custom dataset

In [None]:
# Training
optimizer = torch.optim.AdamW(unet.parameters(), lr=1e-6)
num_epochs = 20
train_loop(dataloader, unet, text_encoder, vae, noise_scheduler, optimizer, device, num_epochs)
#pipe.unet = unet