!/usr/bin/env python

**Logo Generation LoRA Training for SD 1.5 - No PEFT, No bitsandbytes**
**=================================================================**

In [1]:
# Importing required Dependencies

import os
import math
import json
import torch
import torch.nn.functional as F
import torch.utils.checkpoint
import numpy as np
import types
from PIL import Image
from pathlib import Path
from tqdm.auto import tqdm
from datetime import datetime
import logging
import random
from torch.utils.data import Dataset, DataLoader
import transformers
from transformers import CLIPTextModel, CLIPTokenizer
from accelerate import Accelerator
from accelerate.utils import set_seed
from diffusers import (
    AutoencoderKL,
    DDPMScheduler,
    StableDiffusionPipeline,
    UNet2DConditionModel,
)
from diffusers.loaders import AttnProcsLayers
from diffusers.models.attention_processor import LoRAAttnProcessor
from diffusers.optimization import get_scheduler

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler(f"lora_training_{datetime.now().strftime('%Y%m%d_%H%M')}.log")
    ]
)
logger = logging.getLogger(__name__)

In [3]:
# Training configuration
class TrainingConfig:
    def __init__(self):
        # Model settings
        self.pretrained_model_name = "runwayml/stable-diffusion-v1-5"
        self.output_dir = "models/sd15_lora_logos"
        
        # LoRA settings
        self.lora_rank = 128
        self.lora_alpha = 128  # Often same as rank
        self.lora_dropout = 0.0
        
        # Training settings
        self.seed = 42
        self.resolution = 512
        self.train_batch_size = 4
        self.mixed_precision = "fp16"  # "no" for full precision
        self.gradient_accumulation_steps = 4
        self.gradient_checkpointing = True
        
        # Learning rate and scheduler
        self.learning_rate = 1e-4
        self.lr_scheduler = "cosine"
        self.lr_warmup_steps = 100
        self.lr_num_cycles = 1
        self.lr_power = 1.0
        
        # Training loop
        self.max_train_steps = 10000
        self.checkpointing_steps = 1000
        self.validation_steps = 250
        self.validation_prompt = "A logo for a technology company, minimalist style, with blue colors"
        
        # Dataset settings
        self.dataset_path = "data/processed_modern"
        self.training_data_json = os.path.join(self.dataset_path, "modern_training_data.json")
        
        # Performance
        self.enable_xformers = True
        self.dataloader_num_workers = 4

In [4]:
class LogoDataset(Dataset):
    def __init__(
        self,
        json_path,
        tokenizer,
        size=512,
        center_crop=True,
        random_flip=True,
    ):
        self.tokenizer = tokenizer
        self.size = size
        self.center_crop = center_crop
        self.random_flip = random_flip
        
        logger.info(f"Loading dataset from {json_path}")
        with open(json_path, "r") as f:
            self.data = json.load(f)
        
        # Getting path information
        self.dataset_dir = os.path.dirname(json_path)
        
        logger.info(f"Loaded {len(self.data)} training samples")
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        item = self.data[idx]
        
        # Getting image path
        filename = item["file_name"]
        is_augmented = item.get("augmented", False)
        
        # FIXED: Get correct path based on filename pattern
        if "_aug" in filename:
            # If filename contains '_aug', it's definitely an augmented file
            image_path = os.path.join(self.dataset_dir, "augmented_images", filename)
        else:
            # Otherwise it's a regular file in the images directory
            image_path = os.path.join(self.dataset_dir, "images", filename)
        
        # Double-checking if the file exists
        if not os.path.exists(image_path):
            raise FileNotFoundError(f"Image file not found: {image_path}")
        
        # Getting the prompt
        caption = item["prompt"]
        
        # Loading and transform the image
        image = Image.open(image_path)
        if not image.mode == "RGB":
            image = image.convert("RGB")
        
        # Applying transformations
        if self.center_crop:
            image = self.center_crop_image(image, self.size)
        
        if self.random_flip and random.random() > 0.5:
            image = image.transpose(Image.FLIP_LEFT_RIGHT)
        
        # Converting to tensor
        image = np.array(image).astype(np.uint8)
        image = (image / 127.5 - 1.0).astype(np.float32)
        image = torch.from_numpy(image).permute(2, 0, 1)  # Moving channels to first dim
        
        # Tokenize caption
        inputs = self.tokenizer(
            caption,
            padding="max_length",
            max_length=self.tokenizer.model_max_length,
            truncation=True,
            return_tensors="pt",
        )
        
        return {
            "pixel_values": image,
            "input_ids": inputs.input_ids[0],
            "image_path": image_path,
            "caption": caption,
        }
    
    def center_crop_image(self, image, size):
        width, height = image.size
        new_size = min(width, height)
        left = (width - new_size) // 2
        top = (height - new_size) // 2
        right = left + new_size
        bottom = top + new_size
        image = image.crop((left, top, right, bottom))
        return image.resize((size, size), Image.BICUBIC)

