In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import pandas as pd

import torch
import torch.nn


# Пайплайн локально

## Diffusion.py

In [10]:
from typing import Dict, Tuple

import torch
import torch.nn as nn


class DiffusionModel(nn.Module):
    def __init__(
        self,
        eps_model: nn.Module,
        betas: Tuple[float, float],
        num_timesteps: int,
    ):
        super().__init__()
        self.eps_model = eps_model

        for name, schedule in get_schedules(betas[0], betas[1], num_timesteps).items():
            self.register_buffer(name, schedule)
        self.num_timesteps = num_timesteps
        self.criterion = nn.MSELoss()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        timestep = torch.randint(1, self.num_timesteps + 1, (x.shape[0],), device='cuda' if torch.cuda.is_available() else 'cpu')
        eps = torch.rand_like(x)

        x_t = (
            self.sqrt_alphas_cumprod[timestep, None, None, None] * x
            + self.one_minus_alpha_over_prod[timestep, None, None, None] * eps
        )

        return self.criterion(eps, self.eps_model(x_t, timestep / self.num_timesteps))

    def sample(self, num_samples: int, size, device) -> torch.Tensor:

        x_i = torch.randn(num_samples, *size)

        for i in range(self.num_timesteps, 0, -1):
            z = torch.randn(num_samples, *size) if i > 1 else 0
            eps = self.eps_model(x_i, torch.tensor(i / self.num_timesteps).repeat(num_samples, 1).to(device))
            x_i = self.inv_sqrt_alphas[i] * (x_i - eps * self.one_minus_alpha_over_prod[i]) + self.sqrt_betas[i] * z

        return x_i


def get_schedules(beta1: float, beta2: float, num_timesteps: int) -> Dict[str, torch.Tensor]:
    assert beta1 < beta2 < 1.0, "beta1 and beta2 must be in (0, 1)"

    betas = (beta2 - beta1) * torch.arange(0, num_timesteps + 1, dtype=torch.float32) / num_timesteps + beta1
    sqrt_betas = torch.sqrt(betas)
    alphas = 1 - betas

    alphas_cumprod = torch.cumprod(alphas, dim=0)

    sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
    inv_sqrt_alphas = 1 / torch.sqrt(alphas)

    sqrt_one_minus_alpha_prod = torch.sqrt(1 - alphas_cumprod)
    one_minus_alpha_over_prod = (1 - alphas) / sqrt_one_minus_alpha_prod

    return {
        "alphas": alphas,
        "inv_sqrt_alphas": inv_sqrt_alphas,
        "sqrt_betas": sqrt_betas,
        "alphas_cumprod": alphas_cumprod,
        "sqrt_alphas_cumprod": sqrt_alphas_cumprod,
        "sqrt_one_minus_alpha_prod": sqrt_one_minus_alpha_prod,
        "one_minus_alpha_over_prod": one_minus_alpha_over_prod,
    }


## training.py

In [11]:
import torch
from torch.optim.optimizer import Optimizer
from torch.utils.data import DataLoader
from torchvision.utils import make_grid, save_image
from tqdm import tqdm

from modeling.diffusion import DiffusionModel


def train_step(model: DiffusionModel, inputs: torch.Tensor, optimizer: Optimizer, device: str):
    optimizer.zero_grad()
    inputs = inputs.to(device)
    loss = model(inputs)
    loss.backward()
    optimizer.step()
    return loss


def train_epoch(model: DiffusionModel, dataloader: DataLoader, optimizer: Optimizer, device: str):
    model.train()
    pbar = tqdm(dataloader)
    loss_ema = None
    for x, _ in pbar:
        train_loss = train_step(model, x, optimizer, device)
        loss_ema = train_loss if loss_ema is None else 0.9 * loss_ema + 0.1 * train_loss
        pbar.set_description(f"loss: {loss_ema:.4f}")


def generate_samples(model: DiffusionModel, device: str, path: str):
    model.eval()
    with torch.no_grad():
        samples = model.sample(8, (3, 32, 32), device=device)
        grid = make_grid(samples, nrow=4)
        save_image(grid, path)


## Unet.py

In [19]:
import torch
import torch.nn as nn


class ConvBlock(nn.Module):
    def __init__(self, in_channels: int, out_channels: int, residual: bool = False):
        super().__init__()
        self.main = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1),
            nn.GroupNorm(8, out_channels),
            nn.ReLU(),
        )
        self.conv = nn.Sequential(
            nn.Conv2d(out_channels, out_channels, 3, 1, 1),
            nn.GroupNorm(8, out_channels),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, 3, 1, 1),
            nn.GroupNorm(8, out_channels),
            nn.ReLU(),
        )

        self.is_res = residual

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.main(x)
        if self.is_res:
            x = x + self.conv(x)
            return x / 1.414
        else:
            return self.conv(x)


class DownBlock(nn.Module):
    def __init__(self, in_channels: int, out_channels: int):
        super().__init__()
        self.layers = nn.Sequential(ConvBlock(in_channels, out_channels), nn.MaxPool2d(2))

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.layers(x)


