# От Атлантики до Авиньона: диалог эпох

## Обучение Stable Diffusion на стилях Уинслоу Гомера и Пабло Пикассо

Этот notebook содержит код для обучения двух моделей Stable Diffusion:
1. **Модель Гомера** - для генерации изображений в стиле американского реализма XIX века
2. **Модель Пикассо** - для генерации изображений в кубистическом стиле

### Структура:
1. Установка зависимостей
2. Подготовка данных
3. Обучение модели для стиля Гомера
4. Обучение модели для стиля Пикассо
5. Генерация тестовых изображений
6. Создание финальной серии изображений


## 1. Установка зависимостей


In [None]:
# Установка необходимых библиотек
%pip install -q diffusers transformers accelerate datasets torch torchvision pillow


## 2. Подготовка данных

Загружаем и анализируем датасет с картинами Гомера и Пикассо


In [None]:
import pandas as pd
import os
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np

# Загружаем описание датасета
description_df = pd.read_csv("NGA_Dataset/description.csv")

print(f"Всего изображений: {len(description_df)}")
print("\nРаспределение по художникам:")
print(description_df['artist'].value_counts())

# Показываем несколько примеров
print("\nПримеры из датасета:")
print(description_df[['filename', 'artist', 'title', 'date']].head(10))


In [None]:
# Разделяем данные по художникам
homer_df = description_df[description_df['artist'].str.contains('Homer', case=False, na=False)]
picasso_df = description_df[description_df['artist'].str.contains('Picasso', case=False, na=False)]

print(f"Изображений Гомера: {len(homer_df)}")
print(f"Изображений Пикассо: {len(picasso_df)}")

# Создаём директории для раздельных датасетов
os.makedirs("dataset_homer", exist_ok=True)
os.makedirs("dataset_picasso", exist_ok=True)

# Копируем изображения Гомера
for filename in homer_df['filename']:
    src = os.path.join("NGA_Dataset", filename)
    dst = os.path.join("dataset_homer", filename)
    if os.path.exists(src) and not os.path.exists(dst):
        import shutil
        shutil.copy(src, dst)

# Копируем изображения Пикассо
for filename in picasso_df['filename']:
    src = os.path.join("NGA_Dataset", filename)
    dst = os.path.join("dataset_picasso", filename)
    if os.path.exists(src) and not os.path.exists(dst):
        import shutil
        shutil.copy(src, dst)

print(f"\nДатасеты готовы!")
print(f"dataset_homer/: {len(os.listdir('dataset_homer'))} файлов")
print(f"dataset_picasso/: {len(os.listdir('dataset_picasso'))} файлов")


In [None]:
# Визуализация примеров из каждого датасета
fig, axes = plt.subplots(2, 4, figsize=(16, 8))

# Примеры Гомера
homer_files = os.listdir('dataset_homer')[:4]
for idx, filename in enumerate(homer_files):
    img = Image.open(os.path.join('dataset_homer', filename))
    axes[0, idx].imshow(img)
    axes[0, idx].set_title(f'Homer {idx+1}', fontsize=10)
    axes[0, idx].axis('off')

# Примеры Пикассо
picasso_files = os.listdir('dataset_picasso')[:4]
for idx, filename in enumerate(picasso_files):
    img = Image.open(os.path.join('dataset_picasso', filename))
    axes[1, idx].imshow(img)
    axes[1, idx].set_title(f'Picasso {idx+1}', fontsize=10)
    axes[1, idx].axis('off')

plt.tight_layout()
plt.savefig('assets/dataset_examples.png', dpi=150, bbox_inches='tight')
plt.show()


## 3. Подготовка модели и датасета для обучения

Создаём класс датасета и загружаем базовую модель Stable Diffusion


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision import transforms
from diffusers import StableDiffusionPipeline, UNet2DConditionModel, DDPMScheduler
from transformers import CLIPTextModel, CLIPTokenizer
from diffusers.optimization import get_scheduler
from tqdm.auto import tqdm

# Класс датасета для изображений
class ArtistDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.image_paths = [os.path.join(image_dir, fname) 
                           for fname in os.listdir(image_dir) 
                           if fname.endswith(('.jpg', '.png', '.jpeg'))]
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return {"pixel_values": image}

print("Класс датасета создан!")