In [5]:
def create_lora_layers(unet, rank=4, alpha=4):
    """Creating LoRA layers for self-attention modules only"""
    from diffusers.models.lora import LoRALinearLayer
    import torch.nn.functional as F
    
    # Get LoRA-compatible modules
    lora_modules = {}
    
    # Finding all self-attention modules
    for name, module in unet.named_modules():
        # Focus only on self-attention (attn1) modules
        if isinstance(module, torch.nn.Linear) and "attn1" in name and any(
            x in name for x in ["to_q", "to_k", "to_v", "to_out.0"]
        ):
            lora_modules[name] = module
    
    logger.info(f"Found {len(lora_modules)} LoRA-compatible modules")
    
    # Creating a list to store trainable parameters
    lora_parameters = []
    
    # Creating custom LoRA forwarding wrapper
    class LoRAWrapper(torch.nn.Module):
        def __init__(self, base_layer, rank, alpha):
            super().__init__()
            self.base_layer = base_layer
            self.in_features = base_layer.in_features
            self.out_features = base_layer.out_features
            
            # Initialize LoRA weights
            self.lora_down = torch.nn.Linear(self.in_features, rank, bias=False)
            self.lora_up = torch.nn.Linear(rank, self.out_features, bias=False)
            
            # Initializing weights - this is important for stability
            torch.nn.init.normal_(self.lora_down.weight, std=1/rank)
            torch.nn.init.zeros_(self.lora_up.weight)
            
            # Scaling factor
            self.scale = alpha / rank
            
            # Disable gradient computation for base layer
            for param in self.base_layer.parameters():
                param.requires_grad = False
        
        def forward(self, x):
            # Base forward pass
            base_output = self.base_layer(x)
            
            # LoRA forward pass
            lora_output = self.lora_up(F.relu(self.lora_down(x))) * self.scale
            
            # Combining outputs
            return base_output + lora_output
    
    # Replacing modules with LoRA wrappers
    count = 0
    for name, module in lora_modules.items():
        # Creating LoRA wrapper
        lora_wrapper = LoRAWrapper(module, rank, alpha)
        
        # Moving to same device and dtype
        lora_wrapper = lora_wrapper.to(module.weight.device, module.weight.dtype)
        
        # Setting parameters to require gradients
        lora_wrapper.lora_down.weight.requires_grad_(True)
        lora_wrapper.lora_up.weight.requires_grad_(True)
        
        # Adding parameters to trainable list
        lora_parameters.append(lora_wrapper.lora_down.weight)
        lora_parameters.append(lora_wrapper.lora_up.weight)
        
        # Finding parent module
        parent_name, child_name = name.rsplit(".", 1)
        parent = unet
        for part in parent_name.split("."):
            parent = getattr(parent, part)
        
        # Replacing module with wrapper
        setattr(parent, child_name, lora_wrapper)
        count += 1
    
    logger.info(f"Replaced {count} modules with LoRA wrappers")
    logger.info(f"Number of trainable parameters: {sum(p.numel() for p in lora_parameters)}")
    
    return lora_parameters

In [6]:
def save_lora(unet, save_path, rank=128, alpha=128):
    """Save LoRA weights and config"""
    # Saving path
    os.makedirs(save_path, exist_ok=True)
    
    # Extracting LoRA state dict
    lora_state_dict = {}
    
    # Finding all LoRA wrappers
    for name, module in unet.named_modules():
        if hasattr(module, 'lora_up') and hasattr(module, 'lora_down'):
            # Saving the weights
            lora_state_dict[f"{name}.lora_down.weight"] = module.lora_down.weight.data.clone()
            lora_state_dict[f"{name}.lora_up.weight"] = module.lora_up.weight.data.clone()
            
            # Converting scale to tensor if it exists
            if hasattr(module, 'scale'):
                # Convert float to tensor
                scale_tensor = torch.tensor(module.scale, dtype=torch.float32)
                lora_state_dict[f"{name}.scale"] = scale_tensor
    
    # Saving the state dict
    lora_path = os.path.join(save_path, "pytorch_lora_weights.safetensors")
    
    # Using safetensors for storing weights
    try:
        from safetensors.torch import save_file
        save_file(lora_state_dict, lora_path)
    except ImportError:
        # Fallback to PyTorch saving if safetensors not available
        torch.save(lora_state_dict, lora_path.replace("safetensors", "bin"))
        logger.warning("safetensors not available, saved weights using torch.save instead")
    
    # Saving config for weights
    config = {
        "model_type": "stable-diffusion",
        "base_model_name": "runwayml/stable-diffusion-v1-5",
        "rank": rank,
        "network_alpha": alpha
    }
    
    with open(os.path.join(save_path, "config.json"), "w") as f:
        json.dump(config, f, indent=2)
    
    logger.info(f"Saved LoRA weights to {save_path}")
    return lora_path

