# CV Week: Итоговое задание по Consistency Distillation

В этом ноутбуке представлены решения заданий контеста по дистилляции многошаговой диффузионной модели в малошагового студента с использованием Consistency Distillation. Мы будем использовать модель Stable Diffusion 1.5 (SD1.5) для генерации изображений по текстовому описанию и применим различные техники для оптимизации процесса обучения и генерации.

## Содержание
1. Загрузка и настройка модели Stable Diffusion 1.5
2. Реализация шага DDIM
3. Consistency Training
4. Multi-boundary timesteps

## Задание №1: Загрузка модели Stable Diffusion 1.5

Для начала загрузим модель Stable Diffusion 1.5 и сгенерируем изображения за 50 шагов. Все компоненты модели будут загружены в формате FP16 для экономии памяти, и модель будет размещена на GPU.


In [None]:
# Установка необходимых библиотек
!pip install diffusers==0.30.3 peft==0.8.2 huggingface_hub==0.23.4

import torch
from diffusers import StableDiffusionPipeline, DDIMScheduler

# Загрузка модели Stable Diffusion 1.5
pipe = StableDiffusionPipeline.from_pretrained(
    "runwayml/stable-diffusion-v1-5",
    torch_dtype=torch.float16
).to("cuda")

# Проверяем, что все компоненты модели в FP16 и на cuda
assert pipe.unet.dtype == torch.float16 and pipe.unet.device.type == 'cuda'
assert pipe.vae.dtype == torch.float16 and pipe.vae.device.type == 'cuda'
assert pipe.text_encoder.dtype == torch.float16 and pipe.text_encoder.device.type == 'cuda'

# Заменяем дефолтный сэмплер на DDIM
pipe.scheduler = DDIMScheduler.from_config(pipe.scheduler.config, timestep_spacing="trailing")
pipe.scheduler.timesteps = pipe.scheduler.timesteps.cuda()
pipe.scheduler.alphas_cumprod = pipe.scheduler.alphas_cumprod.cuda()

# Отдельно извлекаем модель учителя, которую потом будем дистиллировать
teacher_unet = pipe.unet


Теперь сгенерируем изображения за 50 шагов и за 4 шага, чтобы увидеть разницу в качестве.


In [None]:
from tqdm.auto import tqdm
import matplotlib.pyplot as plt

# Вспомогательная функция для визуализации изображений
def visualize_images(images):
    assert len(images) == 4
    plt.figure(figsize=(12, 3))
    for i, image in enumerate(images):
        plt.subplot(1, 4, i+1)
        plt.imshow(image)
        plt.axis('off')
    plt.subplots_adjust(wspace=-0.01, hspace=-0.01)
    plt.show()

# Генерация изображений за 50 шагов
prompt = "A sad puppy with large eyes"
guidance_scale = 7.5
generator = torch.Generator('cuda').manual_seed(1)

images = pipe(
    prompt=prompt,
    num_inference_steps=50,
    generator=generator,
    guidance_scale=guidance_scale,
    num_images_per_prompt=4
).images

visualize_images(images)

# Генерация изображений за 4 шага
generator = torch.Generator('cuda').manual_seed(1)

images = pipe(
    prompt=prompt,
    num_inference_steps=4,
    generator=generator,
    guidance_scale=guidance_scale,
    num_images_per_prompt=4
).images

visualize_images(images)

Как видно, при уменьшении числа шагов до 4 изображение становится менее четким. Далее мы постараемся улучшить качество генерации при помощи Consistency Training.


## Задание №2: Реализация шага DDIM

Реализуем функцию `ddim_solver_step`, которая выполняет один шаг DDIM-солвера для перехода от шага \( t \) к шагу \( s \).

In [None]:
import torch

def extract_into_tensor(a, t, x_shape):
    b, *_ = t.shape
    out = a.gather(-1, t)
    return out.reshape(b, *((1,) * (len(x_shape) - 1)))


