In [None]:
!pip install torch torchvision transformers pycocotools pillow tqdm matplotlib py7zr


Collecting py7zr
  Downloading py7zr-0.22.0-py3-none-any.whl.metadata (16 kB)
Collecting texttable (from py7zr)
  Downloading texttable-1.7.0-py2.py3-none-any.whl.metadata (9.8 kB)
Collecting pycryptodomex>=3.16.0 (from py7zr)
  Downloading pycryptodomex-3.21.0-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.4 kB)
Collecting pyzstd>=0.15.9 (from py7zr)
  Downloading pyzstd-0.16.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.4 kB)
Collecting pyppmd<1.2.0,>=1.1.0 (from py7zr)
  Downloading pyppmd-1.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.7 kB)
Collecting pybcj<1.1.0,>=1.0.0 (from py7zr)
  Downloading pybcj-1.0.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.0 kB)
Collecting multivolumefile>=0.2.3 (from py7zr)
  Downloading multivolumefile-0.2.3-py3-none-any.whl.metadata (6.3 kB)
Collecting inflate64<1.1.0,>=1.0.0 (from py7zr)
  Downloading inflate64-1.0.0-cp310-cp310-manylinux_2_17_

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install torch torchvision transformers pycocotools pillow tqdm matplotlib




In [None]:
!pip install torch torchvision transformers diffusers pycocotools wandb matplotlib



In [None]:
!pip install --upgrade torch torchvision torchaudio



In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
from diffusers import AutoencoderKL
from diffusers import UNet2DConditionModel
from transformers import (
    CLIPModel,
    CLIPProcessor,
    CLIPTokenizer,
    CLIPTextModel,
)

import json
from diffusers import DDPMScheduler
from PIL import Image
import os
import numpy as np
import matplotlib.pyplot as plt
class FineTuneCLIP(nn.Module):
    def __init__(self, clip_model):
        super().__init__()
        self.clip = clip_model

        # freeze the pretrained layers and parameters
        for param in self.clip.parameters():
            param.requires_grad = False

        # Output of CLIP text projection
        self.text_projection_dim = self.clip.text_projection.weight.shape[0]  # Get dimension from weight matrix

        # Fine tuning head
        self.projection_head = nn.Sequential(
            nn.Linear(self.text_projection_dim, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, self.text_projection_dim),  # Project back to same dimension as image features
            nn.LayerNorm(self.text_projection_dim)
        )

        self.temperature = nn.Parameter(torch.tensor(0.07))

    def forward(self, input_ids, attention_mask):
        # text features
        text_outputs = self.clip.text_model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )

        # project the pooled output
        text_embeddings = text_outputs[1]  # Use pooled output
        text_features = self.clip.text_projection(text_embeddings)

        # custom projection head application
        projected_features = self.projection_head(text_features)
        return projected_features

    def contrastive_loss(self, image_features, text_features):
        # Normalize features
        image_features = F.normalize(image_features, p=2, dim=1)
        text_features = F.normalize(text_features, p=2, dim=1)

        logits = torch.matmul(text_features, image_features.t()) / self.temperature

        # Labels for contrastive loss
        labels = torch.arange(logits.size(0), device=logits.device)

        loss_i = F.cross_entropy(logits, labels)
        loss_t = F.cross_entropy(logits.t(), labels)

        return (loss_i + loss_t) / 2

    def get_image_features(self, images):
        vision_outputs = self.clip.vision_model(images)
        pooled_output = vision_outputs[1]
        image_features = self.clip.visual_projection(pooled_output)
        return image_features

class Flickr8kDataset(Dataset):
    def __init__(self, root_dir, captions_file, tokenizer, processor, transform=None, max_length=77):
        self.root_dir = root_dir
        self.tokenizer = tokenizer
        self.processor = processor
        self.max_length = max_length

        with open(captions_file, 'r', encoding='utf-8') as f:
            self.captions = {}
            for line in f.readlines()[1:]:
                parts = line.strip().split(',')
                if len(parts) >= 2:
                    image_name = parts[0]
                    caption = ','.join(parts[1:]).strip('"')
                    if image_name in self.captions:
                        self.captions[image_name].append(caption)
                    else:
                        self.captions[image_name] = [caption]

        self.image_names = list(self.captions.keys())

        self.transform = transform or transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

    def __len__(self):
        return len(self.image_names)

    def __getitem__(self, idx):
        img_name = self.image_names[idx]

        # Random caption selection
        caption = np.random.choice(self.captions[img_name])

        # Transform image
        img_path = os.path.join(self.root_dir, img_name)
        image = Image.open(img_path).convert('RGB')
        image = self.transform(image)

        # Tokenizing captions
        inputs = self.tokenizer(
            caption,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )

        return {
            'image': image,
            'input_ids': inputs['input_ids'].squeeze(),
            'attention_mask': inputs['attention_mask'].squeeze(),
            'caption': caption
        }

