### HunyuanVideoPipiline类

其`__call__`方法主要有以下流程:

(1)check_inputs检查输入

(2)准备输入

- text_embeddings(使用encode_prompt())
- timestep(使用retrieve_timesteps)
- latents(使用prepare_latents)

(3)去噪(denoising_step循环)

(4)解码输出

1. text_embeddings

有`prompt_embeds, negative_prompt_embeds, prompt_mask, negative_prompt_mask`作为输出。

如果使用CFG，则会将其concat成`[prompt_embeds, negative_prompt_embeds]`,mask同理

2. timestep

输出递降的`<list>` `timesteps`和`<int>` `num_inference_steps`

3. latents

会先将`video_length`做$(L-1)//4+1$，接着将`video_length = 33` `num_channels_latents = 16`送入`prepare_latents()`.在该函数中，若
- 启用`latent_concat`(默认关闭),则会生成`[B,C,F,H,W]=[1,7,33,24,42]`的`latents`
- 启用`i2v_stability`(默认开启),则会线性注入`0.001`的`img_latents`,得到`[B,C,F,H,W]=[1,16,33,24,42]`的`latents`
- 启用`FlowMatching`调度器且有`init_noise_sigma`属性，则会把`latents`的方差变为$init\_noise\_sigma^2 * I$

如果启用`latent_concat`，`img_latents_concat`会是形如`[1,16,33,24,42]`,第一帧存在，后续帧全为0的`<tensor>`,`i2v_mask`是第一元1，后续0的`[33,]`张量，`mask_concat`是形如`[1,1,33,24,42]`，第一帧全为1,后续帧全为0的张量.

4. denoising step

使用`for i, t in enumerate(timesteps)`循环
- 若启用`latent_concat`,`latent_model_input`会是`latents, img_latents_concat, mask_concat`按**通道**维拼接的结果，形如`[1,7+16+1,33,24,42]`,否则为`[1,16,33,24,42]`的`latents`
- 若启用`token_replace`,在去噪前，每个时间步的第一帧都将被`img_latents`替换，去噪只在后续帧执行，去噪后会再与`img_latents`拼接。这一重复可能是为了便于回调。

5. decode

$latents = latents/vae\_sacling\_factor+vae\_shifting\_factor$,

接着将其通过VAE，得到`image`。将其从`[-1,1]`转换到`(0,1)`,即作为`HunyuanVideoPipelineOutput`的属性.

### HunyuanVideoSampler类

其`predict`方法主要实现以下功能，并最后输出一个`out_dict`：

(1)初始化随机种子和FlowMatchDiscreteScheduler

(2)根据图片高宽比选择适配的视频尺寸,并裁剪图片

(3)调用pipeline完成inference

注：参数ulysses或ring会启用并行VAE编码

1. seeds

将不同类型的种子输入统一为`batch_size * num_videos_per_prompt`个种子的`<list>` `seeds`

2. crop

根据所选清晰度，依据最邻近高宽比得到和原图片尺寸最匹配的`closest_size`和`closest_ratio`，用于裁剪。接着`[3,H,W]`的RGB图像形状会被缩放、裁剪，接着值被缩放到`[-0.5,0.5]`, unsqueeze维度到`[1,3,1,H,W]`,

接着，经VAE编码，再乘系数$vae\_scaling\_factor$,得到`img_latents`.图片可经`<list[uint8]>` `semantic_images`传入，或只使用图片路径导入.

### VAE

其中`EncoderCausal3D`和`DecoderCausal3D`分别实现VAE的主干部分，它们依赖于UNet中的组件.`AutoencoderKLCausal3D`类的`encode`和`_decode`方法调用这两个神经网络.`tiling,slicing,fuseqkv,attn`是几个额外功能,其中只有`vae_tiling=True`,故只考虑这部分.

In [None]:
[VAE编码器结构]<img src="/imgs/wk1/encoder.png" >

In [None]:
[VAE解码器结构]<img src="/imgs/wk1/decoder.png" >

"the tiles overlap and are blended together to form a smooth output"这个做法很工程！

