## Diffusion using
## Images + CLIP extracted feature vectors

In [None]:
pip install torch torchvision

In [None]:
pip install git+https://github.com/openai/CLIP.git

In [None]:
pip install diffusers

In [None]:
import torch
from diffusers import UnCLIPScheduler, DDPMScheduler, StableUnCLIPPipeline
from diffusers.models import PriorTransformer
from transformers import CLIPTokenizer, CLIPTextModelWithProjection
from diffusers import StableUnCLIPImg2ImgPipeline
from diffusers.utils import load_image
from PIL import Image
from diffusers import StableDiffusionImageVariationPipeline
from torchvision import transforms

## Experiment with UnClip Text to Image

In [None]:
prior_model_id = "kakaobrain/karlo-v1-alpha"
data_type = torch.float16
prior = PriorTransformer.from_pretrained(prior_model_id, subfolder="prior", torch_dtype=data_type)

prior_text_model_id = "openai/clip-vit-large-patch14"
prior_tokenizer = CLIPTokenizer.from_pretrained(prior_text_model_id)
prior_text_model = CLIPTextModelWithProjection.from_pretrained(prior_text_model_id, torch_dtype=data_type)
prior_scheduler = UnCLIPScheduler.from_pretrained(prior_model_id, subfolder="prior_scheduler")
prior_scheduler = DDPMScheduler.from_config(prior_scheduler.config)

stable_unclip_model_id = "stabilityai/stable-diffusion-2-1-unclip-small"

pipe = StableUnCLIPPipeline.from_pretrained(
    stable_unclip_model_id,
    torch_dtype=data_type,
    prior_tokenizer=prior_tokenizer,
    prior_text_encoder=prior_text_model,
    prior=prior,
    prior_scheduler=prior_scheduler,
)

pipe = pipe.to("cuda")
wave_prompt = "cute cat"

image = pipe(prompt=wave_prompt).images[0]
image

## Unclip Image to Image variant
Ideally in the SSL workflow we don't want to inject additional 'label' type information so would make more sense to just do image to image

In [None]:
pipe = StableUnCLIPImg2ImgPipeline.from_pretrained(
    "stabilityai/stable-diffusion-2-1-unclip", torch_dtype=torch.float16, variation="fp16"
)
pipe = pipe.to("cuda")

init_image = Image.open("/content/n01440764_18.JPEG")

images = pipe(init_image).images
images[0].save("variation_image.png")

## Stable Diffusion Image Variation model


In [None]:
device = "cuda:0"
sd_pipe = StableDiffusionImageVariationPipeline.from_pretrained(
  "lambdalabs/sd-image-variations-diffusers",
  revision="v2.0",
  )
sd_pipe = sd_pipe.to(device)

im = Image.open("/content/n01440764_18.JPEG")
tform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize(
        (224, 224),
        interpolation=transforms.InterpolationMode.BICUBIC,
        antialias=False,
        ),
    transforms.Normalize(
      [0.48145466, 0.4578275, 0.40821073],
      [0.26862954, 0.26130258, 0.27577711]),
])
inp = tform(im).to(device).unsqueeze(0)

out = sd_pipe(inp, guidance_scale=3)
out["images"][0].save("result.jpg")

## Loading ImageNet 100

In [None]:
import os
import random
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

class ImageNet100SubsampleDataset(Dataset):
    def __init__(self, root_dir, k=5, transform=None):
        self.root_dir = root_dir
        self.k = k # if we only want to generate k more samples with k< no.of.samples then only select k images
        # alternative with k > no.of.samples TODO
        self.transform = transform
        self.samples = []

        for class_id in os.listdir(root_dir):
            class_path = os.path.join(root_dir, class_id)
            images = self._get_images(class_path)
            if images:
                self.samples.append((images, class_id))

    def _get_images(self, folder_path):
        """ Recursively collect image paths from a folder. """
        images = []
        for entry in os.listdir(folder_path):
            entry_path = os.path.join(folder_path, entry)
            if os.path.isdir(entry_path):
                images.extend(self._get_images(entry_path))
            else:
                images.append(entry_path)
        return images

    def __len__(self):
        return len(self.samples) * self.k

    def __getitem__(self, idx):
        class_index = idx // self.k
        image_index = idx % self.k

        images, class_id = self.samples[class_index]
        base_image_path = random.choice(images)
        base_image = Image.open(base_image_path).convert("RGB")

        if self.transform:
            base_image = self.transform(base_image)

        return base_image, class_id

Optionally resize the image -- check if still needed

In [None]:
from torchvision.transforms import Compose, Resize, ToTensor

transformations = Compose([
    Resize((224, 224)),
    ToTensor()
])

In [None]:
dataset = ImageNet100SubsampleDataset(root_dir="/content/dataset", k=5, transform=transformations)
data_loader = DataLoader(dataset, batch_size=32, shuffle=True)


In [None]:
from diffusers import StableUnCLIPImg2ImgPipeline
from diffusers.utils import load_image
import torch
from PIL import Image
from torchvision.transforms.functional import to_pil_image

In [None]:
pipe = StableUnCLIPImg2ImgPipeline.from_pretrained(
    "stabilityai/stable-diffusion-2-1-unclip", torch_dtype=torch.float16, variation="fp16"
)
pipe = pipe.to("cuda")

In [None]:
for images, class_ids in data_loader:

    v_imgs = [pipe(to_pil_image(image)).images[0] for image in images]  # this is if the pipeline processes PIL images directly
    no = 0
    for img, class_id in zip(v_imgs, class_ids):
        name = "variation_" + class_id + str(no)+ '.png'
        no += 1
        img.save(name)

## Experimenting with Latent Diffusion Models