In [7]:
def train_lora():
    """Main LoRA training function"""
    # Loading config
    config = TrainingConfig()
    
    # Initializing accelerator
    accelerator = Accelerator(
        gradient_accumulation_steps=config.gradient_accumulation_steps,
        mixed_precision=config.mixed_precision,
    )
    
    # Setting up logging for accelerator
    logging.basicConfig(
        format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
        datefmt="%m/%d/%Y %H:%M:%S",
        level=logging.INFO,
    )
    
    # Making output directory
    if accelerator.is_main_process:
        if config.output_dir is not None:
            os.makedirs(config.output_dir, exist_ok=True)
    
    # Set seed for reproducibility
    set_seed(config.seed)
    
    # Loading tokenizer
    tokenizer = CLIPTokenizer.from_pretrained(
        config.pretrained_model_name,
        subfolder="tokenizer"
    )
    
    # Loading models
    noise_scheduler = DDPMScheduler.from_pretrained(
        config.pretrained_model_name,
        subfolder="scheduler"
    )
    
    text_encoder = CLIPTextModel.from_pretrained(
        config.pretrained_model_name,
        subfolder="text_encoder",
    )
    
    vae = AutoencoderKL.from_pretrained(
        config.pretrained_model_name,
        subfolder="vae"
    )
    
    unet = UNet2DConditionModel.from_pretrained(
        config.pretrained_model_name,
        subfolder="unet"
    )
    
    # Freezing models (only LoRA parameters will be trained)
    vae.requires_grad_(False)
    text_encoder.requires_grad_(False)
    unet.requires_grad_(False)
    
    # Enabling gradient checkpointing for memory efficiency
    if config.gradient_checkpointing:
        unet.enable_gradient_checkpointing()
    
    # Creating LoRA layers
    lora_parameters = create_lora_layers(
        unet, 
        rank=config.lora_rank, 
        alpha=config.lora_alpha
    )
    
    # Creating optimizer
    optimizer = torch.optim.AdamW(
        lora_parameters,
        lr=config.learning_rate,
        betas=(0.9, 0.999),
        weight_decay=1e-2,
        eps=1e-8,
    )
    
    # Creating dataset and dataloader
    train_dataset = LogoDataset(
        json_path=config.training_data_json,
        tokenizer=tokenizer,
        size=config.resolution,
        center_crop=True,
        random_flip=True,
    )
    
    train_dataloader = DataLoader(
        train_dataset,
        batch_size=config.train_batch_size,
        shuffle=True,
        num_workers=config.dataloader_num_workers,
        pin_memory=True,
    )
    
    # Setting up learning rate scheduler
    lr_scheduler = get_scheduler(
        config.lr_scheduler,
        optimizer=optimizer,
        num_warmup_steps=config.lr_warmup_steps * accelerator.num_processes,
        num_training_steps=config.max_train_steps,
        num_cycles=config.lr_num_cycles,
        power=config.lr_power,
    )
    
    # Preparing models for accelerator
    unet, optimizer, train_dataloader, lr_scheduler = accelerator.prepare(
        unet, optimizer, train_dataloader, lr_scheduler
    )
    
    # Moving non-optimized models to device
    vae.to(accelerator.device)
    text_encoder.to(accelerator.device)
    
    # Enable xformers if requested (memory efficiency)
    if config.enable_xformers:
        try:
            import xformers
            unet.enable_xformers_memory_efficient_attention()
            logger.info("Using xformers for memory efficient attention")
        except ImportError:
            logger.warning("xformers not available")
    
    # Calculating number of steps and set progress bar
    num_update_steps_per_epoch = math.ceil(len(train_dataloader) / config.gradient_accumulation_steps)
    if config.max_train_steps is None:
        config.max_train_steps = config.num_train_epochs * num_update_steps_per_epoch
    else:
        # Calculatong epochs for info
        num_train_epochs = math.ceil(config.max_train_steps / num_update_steps_per_epoch)
    
    # Getting total batch size
    total_batch_size = config.train_batch_size * accelerator.num_processes * config.gradient_accumulation_steps
    
    # Print training info
    logger.info("***** Running LoRA training *****")
    logger.info(f"  Num examples = {len(train_dataset)}")
    logger.info(f"  Instantaneous batch size per device = {config.train_batch_size}")
    logger.info(f"  Total train batch size = {total_batch_size}")
    logger.info(f"  Gradient Accumulation steps = {config.gradient_accumulation_steps}")
    logger.info(f"  Total optimization steps = {config.max_train_steps}")
    logger.info(f"  Output directory = {config.output_dir}")
    logger.info(f"  Mixed precision = {config.mixed_precision}")
    
    # Progress bar
    progress_bar = tqdm(range(config.max_train_steps), disable=not accelerator.is_local_main_process)
    global_step = 0
    
    # Training loop
    for epoch in range(1000):  # Arbitrary large number, we use steps for early stopping
        unet.train()
        
        for batch in train_dataloader:
            with accelerator.accumulate(unet):
                # Converting images to latent space
                with torch.no_grad():
                    latents = vae.encode(batch["pixel_values"].to(vae.dtype)).latent_dist.sample() * 0.18215
                # Update progress - don't recreate it
                if accelerator.sync_gradients:
                    progress_bar.update(1)
                    global_step += 1
                
                # Sample noise
                noise = torch.randn_like(latents)
                bsz = latents.shape[0]
                
                # Sample a random timestep for each image
                timesteps = torch.randint(
                    0, noise_scheduler.config.num_train_timesteps, (bsz,), device=latents.device
                ).long()
                
                # Adding noise to the latents
                noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps)
                
                # Getting text embeddings
                with torch.no_grad():
                    encoder_hidden_states = text_encoder(batch["input_ids"])[0]
                
                # Predicting the noise residual
                model_pred = unet(
                    noisy_latents, timesteps, encoder_hidden_states=encoder_hidden_states
                ).sample
                
                # Calculating loss
                loss = F.mse_loss(model_pred, noise, reduction="none").mean([1, 2, 3]).mean()
                
                # Backward pass and optimization
                accelerator.backward(loss)
                
                if accelerator.sync_gradients:
                    accelerator.clip_grad_norm_(lora_parameters, 1.0)
                
                optimizer.step()
                lr_scheduler.step()
                optimizer.zero_grad()
            
            # Updating progress
            if accelerator.sync_gradients:
                progress_bar.update(1)
                global_step += 1
                
                # Log progress
                if global_step % 50 == 0 and accelerator.is_main_process:
                    logger.info(f"Step {global_step}: loss = {loss.item():.4f}, lr = {lr_scheduler.get_last_lr()[0]:.8f}")
                
                # Saving checkpoint
                if global_step % config.checkpointing_steps == 0 and accelerator.is_main_process:
                    save_path = os.path.join(config.output_dir, f"checkpoint-{global_step}")
                    os.makedirs(save_path, exist_ok=True)
                    
                    # Saving LoRA weights
                    accelerator.wait_for_everyone()
                    unwrapped_unet = accelerator.unwrap_model(unet)
                    lora_path = save_lora(unwrapped_unet, save_path)
                    
                    # Optionally add validation here if desired
                    # (would need to load a pipeline and generate samples)
                    
                    logger.info(f"Saved checkpoint at step {global_step} to {save_path}")
            
            # Checking if we've reached the max steps
            if global_step >= config.max_train_steps:
                break
        
        # End epoch - break if we've reached max steps
        if global_step >= config.max_train_steps:
            break
    
    # Final save
    if accelerator.is_main_process:
        # Saving a final checkpoint
        save_path = os.path.join(config.output_dir, "final")
        os.makedirs(save_path, exist_ok=True)
        
        # Unwrap and save
        accelerator.wait_for_everyone()
        unwrapped_unet = accelerator.unwrap_model(unet)
        lora_path = save_lora(unwrapped_unet, save_path, config.lora_rank, config.lora_alpha)
        
        logger.info(f"Training completed. Final model saved to {save_path}")
    
    # Return the path to the saved model
    return config.output_dir

