
# HW6 Diffusion Model

**Sources:**
- Github implementation [Denoising Diffusion Pytorch](https://github.com/lucidrains/denoising-diffusion-pytorch)
- Papers on Diffusion models ([Dhariwal, Nichol, 2021], [Ho et al., 2020] ect.)


## Import Packages and Set Seeds

![图片.png](attachment:8ab13388-816e-4dd9-8383-141dedf69b24.png)

In [1]:
!pip install einops
!pip install transformers
!pip install ema_pytorch
!pip install accelerate

Collecting einops
  Downloading einops-0.6.1-py3-none-any.whl (42 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.2/42.2 kB[0m [31m639.4 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: einops
Successfully installed einops-0.6.1
[0mCollecting ema_pytorch
  Downloading ema_pytorch-0.6.0-py3-none-any.whl (8.8 kB)
  Downloading ema_pytorch-0.5.3-py3-none-any.whl (8.8 kB)
Installing collected packages: ema_pytorch
Successfully installed ema_pytorch-0.5.3
[0m

In [2]:
import math
import copy
from pathlib import Path
from random import random
from functools import partial
from collections import namedtuple
from multiprocessing import cpu_count

import torch
from torch import nn, einsum
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from torch.optim import Adam

import torchvision
from torchvision import transforms as T, utils

from einops import rearrange, reduce, repeat
from einops.layers.torch import Rearrange

from PIL import Image
from tqdm.auto import tqdm
from ema_pytorch import EMA

from accelerate import Accelerator
import matplotlib.pyplot as plt
import os

# Gradescope – Question 1
import matplotlib.pyplot as plt
import torchvision.utils as vutils

torch.backends.cudnn.benchmark = True
torch.manual_seed(4096)

if torch.cuda.is_available():
  torch.cuda.manual_seed(4096)

## Step 1: Forward process (Noise scheduler)




In [3]:
def linear_beta_schedule(timesteps):
    """
    linear schedule 线性时间表, proposed in original ddpm paper
    DDPM: Canonical diffusion model 典型扩散模型
    """
    scale = 1000 / timesteps
    beta_start = scale * 0.0001
    beta_end = scale * 0.02
    #  在 beta_start 和 beta_end 之间生成一个包含 timesteps 个元素的线性时间表
    return torch.linspace(beta_start, beta_end, timesteps, dtype = torch.float64)

def cosine_beta_schedule(timesteps, s=0.008): # Strong
    """
    cosine schedule as proposed in https://openreview.net/forum?id=-NEXDKk8gZ
    生成扩散模型（如扩散概率模型）中的 beta 值序列，用于扩散模型中的噪声调度
    timesteps：时间步数，即扩散过程中的步数
    s：一个小的偏移量，默认值为 0.008，用于调整余弦函数的形状。
    """
    steps = timesteps + 1
    x = torch.linspace(0, timesteps, steps) # 生成时间步序列
    alphas_cumprod = torch.cos((x / timesteps + s) / (1 + s) * math.pi * 0.5) ** 2   # 使用余弦函数计算积累alpha值
    alphas_cumprod = alphas_cumprod / alphas_cumprod[0]  # 归一化
    betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1]) # 根据累积 alpha 值计算得到的，表示相邻两个时间步的 alpha 值的比率
    return torch.clip(betas, 0.0001, 0.9999) # 将 betas 限制在 [0.0001, 0.9999] 的范围内

def sigmoid_beta_schedule(timesteps, start = -3, end = 3, tau = 1, clamp_min = 1e-5):
    """
    sigmoid schedule
    proposed in https://arxiv.org/abs/2212.11972 - Figure 8
    better for images > 64x64, when used during training
    """
    steps = timesteps + 1
    t = torch.linspace(0, timesteps, steps, dtype = torch.float64) / timesteps
    v_start = torch.tensor(start / tau).sigmoid()
    v_end = torch.tensor(end / tau).sigmoid()
    alphas_cumprod = (-((t * (end - start) + start) / tau).sigmoid() + v_end) / (v_end - v_start)
    alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
    betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
    return torch.clip(betas, 0, 0.999)

def extract(a, t, x_shape):
    '''
    在扩散模型（如 DDPM）中，动态地从一组调度参数（如噪声衰减系数或其他时间相关的参数）中，
    根据当前时间步 t 提取相应的值，并将其转换为适合进一步计算的形状。
    a: 包含包含调度参数（如噪声系数）的张量，形状为(time steps,)
    t: 时间步索引张量，表示要提取的时间步,(batch size,1)
    x_shape: 提取出的张量最终要匹配的形状
    '''
    b, *_ = t.shape   # get batch size "b"
    out = a.gather(-1, t)  # 从a的最后一个维度（列）上提取t索引对应位置的元素
    return out.reshape(b, *((1,) * (len(x_shape) - 1)))  # 调整为 (b, 1, 1, ..., 1) 这样的形状，其中 1 的个数等于 x_shape 的维度减去 1

### Create dataset

## Step 2: The backward process = U-Net



In [4]:
class CustomDataset(Dataset):
    def __init__(
        self,
        folder,
        image_size
    ):
        self.folder = folder
        self.image_size = image_size
        self.paths = [p for p in Path(f'{folder}').glob(f'**/*.jpg')]
        #################################
        ## TODO: Data Augmentation ##
        #################################
        # Medium
        self.transform = T.Compose([
            T.Resize(image_size),
            T.RandomHorizontalFlip(),  # Randomly flip the image horizontally
            T.RandomRotation(10),      # Random rotation
            T.ColorJitter(brightness=0.25, contrast=0.25),  # Slight color adjustments
            T.ToTensor()
        ])

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, index):
        path = self.paths[index]
        img = Image.open(path)
        return self.transform(img)  # 返回图像transform后的张量

Define some useful functions and U-Net，U-Net通常用于图像分割等任务，其目标是预测图像中每个像素的标签(或类别)

In [5]:
def num_to_groups(num, divisor):
    groups = num // divisor
    remainder = num % divisor
    arr = [divisor] * groups
    if remainder > 0:
        arr.append(remainder)
    return arr
arr=num_to_groups(10,2)
arr

[2, 2, 2, 2, 2]

In [6]:
# Helper Functions and Modules
def exists(x):
    return x is not None

def default(val, d):
    if exists(val):
        return val
    return d() if callable(d) else d

# 返回其输入不变的函数，通常用作Python中的默认占位符。
def identity(t, *args, **kwargs):
    return t

# 在给定数据集(通常是数据加载器)上创建无限循环，确保在训练期间数据持续可用。
def cycle(dl):
    while True:
        for data in dl:
            yield data

# 检查一个数字是否有整数平方根。
def has_int_squareroot(num):
    return (math.sqrt(num) ** 2) == num

# 将一个数按给定除数分成若干组，并将余数作为最后一组。
def num_to_groups(num, divisor):
    groups = num // divisor
    remainder = num % divisor
    arr = [divisor] * groups
    if remainder > 0:
        arr.append(remainder)
    return arr

# normalization functions

# Normalizes image values to the range [-1, 1].
def normalize_to_neg_one_to_one(img):
    return img * 2 - 1

#  Converts values from the range [-1, 1] back to [0, 1].
def unnormalize_to_zero_to_one(t):
    return (t + 1) * 0.5

# small helper modules
# residual connection,where the output of a function fn is added to its input
class Residual(nn.Module):
    def __init__(self, fn):
        super().__init__()
        self.fn = fn

    def forward(self, x, *args, **kwargs):
        return self.fn(x, *args, **kwargs) + x

# 使用最近邻插值将输入图像上采样2倍，然后使用卷积层来调整维数。
def Upsample(dim, dim_out = None):
    return nn.Sequential(
        nn.Upsample(scale_factor = 2, mode = 'nearest'),
        nn.Conv2d(dim, default(dim_out, dim), 3, padding = 1)
    )

# 通过重新排列像素然后应用卷积对输入进行下采样的模块，有效地将空间维度降低2倍。
def Downsample(dim, dim_out = None):
    return nn.Sequential(
        Rearrange('b c (h p1) (w p2) -> b (c p1 p2) h w', p1 = 2, p2 = 2),
        nn.Conv2d(dim * 4, default(dim_out, dim), 1)
    )

# 在卷积层上实现权值标准化，在训练过程中对权值进行归一化，据称与组归一化相结合可以提高训练的稳定性和性能。
class WeightStandardizedConv2d(nn.Conv2d):
    """
    https://arxiv.org/abs/1903.10520
    weight standardization purportedly works synergistically with group normalization
    权重标准化据称与群体标准化协同工作
    """
    def forward(self, x):
        eps = 1e-5 if x.dtype == torch.float32 else 1e-3

        weight = self.weight
        mean = reduce(weight, 'o ... -> o 1 1 1', 'mean')
        var = reduce(weight, 'o ... -> o 1 1 1', partial(torch.var, unbiased = False))
        normalized_weight = (weight - mean) * (var + eps).rsqrt()

        return F.conv2d(x, normalized_weight, self.bias, self.stride, self.padding, self.dilation, self.groups)

# Implements layer normalization for 2D inputs, normalizing across the channel dimension.
class LayerNorm(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.g = nn.Parameter(torch.ones(1, dim, 1, 1))

    def forward(self, x):
        eps = 1e-5 if x.dtype == torch.float32 else 1e-3
        var = torch.var(x, dim = 1, unbiased = False, keepdim = True)
        mean = torch.mean(x, dim = 1, keepdim = True)
        return (x - mean) * (var + eps).rsqrt() * self.g

# 在将输入传递给另一个函数之前应用层规范化，这是Transformer模型中的一种常用技术。
class PreNorm(nn.Module):
    def __init__(self, dim, fn):
        super().__init__()
        self.fn = fn
        self.norm = LayerNorm(dim)

    def forward(self, x):
        x = self.norm(x)
        return self.fn(x)

# sinusoidal positional embeds

# 生成正弦位置嵌入，通常用于处理具有空间信息的序列或图像的模型，
class SinusoidalPosEmb(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim

    def forward(self, x):
        device = x.device
        half_dim = self.dim // 2
        emb = math.log(10000) / (half_dim - 1)
        emb = torch.exp(torch.arange(half_dim, device=device) * -emb)
        emb = x[:, None] * emb[None, :]
        emb = torch.cat((emb.sin(), emb.cos()), dim=-1)
        return emb

# 类似于SinusoidalPosEmb，但可以选择使用学习的权重来嵌入，而不是固定的正弦值
class RandomOrLearnedSinusoidalPosEmb(nn.Module):
    """ following @crowsonkb 's lead with random (learned optional) sinusoidal pos emb """
    """ https://github.com/crowsonkb/v-diffusion-jax/blob/master/diffusion/models/danbooru_128.py#L8 """

    def __init__(self, dim, is_random = False):
        super().__init__()
        assert (dim % 2) == 0
        half_dim = dim // 2
        self.weights = nn.Parameter(torch.randn(half_dim), requires_grad = not is_random)

    def forward(self, x):
        x = rearrange(x, 'b -> b 1')
        freqs = x * rearrange(self.weights, 'd -> 1 d') * 2 * math.pi
        fouriered = torch.cat((freqs.sin(), freqs.cos()), dim = -1)
        fouriered = torch.cat((x, fouriered), dim = -1)
        return fouriered

# building block modules
# A basic building block consisting of a weight-standardized convolution, group normalization, and activation function (SiLU, also known as Swish).
class Block(nn.Module):
    def __init__(self, dim, dim_out, groups = 8):
        super().__init__()
        self.proj = WeightStandardizedConv2d(dim, dim_out, 3, padding = 1)
        self.norm = nn.GroupNorm(groups, dim_out)
        self.act = nn.SiLU()

    def forward(self, x, scale_shift = None):
        x = self.proj(x)
        x = self.norm(x)

        if exists(scale_shift):
            scale, shift = scale_shift
            x = x * (scale + 1) + shift

        x = self.act(x)
        return x

# 包含时间嵌入的残差块，它被添加到输入特征映射中以提供额外的上下文
class ResnetBlock(nn.Module):
    def __init__(self, dim, dim_out, *, time_emb_dim = None, groups = 8):
        super().__init__()
        self.mlp = nn.Sequential(
            nn.SiLU(),
            nn.Linear(time_emb_dim, dim_out * 2)
        ) if exists(time_emb_dim) else None

        self.block1 = Block(dim, dim_out, groups = groups)
        self.block2 = Block(dim_out, dim_out, groups = groups)
        self.res_conv = nn.Conv2d(dim, dim_out, 1) if dim != dim_out else nn.Identity()

    def forward(self, x, time_emb = None):

        scale_shift = None
        if exists(self.mlp) and exists(time_emb):
            time_emb = self.mlp(time_emb)
            time_emb = rearrange(time_emb, 'b c -> b c 1 1')
            scale_shift = time_emb.chunk(2, dim = 1)

        h = self.block1(x, scale_shift = scale_shift)

        h = self.block2(h)

        return h + self.res_conv(x)

# 在处理大输入的时候比传统的注意力机制更有效
# 传统的注意力机制计算复杂度为 O(N^2)（N 是输入长度），而这个线性注意力机制的复杂度降低为 O(N)
class LinearAttention(nn.Module):
    def __init__(self, dim, heads = 4, dim_head = 32):
        super().__init__()
        self.scale = dim_head ** -0.5
        self.heads = heads
        hidden_dim = dim_head * heads # 计算得到的隐藏维度，即每个头的维度乘以头的数量。
        self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, bias = False) # dim 是输入的通道数，hidden_dim * 3 是输出的通道数（Q、K、V 各占三分之一）。

        self.to_out = nn.Sequential(
            nn.Conv2d(hidden_dim, dim, 1),
            LayerNorm(dim)
        )

    def forward(self, x):
        b, c, h, w = x.shape  # (batch_size, channels, height, width)
        qkv = self.to_qkv(x).chunk(3, dim = 1) # 通过卷积提取的张量 qkv 按通道分割为三个部分，分别是查询（Q）、键（K）和值（V）。
        q, k, v = map(lambda t: rearrange(t, 'b (h c) x y -> b h c (x y)', h = self.heads), qkv) # 将每个头的维度与空间维度分离。,将 Q、K、V 的形状转换为 (batch_size, heads, channels_per_head, height * width)

        q = q.softmax(dim = -2)  # 归一化通道之间的查询向量的权重。
        k = k.softmax(dim = -1)  # 归一化空间位置上的键向量的权重。

        q = q * self.scale
        v = v / (h * w)  # 将 V 向量按空间维度归一化，以防止数值过大

        context = torch.einsum('b h d n, b h e n -> b h d e', k, v) #  K 和 V 矩阵相乘，结果是一个形状为 (batch_size, heads, channels_per_head, channels_per_head) 的上下文矩阵

        out = torch.einsum('b h d e, b h d n -> b h e n', context, q) # 上下文矩阵与 Q 矩阵相乘，得到输出张量 out，其形状为 (batch_size, heads, channels_per_head, height * width)
        out = rearrange(out, 'b h c (x y) -> b (h c) x y', h = self.heads, x = h, y = w) # 将输出张量重新排列为与输入相同的形状 (batch_size, channels, height, width)。
        return self.to_out(out)

# 标准注意机制，用于衡量输入特征图中不同部分的重要性。
class Attention(nn.Module):
    def __init__(self, dim, heads = 4, dim_head = 32):
        super().__init__()
        self.scale = dim_head ** -0.5
        self.heads = heads
        hidden_dim = dim_head * heads

        self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, bias = False)
        self.to_out = nn.Conv2d(hidden_dim, dim, 1)

    def forward(self, x):
        b, c, h, w = x.shape
        qkv = self.to_qkv(x).chunk(3, dim = 1)
        q, k, v = map(lambda t: rearrange(t, 'b (h c) x y -> b h c (x y)', h = self.heads), qkv)

        q = q * self.scale

        sim = torch.einsum('b h d i, b h d j -> b h i j', q, k)
        attn = sim.softmax(dim = -1)
        out = torch.einsum('b h i j, b h d j -> b h i d', attn, v)

        out = rearrange(out, 'b h (x y) d -> b (h d) x y', x = h, y = w)
        return self.to_out(out)

# model

class Unet(nn.Module):
    def __init__(
        self,
        dim,
        init_dim = None,
        out_dim = None,
        dim_mults=(1, 2, 4, 8),
        channels = 3,
        resnet_block_groups = 8,
        learned_sinusoidal_cond = False,
        random_fourier_features = False,
        learned_sinusoidal_dim = 16
    ):
        super().__init__()

        # determine dimensions

        self.channels = channels

        init_dim = default(init_dim, dim)
        self.init_conv = nn.Conv2d(channels, init_dim, 7, padding = 3)

        dims = [init_dim, *map(lambda m: dim * m, dim_mults)]
        in_out = list(zip(dims[:-1], dims[1:]))

        block_klass = partial(ResnetBlock, groups = resnet_block_groups)

        # time embeddings

        time_dim = dim * 4

        self.random_or_learned_sinusoidal_cond = learned_sinusoidal_cond or random_fourier_features

        if self.random_or_learned_sinusoidal_cond:
            sinu_pos_emb = RandomOrLearnedSinusoidalPosEmb(learned_sinusoidal_dim, random_fourier_features)
            fourier_dim = learned_sinusoidal_dim + 1
        else:
            sinu_pos_emb = SinusoidalPosEmb(dim)
            fourier_dim = dim

        self.time_mlp = nn.Sequential(
            sinu_pos_emb,
            nn.Linear(fourier_dim, time_dim),
            nn.GELU(),
            nn.Linear(time_dim, time_dim)
        )

        # layers

        self.downs = nn.ModuleList([])
        self.ups = nn.ModuleList([])
        num_resolutions = len(in_out)

        for ind, (dim_in, dim_out) in enumerate(in_out):
            is_last = ind >= (num_resolutions - 1)

            self.downs.append(nn.ModuleList([
                block_klass(dim_in, dim_in, time_emb_dim = time_dim),
                block_klass(dim_in, dim_in, time_emb_dim = time_dim),
                Residual(PreNorm(dim_in, LinearAttention(dim_in))),
                Downsample(dim_in, dim_out) if not is_last else nn.Conv2d(dim_in, dim_out, 3, padding = 1)
            ]))

        mid_dim = dims[-1]
        self.mid_block1 = block_klass(mid_dim, mid_dim, time_emb_dim = time_dim)
        self.mid_attn = Residual(PreNorm(mid_dim, Attention(mid_dim)))
        self.mid_block2 = block_klass(mid_dim, mid_dim, time_emb_dim = time_dim)

        for ind, (dim_in, dim_out) in enumerate(reversed(in_out)):
            is_last = ind == (len(in_out) - 1)

            self.ups.append(nn.ModuleList([
                block_klass(dim_out + dim_in, dim_out, time_emb_dim = time_dim),
                block_klass(dim_out + dim_in, dim_out, time_emb_dim = time_dim),
                Residual(PreNorm(dim_out, LinearAttention(dim_out))),
                Upsample(dim_out, dim_in) if not is_last else  nn.Conv2d(dim_out, dim_in, 3, padding = 1)
            ]))

        self.out_dim = default(out_dim, channels)

        self.final_res_block = block_klass(dim * 2, dim, time_emb_dim = time_dim)
        self.final_conv = nn.Conv2d(dim, self.out_dim, 1)

    def forward(self, x, time):
        x = self.init_conv(x)
        r = x.clone()

        t = self.time_mlp(time)

        h = []

        for block1, block2, attn, downsample in self.downs:
            x = block1(x, t)
            h.append(x)

            x = block2(x, t)
            x = attn(x)
            h.append(x)

            x = downsample(x)

        x = self.mid_block1(x, t)
        x = self.mid_attn(x)
        x = self.mid_block2(x, t)

        for block1, block2, attn, upsample in self.ups:
            x = torch.cat((x, h.pop()), dim = 1)
            x = block1(x, t)

            x = torch.cat((x, h.pop()), dim = 1)
            x = block2(x, t)
            x = attn(x)

            x = upsample(x)

        x = torch.cat((x, r), dim = 1)

        x = self.final_res_block(x, t)
        return self.final_conv(x)
model = Unet(64)

## Step 3: The Diffusion Process



Define diffusion process, including generating noisy models, sample...     

高斯扩散模型（Gaussian Diffusion Model）是一种生成模型，主要用于生成高质量的图像。与传统的生成对抗网络（GANs）不同，扩散模型通过模拟一个逐步增加噪声的过程来学习如何从纯噪声中逐步生成清晰的图像。模型的基本思想是从干净的图像开始逐步加入噪声，直到最终变为完全的噪声图像。然后，模型学习如何反向操作，从噪声图像逐步去除噪声，最终生成一个清晰的图像。      
**模型的工作原理**

- 前向过程（添加噪声）: 从原始图像开始，模型在每一个时间步长中加入少量的高斯噪声，最终图像逐渐变得模糊、无意义。

- 反向过程（去除噪声）: 训练模型学习如何从噪声图像一步步去除噪声，直到还原出最初的图像。反向过程依赖于模型对噪声的预测能力。

- 训练: 模型通过预测在某一时间步长添加的噪声与实际添加的噪声之间的差异来优化，通常使用均方误差（MSE）作为损失函数。

In [7]:
# Gaussian Diffusion Model
class GaussianDiffusion(nn.Module):  #strong
    def __init__(
        self,
        model,
        *,
        image_size,
        timesteps = 1000,
        beta_schedule = 'linear',
        auto_normalize = True
    ):
        super().__init__()
        assert not (type(self) == GaussianDiffusion and model.channels != model.out_dim)
        assert not model.random_or_learned_sinusoidal_cond

        self.model = model

        self.channels = self.model.channels

        self.image_size = image_size


        if beta_schedule == 'linear':
            beta_schedule_fn = linear_beta_schedule
        elif beta_schedule == 'cosine':
            beta_schedule_fn = cosine_beta_schedule
        elif beta_schedule == 'sigmoid':
            beta_schedule_fn = sigmoid_beta_schedule
        else:
            raise ValueError(f'unknown beta schedule {beta_schedule}')
        
        # calculate beta and other precalculated parameters
        betas = beta_schedule_fn(timesteps)
                                            
        alphas = 1. - betas
        alphas_cumprod = torch.cumprod(alphas, dim=0)
        alphas_cumprod_prev = F.pad(alphas_cumprod[:-1], (1, 0), value = 1.)

        timesteps, = betas.shape
        self.num_timesteps = int(timesteps)

        # sampling related parameters

        self.sampling_timesteps = timesteps # default num sampling timesteps to number of timesteps at training

        # helper function to register buffer from float64 to float32

        register_buffer = lambda name, val: self.register_buffer(name, val.to(torch.float32))

        register_buffer('betas', betas)
        register_buffer('alphas_cumprod', alphas_cumprod)
        register_buffer('alphas_cumprod_prev', alphas_cumprod_prev)

        # calculations for diffusion q(x_t | x_{t-1}) and others

        register_buffer('sqrt_alphas_cumprod', torch.sqrt(alphas_cumprod))
        register_buffer('sqrt_one_minus_alphas_cumprod', torch.sqrt(1. - alphas_cumprod))
        register_buffer('log_one_minus_alphas_cumprod', torch.log(1. - alphas_cumprod))
        register_buffer('sqrt_recip_alphas_cumprod', torch.sqrt(1. / alphas_cumprod))
        register_buffer('sqrt_recipm1_alphas_cumprod', torch.sqrt(1. / alphas_cumprod - 1))

        # calculations for posterior q(x_{t-1} | x_t, x_0)

        posterior_variance = betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod)

        # above: equal to 1. / (1. / (1. - alpha_cumprod_tm1) + alpha_t / beta_t)

        register_buffer('posterior_variance', posterior_variance)

        # below: log calculation clipped because the posterior variance is 0 at the beginning of the diffusion chain

        register_buffer('posterior_log_variance_clipped', torch.log(posterior_variance.clamp(min =1e-20)))
        register_buffer('posterior_mean_coef1', betas * torch.sqrt(alphas_cumprod_prev) / (1. - alphas_cumprod))
        register_buffer('posterior_mean_coef2', (1. - alphas_cumprod_prev) * torch.sqrt(alphas) / (1. - alphas_cumprod))

        # derive loss weight
        # snr - signal noise ratio

        snr = alphas_cumprod / (1 - alphas_cumprod)

        # https://arxiv.org/abs/2303.09556

        maybe_clipped_snr = snr.clone()

        register_buffer('loss_weight', maybe_clipped_snr / snr)

        # auto-normalization of data [0, 1] -> [-1, 1] - can turn off by setting it to be False

        self.normalize = normalize_to_neg_one_to_one if auto_normalize else identity
        self.unnormalize = unnormalize_to_zero_to_one if auto_normalize else identity

    #根据噪声预测生成初始图像 x_start。该方法通过逆操作，将当前时刻 x_t 和噪声还原为生成的初始图像。
    def predict_start_from_noise(self, x_t, t, noise):
        return (
            extract(self.sqrt_recip_alphas_cumprod, t, x_t.shape) * x_t -
            extract(self.sqrt_recipm1_alphas_cumprod, t, x_t.shape) * noise
        )

    # 通过 x_start 和 x_t 预测噪声。这个方法是上一个方法的逆操作，通过已知的图像和初始状态，来预测噪声。
    def predict_noise_from_start(self, x_t, t, x0):
        return (
            (extract(self.sqrt_recip_alphas_cumprod, t, x_t.shape) * x_t - x0) / \
            extract(self.sqrt_recipm1_alphas_cumprod, t, x_t.shape)
        )


    # 计算后验分布 q(x_{t-1} | x_t, x_0)，用于从当前时刻的图像估计前一个时刻的图像。
    def q_posterior(self, x_start, x_t, t):
        posterior_mean = (
            extract(self.posterior_mean_coef1, t, x_t.shape) * x_start +
            extract(self.posterior_mean_coef2, t, x_t.shape) * x_t
        )
        posterior_variance = extract(self.posterior_variance, t, x_t.shape)
        posterior_log_variance_clipped = extract(self.posterior_log_variance_clipped, t, x_t.shape)
        return posterior_mean, posterior_variance, posterior_log_variance_clipped

    #根据模型的输出预测噪声和生成的初始图像 x_start。如果设置了 clip_x_start 为 True，会对 x_start 进行剪辑，确保其在 [-1, 1] 范围内。
    def model_predictions(self, x, t, clip_x_start = False, rederive_pred_noise = False):
        model_output = self.model(x, t)
        maybe_clip = partial(torch.clamp, min = -1., max = 1.) if clip_x_start else identity

        pred_noise = model_output
        x_start = self.predict_start_from_noise(x, t, pred_noise)
        x_start = maybe_clip(x_start)

        if clip_x_start and rederive_pred_noise:
            pred_noise = self.predict_noise_from_start(x, t, x_start)

        return pred_noise, x_start

    # 计算均值和方差用于采样。这个方法主要用于去噪过程，从当前时刻的图像预测前一个时刻的图像。
    def p_mean_variance(self, x, t, clip_denoised = True):
        noise, x_start = self.model_predictions(x, t)

        if clip_denoised:
            x_start.clamp_(-1., 1.)

        model_mean, posterior_variance, posterior_log_variance = self.q_posterior(x_start = x_start, x_t = x, t = t)
        return model_mean, posterior_variance, posterior_log_variance, x_start

    @torch.no_grad()
    # 在特定时间步长 t 对图像 x 进行采样，生成新的图像
    def p_sample(self, x, t: int):
        b, *_, device = *x.shape, x.device
        batched_times = torch.full((b,), t, device = x.device, dtype = torch.long)
        model_mean, _, model_log_variance, x_start = self.p_mean_variance(x = x, t = batched_times, clip_denoised = True)
        noise = torch.randn_like(x) if t > 0 else 0. # no noise if t == 0
        pred_img = model_mean + (0.5 * model_log_variance).exp() * noise
        return pred_img, x_start

    # Gradescope – Question 1
    @torch.no_grad()
    # 整个扩散过程中的采样过程，从最后一个时间步开始，一步一步地反向进行直到得到生成图像。
    def p_sample_loop(self, shape, return_all_timesteps = False, num_samples=5, save_path='./Q1_progressive_generation.png'):
        # shape:(batch_size, channels, image_size, image_size) 可以理解为(batch_size, channels, height, width)
        batch, device = shape[0], self.betas.device

        img = torch.randn(shape, device = device) #生成随机噪声，作为初始图像
        imgs = [img]
        samples = [img[:num_samples]]  # Store initial noisy samples, (num_samples, channels, height, width)，samples列表的初始长度为1

        x_start = None
        
        ###########################################
        ## TODO: plot the sampling process ##
        ###########################################
        
        # 使用 tqdm 显示进度条，从 self.num_timesteps 开始倒序循环。
        for t in tqdm(reversed(range(0, self.num_timesteps)), desc = 'sampling loop time step', total = self.num_timesteps):
            img, x_start = self.p_sample(img, t)
            imgs.append(img)
            if t % (self.num_timesteps // 20) == 0: # t是num_timesteps // 20的整数倍，比如num_timesteps=100，那t为0,5,..,95时为真,共有20个，初始有一个sample所以samples列表有21个元素
                samples.append(img[:num_samples])  # Store samples at specific steps 每隔一定步数（self.num_timesteps // 20），将样本添加到 samples 列表中
        
        ret = img if not return_all_timesteps else torch.stack(imgs, dim = 1) 
        """False返回最终时间步的图像(batch_size, channels, height, width)
        True将所有时间步堆叠起来形成新张量(batch_size, num_timesteps, channels, height, width)，在第一个维度插入新维度
        """

        ret = self.unnormalize(ret)
        self.plot_progressive_generation(samples, len(samples)-1, save_path=save_path) # len(samples)-1=20
        return ret

    @torch.no_grad()
    # 提供了一个接口，直接进行批量采样，从而生成图像。它调用了 p_sample_loop 方法完成扩散模型的逆过程，生成最终的图像。
    def sample(self, batch_size = 16, return_all_timesteps = False):
        image_size, channels = self.image_size, self.channels
        sample_fn = self.p_sample_loop
        return sample_fn((batch_size, channels, image_size, image_size), return_all_timesteps = return_all_timesteps)


    # 从初始图像 x_start 和给定的噪声 noise 中生成扩散过程中的图像 x_t。这个方法是生成过程的核心，模拟了扩散过程中的每一步。
    def q_sample(self, x_start, t, noise=None):
        noise = default(noise, lambda: torch.randn_like(x_start))

        return (
            extract(self.sqrt_alphas_cumprod, t, x_start.shape) * x_start +
            extract(self.sqrt_one_minus_alphas_cumprod, t, x_start.shape) * noise
        )

    @property
    def loss_fn(self):
        return F.mse_loss


    # 计算模型的损失值。模型根据当前时刻的图像预测噪声，然后将其与真实的噪声进行比较，计算均方误差（MSE）损失。损失值会乘以一个时间步长的权重，以便更好地优化模型。
    def p_losses(self, x_start, t, noise = None):
        b, c, h, w = x_start.shape
        noise = default(noise, lambda: torch.randn_like(x_start))

        # noise sample

        x = self.q_sample(x_start = x_start, t = t, noise = noise)

        # predict and take gradient step

        model_out = self.model(x, t)

        loss = self.loss_fn(model_out, noise, reduction = 'none')
        loss = reduce(loss, 'b ... -> b (...)', 'mean')

        loss = loss * extract(self.loss_weight, t, loss.shape)
        return loss.mean()

    # 模型的前向传播过程。它对输入图像进行归一化后，计算出每个时间步长下的损失值，用于训练。
    def forward(self, img, *args, **kwargs):
        b, c, h, w, device, img_size, = *img.shape, img.device, self.image_size
        assert h == img_size and w == img_size, f'height and width of image must be {img_size}'
        t = torch.randint(0, self.num_timesteps, (b,), device=device).long()

        img = self.normalize(img)
        return self.p_losses(img, t, *args, **kwargs)
    
        # Gradescope – Question 1
    def plot_progressive_generation(self, samples, num_steps, save_path=None):
        fig, axes = plt.subplots(1, num_steps + 1, figsize=(20, 4))
        for i, sample in enumerate(samples):
            axes[i].imshow(vutils.make_grid(sample, nrow=1, normalize=True).permute(1, 2, 0).cpu().numpy()) # permute(1, 2, 0) 将图像张量的维度从 (C, H, W) 转换为 (H, W, C)
            axes[i].axis('off')
            axes[i].set_title(f'Step {i}')
        if save_path:
            plt.savefig(save_path)
        plt.show()



# Training
Define Trainer: define the updating process

EMA 是 Exponential Moving Average（指数移动平均）的缩写。在深度学习中，EMA 模型是一种常用的技术，用于平滑模型参数的更新过程。具体来说，EMA 模型会维护一个模型参数的移动平均值，这个平均值是通过对历史参数进行加权平均得到的，权重随时间指数衰减。

EMA 模型的主要优点是能够减少模型参数的抖动，使得模型在训练过程中更加稳定，从而提高模型的泛化能力。在训练过程中，EMA 模型通常用于生成最终的模型，因为它能够更好地反映模型在训练数据上的长期表现。

In [8]:
class Trainer(object):
    '''
    '''
    def __init__(
        self,
        diffusion_model,
        folder,
        *,
        train_batch_size = 16,
        gradient_accumulate_every = 1,  # 1个minibatch上积累梯度，然后进行参数更新
        train_lr = 1e-4,
        train_num_steps = 100000,
        ema_update_every = 10,
        ema_decay = 0.995,
        adam_betas = (0.9, 0.99),
        save_and_sample_every = 1000,
        num_samples = 25,
        results_folder = './results',
        split_batches = True,
        inception_block_idx = 2048
    ):
        super().__init__()

        """
        # Create DataLoaderConfiguration
        dataloader_config = DataLoaderConfiguration(split_batches=split_batches)
        
        # Create Accelerator with DataLoaderConfiguration
        self.accelerator = Accelerator(
            dataloader_config=dataloader_config,
            mixed_precision='no')
        """
        # accelerator

        self.accelerator = Accelerator(
            split_batches = split_batches,
            mixed_precision = 'no'
        )
        

        # model

        self.model = diffusion_model
        self.channels = diffusion_model.channels

        # sampling and training hyperparameters

        assert has_int_squareroot(num_samples), 'number of samples must have an integer square root'
        self.num_samples = num_samples
        self.save_and_sample_every = save_and_sample_every

        self.batch_size = train_batch_size
        self.gradient_accumulate_every = gradient_accumulate_every

        self.train_num_steps = train_num_steps
        self.image_size = diffusion_model.image_size

        # dataset and dataloader

        self.ds = CustomDataset(folder, self.image_size)
        dl = DataLoader(self.ds, batch_size = train_batch_size, shuffle = True, pin_memory = True, num_workers = cpu_count())

        dl = self.accelerator.prepare(dl)
        self.dl = cycle(dl)

        # optimizer

        self.opt = Adam(diffusion_model.parameters(), lr = train_lr, betas = adam_betas)

        # for logging results in a folder periodically

        if self.accelerator.is_main_process:
            # 用于模型权重的指数滑动平均，这有助于平滑模型更新，通常能提升生成质量。
            self.ema = EMA(diffusion_model, beta = ema_decay, update_every = ema_update_every)
            self.ema.to(self.device)

        self.results_folder = Path(results_folder)
        self.results_folder.mkdir(exist_ok = True)

        # step counter state

        self.step = 0

        # prepare model, dataloader, optimizer with accelerator

        self.model, self.opt = self.accelerator.prepare(self.model, self.opt)

    @property
    def device(self):
        return self.accelerator.device

    def save(self, milestone):
        if not self.accelerator.is_local_main_process:
            return

        data = {
            'step': self.step,
            'model': self.accelerator.get_state_dict(self.model),
            'opt': self.opt.state_dict(),
            'ema': self.ema.state_dict(),
            'scaler': self.accelerator.scaler.state_dict() if exists(self.accelerator.scaler) else None,
        }

        torch.save(data, str(self.results_folder / f'model-{milestone}.pt'))

    def load(self, ckpt):
        accelerator = self.accelerator
        device = accelerator.device

        data = torch.load(ckpt, map_location=device)

        model = self.accelerator.unwrap_model(self.model)
        model.load_state_dict(data['model'])

        self.step = data['step']
        self.opt.load_state_dict(data['opt'])
        if self.accelerator.is_main_process:
            self.ema.load_state_dict(data["ema"])


        if exists(self.accelerator.scaler) and exists(data['scaler']):
            self.accelerator.scaler.load_state_dict(data['scaler'])


    def train(self):
        accelerator = self.accelerator
        device = accelerator.device

        with tqdm(initial = self.step, total = self.train_num_steps, disable = not accelerator.is_main_process) as pbar:

            while self.step < self.train_num_steps:

                total_loss = 0.

                for _ in range(self.gradient_accumulate_every):
                    data = next(self.dl).to(device)

                    with self.accelerator.autocast():
                        loss = self.model(data) # 当前batch的loss
                        loss = loss / self.gradient_accumulate_every
                        total_loss += loss.item()

                    self.accelerator.backward(loss)

                accelerator.clip_grad_norm_(self.model.parameters(), 1.0) # 用于梯度裁剪
                pbar.set_description(f'loss: {total_loss:.4f}')

                accelerator.wait_for_everyone()

                self.opt.step()
                self.opt.zero_grad()

                accelerator.wait_for_everyone()

                self.step += 1
                if accelerator.is_main_process:
                    self.ema.update()

                    # 保存和采样
                    # 采样过程是从模型学到的分布中随机抽取样本，生成新的数据
                    if self.step != 0 and self.step % self.save_and_sample_every == 0:
                        self.ema.ema_model.eval()

                        with torch.no_grad():
                            milestone = self.step // self.save_and_sample_every
                            batches = num_to_groups(self.num_samples, self.batch_size)
                            all_images_list = list(map(lambda n: self.ema.ema_model.sample(batch_size=n), batches)) # 使用 EMA 模型生成一批新的图像样本，其中 batch_size=n 指定了生成的样本数量

                        all_images = torch.cat(all_images_list, dim = 0)

                        utils.save_image(all_images, str(self.results_folder / f'sample-{milestone}.png'), nrow = int(math.sqrt(self.num_samples)))
                        
                        self.save(milestone)

                pbar.update(1)

        accelerator.print('training complete')
        
    # 用于按训练model的分布生成图像，num生成的图像数量 n_iter迭代次数
    def inference(self, num=1000, n_iter=5, output_path='./submission'):
        if not os.path.exists(output_path):
            os.makedirs(output_path)
        with torch.no_grad():
            for i in range(n_iter):
                batches = num_to_groups(num // n_iter, 200)
                all_images = list(map(lambda n: self.ema.ema_model.sample(batch_size=n), batches))[0]   # self.ema.ema_model.sample随机生成噪声图像然后去噪然后生成样本，遵循与训练数据相同的分布
                for j in range(all_images.size(0)):
                    torchvision.utils.save_image(all_images[j], f'{output_path}/{i * 200 + j + 1}.jpg')              
                

# Training Hyper-parameters

In [9]:
path = '/kaggle/input/diffusion/faces/faces'
IMG_SIZE = 64             # Size of images, do not change this if you do not know why you need to change
batch_size = 16
train_num_steps = 10000        # total training steps
lr = 1e-3
grad_steps = 1            # gradient accumulation steps, the equivalent batch size for updating equals to batch_size * grad_steps = 16 * 1
ema_decay = 0.995           # exponential moving average decay

channels = 32             # Numbers of channels of the first layer of CNN  # strong
dim_mults = (1, 2, 4)        # The model size will be (channels, 2 * channels, 4 * channels, 4 * channels, 2 * channels, channels)

timesteps = 1000            # Number of steps (adding noise) #medium
beta_schedule = 'linear'

model = Unet(
    dim = channels,
    dim_mults = dim_mults
)

diffusion = GaussianDiffusion(
    model,
    image_size = IMG_SIZE,
    timesteps = timesteps,
    beta_schedule = beta_schedule
)

trainer = Trainer(
    diffusion,
    path,
    train_batch_size = batch_size,
    train_lr = lr,
    train_num_steps = train_num_steps,
    gradient_accumulate_every = grad_steps,
    ema_decay = ema_decay,
    save_and_sample_every = 1000
)

trainer.train()

  0%|          | 0/10000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

training complete


# Inference

In [10]:
# Automatically get the last checkpoint
last_checkpoint = f'./results/model-{train_num_steps // trainer.save_and_sample_every}.pt'
ckpt = last_checkpoint
trainer.load(ckpt)

trainer.inference(output_path='./submission/strong')

'''
ckpt = '/content/drive/MyDrive/ML 2023 Spring/model-55.pt'
trainer.load(ckpt)
trainer.inference()
'''

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

sampling loop time step:   0%|          | 0/1000 [00:00<?, ?it/s]

"\nckpt = '/content/drive/MyDrive/ML 2023 Spring/model-55.pt'\ntrainer.load(ckpt)\ntrainer.inference()\n"

In [11]:
%cd ./submission
!tar -zcf ../submission.tgz *.jpg
%cd ..

/kaggle/working/submission
tar: *.jpg: Cannot stat: No such file or directory
tar: Exiting with failure status due to previous errors
/kaggle/working


# FID & AFD     
FID（Frechet Inception Distance）是一种用于评估生成模型（如生成对抗网络GANs）生成图像质量的指标。它通过比较生成图像与真实图像在特征空间中的分布差异来衡量生成图像的质量。FID越低，表示生成图像与真实图像的分布越接近，生成图像的质量越高。

FID的计算步骤如下：

- 提取特征：使用预训练的Inception网络（或其他合适的网络）从生成图像和真实图像中提取特征。通常，这些特征是从网络的某一层（如最后一层全连接层）输出的。

- 计算均值和协方差：对于生成图像和真实图像的特征，分别计算它们的均值（mean）和协方差（covariance）。

- 使用Frechet距离（也称为Wasserstein-2距离）来度量两个多元正态分布（即生成图像特征分布和真实图像特征分布）之间的距离。具体公式：$$\mathrm{FID}=\|\mu_r-\mu_g\|^2+\mathrm{Tr}(\Sigma_r+\Sigma_g-2(\Sigma_r\Sigma_g)^{1/2})$$
其中，$\mu_r$和$\Sigma_r$分别是真实图像特征的均值和协方差，$\mu_g$和$\Sigma_g$分别是生成图像特征的均值和协方
差，Tr 表示矩阵的迹(trace)。


![图片.png](attachment:0197efc6-2de2-4d44-bd42-7d59dc80e638.png)

AFD（Anime Face Detection）率是指在动漫图像或视频中检测和识别动漫角色面部特征的成功率。这个指标通常用于评估动漫面部检测算法或模型的性能。AFD率可以通过以下几个方面来衡量：

    准确率（Accuracy）：正确检测到的动漫面部数量与总面部数量的比例。

    召回率（Recall）：正确检测到的动漫面部数量与实际存在的面部数量的比例。

    精确率（Precision）：正确检测到的动漫面部数量与检测到的总面部数量的比例。

AFD率的计算通常涉及以下步骤：

    标注数据：首先需要有一组标注好的动漫图像或视频数据，其中每个动漫角色的面部都被准确地标注出来。

    运行检测算法：使用动漫面部检测算法或模型对这些数据进行检测，得到检测结果。

    计算指标：根据检测结果和标注数据，计算准确率、召回率、精确率等指标。

例如，假设有一个动漫图像数据集，其中有1000个标注的动漫面部。使用某个动漫面部检测算法进行检测后，算法正确检测到了800个面部，同时还有50个面部被错误地检测为非面部，另外有150个面部未被检测到。那么：

    准确率 = 正确检测到的面部数量 / 总面部数量 = 800 / 1000 = 0.8

    召回率 = 正确检测到的面部数量 / 实际存在的面部数量 = 800 / 1000 = 0.8

    精确率 = 正确检测到的面部数量 / 检测到的总面部数量 = 800 / (800 + 50) = 0.941

In [12]:
!pip install pytorch-fid
!wget https://raw.githubusercontent.com/nagadomi/lbpcascade_animeface/master/lbpcascade_animeface.xml -O lbpcascade_animeface.xml

Collecting pytorch-fid
  Downloading pytorch_fid-0.3.0-py3-none-any.whl (15 kB)
Installing collected packages: pytorch-fid
Successfully installed pytorch-fid-0.3.0
[0m--2024-08-27 12:24:11--  https://raw.githubusercontent.com/nagadomi/lbpcascade_animeface/master/lbpcascade_animeface.xml
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.110.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 246945 (241K) [text/plain]
Saving to: ‘lbpcascade_animeface.xml’


2024-08-27 12:24:12 (6.17 MB/s) - ‘lbpcascade_animeface.xml’ saved [246945/246945]



In [14]:
!pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.0.151-py3-none-any.whl (616 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m616.7/616.7 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting opencv-python>=4.6.0
  Downloading opencv_python-4.10.0.84-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (62.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.5/62.5 MB[0m [31m17.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting ultralytics
  Downloading ultralytics-8.0.150-py3-none-any.whl (614 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m614.9/614.9 kB[0m [31m23.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Downloading ultralytics-8.0.149-py3-none-any.whl (614 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m614.4/614.4 kB[0m [31m27.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Downloading ultralytics-8.0.148-py3-none-any.whl (606 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [15]:
from ultralytics import YOLO
from pytorch_fid import fid_score
import os

def calculate_fid(real_images_path, generated_images_path):
    """
    Calculate FID score between real and generated images.
    
    param real_images_path: Path to the directory containing real images.
    param generated_images_path: Path to the directory containing generated images.
    return: FID score
    """
    fid = fid_score.calculate_fid_given_paths([real_images_path, generated_images_path], batch_size=50, device='cuda', dims=2048)
    return fid

def calculate_afd(generated_images_path, save=True):
    """
    Calculate AFD (Anime Face Detection) score for generated images.
    
    param generated_images_path: Path to the directory containing generated images.
    return: AFD score (percentage of images detected as anime faces)
    """
    results = yolov8_animeface.predict(generated_images_path, save=save, conf=0.8, iou=0.8, imgsz=64)
    """
    conf=0.8：置信度阈值，表示检测框的置信度至少为0.8。
    iou=0.8：交并比阈值，用于非极大值抑制（NMS）。
    imgsz=64：图像大小为64x64
    results:
    图像索引：表示检测结果对应哪一张图像。
    检测框（boxes）：表示检测到的动漫面部的边界框信息，包括位置和大小。
    置信度（confidence）：表示检测框的置信度，即模型认为该检测框内是动漫面部的概率
    """

    anime_faces_detected = 0    # 初始化检测到的动漫面部数量
    total_images = len(results) # 检测的图像总数

    for result in results:
        if len(result.boxes) > 0: # 当前图像中检测到了至少一个动漫面部
            anime_faces_detected += 1

    afd_score = anime_faces_detected / total_images
    return afd_score
            

In [20]:
!pip install pytorch-fid ultralytics
!wget https://github.com/MagicalKyaru/yolov8_animeface/releases/download/v1/yolov8x6_animeface.pt

[0m--2024-08-27 13:00:56--  https://github.com/MagicalKyaru/yolov8_animeface/releases/download/v1/yolov8x6_animeface.pt
Resolving github.com (github.com)... 140.82.113.4
Connecting to github.com (github.com)|140.82.113.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/754149685/213c9a83-4bb9-4825-aaeb-4f231e613b62?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20240827%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240827T130056Z&X-Amz-Expires=300&X-Amz-Signature=da6c847d5febde96d42b4ead96aa01f53c6eae578a806df0495e856469096e4c&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=754149685&response-content-disposition=attachment%3B%20filename%3Dyolov8x6_animeface.pt&response-content-type=application%2Foctet-stream [following]
--2024-08-27 13:00:56--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/754149685/213c9a83-4bb9-4825

In [21]:
# Calculate and print FID and AFD with optional visualization
yolov8_animeface = YOLO('yolov8x6_animeface.pt')  # 加载一个预训练的YOLOv8模型，用于动漫面部检测
real_images_path = '/kaggle/input/diffusion/faces/faces'  # Replace with the path to real images
fid = calculate_fid(real_images_path, './submission/strong')
afd = calculate_afd('./submission/strong')

100%|██████████| 1427/1427 [04:36<00:00,  5.17it/s]
100%|██████████| 20/20 [00:04<00:00,  4.96it/s]

image 1/1000 /kaggle/working/submission/medium/1.jpg: 64x64 (no detections), 20.6ms
image 2/1000 /kaggle/working/submission/medium/10.jpg: 64x64 1 face, 18.8ms
image 3/1000 /kaggle/working/submission/medium/100.jpg: 64x64 (no detections), 18.9ms
image 4/1000 /kaggle/working/submission/medium/1000.jpg: 64x64 (no detections), 24.9ms
image 5/1000 /kaggle/working/submission/medium/101.jpg: 64x64 (no detections), 23.5ms
image 6/1000 /kaggle/working/submission/medium/102.jpg: 64x64 1 face, 23.1ms
image 7/1000 /kaggle/working/submission/medium/103.jpg: 64x64 1 face, 22.5ms
image 8/1000 /kaggle/working/submission/medium/104.jpg: 64x64 1 face, 22.5ms
image 9/1000 /kaggle/working/submission/medium/105.jpg: 64x64 1 face, 21.7ms
image 10/1000 /kaggle/working/submission/medium/106.jpg: 64x64 (no detections), 22.9ms
image 11/1000 /kaggle/working/submission/medium/107.jpg: 64x64 1 face, 21.3ms
image 1

In [22]:
print(f'FID: {fid}')
print(f'AFD: {afd}')

FID: 185.3669799972492
AFD: 0.453