1. spatial_tiled_encode()

它会把图像分割成小块，装入二元列表`rows`中，分别编码，然后拼接. 其中有关键参数
- `tile_sample_min_size`: 默认为32,将图像按空间维度裁剪为方体，但最后一行/列不方
- `tile_overlap_factor`: 重叠比例，默认为0.25
- `overlap_size`: 相邻 tile 起点之间的步长
- `blend_extent`: 需要融合的宽度/高度
- `row_limit`: 切片边缘,使用时为`[:,:,:,:row_limit,:row_limit]`. 每次融合其左/上方的图片，并在存储时去掉其右侧、下侧`blend_extent`宽度的像素


2. temporal_tiled_encode()

它会按时间维度将图像分成小块，并在每个时间块递归地调用`spatial_tiled_encode()`, 待空间编码融合后, 再进行时间编码、融合。解码部分待补充。

### UNet

包含的模块(类)有

- 卷积块`CausalConv3d`
- 基于它的`DownsampleCausal3D`和`UpsampleCausal3D`
- 以及进一级抽象`DownEncoderBlockCausal3D`和`UpEncoderBlockCausal3D`(ModuleList)
- 此外，还有`ResnetBlockCausal3D`和`UNetMidBlockCausal3D`(ModuleList)

需要注意的是，上采样时没有使用`ConvTranspose`,而是使用了先插值、再卷积的模式，这可能是由于反卷积的设计无法实现“因果性”(即后续帧不影响前面帧)

1. CausalConv3d

In [9]:
def forward(self, x):
        x = F.pad(x, self.time_causal_padding, mode='replicate')
        return self.conv(x)

其中，`<tuple>` `self.time_causal_padding = (kernel_size // 2, kernel_size // 2, kernel_size // 2, kernel_size // 2, kernel_size - 1, 0)`

