In [1]:
import os
import csv
from PIL import Image

In [2]:
dataset_dir = "dataset"
images_dir = os.path.join(dataset_dir, "images")
os.makedirs(images_dir, exist_ok=True)

In [3]:
img_names = ["img1.jpg", "img2.jpg", "medical_scan_01.jpg"]
colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255)]  # Red, Green, Blue

for name, color in zip(img_names, colors):
    img = Image.new("RGB", (512, 512), color)
    img.save(os.path.join(images_dir, name))

In [4]:
captions = [
    ("img1.jpg", "A red flower in a green field"),
    ("img2.jpg", "A futuristic medical scanner in a lab"),
    ("medical_scan_01.jpg", "High-resolution MRI scan showing brain activity"),
]
csv_path = os.path.join(dataset_dir, "captions.csv")
with open(csv_path, "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["file_name", "text"])
    writer.writerows(captions)

print("Dataset prepared!")

Dataset prepared!


In [5]:
import torch
import numpy as np
import pandas as pd

In [6]:
from torch.utils.data import Dataset, DataLoader
from transformers import CLIPTokenizer, CLIPTextModel
from diffusers import AutoencoderKL, UNet2DConditionModel, StableDiffusionPipeline, DDIMScheduler

In [7]:
from accelerate import Accelerator
from tqdm import tqdm

In [8]:
class CustomImageDataset(Dataset):
    def __init__(self, images_dir, captions_file, tokenizer, size=512):
        self.images_dir = images_dir
        self.data = pd.read_csv(captions_file)
        self.tokenizer = tokenizer
        self.size = size

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        image_path = os.path.join(self.images_dir, row['file_name'])
        image = Image.open(image_path).convert("RGB").resize((self.size, self.size))
        image = torch.tensor(np.array(image)).permute(2, 0, 1).float() / 255.0

        inputs = self.tokenizer(
            row['text'],
            truncation=True,
            padding="max_length",
            max_length=self.tokenizer.model_max_length,
            return_tensors="pt"
        )

        return {
            "pixel_values": image,
            "input_ids": inputs.input_ids.squeeze(0)
        }

In [9]:
PRETRAINED_MODEL_NAME = "CompVis/stable-diffusion-v1-4"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
SEED = 42
torch.manual_seed(SEED)

<torch._C.Generator at 0x2929d81fb30>

In [10]:
tokenizer = CLIPTokenizer.from_pretrained(PRETRAINED_MODEL_NAME, subfolder="tokenizer")
text_encoder = CLIPTextModel.from_pretrained(PRETRAINED_MODEL_NAME, subfolder="text_encoder").to(DEVICE)
vae = AutoencoderKL.from_pretrained(PRETRAINED_MODEL_NAME, subfolder="vae").to(DEVICE)
unet = UNet2DConditionModel.from_pretrained(PRETRAINED_MODEL_NAME, subfolder="unet").to(DEVICE)

for param in vae.parameters():
    param.requires_grad = False
for param in text_encoder.parameters():
    param.requires_grad = False

dataset = CustomImageDataset(
    images_dir=images_dir,
    captions_file=csv_path,
    tokenizer=tokenizer
)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)


In [11]:
# Optimizer and training setup
optimizer = torch.optim.AdamW(unet.parameters(), lr=5e-6)
accelerator = Accelerator()
unet, optimizer, dataloader = accelerator.prepare(unet, optimizer, dataloader)

noise_scheduler = DDIMScheduler(
    beta_start=0.00085, beta_end=0.012, beta_schedule="scaled_linear", num_train_timesteps=1000
)

In [12]:
EPOCHS = 2
LEARNING_RATE = 5e-6
MAX_TRAIN_STEPS = 100
optimizer = torch.optim.AdamW(unet.parameters(), lr=LEARNING_RATE)
accelerator = Accelerator()
unet, optimizer, dataloader = accelerator.prepare(unet, optimizer, dataloader)

noise_scheduler = DDIMScheduler(
    beta_start=0.00085, beta_end=0.012, beta_schedule="scaled_linear", num_train_timesteps=1000
)

In [13]:
print("Starting training...")
step = 0
MAX_TRAIN_STEPS = 10

for epoch in range(1):
    for batch in tqdm(dataloader, desc=f"Epoch {epoch+1}"):
        pixel_values = batch["pixel_values"].to(DEVICE)
        input_ids = batch["input_ids"].to(DEVICE)

        with torch.no_grad():
            encoder_hidden_states = text_encoder(input_ids)[0]
            latents = vae.encode(pixel_values).latent_dist.sample() * 0.18215

        noise = torch.randn_like(latents)
        timesteps = torch.randint(0, 1000, (latents.shape[0],), device=DEVICE).long()
        noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps)

        noise_pred = unet(noisy_latents, timesteps, encoder_hidden_states).sample
        loss = torch.nn.functional.mse_loss(noise_pred, noise)

        optimizer.zero_grad()
        accelerator.backward(loss)
        optimizer.step()

        if step % 5 == 0:
            print(f"Step {step} | Loss: {loss.item():.4f}")

        step += 1
        torch.cuda.empty_cache()

        if step >= MAX_TRAIN_STEPS:
            break

    if step >= MAX_TRAIN_STEPS:
        print("\n Training complete.")
        break


Starting training...


Epoch 1:  50%|█████████████████████████████████████                                     | 1/2 [04:09<04:09, 249.63s/it]

Step 0 | Loss: 0.0043


Epoch 1: 100%|██████████████████████████████████████████████████████████████████████████| 2/2 [07:37<00:00, 228.65s/it]


