In [4]:
import torch
import torch.functional as F
import os
import torch.nn as nn
from matplotlib import pyplot as plt
from torch import optim
from tqdm import tqdm 
import logging
from torch.utils.tensorboard import SummaryWriter
#这行代码配置了:日志格式:时间-级别:日志信息日志级别:INFO时间格式:小时:分钟:秒

logging.basicConfig(format="%(asctime)s-%(levelname)s: %(message)s",\
    level = logging.INFO,datefmt="%I:%H:%S",filename='app.log',)


# 1alpha 和2加噪 公式

![RUNOOB 图标](alpha.png)
![RUNOOB 图标](加噪.png)

![](采样公式.png)

# 扩散模型（主要是采样）

In [2]:

class Diffusion:
    def __init__(self,nosie_steps=1000,beta_start=1e-4,beta_end=0.02,img_size=6,device="cuda") -> None:
        # 加噪步数
        self.noise_steps = nosie_steps
        self.beta_start = beta_start
        self.beta_end = beta_end
        self.img_size = img_size
        self.device = device
        #实现上面图片alpha公式
        self.beta = self.prepare_noise_schedule().to(device)
        self.alpha = 1. - self.beta
        #torch.cumprod的作用是计算张量在指定维度上元素的累积积
        self.alpha_hat = torch.cumprod(self.alpha,dim=0)
    def prepare_noise_schedule(self):
        return torch.linspace(self.beta_start,self.beta_end,self.noise_steps)
    # 对图片进行加噪 
    def noise_images(self,x,t):
        #对对应的时间步骤进行扩维 并且开根号
        sqrt_alpha_hat = torch.sqrt(self.alpha_hat[t])[:,None,None]
        sqrt_one_minus_alpha_hat = torch.sqrt(1.- self.alpha_hat[t])[:,None,None]
        z = torch.randn_like(x)
        return sqrt_alpha_hat * x + sqrt_one_minus_alpha_hat*z
        # 步骤列表
    def sample_timesteps(self,n):
        return torch.randint(low=1,high=self.noise_steps,size=(n,))
    
    def sample(self,model,n):
        logging.info(f"Sampling{n} new images")
        model.eval()
        with torch.no_grad():
            x = torch.randn((n,3,self.img_size,self.img_size)).to(self.device)
            #position=0 - 设置进度条位置为最上方
            for i in tqdm(reversed(range(1,self.noise_images)),position=0):
                t = (torch.ones(n)*i).long().to(self.device)#[1, 1, 1, 1, 1]
                predict_noise = model(x,t)
                alpha = self.alpha[t][:,None,None,None]
                alpha_hat = self.alpha_hat[t][:,None,None,None]
                beta = self.beta[t][:,None,None,None]
                #最后一次去噪后不添加随机噪声
                if i >1 :
                    noise = torch.randn_like(x)
                else :
                    noise = torch.zeros_list(x)
                x = 1/torch.sqrt(alpha) * (x-(1-alpha) /torch.sqrt(1-alpha) *predict_noise) + torch.sqrt(beta) * noise
        model.train()
        # 如果x中的元素原本小于-1,将被替换为-1;如果原本大于1,将被替换为1;如果在[-1, 1]范围内,保持不变
        x = (x.clamp(-1,1)+1)/2
        x = (x*255).type(torch.uint8)
        
        return x
        

# Unet

