In [1]:
import torch
import torch.nn as nn
import numpy as np
import random

In [2]:
#Train setting
BATCH_SIZE = 8 #2560
LR = 0.0001 # 0.0001
NUM_EPOCHS = 1000 #500
MAX_LEN = 3
DIM_EMBEDDING = 128
MASK_IDX = 1 #0:5
OUTPUT_IDX = 2

In [3]:

class autoencoder(nn.Module):
    def __init__(self,
                dropout=0.2,
                num_heads=8,
                vocab_size = MAX_LEN,
                d_embedding=32,
                num_encoder_layers=1,
                output_idx = 2
                ):
        super(autoencoder, self).__init__()
        
        self.output_idx = output_idx
        
        self.embeddingLayer_encoder = nn.Embedding(vocab_size, d_embedding)

        ##Encoder 
        encoder_layers = nn.TransformerEncoderLayer(d_model=d_embedding, nhead=num_heads,dim_feedforward=1024,dropout=dropout)
        encoder_norm = nn.LayerNorm(d_embedding)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers,num_encoder_layers,encoder_norm)
        
        ##output
        self.fc_out = nn.Linear(d_embedding, MAX_LEN)

    def forward(self,src,src_mask):
        
        src = self.embeddingLayer_encoder(src)

        #Encoder
        src = src.permute(1,0,2) #seq * batch * feature
        output = self.transformer_encoder(src=src,mask=src_mask)
        output = output.permute(1,0,2) #batch * seq * feature
        
        output = self.fc_out(output)
        
        ##Attention
        attn_output_weights = self.transformer_encoder.layers[0].self_attn(src, src, src,attn_mask=src_mask)[1]
        # attn_output_weights = torch.sum(attn_output_weights,dim=1)

         
        return output[:,self.output_idx,:],attn_output_weights[:,self.output_idx,:]

##创建模型，只输出transformer_encoder某个位置的输出结果和注意力权重
device = "cpu"
model = autoencoder(num_encoder_layers = 1,d_embedding=DIM_EMBEDDING,output_idx=OUTPUT_IDX).to(device)


In [4]:

def get_batch_data(batch_size = 0):
    '''
    Batch * len
    '''
    x = list()
    y = list()
    tokenizer = {
                'A':0,
                'B':1,
                'C':2}
    label_embedding = {'A':[1,0,0],
                      'B': [0,1,0],
                      'C': [0,0,1]}
    for n in range(batch_size):
        sample = list()
        label = list()
        for m in range(MAX_LEN):
            char = random.choice(['A','B','C']) 
            sample.append(tokenizer[char])
            label.append(label_embedding[char])
        x.append(sample)
        y.append(label)
        
    x = torch.tensor(x).type(torch.int64)
    y = torch.tensor(y).type(torch.float32)
    return x,y


In [5]:
##创建src_mask，遮挡输入序列中第MASK_IDX个位置的信息
src_mask =  torch.zeros(MAX_LEN,MAX_LEN)
src_mask[:,MASK_IDX] = 1
src_mask = src_mask.masked_fill(src_mask == 1, float('-inf'))
print(src_mask)

tensor([[0., -inf, 0.],
        [0., -inf, 0.],
        [0., -inf, 0.]])


In [6]:
###开始验证transformer_encoder的第OUTPUT_IDX能否看见第MASK_IDX个输入
optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=0.01) #0.01
criterion = torch.nn.MSELoss()
for epoch in range(NUM_EPOCHS):
    ##Train
    model.train()
    
    #Get data
    x,y = get_batch_data(BATCH_SIZE)
    
    #Calculate output
    optimizer.zero_grad()
    pre,attn_output_weights = model(x,src_mask)
    
    #Calculate loss
    loss = criterion(pre,y[:,1,:])
    
    #Optimize the model
    loss.backward()
    optimizer.step()
    
    ##Test
    if (epoch % 100) == 0:
        model.eval()
        #Get data
        x,y = get_batch_data(1000)
        
        with torch.no_grad():
            pre,attn_output_weights = model(x,src_mask)
            pre_idx = torch.argmax(pre,1)
            gt = torch.argmax(y[:,1,:],1)
            print('{:.1f}%的样本被预测正确'.format(torch.sum(pre_idx==gt)/10))
            # print(torch.sum(attn_output_weights,dim=0))
        

