In [2]:
!pip install diffusers transformers torch accelerate

Collecting accelerate
  Using cached accelerate-1.4.0-py3-none-any.whl.metadata (19 kB)
Using cached accelerate-1.4.0-py3-none-any.whl (342 kB)
Installing collected packages: accelerate
Successfully installed accelerate-1.4.0

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [3]:
import json
import torch
from PIL import Image
from diffusers import StableDiffusionPipeline, UNet2DConditionModel, DDPMScheduler
from transformers import CLIPTextModel, CLIPTokenizer
from torch.optim import AdamW
import os

In [4]:

# Configuration
placeholder_token = "icon"
learnable_property = "object"
num_train_epochs = 1000
checkpoint_steps = 100  # Save checkpoint every 100 steps
learning_rate = 5e-4
output_dir = "textual_inversion_model"

In [5]:
# Create output directory
os.makedirs(output_dir, exist_ok=True)

# Load metadata
with open("/Users/xs518-shigoy/Documents/AI Agent/.TIFewImage/images/metadata.json", "r") as f:
    metadata = json.load(f)


In [6]:
metadata

[{'text': 'An orange light bulb icon with blue rays emanating from it.',
  'image': '08-05-2024 Icons for AI Library-01.png'},
 {'text': 'A web browser window with three interlocking gears in blue orange and white.',
  'image': '08-05-2024 Icons for AI Library-02.png'},
 {'text': 'Three circular icons with a blue gear green chat bubble and orange wrench',
  'image': '08-05-2024 Icons for AI Library-03.png'}]

In [8]:
!pip install accelerate


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [9]:
# Load models
model_id = "runwayml/stable-diffusion-v1-5"
tokenizer = CLIPTokenizer.from_pretrained(model_id, subfolder="tokenizer")
text_encoder = CLIPTextModel.from_pretrained(model_id, subfolder="text_encoder")
unet = UNet2DConditionModel.from_pretrained(model_id, subfolder="unet")
noise_scheduler = DDPMScheduler.from_pretrained(model_id, subfolder="scheduler")    

Cannot initialize model with low cpu memory usage because `accelerate` was not found in the environment. Defaulting to `low_cpu_mem_usage=False`. It is strongly recommended to install `accelerate` for faster and less memory-intense model loading. You can do so with: 
```
pip install accelerate
```
.
Error while downloading from https://cdn-lfs.hf.co/repos/66/6f/666f465fa70158515404e8de2c6bc6fe2f90c46f9296293aa14daededeb32c52/19da7aaa4b880e59d56843f1fcb4dd9b599c28a1d9d9af7c1143057c8ffae9f1?response-content-disposition=inline%3B+filename*%3DUTF-8%27%27diffusion_pytorch_model.safetensors%3B+filename%3D%22diffusion_pytorch_model.safetensors%22%3B&Expires=1740120096&Policy=eyJTdGF0ZW1lbnQiOlt7IkNvbmRpdGlvbiI6eyJEYXRlTGVzc1RoYW4iOnsiQVdTOkVwb2NoVGltZSI6MTc0MDEyMDA5Nn19LCJSZXNvdXJjZSI6Imh0dHBzOi8vY2RuLWxmcy5oZi5jby9yZXBvcy82Ni82Zi82NjZmNDY1ZmE3MDE1ODUxNTQwNGU4ZGUyYzZiYzZmZTJmOTBjNDZmOTI5NjI5M2FhMTRkYWVkZWRlYjMyYzUyLzE5ZGE3YWFhNGI4ODBlNTlkNTY4NDNmMWZjYjRkZDliNTk5YzI4YTFkOWQ5YWY3YzExNDMwNTdjOGZ

In [None]:
# Add placeholder token
num_added_tokens = tokenizer.add_tokens(placeholder_token)
if num_added_tokens == 0:
    raise ValueError(f"Token {placeholder_token} already exists")
text_encoder.resize_token_embeddings(len(tokenizer))


In [None]:
# Get token ID
placeholder_token_id = tokenizer.convert_tokens_to_ids(placeholder_token)

# Freeze all parameters except the token embeddings
text_encoder.text_model.encoder.requires_grad_(False)
text_encoder.text_model.final_layer_norm.requires_grad_(False)
text_encoder.text_model.embeddings.position_embedding.requires_grad_(False)


In [None]:
# Initialize new token with random embeddings
token_embeds = text_encoder.get_input_embeddings().weight.data
token_embeds[placeholder_token_id] = torch.randn_like(token_embeds[placeholder_token_id])

In [None]:
# Prepare dataset
dataset = []
for item in metadata:
    image = Image.open(item["file_name"])
    image = image.convert("RGB").resize((512, 512))
    caption = item["caption"]  # Fixed bracket
    dataset.append((image, caption))

In [None]:
# Training setup
optimizer = AdamW(text_encoder.get_input_embeddings().parameters(), lr=learning_rate)
global_step = 0

In [None]:
# Training loop
for epoch in range(num_train_epochs):
    for image, caption in dataset:
        # Convert image to latents
        with torch.no_grad():
            latents = torch.randn((1, 4, 64, 64))  # Simplified for example
            
        # Tokenize text
        input_ids = tokenizer(
            caption,
            padding="max_length",
            truncation=True,
            max_length=tokenizer.model_max_length,
            return_tensors="pt",
        ).input_ids

        # Forward pass
        text_encoder.train()
        encoder_hidden_states = text_encoder(input_ids)[0]

        # Sample noise
        noise = torch.randn_like(latents)
        timesteps = torch.randint(0, noise_scheduler.config.num_train_timesteps, (1,))
        noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps)

        # Predict noise
        noise_pred = unet(noisy_latents, timesteps, encoder_hidden_states).sample

        # Calculate loss
        loss = torch.nn.functional.mse_loss(noise_pred, noise)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        global_step += 1

        # Save checkpoint
        if global_step % checkpoint_steps == 0:
            checkpoint_path = os.path.join(output_dir, f"checkpoint-{global_step}")
            torch.save({
                "epoch": epoch,
                "global_step": global_step,
                "state_dict": text_encoder.state_dict(),
                "optimizer": optimizer.state_dict(),
            }, checkpoint_path)
            print(f"Saved checkpoint to {checkpoint_path}")

    print(f"Epoch {epoch+1}/{num_train_epochs}, Loss: {loss.item()}")

In [None]:
# Save final embeddings
text_encoder.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)
print("Training complete!")

In [None]:
checkpoint = torch.load("textual_inversion_model/checkpoint-100")
text_encoder.load_state_dict(checkpoint["state_dict"])
optimizer.load_state_dict(checkpoint["optimizer"])