class LatentDiffusionModel(nn.Module):
    def __init__(self, clip_model, vae, unet, noise_scheduler):
        super().__init__()
        self.clip = clip_model
        self.vae = vae
        self.unet = unet
        self.noise_scheduler = noise_scheduler

    def encode_text(self, input_ids, attention_mask):
        return self.clip.get_text_features(
            input_ids=input_ids,
            attention_mask=attention_mask
        )

    def forward(self, images, text_embeddings, timesteps):
        # Image encoding to latent space
        latent_dist = self.vae.encode(images).latent_dist
        latents = latent_dist.sample() * 0.18215

        noise = torch.randn_like(latents)
        timesteps = timesteps.long()
        noisy_latents = self.noise_scheduler.add_noise(latents, noise, timesteps)

        noise_pred = self.unet(
            noisy_latents,
            timesteps,
            encoder_hidden_states=text_embeddings
        ).sample

        return F.mse_loss(noise_pred, noise)

    @torch.no_grad()
    def generate(self, prompt, device, num_inference_steps=50):
        # Encoding text
        inputs = self.clip.processor(text=prompt, return_tensors="pt").to(device)
        text_embeddings = self.clip.get_text_features(**inputs)

        latents = torch.randn(
            (1, self.unet.config.in_channels,
             self.unet.config.sample_size,
             self.unet.config.sample_size)
        ).to(device)

        # Denoising
        for t in reversed(range(num_inference_steps)):
            timestep = torch.tensor([t]).to(device)

            # Predict noise
            noise_pred = self.unet(
                latents,
                timestep,
                encoder_hidden_states=text_embeddings
            ).sample

            latents = self.noise_scheduler.step(
                noise_pred,
                timestep[0],
                latents
            ).prev_sample

        image = self.vae.decode(latents / 0.18215).sample

        return image

def train_epoch(model, dataloader, optimizer, device, scaler, gradient_accumulation_steps):
    """
    Train for one epoch with fixed gradient accumulation and scaling
    """
    model.train()
    total_loss = 0
    progress_bar = tqdm(dataloader, desc='Training')
    optimizer.zero_grad()

    for i, batch in enumerate(progress_bar):
        try:
            images = batch['image'].to(device)
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)

            with autocast(device_type='cuda'):
                image_features = model.get_image_features(images)

                text_features = model(input_ids, attention_mask)

                loss = model.contrastive_loss(image_features, text_features)
                loss = loss / gradient_accumulation_steps


            scaler.scale(loss).backward()

            # gradient accumulation steps
            if (i + 1) % gradient_accumulation_steps == 0:
                scaler.unscale_(optimizer)

                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=0.5)

                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad()

            total_loss += loss.item() * gradient_accumulation_steps
            progress_bar.set_postfix({'loss': loss.item() * gradient_accumulation_steps})



        except RuntimeError as e:
            if "unscale_() has already been called" not in str(e):
                print(f"Error in batch {i}: {str(e)}")
            optimizer.zero_grad()
            continue

    avg_loss = total_loss / len(dataloader)
    return avg_loss