29.8%的样本被预测正确
31.0%的样本被预测正确
34.5%的样本被预测正确
34.0%的样本被预测正确
33.5%的样本被预测正确
32.9%的样本被预测正确
29.7%的样本被预测正确
35.1%的样本被预测正确
35.2%的样本被预测正确
34.1%的样本被预测正确


In [7]:
##修改src_mask，不再遮挡输入序列中第MASK_IDX个位置的信息
src_mask =  torch.zeros(MAX_LEN,MAX_LEN)
# src_mask[:,MASK_IDX] = 1
src_mask = src_mask.masked_fill(src_mask == 1, float('-inf'))
print(src_mask)

tensor([[0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.]])


In [8]:
###重新测试transformer_encoder的第OUTPUT_IDX能否看见第MASK_IDX个输入
optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=0.01) #0.01
criterion = torch.nn.MSELoss()
for epoch in range(NUM_EPOCHS):
    ##Train
    model.train()
    
    #Get data
    x,y = get_batch_data(BATCH_SIZE)
    
    #Calculate output
    optimizer.zero_grad()
    pre,attn_output_weights = model(x,src_mask)
    
    #Calculate loss
    loss = criterion(pre,y[:,1,:])
    
    #Optimize the model
    loss.backward()
    optimizer.step()
    
    ##Test
    if (epoch % 100) == 0:
        model.eval()
        #Get data
        x,y = get_batch_data(1000)
        
        with torch.no_grad():
            pre,attn_output_weights = model(x,src_mask)
            pre_idx = torch.argmax(pre,1)
            gt = torch.argmax(y[:,1,:],1)
            print('{:.1f}%的样本被预测正确'.format(torch.sum(pre_idx==gt)/10))
            # print(torch.sum(attn_output_weights,dim=0))
        

29.7%的样本被预测正确
65.3%的样本被预测正确
66.7%的样本被预测正确
69.4%的样本被预测正确
65.6%的样本被预测正确
66.3%的样本被预测正确
65.6%的样本被预测正确
67.4%的样本被预测正确
66.4%的样本被预测正确
64.9%的样本被预测正确


In [9]:
###测试src_mask对角线遮挡是否有用
src_mask =  torch.zeros(MAX_LEN,MAX_LEN) 
src_mask[:,MASK_IDX] = 1 ##创建src_mask，遮挡输入序列中第MASK_IDX个位置的信息
src_mask = src_mask.masked_fill(src_mask == 1, float('-inf'))
print(src_mask)

#重新创建模型
OUTPUT_IDX = 1
model = autoencoder(num_encoder_layers = 1,d_embedding=DIM_EMBEDDING,output_idx=OUTPUT_IDX).to(device)

###重新测试transformer_encoder的第OUTPUT_IDX能否看见第MASK_IDX个输入
optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=0.01) #0.01
criterion = torch.nn.MSELoss()
for epoch in range(NUM_EPOCHS):
    ##Train
    model.train()
    
    #Get data
    x,y = get_batch_data(BATCH_SIZE)
    
    #Calculate output
    optimizer.zero_grad()
    pre,attn_output_weights = model(x,src_mask)
    
    #Calculate loss
    loss = criterion(pre,y[:,1,:])
    
    #Optimize the model
    loss.backward()
    optimizer.step()
    
    ##Test
    if (epoch % 100) == 0:
        model.eval()
        #Get data
        x,y = get_batch_data(1000)
        
        with torch.no_grad():
            pre,attn_output_weights = model(x,src_mask)
            pre_idx = torch.argmax(pre,1)
            gt = torch.argmax(y[:,1,:],1)
            print('{:.1f}%的样本被预测正确'.format(torch.sum(pre_idx==gt)/10))
            # print(torch.sum(attn_output_weights,dim=0))
print('随便输出一个样本的注意力权重看看被遮挡位置的注意力权重是否为0')
print(attn_output_weights[0]) 

tensor([[0., -inf, 0.],
        [0., -inf, 0.],
        [0., -inf, 0.]])
45.5%的样本被预测正确
100.0%的样本被预测正确
100.0%的样本被预测正确
100.0%的样本被预测正确
100.0%的样本被预测正确
100.0%的样本被预测正确
100.0%的样本被预测正确
100.0%的样本被预测正确
100.0%的样本被预测正确
100.0%的样本被预测正确
随便输出一个样本的注意力权重看看被遮挡位置的注意力权重是否为0
tensor([0.2055, 0.0000, 0.6865], grad_fn=<SelectBackward>)