def setup_environment():
    """Setup environment variables and initial checks before training"""
    # Checking for CUDA
    if not torch.cuda.is_available():
        logger.warning("CUDA not available, training will be slow")
    else:
        logger.info(f"Found {torch.cuda.device_count()} CUDA devices")
        logger.info(f"Using GPU: {torch.cuda.get_device_name(0)}")
    
    # Setting up tensor float precision
    torch.set_float32_matmul_precision('high')
    
    return True

if __name__ == "__main__":
    # Setup
    setup_environment()
    
    # Start training
    logger.info("Starting LoRA training for SD 1.5")
    output_dir = train_lora()
    
    logger.info(f"Training completed. Model saved to {output_dir}")

2025-05-02 15:41:50,620 - INFO - Found 2 CUDA devices
2025-05-02 15:41:50,632 - INFO - Using GPU: NVIDIA L4
2025-05-02 15:41:50,634 - INFO - Starting LoRA training for SD 1.5
2025-05-02 15:41:51,865 - INFO - Found 64 LoRA-compatible modules
2025-05-02 15:41:52,047 - INFO - Replaced 64 modules with LoRA wrappers
2025-05-02 15:41:52,049 - INFO - Number of trainable parameters: 12779520
2025-05-02 15:41:52,050 - INFO - Loading dataset from data/processed_modern/modern_training_data.json
2025-05-02 15:41:52,058 - INFO - Loaded 3212 training samples
2025-05-02 15:41:53,529 - INFO - Using xformers for memory efficient attention
2025-05-02 15:41:53,530 - INFO - ***** Running LoRA training *****
2025-05-02 15:41:53,531 - INFO -   Num examples = 3212
2025-05-02 15:41:53,531 - INFO -   Instantaneous batch size per device = 4
2025-05-02 15:41:53,532 - INFO -   Total train batch size = 16
2025-05-02 15:41:53,533 - INFO -   Gradient Accumulation steps = 4
2025-05-02 15:41:53,533 - INFO -   Total op