def ddim_solver_step(model_output, x_t, t, s, scheduler):
    """
    Реализация шага DDIM солвера для VP процесса зашумления и eps-prediction модели.

    Params:
        model_output: torch.Tensor[B, 4, 64, 64] - предсказание модели - шум ε
        x_t: torch.Tensor[B, 4, 64, 64] - сэмплы на шаге t
        t: torch.Tensor[B] - номер текущего шага
        s: torch.Tensor[B] - номер следующего шага
        scheduler: DDIMScheduler - расписание диффузионного процесса, чтобы получить alpha и sigma
    """
    alphas_cumprod = scheduler.alphas_cumprod.to(x_t.device)
    alphas = torch.sqrt(alphas_cumprod)  # α_t = sqrt(ᾱ_t)
    sigmas = torch.sqrt(1.0 - alphas_cumprod)  # σ_t = sqrt(1 - ᾱ_t)

    sigmas_s = extract_into_tensor(sigmas, s, x_t.shape)
    alphas_s = extract_into_tensor(alphas, s, x_t.shape)

    sigmas_t = extract_into_tensor(sigmas, t, x_t.shape)
    alphas_t = extract_into_tensor(alphas, t, x_t.shape)

    alphas_s[s == 0] = 1.0
    sigmas_s[s == 0] = 0.0

    alphas_t[t == 0] = 1.0
    sigmas_t[t == 0] = 0.0

    x_0 = (x_t - sigmas_t * model_output) / alphas_t

    x_s = alphas_s * x_0 + sigmas_s * model_output

    return x_s

Функция `ddim_solver_step` реализует один шаг перехода от шага \( t \) к шагу \( s \) с использованием предсказанного шума модели.

## Задание №3: Consistency Training

### Задание №3.1: Деривация аналитического перехода

Мы стремимся выразить \( \mathbf{x}_s \) через \( \mathbf{x}_t \) и \( \mathbf{x}_0 \) аналитически, используя формулы DDIM и оценку скор функции.

In [None]:
### Деривация

Начнем с формулы зашумления:
\[
\mathbf{x}_t = \alpha_t \mathbf{x}_0 + \sigma_t \epsilon_\theta
\]
где \( \epsilon_\theta \sim \mathcal{N}(0, I) \).

Выразим \( \epsilon_\theta \):
\[
\epsilon_\theta = \frac{\mathbf{x}_t - \alpha_t \mathbf{x}_0}{\sigma_t}
\]

Используя формулу DDIM для перехода:
\[
\mathbf{x}_s = \alpha_s \mathbf{x}_0 + \sigma_s \epsilon_\theta
\]

Подставляем выражение для \( \epsilon_\theta \):
\[
\mathbf{x}_s = \alpha_s \mathbf{x}_0 + \sigma_s \left( \frac{\mathbf{x}_t - \alpha_t \mathbf{x}_0}{\sigma_t} \right)
\]
\[
\mathbf{x}_s = \alpha_s \mathbf{x}_0 + \frac{\sigma_s}{\sigma_t} \mathbf{x}_t - \frac{\sigma_s \alpha_t}{\sigma_t} \mathbf{x}_0
\]
\[
\mathbf{x}_s = \left( \alpha_s - \frac{\sigma_s \alpha_t}{\sigma_t} \right) \mathbf{x}_0 + \frac{\sigma_s}{\sigma_t} \mathbf{x}_t
\]

Таким образом, функция `get_xs_from_xt_naive` вычисляет \( \mathbf{x}_s \) аналитически.


In [None]:
import torch

def extract_into_tensor(a, t, x_shape):
    b, *_ = t.shape
    out = a.gather(-1, t)
    return out.reshape(b, *((1,) * (len(x_shape) - 1)))

def get_xs_from_xt_naive(
    x_0, x_t, t, s,  # Не все эти аргументы могут быть вам нужны
    scheduler,
    noise=None,
    **kwargs
):
    """
    Получение точки x_s в CT режиме, т.е., аналитически.
    
    Params:
        x_0: torch.Tensor[B, C, H, W] - чистые данные x0
        x_t: torch.Tensor[B, C, H, W] - зашумленные данные на шаге t
        t: torch.Tensor[B] - номера текущих шагов
        s: torch.Tensor[B] - номера следующих шагов
        scheduler: DDIMScheduler - расписание диффузионного процесса
        noise: torch.Tensor[B, C, H, W] - шум ε (опционально)
        
    Returns:
        x_s: torch.Tensor[B, C, H, W] - данные на шаге s
    """
    # Извлекаем alphas_cumprod и вычисляем alpha и sigma
    alphas_cumprod = scheduler.alphas_cumprod.to(x_t.device)
    alphas = torch.sqrt(alphas_cumprod)  # α_t = sqrt(ᾱ_t)
    sigmas = torch.sqrt(1.0 - alphas_cumprod)  # σ_t = sqrt(1 - ᾱ_t)
    
    # Извлекаем α_s, σ_s и α_t, σ_t для соответствующих шагов
    sigmas_s = extract_into_tensor(sigmas, s, x_t.shape)
    alphas_s = extract_into_tensor(alphas, s, x_t.shape)
    
    sigmas_t = extract_into_tensor(sigmas, t, x_t.shape)
    alphas_t = extract_into_tensor(alphas, t, x_t.shape)
    
    # Устанавливаем граничные условия
    alphas_s[s == 0] = 1.0
    sigmas_s[s == 0] = 0.0
    
    alphas_t[t == 0] = 1.0
    sigmas_t[t == 0] = 0.0
    
    # Вычисляем ε_theta
    epsilon_theta = (x_t - alphas_t * x_0) / sigmas_t
    
    # Вычисляем x_s по формуле DDIM
    x_s = alphas_s * x_0 + sigmas_s * epsilon_theta
    
    return x_s