In [None]:
# Функция для обучения модели
def train_model(dataset_dir, output_dir, artist_name, 
                resolution=512, batch_size=1, learning_rate=1e-5, 
                num_epochs=5, max_train_steps=500):
    """
    Обучает Stable Diffusion на датасете художника
    
    Args:
        dataset_dir: директория с изображениями
        output_dir: директория для сохранения модели
        artist_name: имя художника (для логов)
        resolution: разрешение изображений для обучения
        batch_size: размер батча
        learning_rate: скорость обучения
        num_epochs: количество эпох
        max_train_steps: максимальное количество шагов обучения
    """
    
    print(f"="*60)
    print(f"Начало обучения модели для стиля {artist_name}")
    print(f"="*60)
    
    # Параметры
    model_id = "runwayml/stable-diffusion-v1-5"
    accumulation_steps = 4  # Градиентное аккумулирование
    
    # Преобразования для изображений
    transform = transforms.Compose([
        transforms.Resize((resolution, resolution)),
        transforms.ToTensor(),
        transforms.Normalize([0.5], [0.5]),
    ])
    
    # Загрузка данных
    dataset = ArtistDataset(dataset_dir, transform=transform)
    print(f"Размер датасета: {len(dataset)} изображений")
    
    dataloader = DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=True,
        pin_memory=True,
        num_workers=2
    )
    
    # Загрузка модели
    print("Загрузка модели Stable Diffusion...")
    tokenizer = CLIPTokenizer.from_pretrained(model_id, subfolder="tokenizer")
    text_encoder = CLIPTextModel.from_pretrained(model_id, subfolder="text_encoder")
    unet = UNet2DConditionModel.from_pretrained(model_id, subfolder="unet")
    vae = StableDiffusionPipeline.from_pretrained(model_id).vae
    noise_scheduler = DDPMScheduler.from_pretrained(model_id, subfolder="scheduler")
    
    # Перемещение модели на устройство
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Используется устройство: {device}")
    
    unet.to(device)
    vae.to(device)
    text_encoder.to(device)
    
    # Оптимизатор
    optimizer = torch.optim.AdamW(unet.parameters(), lr=learning_rate)
    
    # Планировщик обучения
    lr_scheduler = get_scheduler(
        "linear",
        optimizer=optimizer,
        num_warmup_steps=50,
        num_training_steps=max_train_steps,
    )
    
    # Обучение
    progress_bar = tqdm(range(max_train_steps))
    unet.train()
    global_step = 0
    
    for epoch in range(num_epochs):
        print(f"\nЭпоха {epoch + 1}/{num_epochs}")
        
        for i, batch in enumerate(dataloader):
            if global_step >= max_train_steps:
                break
                
            # Перемещение данных на устройство
            pixel_values = batch["pixel_values"].to(device)
            latents = vae.encode(pixel_values).latent_dist.sample()
            latents = latents * 0.18215
            
            # Генерация случайного шума
            noise = torch.randn_like(latents)
            bsz = latents.shape[0]
            
            # Выбор случайных временных шагов
            timesteps = torch.randint(0, 1000, (bsz,), device=device).long()
            
            # Добавление шума к латентам
            noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps)
            
            # Создание encoder_hidden_states
            dummy_prompt = [""] * bsz
            input_ids = tokenizer(
                dummy_prompt,
                padding="max_length",
                truncation=True,
                max_length=tokenizer.model_max_length,
                return_tensors="pt",
            ).input_ids.to(device)
            encoder_hidden_states = text_encoder(input_ids)[0]
            
            # Прогнозирование шума
            noise_pred = unet(noisy_latents, timesteps, encoder_hidden_states).sample
            
            # Вычисление потерь
            loss = torch.nn.functional.mse_loss(noise_pred, noise)
            loss.backward()
            
            # Градиентное аккумулирование
            if (i + 1) % accumulation_steps == 0:
                optimizer.step()
                lr_scheduler.step()
                optimizer.zero_grad()
                global_step += 1
            
            # Очистка памяти
            del pixel_values, latents, noise, noisy_latents, noise_pred, encoder_hidden_states
            if device == "cuda":
                torch.cuda.empty_cache()
            
            # Обновление прогресс-бара
            progress_bar.update(1)
            logs = {"loss": loss.detach().item(), "lr": lr_scheduler.get_last_lr()[0]}
            progress_bar.set_postfix(**logs)
        
        # Сохранение модели после каждой эпохи
        epoch_output_dir = f"{output_dir}-epoch-{epoch}"
        unet.save_pretrained(epoch_output_dir)
        print(f"Модель сохранена в {epoch_output_dir}")
    
    # Финальное сохранение модели
    print(f"\nСоздание финальной модели...")
    unet.save_pretrained(output_dir)
    
    # Создание полного pipeline
    pipeline = StableDiffusionPipeline(
        vae=vae,
        text_encoder=text_encoder,
        tokenizer=tokenizer,
        unet=unet,
        scheduler=noise_scheduler,
        safety_checker=None,
        feature_extractor=None
    )
    
    pipeline.save_pretrained(output_dir)
    print(f"Финальная модель сохранена в {output_dir}")
    
    return pipeline

print("Функция обучения готова!")
