In [1]:
!pip install torch



In [2]:
import pandas as pd
from sklearn.model_selection import train_test_split
import os

# Paths to datasets
paths = {
    'cat': '/Users/lixiaozao/Desktop/desk/cogs185/FINAL/Super-Model/dataset/cat/selected_images_captions/captions.csv',
    'dog': '/Users/lixiaozao/Desktop/desk/cogs185/FINAL/Super-Model/dataset/dog/selected_images_captions/captions.csv',
    'swan': '/Users/lixiaozao/Desktop/desk/cogs185/FINAL/Super-Model/dataset/swan/selected_images_captions/captions.csv'
}

# Load datasets and split
train_data = {}
test_data = {}
for animal, path in paths.items():
    df = pd.read_csv(path)
    train, test = train_test_split(df, test_size=10, random_state=42)
    train_data[animal] = train.head(30)
    test_data[animal] = test

# Check the splits
for animal in train_data:
    print(f"Train data size for {animal}: ", len(train_data[animal]))
    print(f"Test data size for {animal}: ", len(test_data[animal]))


Train data size for cat:  30
Test data size for cat:  10
Train data size for dog:  30
Test data size for dog:  10
Train data size for swan:  30
Test data size for swan:  10


In [3]:
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torch
from transformers import CLIPProcessor, CLIPModel

class CustomImageCaptionDataset(Dataset):
    def __init__(self, dataframe, image_dir):
        self.dataframe = dataframe
        self.image_dir = image_dir

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_name = os.path.join(self.image_dir, self.dataframe.iloc[idx, 0])
        image = Image.open(img_name).convert("RGB")
        caption = self.dataframe.iloc[idx, 1]
        return image, caption

# Define the image directories
image_dirs = {
    'cat': '/Users/lixiaozao/Desktop/desk/cogs185/FINAL/Super-Model/dataset/cat/selected_images',
    'dog': '/Users/lixiaozao/Desktop/desk/cogs185/FINAL/Super-Model/dataset/dog/selected_images',
    'swan': '/Users/lixiaozao/Desktop/desk/cogs185/FINAL/Super-Model/dataset/swan/selected_images'
}

# Combine datasets for DataLoader
train_datasets = []
for animal, df in train_data.items():
    train_datasets.append(CustomImageCaptionDataset(df, image_dirs[animal]))