### Задание №3.2: Реализация функции расчета лосса

Теперь реализуем функцию `cm_loss_template`, которая рассчитывает лосс для консистенси моделей.


In [None]:
import torch
import functools

def cm_loss_template(
    latents, prompt_embeds,  # батч латентов и текстовых эмбедов
    unet, scheduler,

    # Функции, которые будем постепенно менять из задания к заданию
    loss_fn: callable,
    get_boundary_timesteps: callable,
    get_xs_from_xt: callable,

    num_timesteps=1000,
    step_size=20,  # Указываем с каким интервалом берем шаги s и t.
):
    # Сэмплируем случайные шаги t для каждого элемента батча t ~ U[step_size-1, 999]
    assert num_timesteps == 1000
    num_intervals = num_timesteps // step_size

    index = torch.randint(1, num_intervals, (len(latents),), device=latents.device).long()  # [1, num_intervals]
    t = step_size * index - 1
    s = torch.clamp(t - step_size, min=0)
    boundary_timesteps = get_boundary_timesteps(
        s, num_timesteps=num_timesteps
    )

    # Сэмплируем x_t
    noise = torch.randn_like(latents)
    x_t = q_sample(latents, t, scheduler, noise=noise)

    # Для реализации mixed-precision обучения
    with torch.cuda.amp.autocast():
        noise_pred = unet(x_t.float(), t, encoder_hidden_states=prompt_embeds.float()).sample

    # Получаем оценку в граничной точке для x_t
    boundary_pred = unet(x_t.float(), boundary_timesteps, encoder_hidden_states=prompt_embeds.float()).sample

    # Получаем сэмпл x_s из x_t
    x_s = get_xs_from_xt(
        latents, x_t, t, s,
        scheduler,
        prompt_embeds=prompt_embeds,
        noise=noise,
    )

    # Предсказание "таргет моделью"
    with torch.no_grad(), torch.cuda.amp.autocast():
        target_noise_pred = unet(x_s.float(), s, encoder_hidden_states=prompt_embeds.float()).sample

    # Получаем оценку в граничной точке для x_s
    boundary_target = unet(x_s.float(), boundary_timesteps, encoder_hidden_states=prompt_embeds.float()).sample

    loss = loss_fn(boundary_pred, boundary_target)
    return loss

def get_zero_boundary_timesteps(t, **kwargs):
    """
    Определяем шаги где будут срабатывать граничные условия.
    Для классических СM это t=0.
    """
    return torch.zeros_like(t)


Функция `cm_loss_template` рассчитывает лосс для консистенси моделей, учитывая граничные условия и предсказания модели на разных шагах.


## Задание №4: Multi-boundary timesteps

Теперь реализуем функцию `get_multi_boundary_timesteps`, которая определяет граничные точки для разделения траекторий на несколько отрезков.


In [None]:
import torch

def get_multi_boundary_timesteps(timesteps, num_boundaries=4, num_timesteps=1000):
    """
    Для батча таймстепов определяет соответствующие граничные точки.
    
    params:
        timesteps: torch.Tensor(batch_size, device='cuda')
        num_boundaries (int): Количество граничных точек (отрезков)
        num_timesteps (int): Общее количество таймстепов
    
    returns:
        boundary_timesteps: torch.Tensor(batch_size, device='cuda')
    """
    # Вычисляем шаг между границами
    step = num_timesteps // num_boundaries  # Например, 1000 // 4 = 250
    
    # Создаем тензор границ
    boundaries = torch.arange(0, num_timesteps, step, device=timesteps.device)
    # Для num_boundaries=4 и num_timesteps=1000 получим boundaries = [0, 250, 500, 750]
    
    # Используем torch.bucketize для нахождения индекса границы для каждого t
    # right=False означает, что граница включается в нижний интервал
    boundary_indices = torch.bucketize(timesteps, boundaries, right=False) - 1
    
    # Убеждаемся, что индексы не выходят за пределы
    boundary_indices = boundary_indices.clamp(min=0)
    
    # Получаем соответствующие границы
    boundary_timesteps = boundaries[boundary_indices]
    
    return boundary_timesteps


Функция `get_multi_boundary_timesteps` разделяет траекторию диффузионного процесса на несколько отрезков и определяет соответствующую граничную точку для каждого таймстепа в батче.