def train_clip_model(model, dataloader, num_epochs, device, learning_rate=2e-5,
                    batch_size=32, gradient_accumulation_steps=4, image_size=224,
                    save_dir="checkpoints", tokenizer=None, processor=None):
    """
    Train the CLIP model for a specified number of epochs
    """
    os.makedirs(save_dir, exist_ok=True)

    # initialize with weight decay
    optimizer = optim.AdamW(model.projection_head.parameters(), lr=learning_rate, weight_decay=0.01)

    # rate scheduler
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)

    scaler = GradScaler()


    best_loss = float('inf')
    training_history = []

    for epoch in range(num_epochs):
        avg_loss = train_epoch(
                    model=model,
                    dataloader=dataloader,
                    optimizer=optimizer,
                    device=device,
                    scaler=scaler,
                    gradient_accumulation_steps=gradient_accumulation_steps
                )

        print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss:.4f}")

        training_history.append({
            'epoch': epoch + 1,
            'loss': avg_loss,
            'lr': optimizer.param_groups[0]['lr']
        })

        if (epoch + 1) % 5 == 0:
            output_dir = os.path.join(save_dir, f'finetuned_clip_epoch_{epoch+1}')
            os.makedirs(output_dir, exist_ok=True)

            checkpoint = {
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'epoch': epoch,
                'loss': avg_loss,
                'scaler': scaler.state_dict(),
            }
            torch.save(checkpoint, os.path.join(output_dir, 'checkpoint.pt'))

            # Save the fine-tuned model components
            model.clip.save_pretrained(output_dir)
            if tokenizer:
                tokenizer.save_pretrained(output_dir)
            if processor:
                processor.save_pretrained(output_dir)

            torch.save(model.projection_head.state_dict(),
                      os.path.join(output_dir, 'projection_head.pt'))

            config = {
                'epoch': epoch + 1,
                'final_loss': avg_loss,
                'training_args': {
                    'batch_size': batch_size,
                    'gradient_accumulation_steps': gradient_accumulation_steps,
                    'learning_rate': learning_rate,
                    'image_size': image_size,
                }
            }
            with open(os.path.join(output_dir, 'training_config.json'), 'w') as f:
                json.dump(config, f, indent=4)

            print(f"Exported fine-tuned model to {output_dir}")

        scheduler.step()



    final_output_dir = os.path.join(save_dir, 'finetuned_clip_final')
    os.makedirs(final_output_dir, exist_ok=True)

    model.clip.save_pretrained(final_output_dir)
    if tokenizer:
        tokenizer.save_pretrained(final_output_dir)
    if processor:
        processor.save_pretrained(final_output_dir)
    torch.save(model.projection_head.state_dict(),
              os.path.join(final_output_dir, 'projection_head.pt'))

    with open(os.path.join(save_dir, 'training_history.json'), 'w') as f:
        json.dump(training_history, f)


    return training_history
def main():
    BATCH_SIZE = 64
    GRADIENT_ACCUMULATION_STEPS = 4
    LEARNING_RATE = 2e-5
    IMAGE_SIZE = 224
    NUM_EPOCHS = 10

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    print("Loading pre-trained models...")
    clip_model = CLIPModel.from_pretrained("openai/clip-vit-large-patch14")
    tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-large-patch14")
    processor = CLIPProcessor.from_pretrained("openai/clip-vit-large-patch14")

    vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse")
    unet = UNet2DConditionModel.from_pretrained("CompVis/stable-diffusion-v1-4", subfolder="unet")

    noise_scheduler = DDPMScheduler.from_config("CompVis/stable-diffusion-v1-4", subfolder="scheduler")

    fine_tuning_model = FineTuneCLIP(clip_model).to(device)

    print("Loading Flickr8k dataset...")
    root_dir = "./Images"
    captions_file = "./captions.txt"
    dataset = Flickr8kDataset(
        root_dir,
        captions_file,
        tokenizer,
        processor
    )
    dataloader = DataLoader(
        dataset,
        batch_size=32,
        shuffle=True,
        num_workers=4
    )

    training_history = train_clip_model(
        model=fine_tuning_model,
        dataloader=dataloader,
        num_epochs=NUM_EPOCHS,
        device=device,
        learning_rate=LEARNING_RATE,
        batch_size=BATCH_SIZE,
        gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS,
        image_size=IMAGE_SIZE,
        save_dir="clip_finetuned",
        tokenizer=tokenizer,
        processor=processor
    )


    plt.figure(figsize=(10, 5))
    epochs = [h['epoch'] for h in training_history]
    losses = [h['loss'] for h in training_history]
    plt.plot(epochs, losses)
    plt.title('Training Loss Over Time')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.savefig('clip_finetuned/training_history.png')
    plt.close()





if __name__ == "__main__":
    main()

Using device: cuda
Loading pre-trained models...
Loading Flickr8k dataset...
Starting training...
Effective batch size: 256


Training: 100%|██████████| 253/253 [01:02<00:00,  4.05it/s, loss=3.18]


Epoch 1/10, Average Loss: 3.4174


