# Necessary Imports and Installations

In [1]:
!pip install diffusers

Collecting diffusers
  Downloading diffusers-0.30.3-py3-none-any.whl.metadata (18 kB)
Downloading diffusers-0.30.3-py3-none-any.whl (2.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.7/2.7 MB[0m [31m33.8 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: diffusers
Successfully installed diffusers-0.30.3


In [2]:
import torch
import torch.nn as nn
import numpy as np
from diffusers import StableDiffusionPipeline
from scipy.stats import ttest_rel
from torchvision import transforms
from PIL import Image


The cache for model files in Transformers v4.22.0 has been updated. Migrating your old cache. This is a one-time only operation. You can interrupt this and resume the migration later on by calling `transformers.utils.move_cache()`.


0it [00:00, ?it/s]

# Image Preprocessing

In [3]:
def read_image(image_path):
    image = Image.open(image_path).convert("RGB")
    return image

def preprocess(image, device):
    transform_pipeline = transforms.Compose([
        transforms.Resize((512, 512)),
        transforms.ToTensor(),
        transforms.Normalize([0.5], [0.5])  
    ])
    
    processed_image = transform_pipeline(image).unsqueeze(0).to(device, dtype=torch.float16)  
    return processed_image

def convert2latent(processed_image, pipeline, device):
    encoded_image = pipeline.vae.encode(processed_image).latent_dist.mean
    scaling = pipeline.vae.config.scaling_factor  
    with torch.no_grad():
        latents = encoded_image * scaling
    return latents

# Forward Diffusion

In [4]:
def forward(latents, timestep, scheduler):
    noise = torch.randn_like(latents)
    
    noised = scheduler.add_noise(
        latents, 
        noise, 
        torch.tensor([timestep], device=latents.device)
    )
    
    return noised

# Reverse Diffusion

In [5]:
def reverse(noised, timestep, encoder_hidden_states, pipeline):
    current = torch.tensor([timestep], dtype=torch.long, device=noised.device)
    
    denoised = pipeline.unet(
        noised, 
        current, 
        encoder_hidden_states=encoder_hidden_states
    ).sample
    
    return denoised

In [15]:
def similarity(original, denoised):
    norm_original = nn.functional.normalize(original, dim=-1)
    norm_denoised = nn.functional.normalize(denoised, dim=-1)
    
    sim = torch.sum(norm_original * norm_denoised, dim=-1).mean()
    
    return sim.item()

In [31]:
def classify(image_path, categories):
    num_timesteps = 25
    device = "cuda" if torch.cuda.is_available() else "cpu"

    pipeline = StableDiffusionPipeline.from_pretrained(
        "runwayml/stable-diffusion-v1-5",  
        torch_dtype=torch.float16
    ).to(device)
    
    noise_scheduler = pipeline.scheduler 
    image = read_image(image_path)
    preprocessed = preprocess(image, device)
    latents = convert2latent(preprocessed, pipeline, device)

    total_timesteps = noise_scheduler.config.num_train_timesteps
    selected_timesteps = torch.linspace(0, total_timesteps - 1, steps=num_timesteps, dtype=torch.long).tolist()

    weights = [1.0 for _ in selected_timesteps]
    
    scores = []

    for category in categories:
        text_prompt = f"a photo of a {category}"
        
        text_inputs = pipeline.tokenizer(
            text_prompt,  
            padding="max_length", 
            max_length=pipeline.tokenizer.model_max_length, 
            truncation=True, 
            return_tensors="pt"
        ).to(device)
        
        with torch.no_grad():
            text_embeddings = pipeline.text_encoder(**text_inputs).last_hidden_state  

        category_scores = []

        for timestep, weight in zip(selected_timesteps, weights):
            noisy_latents = forward(latents, timestep, noise_scheduler)

            denoised_latents = reverse(noisy_latents, timestep, text_embeddings, pipeline)

            score = similarity(latents, denoised_latents)  
            category_scores.append(score * weight)

        total_score = sum(category_scores)
        scores.append(total_score)

    print("Scores: ", scores)
    predicted = categories[scores.index(min(scores))]

    return predicted

# Test it Out!

In [28]:
image_path = "/kaggle/input/cat-image/cat1.jpg"  
classes = ["dog", "horse", "cat", "elephant", "zebra", "leopard"]

result = classify(image_path, classes)
print(f"Predicted class: {result}")

Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

Scores:  [4.23095703125, 4.239837646484375, 4.059783935546875, 4.257659912109375, 4.17352294921875, 4.104644775390625]
Predicted class: cat


In [29]:
image_path = "/kaggle/input/classify-diffusion/car1.jpeg"  
classes = ["car", "truck", "bus"]

result = classify(image_path, classes)
print(f"Predicted class: {result}")

Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

Scores:  [5.257720947265625, 5.314910888671875, 5.29034423828125]
Predicted class: car


In [30]:
image_path = "/kaggle/input/classify-diffusion/bird1.jpeg"  
classes = ["bird", "human", "fish", "insect"]

result = classify(image_path, classes)
print(f"Predicted class: {result}")

Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

Scores:  [2.466888427734375, 2.476470947265625, 2.480499267578125, 2.487457275390625]
Predicted class: bird
