In [3]:
# Import necessary libraries
import os
import torch
import logging
from PIL import Image
from pathlib import Path
from torchvision import transforms
from torch.utils.data import Dataset
from diffusers import (
    StableDiffusionXLPipeline,
    DPMSolverMultistepScheduler,
    DDPMScheduler,
)
from diffusers.optimization import get_scheduler
from diffusers.training_utils import EMAModel
from transformers import CLIPTokenizer
from huggingface_hub import notebook_login
from tqdm.auto import tqdm

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
class Config:
    # Model settings
    model_id = "stabilityai/stable-diffusion-xl-base-1.0"
    instance_prompt = "photo of Eesha"  # Replace 'xyz' with your identifier
    class_prompt = "photo of a person"
    
    # Paths
    output_dir = "dreambooth-model"
    instance_data_dir = "training-images"
    
    # Training settings
    num_training_steps = 1000
    learning_rate = 1e-6
    train_batch_size = 1
    gradient_accumulation_steps = 1
    image_size = 512
    mixed_precision = "fp16"  # or "no" for full precision
    
    # Device configuration
    device = "cuda" if torch.cuda.is_available() else "cpu"
    
config = Config()

# Create output directory
os.makedirs(config.output_dir, exist_ok=True)

In [5]:
class PersonalDataset(Dataset):
    def __init__(self, instance_data_dir, instance_prompt, tokenizer, size=512):
        self.instance_data_dir = Path(instance_data_dir)
        self.instance_prompt = instance_prompt
        self.tokenizer = tokenizer
        self.size = size
        
        self.image_paths = [f for f in self.instance_data_dir.iterdir() 
                           if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.webp']]
        
        if len(self.image_paths) == 0:
            raise ValueError(f"No images found in {instance_data_dir}")
        
        logger.info(f"Found {len(self.image_paths)} images in {instance_data_dir}")
        
        self.transform = transforms.Compose([
            transforms.Resize(size, interpolation=transforms.InterpolationMode.BILINEAR),
            transforms.CenterCrop(size),
            transforms.ToTensor(),
            transforms.Normalize([0.5], [0.5]),
        ])

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        try:
            image = Image.open(image_path).convert('RGB')
            image = self.transform(image)
            
            example = {
                "input_ids": self.tokenizer(
                    self.instance_prompt,
                    padding="max_length",
                    truncation=True,
                    max_length=self.tokenizer.model_max_length,
                    return_tensors="pt",
                ).input_ids[0],
                "images": image,
            }
            return example
        except Exception as e:
            logger.error(f"Error loading image {image_path}: {e}")
            raise

In [6]:
def setup_model():
    # Login to Hugging Face
    notebook_login()
    
    # Load the pipeline
    pipeline = StableDiffusionXLPipeline.from_pretrained(
        config.model_id,
        torch_dtype=torch.float16 if config.mixed_precision == "fp16" else torch.float32,
        use_safetensors=True,
        variant="fp16"
    )
    
    # Enable memory efficient attention
    pipeline.enable_xformers_memory_efficient_attention()
    
    # Move to device
    pipeline = pipeline.to(config.device)
    
    return pipeline

def prepare_dataset(pipeline):
    dataset = PersonalDataset(
        instance_data_dir=config.instance_data_dir,
        instance_prompt=config.instance_prompt,
        tokenizer=pipeline.tokenizer,
        size=config.image_size
    )
    return dataset

In [7]:
def training_function(pipeline, dataset):
    # Prepare optimizer
    optimizer = torch.optim.AdamW(
        pipeline.unet.parameters(),
        lr=config.learning_rate,
    )
    
    # Prepare scheduler
    lr_scheduler = get_scheduler(
        "constant",
        optimizer=optimizer,
        num_warmup_steps=0,
        num_training_steps=config.num_training_steps,
    )
    
    # Progress bar
    progress_bar = tqdm(range(config.num_training_steps))
    progress_bar.set_description("Steps")
    global_step = 0
    
    # Training loop
    for step in range(config.num_training_steps):
        pipeline.train()
        
        # Get training sample
        batch = dataset[step % len(dataset)]
        
        # Forward pass
        loss = pipeline(
            batch["input_ids"].unsqueeze(0).to(config.device),
            batch["images"].unsqueeze(0).to(config.device),
            return_dict=True
        ).loss
        
        # Backward pass
        loss.backward()
        
        if (step + 1) % config.gradient_accumulation_steps == 0:
            optimizer.step()
            lr_scheduler.step()
            optimizer.zero_grad()
            
            # Update progress bar
            progress_bar.update(1)
            global_step += 1
            
            # Log progress
            if global_step % 10 == 0:
                logger.info(f"Step {global_step}: loss = {loss.detach().item():.4f}")
            
            # Save checkpoint
            if global_step % 100 == 0:
                pipeline.save_pretrained(os.path.join(config.output_dir, f"checkpoint-{global_step}"))
    
    # Save final model
    pipeline.save_pretrained(config.output_dir)
    return pipeline

In [8]:
def generate_images(pipeline, prompt, num_images=1):
    """Generate images using the fine-tuned model"""
    images = pipeline(
        prompt,
        num_inference_steps=50,
        guidance_scale=7.5,
        num_images_per_prompt=num_images
    ).images
    
    # Save images
    os.makedirs("generated_images", exist_ok=True)
    for i, image in enumerate(images):
        image.save(f"generated_images/generated_{i}.png")
    
    return images

In [10]:
def main():
    # Setup model
    pipeline = setup_model()
    
    # Prepare dataset
    dataset = prepare_dataset(pipeline)
    
    # Train model
    logger.info("Starting training...")
    pipeline = training_function(pipeline, dataset)
    
    # Generate test image
    logger.info("Generating test image...")
    test_prompt = f"professional photo of {config.instance_prompt}, high quality, detailed face"
    generate_images(pipeline, test_prompt)
    
    logger.info("Training complete!")

if __name__ == "__main__":
    main()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

Fetching 19 files: 100%|██████████| 19/19 [03:08<00:00,  9.94s/it]
Loading pipeline components...: 100%|██████████| 7/7 [00:02<00:00,  3.04it/s]


ModuleNotFoundError: Refer to https://github.com/facebookresearch/xformers for more information on how to install xformers