Training: 100%|██████████| 253/253 [01:01<00:00,  4.09it/s, loss=3.06]


Epoch 2/10, Average Loss: 3.2873


Training: 100%|██████████| 253/253 [01:02<00:00,  4.08it/s, loss=3.03]


Epoch 3/10, Average Loss: 3.2545


Training: 100%|██████████| 253/253 [01:02<00:00,  4.08it/s, loss=nan]


Epoch 4/10, Average Loss: nan


Training: 100%|██████████| 253/253 [01:01<00:00,  4.08it/s, loss=nan]


Epoch 5/10, Average Loss: nan
Exported fine-tuned model to clip_finetuned/finetuned_clip_epoch_5


Training: 100%|██████████| 253/253 [01:02<00:00,  4.06it/s, loss=nan]


Epoch 6/10, Average Loss: nan


Training: 100%|██████████| 253/253 [01:01<00:00,  4.10it/s, loss=nan]


Epoch 7/10, Average Loss: nan


Training: 100%|██████████| 253/253 [01:01<00:00,  4.09it/s, loss=nan]


Epoch 8/10, Average Loss: nan


Training: 100%|██████████| 253/253 [01:01<00:00,  4.08it/s, loss=nan]


Epoch 9/10, Average Loss: nan


Training: 100%|██████████| 253/253 [01:01<00:00,  4.09it/s, loss=nan]


Epoch 10/10, Average Loss: nan
Exported fine-tuned model to clip_finetuned/finetuned_clip_epoch_10


In [None]:
!cp -r /content/clip_finetuned/finetuned_clip_epoch_10 /content/drive/MyDrive/ram/flickr

In [None]:
import torch
from diffusers import StableDiffusionPipeline, AutoencoderKL, UNet2DConditionModel
from transformers import CLIPTextModel, CLIPTokenizer
import os
from PIL import Image

def load_finetuned_models(model_path, device, half_precision=True):
    """
    Load fine-tuned CLIP model and other components
    """
    dtype = torch.float16 if half_precision and torch.cuda.is_available() else torch.float32

    text_encoder = CLIPTextModel.from_pretrained(model_path).to(device).to(dtype)
    tokenizer = CLIPTokenizer.from_pretrained(model_path)


    if os.path.exists(os.path.join(model_path, 'projection_head.pt')):
        projection_head_state = torch.load(os.path.join(model_path, 'projection_head.pt'))
        print("Loaded custom projection head")

    return text_encoder, tokenizer

def setup_pipeline(text_encoder, tokenizer, device, half_precision=True):
    """
    Set up the Stable Diffusion pipeline
    """

    dtype = torch.float16 if half_precision and torch.cuda.is_available() else torch.float32

    pipeline = StableDiffusionPipeline.from_pretrained(
        "CompVis/stable-diffusion-v1-4",
        text_encoder=text_encoder,
        tokenizer=tokenizer,
        torch_dtype=dtype,
        safety_checker=None
    )

    # Ensure all models are using the same dtype
    pipeline.unet = pipeline.unet.to(device).to(dtype)
    pipeline.vae = pipeline.vae.to(device).to(dtype)
    pipeline.text_encoder = pipeline.text_encoder.to(device).to(dtype)

    if torch.cuda.is_available():
        pipeline.enable_attention_slicing()

    return pipeline

def generate_images(pipeline, prompt, num_images=1, output_dir="generated_images",
                   guidance_scale=7.5, num_inference_steps=50, seed=None):
    """
    Generate images using the pipeline
    """
    os.makedirs(output_dir, exist_ok=True)

    if seed is not None:
        generator = torch.Generator(device=pipeline.device).manual_seed(seed)
    else:
        generator = None

    for i in range(num_images):
        try:
            image = pipeline(
                prompt,
                num_inference_steps=num_inference_steps,
                guidance_scale=guidance_scale,
                generator=generator
            ).images[0]

            safe_prompt = "".join(x for x in prompt[:50] if x.isalnum() or x.isspace())
            filename = f"{safe_prompt}_{i+1}.png"
            image.save(os.path.join(output_dir, filename))
            print(f"Saved image {i+1} as {filename}")
        except Exception as e:
            print(f"Error generating image {i+1}: {str(e)}")
            continue