In [3]:
class UNet(nn.Module):
    def __init__(self, c_in=3, c_out=3,time_dim=256,device="cuda") -> None:
        super().__init__()
        self.device = device
        self.time_dim = time_dim
        self.inc1 = DoubleConv(c_in,64)
        self.down1 = Down(64,128)
        self.sa1 = SelfAttention(128,32)
        self.down2 = Down(128,256)
        self.sa2 = selfSelfAttention(25,8)
        self.down3 = Down(256,256)
        self.sa3 = SelfAttention(256,8)
        
        self.bat1 = DoubleConv(256,512)
        self.bot2 = DoubleConv(512,512)
        self.bot3 = DoubleConv(512,256)
        
        self.up1 = Up(512,128)
        self.sa4 = SelfAttention(128,16)
        self.up2 = Up(256,64)
        self.sa5 = SelfAttention(64,32)
        self.up3 = SelfAttention(128,64)
        self.sa6 = SelfAttention(6,6)
        self.outc = nn.Conv2d(64,c_out,kernel_size=1)
    # 位置编码
    def pos_encoding(self,t, channels):
        inv_freg = 1.0 / ( 
            10000 **
            # 生成数列
            (torch.arange(0,channels,2,device=self.device).float() /channels)
        )
        #t.repeat(1,channels //2 沿着1维度（还有0维度）复制多次
        pos_enc_a = torch.sin(t.repeat(1,channels //2) *inv_freg)
        pos_enc_b = torch.cos(t.repeat(1,channels //2)*inv_freg)
        pos_enc = torch.cat([pos_enc_a,pos_enc_b],dim=-1)
        
        return pos_enc
    # 这里开始写他的网络结构了
    def forward(self,x,t):
        # unsqueeze(-1)的作用是在张量t的最后一维增加单维度  如[5]变成[5,1]
        t = t.unsqueeze(-1).type(torch.float)
        #时间编码
        t = self.pos_encoding(t,self.time_dim)
        x1 = self.inc1(x)
        x2 = self.down1(x1,t)
        x2 = self.sa1(x2)
        x3 = self.down2(x2,t)
        x3 = self.sa2(x3)
        x4 = self.down3(x3)
        x4 = self.sa3(x4)
        
        
        x4 = self.bot1(x4)
        x4 = self.bot2(x4)
        x4 = self.bot3(x4)
        
        x5 = self.up1(x4,t)
        x5 = self.sa4(x5)
        x6 = self.up2(x5,t)
        x6 = self.sa5(x6)
        x7 = self.up3(x6,t)
        x7 = self.sa6(x7)
        # 全卷积对空间信息敏感的任务,如图像分割、生成模型等,既保留了空间信息,又增加了灵活性
        out = self.outc(x7)
        return out
        
        
        
        

# 各部分的小模块

## 最小模块 双层卷积可能还有残差

In [6]:
class DoubleConv(nn.Module):
    # 残差的参数residual  ,mid_channels控制输出通道
    def __init__(self,in_channels,out_channels,mid_channels=None,residual=False):
        super().__init__()
        self.residual = residual
        if not mid_channels:
            mid_channels = out_channels
        self.doubleConv = nn.Sequential(
            nn.Conv2d(in_channels,mid_channels,kernel_size=3,padding =1,bisa =False),
            nn.GroupNorm(1,mid_channels),
            # 在负值区有非零响应,不像ReLU直接切断，在正值区会增强大值,并压缩小值
            nn.GELU(),
            nn.Conv2d(mid_channels,out_channels,3,1,bias=False),
            # 将C个通道划分成G组,每组含C/G个通道。在每组内部分别计算均值和标准差,进行规范化。
            #Layer Norm: 对每个样本进行规范化 ，Layer Norm针对样本维度计算,计算量大
#Batch Norm: 对每个批量进行规范化，Batch Norm的计算量一般介于二者之间  Batch Norm收敛速度快,但对批量大小敏感
#Group Norm: 对通道组进行规范化  Group Norm只在小组内计算,计算量较小 Group Norm和Layer Norm收敛稍慢,但鲁棒性更好
            nn.GroupNorm(1,out_channels)
        )
        
    def forward(self,x):
        # 两个卷积层之间有没有残差
        if self.residual:
            return F.gelu(x+self.doubleConv(x))
        else :
            return self.doubleConv(x)
    
    
    

## 下采样

In [None]:
class Down(nn.Module):
    def __init__(self,in_channels,out_channels,emb_dim=256):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels,out_channels,residual=True),
            DoubleConv(in_channels,out_channels)
        )
        #对采样的步骤的编码
        self.emb_layer = nn.Sequential(
            # 负值有梯度没有
            nn.SiLU(),
            nn.Linear(emb_dim,out_channels)
        )
    def forward(self,x,t):
        x = self.maxpool_conv(x)
        #传入位置index ，t,得到位置编码向量  self.emb_layer(t) 有了批量大小的维度和通道数的维度，还要把图片大小宽高维度扩大
        emb = self.emb_layer(t)[:,:,None,None].repeat(1,1,x.shape[-2],x.shape[-1])
        # 把采样时刻和图片特征融合
        return x+emb

## 上采样

In [9]:
class UP(nn.Module):
    def __init__(self,in_channels,out_channels,emb_dim=256) -> None:
        super().__init__()
        # 双线性插值（新点周围的最近4个点按比例插值计算,这样可以得到更平滑的放大结果。）
        # 不是用卷积可以是比较懒，或者太大的参数量不能计算
        
        self.up = nn.Upsample(scale_factor=2,mode="bilinear",align_corners=True)
        self. conv = nn.Sequential(
            DoubleConv(in_channels,in_channels,residual=True),
            DoubleConv(in_channels,out_channels,in_channels//2)
        )
        self.emb_layer = nn.Sequential(
            nn.SiLU(),
            nn.Linear(
                emb_dim,
                out_channels
            )
        )
    def forward(self,x, skip_x, t):
        x = self.up(x)
        # 拼接，1 是通道的维度，通道数相加
        x = torch.cat([skip_x,x],dim = 1)
        x = self.conv(x)
        emb = self.emb_layer(t)
        return x + emb

        
    
        

AttributeError: module 'torch' has no attribute 'repeat'

## 自注意力机制


In [None]:
class SelfAttention(nn.Module):
    def __init__(self, channels,size) -> None:
        super(SelfAttention,self).__init__()
        self.channels = channels
        self.size = size
        #nn.MultiheadAttention模块中,batch_first是一个布尔类型的参数,它表示输入输出张量的格式是:

         #batch_first=True: 输入/输出为 (batch, seq, feature)
         #batch_first=False: 输入/输出为 (seq, batch, feature)
         # channels 图片就是3，文字就是一个字的维度
        self.mha = nn.MultiheadAttention(channels,4,batch_first=True)
        self.ln = nn.LayerNorm(channels)
        self.ff_self = nn.Sequential(
            nn.LayerNorm([channels]),
            nn.Linear(channels,channels),
            nn.GELU(),
            nn.Linear(channels,channels)
        )
    def forward(self,x):
        #x.view操作:将(batch_size, channels, H, W) 形状展平为 (batch_size, channels, H * W)
        # swapaxes(1, 2)操作:将channels和H*W这两个维度调换顺序
        # 换通道你想之前处理文字是通道在最后面，所以这里也要变成这样
        #view()返回的是张量的视图(view),不会进行额外内存分配,更高效。
        #reshape()返回的是重新分配内存的张量副本,占用更多内存
        x = x.view(-1,self.channels,self.size *self.size).swapaxes(1,2)
        x_ln = self.ln(x)
        # 三个都是一样的自注意力 机制
        attention_value,_ = self.mha(x_ln,x_ln,x_ln)
        # 自注意力以及残差残差
        attention_value =  attention_value+x
        # 前项以及残差
        attention_value = self.ff_self(attention_value) +attention_value 
        return attention_value.swapaxex(2,1).view(-1,self.channels,self.size,self.size)
        
        
        
        
        
    
    
    