In [10]:
#测试对TransformerEncoderLayer输出序列的某个位置而言，所有没有被遮挡掉的输入序列位置的注意力权重都非常相近
##修改src_mask，不再遮挡输入序列中第MASK_IDX个位置的信息
src_mask =  torch.zeros(MAX_LEN,MAX_LEN)
src_mask = src_mask.masked_fill(src_mask == 1, float('-inf'))
print(src_mask)

#重新创建模型
OUTPUT_IDX = 2
model = autoencoder(num_encoder_layers = 1,d_embedding=DIM_EMBEDDING,output_idx=OUTPUT_IDX).to(device)


###重新测试transformer_encoder的第OUTPUT_IDX能否看见第MASK_IDX个输入
optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=0.01) #0.01
criterion = torch.nn.MSELoss()
for epoch in range(500):
    ##Train
    model.train()
    
    #Get data
    x,y = get_batch_data(BATCH_SIZE)
    
    #Calculate output
    optimizer.zero_grad()
    pre,attn_output_weights = model(x,src_mask)
    
    #Calculate loss
    loss = criterion(pre,y[:,1,:])
    
    #Optimize the model
    loss.backward()
    optimizer.step()
    
    ##Test
    if (epoch % 100) == 0:
        model.eval()
        #Get data
        x,y = get_batch_data(1000)
        
        with torch.no_grad():
            pre,attn_output_weights = model(x,src_mask)
            pre_idx = torch.argmax(pre,1)
            gt = torch.argmax(y[:,1,:],1)
            # print('{:.1f}%的样本被预测正确'.format(torch.sum(pre_idx==gt)/10))
            print(torch.mean(attn_output_weights,dim=0))
        

tensor([[0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.]])
tensor([0.3422, 0.3369, 0.3209])
tensor([0.3440, 0.3425, 0.3136])
tensor([0.3456, 0.3459, 0.3085])
tensor([0.3493, 0.3474, 0.3033])
tensor([0.3502, 0.3465, 0.3033])


In [11]:
##修改src_mask，遮挡输入序列中第1个位置的信息
src_mask =  torch.zeros(MAX_LEN,MAX_LEN)
src_mask[:,0] = 1
# src_mask[:,2] = 1
src_mask = src_mask.masked_fill(src_mask == 1, float('-inf'))
print(src_mask)

#重新创建模型
OUTPUT_IDX = 2
model = autoencoder(num_encoder_layers = 1,d_embedding=DIM_EMBEDDING,output_idx=OUTPUT_IDX).to(device)


###重新测试transformer_encoder的第OUTPUT_IDX能否看见第MASK_IDX个输入
optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=0.01) #0.01
criterion = torch.nn.MSELoss()
for epoch in range(500):
    ##Train
    model.train()
    
    #Get data
    x,y = get_batch_data(BATCH_SIZE)
    
    #Calculate output
    optimizer.zero_grad()
    pre,attn_output_weights = model(x,src_mask)
    
    #Calculate loss
    loss = criterion(pre,y[:,1,:])
    
    #Optimize the model
    loss.backward()
    optimizer.step()
    
    ##Test
    if (epoch % 100) == 0:
        model.eval()
        #Get data
        x,y = get_batch_data(1000)
        
        with torch.no_grad():
            pre,attn_output_weights = model(x,src_mask)
            pre_idx = torch.argmax(pre,1)
            gt = torch.argmax(y[:,1,:],1)
            print('{:.1f}%的样本被预测正确'.format(torch.sum(pre_idx==gt)/10),end='; ')
            print(torch.mean(attn_output_weights,dim=0))
        

tensor([[-inf, 0., 0.],
        [-inf, 0., 0.],
        [-inf, 0., 0.]])
52.8%的样本被预测正确; tensor([0.0000, 0.4980, 0.5020])
100.0%的样本被预测正确; tensor([0.0000, 0.5983, 0.4017])
100.0%的样本被预测正确; tensor([0.0000, 0.6258, 0.3742])
100.0%的样本被预测正确; tensor([0.0000, 0.6506, 0.3494])
100.0%的样本被预测正确; tensor([0.0000, 0.6634, 0.3366])