def main():
    model_path = "/content/clip_finetuned/finetuned_clip_epoch_10"
    prompt = "a beautiful sunset over mountains"
    num_images = 2
    guidance_scale = 7.0
    num_steps = 50
    output_dir = "generated_images"
    seed = 42

    half_precision = True

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    if device.type == 'cpu':
        half_precision = False
    print(f"Using device: {device}, Precision: {'half' if half_precision else 'full'}")


    print("Loading fine-tuned models...")
    text_encoder, tokenizer = load_finetuned_models(model_path, device, half_precision)


    print("Setting up pipeline...")
    pipeline = setup_pipeline(text_encoder, tokenizer, device, half_precision)


    print(f"Generating {num_images} images for prompt: {prompt}")
    generate_images(
        pipeline=pipeline,
        prompt=prompt,
        num_images=num_images,
        output_dir=output_dir,
        guidance_scale=guidance_scale,
        num_inference_steps=num_steps,
        seed=seed
    )

    print("Image generation complete!")

if __name__ == "__main__":
    main()

Using device: cuda, Precision: half
Loading fine-tuned models...


  projection_head_state = torch.load(os.path.join(model_path, 'projection_head.pt'))


Loaded custom projection head
Setting up pipeline...


Loading pipeline components...:   0%|          | 0/6 [00:00<?, ?it/s]

You have disabled the safety checker for <class 'diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline'> by passing `safety_checker=None`. Ensure that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered results in services or applications open to the public. Both the diffusers team and Hugging Face strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling it only for use-cases that involve analyzing network behavior or auditing its results. For more information, please have a look at https://github.com/huggingface/diffusers/pull/254 .


Generating 4 images for prompt: a beautiful sunset over mountains


  0%|          | 0/50 [00:00<?, ?it/s]

Saved image 1 as a beautiful sunset over mountains_1.png


  0%|          | 0/50 [00:00<?, ?it/s]

Saved image 2 as a beautiful sunset over mountains_2.png


  0%|          | 0/50 [00:00<?, ?it/s]

Saved image 3 as a beautiful sunset over mountains_3.png


  0%|          | 0/50 [00:00<?, ?it/s]

Saved image 4 as a beautiful sunset over mountains_4.png
Image generation complete!


#Before Fine tuning

In [14]:
import torch
from diffusers import AutoencoderKL, UNet2DConditionModel, DDPMScheduler
from transformers import CLIPTextModel, CLIPTokenizer
import os
from PIL import Image
from torch import autocast
from torchvision import transforms
import numpy as np
from tqdm import tqdm

