In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class PositionEncoding(nn.Module):
    def __init__(self, d_model, dropout = 0.1, max_len = 5000): #d_model 是每个时间点的特征维度长度(可以设置超过实际特征长度来实现AutoML类似的新特征学习),max_len对应的是seq_len的长度
        super().__init__()
        self.dropout == nn.Dropout(p=dropout)
        pe = torch.zeros(max_len,d_model)
        position  = torch.arange(0,max_len,dtype = torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0,d_model,2).float()*(math.log(10000.0)/d_model))
        pe[:,0::2] = torch.sin(position*div_term)
        pe[:,1::2] = torch.cos(position *div_term)
        pe = pe.unsqueeze(0).transpose(0,1)
        self.register_buffer('pe',pe)
    
    def forward(self,x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)
    
class Transformer(nn.Module):
    def __init__(self, input_dim , output_dim, hidden_dim, num_layers, num_heads, dropout):
        super().__init__()
        self.encoder = TransformerEncoder(input_dim, hidden_dim, num_layers, num_heads, dropout)
        self.decoder = TransformerDecoder(output_dim, hidden_dim, num_layers, num_heads, dropout)
        self.linear = nn.Linear(hidden_dim, output_dim)
    
    def forward(self, src, trg):
        encodings = self.encoder(src)
        decodings = self.decoder(trg,encodings)
        output = self.linear(decodings)
        return output
    
class TransformerEncoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, num_heads, dropout):
        super().__init__()
        self.pos.encoding = PositionalEncoding(hidden_dim, dropout)
        self.transformer_blocks = nn.ModuleList([
            TransformerBlock(hidden_dim, num_heads, dropout) for _ in range(num_layers)
        ])
        self.linear = nn.Linear(input_dim, hidden_dim)
        
    def forward(self, x):
        x = self.linear(x)
        x = self.pos_encoding(x)
        for transformer_block in self.transformer_blocks:
            x = transformer_block(x, encodings)
        return x 
    
class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, dim_feedforward, dropout=0.1):
        super().__init__()
        
        self.self_attention = nn.MultiheadAttention(d_model, num_heads)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, dim_feedforward),
            nn.ReLU(),
            nn.Linear(dim_feedforward, d_model)
        )
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
       
    def forward(self, x, mask=None):
        residual = x
        x = self.norm1(x)
        x, _ = self.self_attention(x, x, x, mask)
        x = self.dropout(x)
        x = x + residual

        residual = x
        x = self.norm2(x)
        x = self.feed_forward(x)
        x = self.dropout(x)
        x = x + residual

        return x
        
        
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention,self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_size = d_model // num_heads
        self.linear_q = nn.Linear(d_model, d_model)
        self.linear_k = nn.Linear(d_model, d_model)
        self.linear_v = nn.Linear(d_model, d_model)
        self.linear_out = nn.Linear(d_model, d_model)
    
    def forward(self,query, key, value,mask=None):
        batch_size = query.size(0)

        # 将输入的query、key、value先经过线性变换
        query = self.linear_q(query)
        key = self.linear_k(key)
        value = self.linear_v(value)

        # 将输入的query、key、value进行reshape
        query = query.view(batch_size * self.num_heads, -1, self.head_size).transpose(1, 2)
        key = key.view(batch_size * self.num_heads, -1, self.head_size).transpose(1, 2)
        value = value.view(batch_size * self.num_heads, -1, self.head_size).transpose(1, 2)
        
        # 计算注意力分数
        scores = torch.bmm(query, key.transpose(1, 2)) / math.sqrt(self.head_size)

        if mask is not None:
            mask = mask.unsqueeze(1).repeat(1, self.num_heads, 1, 1)
            scores.masked_fill_(mask, -1e9)
            
        # 计算注意力分布
        attn_weights = F.softmax(scores, dim=-1)
        attn_weights = F.dropout(attn_weights, p=0.1)

        
        # 计算注意力加权的value
        attn_output = torch.bmm(attn_weights, value)
        
        # 将多头注意力得到的结果进行拼接，并进行线性变换
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.head_size)
        attn_output = self.linear_out(attn_output)

        return attn_output, attn_weights


class TransformerDecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, dim_feedforward, dropout=0.1):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(d_model, num_heads, dropout=dropout)
        self.cross_attn = nn.MultiheadAttention(d_model, num_heads, dropout=dropout)

        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.linear2 = nn.Linear(dim_feedforward, d_model)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)

        self.dropout = nn.Dropout(dropout)

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None):
        """
        :param tgt: 目标语言的输入，shape: (target_len, batch_size, d_model)
        :param memory: 编码器的输出，shape: (src_len, batch_size, d_model)
        :param tgt_mask: 目标语言的掩码矩阵，shape: (target_len, target_len)
        :param memory_mask: 编码器的掩码矩阵，shape: (target_len, src_len)
        :return: 解码器的输出，shape: (target_len, batch_size, d_model)
        """
        # self-attention
        tgt2 = self.norm1(tgt)
        tgt2, _ = self.self_attn(tgt2, tgt2, tgt2, attn_mask=tgt_mask, key_padding_mask=None)
        tgt = tgt + self.dropout(tgt2)

        # multi-head attention
        tgt2 = self.norm2(tgt)
        tgt2, _ = self.cross_attn(tgt2, memory, memory, attn_mask=memory_mask, key_padding_mask=None)
        tgt = tgt + self.dropout(tgt2)

        # feedforward
        tgt2 = self.norm3(tgt)
        tgt2 = F.relu(self.linear1(tgt2))
        tgt2 = self.linear2(tgt2)
        tgt = tgt + self.dropout(tgt2)

        return tgt
    
class TransformerDecoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff, dropout):
        super(TransformerDecoder, self).__init__()

        self.layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None):
        output = tgt

        for layer in self.layers:
            output = layer(output, memory, tgt_mask=tgt_mask, memory_mask=memory_mask)

        output = self.norm(output)

        return output


  from .autonotebook import tqdm as notebook_tqdm