[WaveNet](https://arxiv.org/abs/1609.03499)中提到，针对图像卷积，可以使用掩码卷积来实现因果性。但此处使用了和原论文类似的填充方式。举个例子，一个在T维度上原先为`<x0,x1,...,xn>`的序列，如果用`kernel=3`的卷积，将会使用`<x0,x1,x2>`,`<x1,x2,x3>`,...作为输入,这会使用后续时间的信息。左边先补两格，`<P,P,x0,x1,...>`,将会使用`<P,P,x0>`,`<P,x0,x1>`,`<x0,x1,x2>`作为输入，实现因果性。这里`P<- x0`.

注：`attention masking fill`似乎与此类似

注：`padding`的格式是上、下、左、右、前、后，分别对应`H,W,T`三个维度.空间维度padding是为了保持输入输出形状不变

2. UpsampleCausal3D

In [22]:
from typing import Optional, Tuple, Union
import torch
import torch.nn.functional as F
from torch import nn

def forward(
        self,
        hidden_states: torch.FloatTensor,
        output_size: Optional[int] = None,
        scale: float = 1.0,
    ) -> torch.FloatTensor:
    #此前省去...
    if self.interpolate:  # 分离第一帧，分别插值
        B, C, T, H, W = hidden_states.shape
        first_h, other_h = hidden_states.split((1, T - 1), dim=2)
        if output_size is None:
            if T > 1:
                other_h = F.interpolate(other_h, scale_factor=self.upsample_factor, mode="nearest")  # 4n+1的设计导致这里必须拆出一帧。那任何一帧都OK吗？似乎不是。

            first_h = first_h.squeeze(2)
            first_h = F.interpolate(first_h, scale_factor=self.upsample_factor[1:], mode="nearest")
            first_h = first_h.unsqueeze(2)
        else:
            raise NotImplementedError

        if T > 1:
            hidden_states = torch.cat((first_h, other_h), dim=2)
        else:
            hidden_states = first_h

        # If the input is bfloat16, we cast back to bfloat16
        if dtype == torch.bfloat16:
            hidden_states = hidden_states.to(dtype)

        if self.use_conv:  #插值完再conv
            if self.name == "conv":
                hidden_states = self.conv(hidden_states)
            else:
                hidden_states = self.Conv2d_0(hidden_states)

        return hidden_states

这里`<tuple>` `self.upsample_factor=(2, 2, 2)`,`self.conv=CausalConv3d(self.channels, self.out_channels, kernel_size=3, bias=bias)`,默认`stride=1`

3. DownsampleCausal3D

   与UpsampleCausal3D类似，但直接使用卷积降采样.

In [36]:
def forward(self, hidden_states: torch.FloatTensor, scale: float = 1.0) -> torch.FloatTensor:
        assert hidden_states.shape[1] == self.channels

        if self.norm is not None:
            hidden_states = self.norm(hidden_states.permute(0, 2, 3, 1)).permute(0, 3, 1, 2) 
            # B C H*W T → B H*W T C → B C H W (T是何时不见的？似乎是bug,但由于默认不启用norm,故无报错)
        assert hidden_states.shape[1] == self.channels

        hidden_states = self.conv(hidden_states)  # stride = 2

        return hidden_states

3. DownEncoderBlockCausal3D/UpDecoderBlockCausal3D

`num_layers`个`ResnetBlockCausal3D`+ 1个`DownsampleCausal3D`/`UpsampleCausal3D`

4. UNetMidBlockCausal3D

   有一些初始参数

- `attention_head_dim`: 每个头的维数，如果没有则等于`in_channels`
- `num_layers`: 层的个数。`ModuleList`中会交替添加`num_layers`个`attn+resnet`

In [38]:
def forward(self, hidden_states: torch.FloatTensor, temb: Optional[torch.FloatTensor] = None) -> torch.FloatTensor:
    hidden_states = self.resnets[0](hidden_states, temb)
    for attn, resnet in zip(self.attentions, self.resnets[1:]):  # 初始化的第一个resnet实际不会用到，只是设计考量
        if attn is not None:
            B, C, T, H, W = hidden_states.shape
            hidden_states = rearrange(hidden_states, "b c f h w -> b (f h w) c")  # rearrage是为了便于生成因果掩码, (掩码从下一帧第一个token开始，此前为0,此后为-inf)
            attention_mask = prepare_causal_attention_mask(
                T, H * W, hidden_states.dtype, hidden_states.device, batch_size=B
            )
            hidden_states = attn(hidden_states, temb=temb, attention_mask=attention_mask)  # temb被用在spatial_norm里
            hidden_states = rearrange(hidden_states, "b (f h w) c -> b c f h w", f=T, h=H, w=W)
        hidden_states = resnet(hidden_states, temb)  # temb被用在多个norm和time_emb_proj里,并参与残差lian'jie
        
    return hidden_states

5. ResnetBlockCausal3D

pipeline是`norm1 ->activate ->causalConv1 ->norm2 ->(Optional)+temb ->dropout ->causalConv2 ->(Optional)shortcutConv ->resConnection`,需要额外关注的点是

- time_embedding
  - 如果`time_embedding_norm`是`"ada_group"`或`"spatial"`,则`norm1`和`norm2`接受`temb`作为参数。(`norm2`需经`temb_proj`投影)
  - 接着,若`temb_channels`不为`None`，且`time_embedding_norm`是`"default"`或`"scale_shift"`,同时`time_emb_proj`存在，并且`skip_time_act=False`,
  - 则`temb`会被投影至与`hidden_states`相同形状
- shortcutConv

  会在`in_channels != out_channels`时启用，`kernel=stride=1`, 用于将`input`的通道数卷到和处理后的`hidden_states`相同维数,用于残差连接

### 几个待处理的问题

1. 搞清楚`layer_norm`,  `group_norm`, `spatial_norm`, `batch_norm`分别是什么？有什么应用场景？使用的原因分别是什么？
2. 搞清楚UNet中`channels`的变化
3. 搞清楚`time_embedding`是如何参与到`norm`中的，以及还出现在除了残差块中的何处
4. 搞清楚UNet中cross_attention出现的位置、输入输出的数量和形状
5. 寻找上采样不使用反卷积的原因
6. 寻找`latent_concat`在通道维度拼接的原因