In [None]:
# mount my Google Drive to save the notebook
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# install libraries
!pip install -q diffusers transformers accelerate peft bitsandbytes kaggle

In [None]:
# Kaggle API
from google.colab import files
files.upload() # upload kaggle.json API key
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
# download Tom and Jerry dataset
!kaggle datasets download -d balabaskar/tom-and-jerry-image-classification -p /content/tom_and_jerry

In [None]:
# unzip the dataset
!unzip -q /content/tom_and_jerry/tom-and-jerry-image-classification.zip -d /content/tom_and_jerry_dataset

In [None]:
# inspect dataset to verify and start data preparation
import os
dataset_path = '/content/tom_and_jerry_dataset/tom_and_jerry/tom_and_jerry'
print(os.listdir(dataset_path))

In [None]:
# get all images from the dataset
import glob
from PIL import Image

image_paths = []
style_data_root = '/content/tom_and_jerry_dataset/tom_and_jerry/tom_and_jerry/'

# collect images from all subfolders -> goes through all(files and folders) in the style_data_root directory
for folder_name in os.listdir(style_data_root):
  folder_path = os.path.join(style_data_root, folder_name)
  if os.path.isdir(folder_path):
    # add all files to image_paths list
    image_paths.extend(glob.glob(os.path.join(folder_path, '*.jpg')))
    image_paths.extend(glob.glob(os.path.join(folder_path, '*.png'))) # just in case but I think its only .jpg files
print(f"Found  {len(image_paths)} images to use for training.")

In [None]:
# load the base model = Stable Diffusion v1.5
import torch
from diffusers import StableDiffusionPipeline
from peft import LoraConfig, get_peft_model, PeftModel

model_id = "runwayml/stable-diffusion-v1-5"
device = "cuda"

# load pipeline with 4-bit quantization
pipeline = StableDiffusionPipeline.from_pretrained(
    model_id,
    torch_dtype=torch.float16, # try full 32bits precision later -> or rent a better GPU
    load_in_4bit=True, # model's weights and biases are loaded and stored using only 4 bits per value
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16,
)
pipeline.to(device)

pipeline.text_encoder.to(dtype=torch.float16, device=device)
pipeline.vae.to(dtype=torch.float16, device=device)

print("Base model loaded.")

In [None]:
# configure LoRA for UNet
from peft import LoraConfig, get_peft_model

# configure LoRA
lora_config = LoraConfig(
    r=16,
    lora_alpha=32, # often 2*r
    target_modules=["to_q", "to_k", "to_v", "to_out.0", "proj_in", "proj_out"],
    lora_dropout=0.05,
    bias="none",
)

# add LoRA adapters to the UNet -> making the UNet trainable, not the whole pipeline
unet = pipeline.unet
unet_lora = get_peft_model(unet, lora_config)
# verify LoRA application
unet_lora.print_trainable_parameters()

In [None]:
# training script
import torch
from torch.utils.data import DataLoader
from torchvision import transforms
from PIL import Image
from accelerate import Accelerator
from tqdm.auto import tqdm

# define accelerator
accelerator = Accelerator()

# image transformation
transform = transforms.Compose([
    transforms.Resize((512, 512)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5]),
])

# custom 'Tom and Jerry' dataset class
class TomAndJerryDataset(torch.utils.data.Dataset):
  def __init__(self, image_paths, transform=None):
    self.image_paths = image_paths
    self.transform = transform
  def __len__(self):
    return len(self.image_paths)
  def __getitem__(self, idx):
    image = Image.open(self.image_paths[idx]).convert("RGB")
    if self.transform:
      image = self.transform(image)
    return image

In [None]:
# training parameters
num_epochs = 6
learning_rate = 1e-4
BATCH_SIZE = 4

# prepare custom 'Tom and Jerry' dataset and dataloader
dataset = TomAndJerryDataset(image_paths, transform)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

# setup optimizer for LoRA params only
optimizer = torch.optim.AdamW(unet_lora.parameters(), lr=learning_rate)

# prepare everything with accelerator
unet_lora, optimizer, dataloader = accelerator.prepare(unet_lora, optimizer, dataloader)

In [None]:
import gc
import torch

gc.collect()
torch.cuda.empty_cache()
torch.cuda.ipc_collect()

print(torch.cuda.memory_summary())

In [None]:
# training loop
unet_lora.train()

for epoch in range(num_epochs):
  total_loss = 0.0
  progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}")

  for images in progress_bar:
    optimizer.zero_grad()

    # generate latent representations
    latents = pipeline.vae.encode(images.to(device, dtype=torch.float16)).latent_dist.sample()
    latents = latents * 0.18215 # stable factor from Stable Diffusion

    # sample random noise
    noise = torch.randn_like(latents).to(device)
    timesteps = torch.randint(0, pipeline.scheduler.config.num_train_timesteps, (latents.shape[0],), device=device).long()

    noisy_latents = pipeline.scheduler.add_noise(latents, noise, timesteps)

    # predict noise with UNet-LoRA
    noise_pred = unet_lora(noisy_latents, timesteps, encoder_hidden_states=pipeline.text_encoder(torch.zeros((latents.shape[0], 77), dtype=torch.long, device=device))[0]).sample

    # compute loss - MSE between predicted and true noise
    loss = torch.nn.functional.mse_loss(noise_pred, noise)
    accelerator.backward(loss)

    optimizer.step()

    total_loss += loss.item()
    progress_bar.set_postfix(loss=loss.item())

  avg_loss = total_loss / len(dataloader)
  print(f"Epoch {epoch+1}/{num_epochs} - Average Loss: {avg_loss:.4f}")

# save fine-tuned model
unet_lora.save_pretrained("/content/drive/MyDrive/MLX/fine_tuned_tom_and_jerry")