class UpBlock(nn.Module):
    def __init__(self, in_channels: int, out_channels: int):
        super().__init__()
        self.layers = nn.Sequential(
            nn.ConvTranspose2d(in_channels, out_channels, 2, 2),
            ConvBlock(out_channels, out_channels),
            ConvBlock(out_channels, out_channels),
        )

    def forward(self, x: torch.Tensor, skip: torch.Tensor) -> torch.Tensor:
        x = torch.cat((x, skip), 1)
        x = self.layers(x)

        return x


class TimestepEmbedding(nn.Module):
    def __init__(self, emb_dim: int):
        super().__init__()

        self.lin1 = nn.Linear(1, emb_dim, bias=False)
        self.lin2 = nn.Linear(emb_dim, emb_dim)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x.view(-1, 1)
        x = torch.sin(self.lin1(x))
        x = self.lin2(x)
        return x


class UnetModel(nn.Module):
    def __init__(self, in_channels: int, out_channels: int, hidden_size: int = 256):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels

        self.hidden_size = hidden_size

        self.init_conv = ConvBlock(in_channels, hidden_size, residual=True)
        
        self.down1 = DownBlock(hidden_size, hidden_size)
        self.down2 = DownBlock(hidden_size, 2 * hidden_size)
        self.down3 = DownBlock(2 * hidden_size, 2 * hidden_size)

        self.to_vec = nn.Sequential(nn.AvgPool2d(4), nn.ReLU())

        self.timestep_embedding = TimestepEmbedding(2 * hidden_size)

        self.up0 = nn.Sequential(
            nn.ConvTranspose2d(2 * hidden_size, 2 * hidden_size, 4, 4),
            nn.GroupNorm(8, 2 * hidden_size),
            nn.ReLU(),
        )

        self.up1 = UpBlock(4 * hidden_size, 2 * hidden_size)
        self.up2 = UpBlock(4 * hidden_size, hidden_size)
        self.up3 = UpBlock(2 * hidden_size, hidden_size)
        self.out = nn.Conv2d(2 * hidden_size, self.out_channels, 3, 1, 1)

    def forward(self, x: torch.Tensor, t: torch.Tensor) -> torch.Tensor:
        print(x.shape, 'shape before anything')
        x = self.init_conv(x)
        print(x.shape, 'shape before down')
        down1 = self.down1(x)
        print(down1.shape, 'shape after down1')
        down2 = self.down2(down1)
        repeat = down2.shape[2]
        print(down2.shape, 'shape after down2')
        down3 = self.down3(down2)
        print(down3.shape, 'shape after down3')
        
        thro = self.to_vec(down3)
        print(thro.shape, 'shape after vectorizing')
        
        temb = torch.unsqueeze(torch.unsqueeze(self.timestep_embedding(t), 2), 3)
        thro = self.up0(thro + temb)
        print(thro.shape, 'shape after up0')
        up1 = self.up1(thro, down3) + temb.repeat(1, 1, repeat, 1)
        print(up1.shape, 'shape after up1')
        up2 = self.up2(up1, down2)
        print(up2.shape, 'shape after up2')
        up3 = self.up3(up2, down1)
        print(up3.shape, 'shape after up3')
        out = self.out(torch.cat((up3, x), 1))
        print(out.shape, 'shape out')
        return out


## Полный запуск

In [2]:
device = 'cuda'
ddpm = DiffusionModel(
    eps_model=UnetModel(3, 3, hidden_size=128),
    betas=(1e-4, 0.02),
    num_timesteps=1000,
)

ddpm.to(device)
print('hui')
# train_transforms = transforms.Compose(
#     [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
# )

# dataset = CIFAR10(
#     "cifar10",
#     train=True,
#     download=True,
#     transform=train_transforms,
# )

# dataloader = DataLoader(dataset, batch_size=128, num_workers=4, shuffle=True)
# optim = torch.optim.Adam(ddpm.parameters(), lr=1e-5)

# for i in range(num_epochs):
#     train_epoch(ddpm, dataloader, optim, device)
#     generate_samples(ddpm, device, f"samples/{i:02d}.png")


NameError: name 'DiffusionModel' is not defined

In [None]:
!python3 main.py

In [17]:
# УРАРРАРАРАРАРРАРАРАРАРАР
!pytest --cov modeling.training tests/test_pipeline.py

platform linux -- Python 3.10.12, pytest-8.2.2, pluggy-1.5.0
rootdir: /home/jupyter/datasphere/shad/efficient-dl-systems/week02_management_and_testing/homework
plugins: typeguard-2.13.3, jaxtyping-0.2.28, dash-2.18.2, cov-6.0.0, hydra-core-1.3.2, anyio-3.7.1, dvc-2.58.2
collected 4 items

tests/test_pipeline.py ....                                              [100%]

---------- coverage: platform linux, python 3.10.12-final-0 ----------
Name                   Stmts   Miss  Cover
------------------------------------------
modeling/training.py      35      4    89%
------------------------------------------
TOTAL                     35      4    89%