In [None]:
def debug_dataset_paths():
    """Debug dataset paths to ensure files can be found"""
    import json
    import os
    
    # Path to your JSON file
    json_path = "data/processed_modern/modern_training_data.json"
    
    # Load the JSON data
    with open(json_path, 'r') as f:
        data = json.load(f)
    
    # Count total entries
    print(f"Total entries in JSON: {len(data)}")
    
    # Check directory structure
    base_dir = "data/processed_modern"
    images_dir = os.path.join(base_dir, "images")
    augmented_dir = os.path.join(base_dir, "augmented_images")
    
    print(f"Base directory exists: {os.path.exists(base_dir)}")
    print(f"Images directory exists: {os.path.exists(images_dir)}")
    print(f"Augmented images directory exists: {os.path.exists(augmented_dir)}")
    
    # Count files in directories
    if os.path.exists(images_dir):
        image_files = os.listdir(images_dir)
        print(f"Files in images directory: {len(image_files)}")
        
    if os.path.exists(augmented_dir):
        augmented_files = os.listdir(augmented_dir)
        print(f"Files in augmented_images directory: {len(augmented_files)}")
    
    # Verify first 5 entries
    print("\nChecking first 5 entries:")
    for i, item in enumerate(data[:5]):
        filename = item["file_name"]
        is_augmented = item.get("augmented", False)
        subfolder = "augmented_images" if is_augmented else "images"
        path = os.path.join(base_dir, subfolder, filename)
        
        exists = os.path.exists(path)
        print(f"Entry {i}: {filename} (augmented: {is_augmented}) - Path exists: {exists} - {path}")
    
    # Verify last 5 entries
    print("\nChecking last 5 entries:")
    for i, item in enumerate(data[-5:]):
        filename = item["file_name"]
        is_augmented = item.get("augmented", False)
        subfolder = "augmented_images" if is_augmented else "images"
        path = os.path.join(base_dir, subfolder, filename)
        
        exists = os.path.exists(path)
        print(f"Entry {i+len(data)-5}: {filename} (augmented: {is_augmented}) - Path exists: {exists} - {path}")
    
    # Check for specific problematic file
    problem_file = "modern_490.png"
    direct_path = os.path.join(augmented_dir, problem_file)
    print(f"\nCheck specific problem file: {direct_path} - Exists: {os.path.exists(direct_path)}")
    
    # Search for similar files
    if os.path.exists(augmented_dir):
        similar_files = [f for f in augmented_files if "490" in f]
        print(f"Similar files: {similar_files}")

# Run the function
debug_dataset_paths()