In [None]:
import torch
import torch.nn as nn
from einops import rearrange

# 关于机器学习中的mask
在类似于transformer等模型中，不论是进行NLP任务还是CV任务，都会有可能碰到有关mask的问题。
mask往往见于MLM、通过transformer encoder进行NLP任务中的pad mask、transformer decoder的seq mask、图像上的mae等等

## multihead-self-attention中的pad mask
在multihead attention中，attention_score往往是[batch, head, seq_len, seq_len]维度，表示的是对于batch中每个样本，每个head上，每个位置对于包括自己的共seq_len个位置的注意力。
而pad mask往往是[batch, seq_len]维度，表示的是对于每个样本，在长为seq_len的位置上，哪几个是pad token，需要mask而不参与attention计算

In [10]:
# 下方为multihead self-attention的经典基本实现（仅有pad mask）


class Attention(nn.Module):
    # multi-head attention
    def __init__(self, head, dim, dropout=0.):
        """
        head: 头个数
        dim: 原始维度
        """
        super().__init__()
        assert dim % head == 0
        self.head = head
        self.dim = dim
        self.head_dim = dim // head
        self.to_qkv = nn.Linear(self.dim, 3 * self.dim, bias=False)
        # 将qkv用一个linear一起完成
        self.scale = self.head_dim ** -0.5
        self.attend = nn.Softmax(-1)
        self.output_proj = nn.Sequential(
            nn.Linear(dim, dim),
            nn.Dropout(dropout)
        )

    def forward(self, x, mask=None):
        # x: [B, S, E]
        # mask: [B, S]

        qkv = self.to_qkv(x).chunk(3, dim=-1)
        # 经过to_qkv计算得到[B, S, 3*E]， 之后chunk分割得到qkv: [3, B S, E]

        query, key, value = map(lambda t: rearrange(t, 'b s (h d) -> b h s d', h=self.head), qkv)
        # 对于每个[b, s, e]的对象 分割为[b, s, h, d]再转置为[b h s d]

        attention_score = torch.matmul(query, key.transpose(-1, -2)) * self.scale
        # [B, head, S, S]

        if mask is not None:
            attention_score = attention_score.masked_fill(mask == 0, -1e9)
            # mask矩阵中为0为需要mask的地方 mask值为负无穷则softmax为0
        attention = self.attend(attention_score)

        output = torch.matmul(attention, value)
        # [B, head, S, head_dim]
        output = rearrange(output, 'b h s d -> b s (h d)')

        output = self.output_proj(output)
        return output

In [6]:
attention_object = Attention(head=2, dim=8)
random_input = torch.randn(size=(3, 2, 8))
mask = None
output = attention_object(random_input, mask)
output.shape

torch.Size([3, 2, 8])

在mask为None的时候，正常计算不报错

In [11]:
attention_object = Attention(head=2, dim=8)
random_input = torch.randn(size=(3, 2, 8))
mask = torch.randint(0, 2, size=(3, 2)) == 0.
output = attention_object(random_input, mask)
output.shape

RuntimeError: The size of tensor a (3) must match the size of tensor b (2) at non-singleton dimension 2

值得注意的是，在加入mask后就报错了。 这是一个广播上的报错 masked_fill方法是通过广播机制实现
此时attention_score为[3,2,2,2] 而mask为[3,2] 不满足广播条件。

此时，需要让mask满足广播条件，应该将其扩展为[3,1,1,2]。
该过程的理解为：
1. 3与3相同是batch维度，每个样本代表自己的没必要进行广播；
2. 原始mask的第二维为2 是seq_len长度，就是对于一个样本而言，sequence上哪些位置是需要mask的，attention的最后两维均为seq_len，表示的是对于每个位置，相对于包括自己的所有位置的attention注意力值
3. 那么需要将其在导数第二维进行扩充，变为[3,1,2]表示的是对于每个位置，都需要mask掉一些位置
4. 之后，对于每个head而言，都是完全相同的计算，现在有两个head，每个head都是seq_len*seq_len的注意力，那么就将mask扩展到[3,1,1,2]上，满足广播条件

In [12]:
attention_object = Attention(head=2, dim=8)
random_input = torch.randn(size=(3, 2, 8))

mask = torch.randint(0, 2, size=(3, 2)) == 0.
mask = rearrange(mask, 'b s -> b 1 1 s')
# 扩充维度 满足广播条件

output = attention_object(random_input, mask)
output.shape

torch.Size([3, 2, 8])