In [14]:
# Save model
OUTPUT_DIR = "sd-custom-output"
if accelerator.is_main_process:
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    unet.save_pretrained(os.path.join(OUTPUT_DIR, "unet"))
    vae.save_pretrained(os.path.join(OUTPUT_DIR, "vae"))
    text_encoder.save_pretrained(os.path.join(OUTPUT_DIR, "text_encoder"))
    tokenizer.save_pretrained(os.path.join(OUTPUT_DIR, "tokenizer"))
    print(f"\n Model saved to: {OUTPUT_DIR}")


 Model saved to: sd-custom-output


In [None]:
import os
import torch
from diffusers import StableDiffusionPipeline, UNet2DConditionModel, AutoencoderKL
from transformers import CLIPTextModel, CLIPTokenizer

OUTPUT_DIR = "sd-custom-output"
DEVICE = "cpu"

# Load components with reduced memory
pipe = StableDiffusionPipeline.from_pretrained(
    pretrained_model_name_or_path=OUTPUT_DIR,
    unet=UNet2DConditionModel.from_pretrained(os.path.join(OUTPUT_DIR, "unet")),
    vae=AutoencoderKL.from_pretrained(os.path.join(OUTPUT_DIR, "vae")),
    text_encoder=CLIPTextModel.from_pretrained(os.path.join(OUTPUT_DIR, "text_encoder")),
    tokenizer=CLIPTokenizer.from_pretrained(os.path.join(OUTPUT_DIR, "tokenizer")),
    torch_dtype=torch.float32,
    safety_checker=None,  # Disable safety checker to avoid extra load
    low_cpu_mem_usage=True,
)

# Use attention slicing to reduce memory spikes
pipe.enable_attention_slicing()
pipe.enable_vae_tiling()
pipe.enable_model_cpu_offload()  # Gradually moves parts to CPU when not used

# Move to GPU only if available and safe
pipe.to(DEVICE)

# Generate image
prompt = "A futuristic medical scanner in a lab"
with torch.autocast("cuda") if torch.cuda.is_available() else torch.no_grad():
    image = pipe(prompt, guidance_scale=7.5).images[0]

image.save("result_custom.png")
print("\n Image saved as result_custom.png")

In [None]:
#the above code is not working because its too large 

In [None]:
#Use torch_dtype=torch.float16 and device_map="auto"
#To reduce RAM/GPU usage:
from diffusers import StableDiffusionPipeline

pipe = StableDiffusionPipeline.from_pretrained(
    "path_or_output_folder",
    torch_dtype=torch.float16,
    revision="fp16",
    safety_checker=None,
    device_map="auto"
)


In [None]:
print("🚀 Starting lightweight training...")
step = 0

for epoch in range(EPOCHS):
    for batch in tqdm(dataloader, desc=f"Epoch {epoch+1}"):
        pixel_values = batch["pixel_values"].to(DEVICE)
        input_ids = batch["input_ids"].to(DEVICE)

        with torch.no_grad():
            encoder_hidden_states = text_encoder(input_ids)[0]

        with torch.no_grad():
            latents = vae.encode(pixel_values).latent_dist.sample()
        latents = 0.18215 * latents

        noise = torch.randn_like(latents)
        timesteps = torch.randint(0, 1000, (latents.shape[0],), device=DEVICE).long()
        noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps)

        noise_pred = unet(noisy_latents, timesteps, encoder_hidden_states).sample

        loss = torch.nn.functional.mse_loss(noise_pred, noise)

        optimizer.zero_grad()
        accelerator.backward(loss)
        optimizer.step()

        if step % 50 == 0:
            print(f"🧮 Step {step} | Loss: {loss.item():.4f}")

        step += 1
        if step >= MAX_TRAIN_STEPS:
            break

    if step >= MAX_TRAIN_STEPS:
        print("✅ Training stopped: Max steps reached.")
        break

In [None]:
OUTPUT_DIR = "sd-custom-output"
if accelerator.is_main_process:
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    unet.save_pretrained(os.path.join(OUTPUT_DIR, "unet"))
    vae.save_pretrained(os.path.join(OUTPUT_DIR, "vae"))
    text_encoder.save_pretrained(os.path.join(OUTPUT_DIR, "text_encoder"))
    tokenizer.save_pretrained(os.path.join(OUTPUT_DIR, "tokenizer"))
    print(f"Model saved to: {OUTPUT_DIR}")

In [None]:
torch_dtype=torch.float16
pipe = StableDiffusionPipeline.from_pretrained(
    OUTPUT_DIR,
    unet=UNet2DConditionModel.from_pretrained(os.path.join(OUTPUT_DIR, "unet")),
    vae=AutoencoderKL.from_pretrained(os.path.join(OUTPUT_DIR, "vae")),
    text_encoder=CLIPTextModel.from_pretrained(os.path.join(OUTPUT_DIR, "text_encoder")),
    tokenizer=CLIPTokenizer.from_pretrained(os.path.join(OUTPUT_DIR, "tokenizer")),
    torch_dtype=torch.float32,
    low_cpu_mem_usage=True,
).to(DEVICE)
pipe.to(torch.device("cuda"))
pipe.half()
from accelerate import init_empty_weights
pipe = StableDiffusionPipeline.from_pretrained(
    OUTPUT_DIR,
    torch_dtype=torch.float16,
    device_map="auto"  # balances between CPU and GPU
)

In [None]:
pipe = StableDiffusionPipeline.from_pretrained(
    OUTPUT_DIR,
    torch_dtype=torch.float32,
    safety_checker=None,
).to("cpu")

pipe.enable_attention_slicing()  # Helps reduce memory usage

prompt = "A futuristic medical scanner in a lab"
image = pipe(prompt, guidance_scale=7.5).images[0]
image.save("result_custom.png")
print(" Image saved")