train_dataset = torch.utils.data.ConcatDataset(train_datasets)
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)


  _torch_pytree._register_pytree_node(
  _torch_pytree._register_pytree_node(
  _torch_pytree._register_pytree_node(


In [None]:
from diffusers import StableDiffusionPipeline, UNet2DConditionModel, DDPMScheduler
from transformers import CLIPTextModel, CLIPTokenizer
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision import transforms
from PIL import Image

# Define hyperparameters
learning_rate = 1e-4
batch_size = 4
num_epochs = 5
image_size = 256

# Define the image directories
image_dirs = {
    'cat': 'dataset/cat/selected_images',
    'dog': 'dataset/dog/selected_images',
    'swan': 'dataset/swan/selected_images'
}

# Create a combined dataset for training
train_datasets = []
for animal, df in train_data.items():
    train_datasets.append(CustomImageCaptionDataset(df, image_dirs[animal]))
train_dataset = torch.utils.data.ConcatDataset(train_datasets)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Load pre-trained model
model_id = "runwayml/stable-diffusion-v1-5"
pipe = StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=torch.float16)
pipe = pipe.to("cuda")

# Extract components of the pipeline
unet = pipe.unet
scheduler = pipe.scheduler
vae = pipe.vae
text_encoder = pipe.text_encoder
tokenizer = pipe.tokenizer

# Set models to train mode
unet.train()
text_encoder.train()

# Define optimizer
optimizer = optim.Adam(unet.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    for batch in train_loader:
        images, captions = batch

        # Preprocess captions
        inputs = tokenizer(captions, return_tensors="pt", padding=True, truncation=True, max_length=77)
        input_ids = inputs.input_ids.to("cuda")

        # Encode the captions
        encoder_hidden_states = text_encoder(input_ids)[0]

        # Preprocess images
        images = [vae.encode(torch.tensor(image).unsqueeze(0).float().to("cuda")) for image in images]
        images = torch.cat(images, dim=0)

        # Forward pass through the model
        noise = torch.randn(images.shape).to("cuda")
        timesteps = torch.randint(0, scheduler.num_train_timesteps, (images.shape[0],)).to("cuda")
        noisy_images = scheduler.add_noise(images, noise, timesteps)

        # Get model prediction
        model_pred = unet(noisy_images, timesteps, encoder_hidden_states).sample

        # Calculate loss (e.g., MSE loss)
        loss = nn.MSELoss()(model_pred, noise)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

print("Training completed.")


In [None]:
# # Placeholder for training loop
# for epoch in range(num_epochs):
#     for images, captions in train_loader:
#         # Preprocess images and captions for the model
#         # Forward pass through the model
#         # Calculate loss
#         # Backward pass and optimization
#         pass

# # For now, we use the pre-trained model for generation
# pipe = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
# pipe = pipe.to("cuda")


In [None]:
from diffusers import StableDiffusionPipeline, UNet2DConditionModel, DDPMScheduler
from transformers import CLIPTextModel, CLIPTokenizer
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision import transforms
from PIL import Image

# Define hyperparameters
learning_rate = 1e-4
batch_size = 4
num_epochs = 5
image_size = 256

# Define the image directories
image_dirs = {
    'cat': 'dataset/cat/selected_images',
    'dog': 'dataset/dog/selected_images',
    'swan': 'dataset/swan/selected_images'
}

# Create a combined dataset for training
train_datasets = []
for animal, df in train_data.items():
    train_datasets.append(CustomImageCaptionDataset(df, image_dirs[animal]))
train_dataset = torch.utils.data.ConcatDataset(train_datasets)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Load pre-trained model
model_id = "runwayml/stable-diffusion-v1-5"
pipe = StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=torch.float16)
pipe = pipe.to("cuda")

# Extract components of the pipeline
unet = pipe.unet
scheduler = pipe.scheduler
vae = pipe.vae
text_encoder = pipe.text_encoder
tokenizer = pipe.tokenizer

# Set models to train mode
unet.train()
text_encoder.train()

# Define optimizer
optimizer = optim.Adam(unet.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    for batch in train_loader:
        images, captions = batch

        # Preprocess captions
        inputs = tokenizer(captions, return_tensors="pt", padding=True, truncation=True, max_length=77)
        input_ids = inputs.input_ids.to("cuda")

        # Encode the captions
        encoder_hidden_states = text_encoder(input_ids)[0]

        # Preprocess images
        images = [vae.encode(torch.tensor(image).unsqueeze(0).float().to("cuda")) for image in images]
        images = torch.cat(images, dim=0)

        # Forward pass through the model
        noise = torch.randn(images.shape).to("cuda")
        timesteps = torch.randint(0, scheduler.num_train_timesteps, (images.shape[0],)).to("cuda")
        noisy_images = scheduler.add_noise(images, noise, timesteps)

        # Get model prediction
        model_pred = unet(noisy_images, timesteps, encoder_hidden_states).sample

        # Calculate loss (e.g., MSE loss)
        loss = nn.MSELoss()(model_pred, noise)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

print("Training completed.")


In [None]:
# Function to generate images using the Stable Diffusion pipeline
def generate_image(prompt, model):
    image = model(prompt).images[0]
    return image

# Generate images for test captions
for animal, df in test_data.items():
    df['generated_image'] = df['caption'].apply(lambda x: generate_image(x, pipe))

# Save generated images

import os

output_dir = 'Super-Model/dataset/gen_imag'
os.makedirs(output_dir, exist_ok=True)
for animal, df in test_data.items():
    for i, row in df.iterrows():
        image_path = os.path.join(output_dir, f"{animal}_generated_{i}.png")
        row['generated_image'].save(image_path)


In [None]:
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

def calculate_clip_score(image, text, model, processor):
    inputs = processor(text=[text], images=image, return_tensors="pt", padding=True)
    outputs = model(**inputs)
    logits_per_image = outputs.logits_per_image
    return logits_per_image.item()

# Calculate CLIP scores for the test set
for animal, df in test_data.items():
    df['clip_score'] = df.apply(lambda x: calculate_clip_score(x['generated_image'], x['caption'], clip_model, clip_processor), axis=1)

# Output the CLIP scores
for animal, df in test_data.items():
    print(f"CLIP scores for {animal}:")
    print(df[['caption', 'clip_score']])