class ImageGenerator:
    def __init__(self, clip_model_path, device="cuda", half_precision=True):
        self.device = device
        self.dtype = torch.float16 if half_precision and torch.cuda.is_available() else torch.float32


        print("Loading models...")
        self.text_encoder = CLIPTextModel.from_pretrained(clip_model_path).to(device).to(self.dtype)
        self.tokenizer = CLIPTokenizer.from_pretrained(clip_model_path)

        self.vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse").to(device).to(self.dtype)
        self.unet = UNet2DConditionModel.from_pretrained(
            "CompVis/stable-diffusion-v1-4",
            subfolder="unet"
        ).to(device).to(self.dtype)

        self.scheduler = DDPMScheduler.from_config(
            "CompVis/stable-diffusion-v1-4",
            subfolder="scheduler"
        )


        self.vae.eval()
        self.unet.eval()
        self.text_encoder.eval()

    @torch.no_grad()
    def encode_prompt(self, prompt):
        text_inputs = self.tokenizer(
            prompt,
            padding="max_length",
            max_length=self.tokenizer.model_max_length,
            truncation=True,
            return_tensors="pt"
        )

        text_input_ids = text_inputs.input_ids.to(self.device)

        prompt_embeds = self.text_encoder(text_input_ids)[0]
        return prompt_embeds

    @torch.no_grad()
    def generate_latents(
        self,
        prompt_embeds,
        height=512,
        width=512,
        num_inference_steps=50,
        guidance_scale=7.5,
        generator=None
    ):

        self.scheduler.set_timesteps(num_inference_steps)

        latents_shape = (1, self.unet.config.in_channels, height // 8, width // 8)
        latents = torch.randn(
            latents_shape,
            generator=generator,
            dtype=self.dtype,
            device=self.device
        )

        uncond_input = self.tokenizer(
            [""],
            padding="max_length",
            max_length=self.tokenizer.model_max_length,
            truncation=True,
            return_tensors="pt"
        )
        uncond_embeddings = self.text_encoder(uncond_input.input_ids.to(self.device))[0]


        text_embeddings = torch.cat([uncond_embeddings, prompt_embeds])


        for t in tqdm(self.scheduler.timesteps):

            latent_model_input = torch.cat([latents] * 2)


            latent_model_input = self.scheduler.scale_model_input(latent_model_input, timestep=t)


            with autocast(device_type='cuda', dtype=self.dtype):
                noise_pred = self.unet(
                    latent_model_input,
                    t,
                    encoder_hidden_states=text_embeddings
                ).sample

            noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
            noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

            latents = self.scheduler.step(noise_pred, t, latents).prev_sample

        return latents

    @torch.no_grad()
    def decode_latents(self, latents):
        latents = 1 / 0.18215 * latents
        image = self.vae.decode(latents).sample
        image = (image / 2 + 0.5).clamp(0, 1)
        return image

    def generate(
        self,
        prompt,
        num_images=1,
        output_dir="generated_images",
        height=512,
        width=512,
        guidance_scale=7.5,
        num_inference_steps=50,
        seed=None
    ):
        os.makedirs(output_dir, exist_ok=True)


        prompt_embeds = self.encode_prompt(prompt)


        generator = None
        if seed is not None:
            generator = torch.Generator(device=self.device).manual_seed(seed)

        for i in range(num_images):
            try:

                latents = self.generate_latents(
                    prompt_embeds,
                    height=height,
                    width=width,
                    guidance_scale=guidance_scale,
                    num_inference_steps=num_inference_steps,
                    generator=generator
                )


                image = self.decode_latents(latents)

                image = image.cpu().permute(0, 2, 3, 1).numpy()[0]
                image = Image.fromarray((image * 255).astype(np.uint8))

                safe_prompt = "".join(x for x in prompt[:50] if x.isalnum() or x.isspace())
                filename = f"{safe_prompt}_{i+1}.png"
                image.save(os.path.join(output_dir, filename))
                print(f"Saved image {i+1} as {filename}")

            except Exception as e:
                print(f"Error generating image {i+1}: {str(e)}")
                continue

def main():

    model_path = "openai/clip-vit-large-patch14"
    prompt = "A teacher is teaching the class"
    num_images = 1
    guidance_scale = 10.0
    num_steps = 50
    output_dir = "generated_images"
    seed = 42

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    generator = ImageGenerator(
        clip_model_path=model_path,
        device=device,
        half_precision=True
    )

    print(f"Generating {num_images} images for prompt: {prompt}")
    generator.generate(
        prompt=prompt,
        num_images=num_images,
        output_dir=output_dir,
        guidance_scale=guidance_scale,
        num_inference_steps=num_steps,
        seed=seed
    )

    print("Image generation complete!")

if __name__ == "__main__":
    main()

Loading models...
Generating 1 images for prompt: A teacher is teaching the class


100%|██████████| 50/50 [00:07<00:00,  6.30it/s]


Saved image 1 as A teacher is teaching the class_1.png
Image generation complete!


# Fine Tuned


In [15]:
import torch
from diffusers import AutoencoderKL, UNet2DConditionModel, DDPMScheduler
from transformers import CLIPTextModel, CLIPTokenizer
import os
from PIL import Image
from torch import autocast
from torchvision import transforms
import numpy as np
from tqdm import tqdm

class ImageGenerator:
    def __init__(self, clip_model_path, device="cuda", half_precision=True):
        self.device = device
        self.dtype = torch.float16 if half_precision and torch.cuda.is_available() else torch.float32

        print("Loading models...")
        self.text_encoder = CLIPTextModel.from_pretrained(clip_model_path).to(device).to(self.dtype)
        self.tokenizer = CLIPTokenizer.from_pretrained(clip_model_path)

        self.vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse").to(device).to(self.dtype)
        self.unet = UNet2DConditionModel.from_pretrained(
            "CompVis/stable-diffusion-v1-4",
            subfolder="unet"
        ).to(device).to(self.dtype)

        self.scheduler = DDPMScheduler.from_config(
            "CompVis/stable-diffusion-v1-4",
            subfolder="scheduler"
        )


        self.vae.eval()
        self.unet.eval()
        self.text_encoder.eval()

    @torch.no_grad()
    def encode_prompt(self, prompt):
        text_inputs = self.tokenizer(
            prompt,
            padding="max_length",
            max_length=self.tokenizer.model_max_length,
            truncation=True,
            return_tensors="pt"
        )

        text_input_ids = text_inputs.input_ids.to(self.device)

        prompt_embeds = self.text_encoder(text_input_ids)[0]
        return prompt_embeds

    @torch.no_grad()
    def generate_latents(
        self,
        prompt_embeds,
        height=512,
        width=512,
        num_inference_steps=50,
        guidance_scale=7.5,
        generator=None
    ):

        self.scheduler.set_timesteps(num_inference_steps)

        latents_shape = (1, self.unet.config.in_channels, height // 8, width // 8)
        latents = torch.randn(
            latents_shape,
            generator=generator,
            dtype=self.dtype,
            device=self.device
        )


        uncond_input = self.tokenizer(
            [""],
            padding="max_length",
            max_length=self.tokenizer.model_max_length,
            truncation=True,
            return_tensors="pt"
        )
        uncond_embeddings = self.text_encoder(uncond_input.input_ids.to(self.device))[0]


        text_embeddings = torch.cat([uncond_embeddings, prompt_embeds])

        for t in tqdm(self.scheduler.timesteps):

            latent_model_input = torch.cat([latents] * 2)


            latent_model_input = self.scheduler.scale_model_input(latent_model_input, timestep=t)


            with autocast(device_type='cuda', dtype=self.dtype):
                noise_pred = self.unet(
                    latent_model_input,
                    t,
                    encoder_hidden_states=text_embeddings
                ).sample

            noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
            noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

            latents = self.scheduler.step(noise_pred, t, latents).prev_sample

        return latents

    @torch.no_grad()
    def decode_latents(self, latents):
        latents = 1 / 0.18215 * latents
        image = self.vae.decode(latents).sample
        image = (image / 2 + 0.5).clamp(0, 1)
        return image

    def generate(
        self,
        prompt,
        num_images=1,
        output_dir="generated_images",
        height=512,
        width=512,
        guidance_scale=7.5,
        num_inference_steps=50,
        seed=None
    ):
        os.makedirs(output_dir, exist_ok=True)

        # Encode prompt
        prompt_embeds = self.encode_prompt(prompt)

        # Set generator for reproducibility
        generator = None
        if seed is not None:
            generator = torch.Generator(device=self.device).manual_seed(seed)

        for i in range(num_images):
            try:
                # Generate latents
                latents = self.generate_latents(
                    prompt_embeds,
                    height=height,
                    width=width,
                    guidance_scale=guidance_scale,
                    num_inference_steps=num_inference_steps,
                    generator=generator
                )

                # Decode latents to image
                image = self.decode_latents(latents)

                # Convert to PIL Image
                image = image.cpu().permute(0, 2, 3, 1).numpy()[0]
                image = Image.fromarray((image * 255).astype(np.uint8))

                # Save image
                safe_prompt = "".join(x for x in prompt[:50] if x.isalnum() or x.isspace())
                filename = f"{safe_prompt}_{i+1}_FineTuned.png"
                image.save(os.path.join(output_dir, filename))
                print(f"Saved image {i+1} as {filename}")

            except Exception as e:
                print(f"Error generating image {i+1}: {str(e)}")
                continue

def main():
    # Configuration
    model_path = "/content/drive/MyDrive/finetuned_clip_checkpoint"  # Path to your fine-tuned model
    prompt = "A teacher is teaching the class"
    num_images = 1
    guidance_scale = 10.0
    num_steps = 50
    output_dir = "generated_images"
    seed = 42

    # Initialize generator
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    generator = ImageGenerator(
        clip_model_path=model_path,
        device=device,
        half_precision=True
    )

    # Generate images
    print(f"Generating {num_images} images for prompt: {prompt}")
    generator.generate(
        prompt=prompt,
        num_images=num_images,
        output_dir=output_dir,
        guidance_scale=guidance_scale,
        num_inference_steps=num_steps,
        seed=seed
    )

    print("Image generation complete!")

if __name__ == "__main__":
    main()

Loading models...
Generating 1 images for prompt: A teacher is teaching the class


100%|██████████| 50/50 [00:07<00:00,  6.35it/s]


Saved image 1 as A teacher is teaching the class_1_FineTuned.png
Image generation complete!
