In [1]:
import torch 
import torch.nn as nn  

In [2]:
# Self attentention
class SelfAttention(nn.Module): #inherith from nn module
    def __init__(self,embed_size, heads): # dato un embedding se lo dividiamo in n parti, ciascuna di queste sarà una head (ex:emb_size=256, se splittiamo in 6 --> ogni head sarà di 32)
        super(SelfAttention,self).__init__() #initi inizializerà la classe parente
        self.embed_size = embed_size
        self.heads = heads
        self.head_dim = embed_size // heads # // is an integer division
        assert(self.head_dim * heads == embed_size, "Embed size needs to be divisible by heads")

        #definiamo i linear layers che andremo a passare ai volori di key, query and values
        self.values = nn.Linear(embed_size, embed_size)
        self.keys = nn.Linear(embed_size, embed_size)
        self.queries = nn.Linear(embed_size, embed_size)
        self.fc_out = nn.Linear(embed_size, embed_size)

        #self.values = nn.Linear(self.head_dim, self.head_dim, bias = False) # 1°termine è il valore degli input layer secondo è il valore dell'output layer (etichette)
        #self.keys = nn.Linear(self.head_dim, self.head_dim, bias = False)
        #self.queries = nn.Linear(self.head_dim, self.head_dim, bias = False)
        #self.fc_out = nn.Linear(heads * self.head_dim,embed_size) #nel primo temrine ricostruisco la dimensione dell'embedding e lo rimappo nello stesso spazio


    def forward(self, values, keys, query, mask): # !!! attenzione la query è singola! non è queries !!!
        N = query.shape[0] # definisco il numero di traing_example da mandare allo steso momento
        value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]  #da capire meglio, mi sembra che voglia capire le dimensioni di keyS,valueS (tante) e query (singola)


        values = self.values(values)
        keys = self.keys(keys)
        queries = self.queries(query)
        
        # Split embedding into self.heads piecies
        values = values.reshape(N, value_len, self.heads, self.head_dim) #self.heads, self.head_dim è come andremo a splittarlo
        keys = keys.reshape(N, key_len, self.heads, self.head_dim)
        queries = queries.reshape(N, query_len, self.heads, self.head_dim)



        #we use EINSUM for matrix multimplication
        energy = torch.einsum("nqhd,nkhd->nhqk",[queries,keys])
        # queries shape: (N, query_len, heads, head_dim)
        # keys shape: (N, key_len, heads, head_dim)
        # energy shape: (N, heads, query_len, key_len)
        if mask is not None:
            energy = energy.masked_fill(mask == 0, float ("-1e20")) #if the element of the mask is 0, it doesen't impact any others so we want to replace them with a -infinity, utile per la softamax
        
        attention = torch.softmax(energy / (self.embed_size **(1/2)), dim = 3) # it means that we'll normalize across the key lenght! if we look at energy shape: (N, heads, query_len, key_len) it will be across dim =3
        out = torch.einsum("nhql,nlhd->nqhd",[attention,values]) #we are colling l the dimension that we want to nmultiply across (so key_len and value_len ar both l now)
        # attention shape: (N, heads, query_len, key_len) [same of the energy shape]
        # values shape: (N, value_len, heads, head_dim)
        # we want get (N, query_len, heads, heads_dim), to do this key_len and value_len must be the same! so we ca multiply across that dim
        out = out.reshape(N, query_len, self.heads * self.head_dim) #make this reshape in order to flatten the last two dimension into one single dimension, which is the original embed size!

        #lastly send the output into a FC ! it maps the embed size to ambed size
        out = self.fc_out(out)
        return out


  assert(self.head_dim * heads == embed_size, "Embed size needs to be divisible by heads")


In [3]:
class TransformerBlock(nn.Module):
    def __init__(self, embed_size, heads, dropout, forward_expansion):
        super(TransformerBlock, self).__init__()
        self.attentention = SelfAttention(embed_size, heads) #we use the attention implemented above
        
        # we have to do two batch normalization
        self.norm1 = nn.LayerNorm(embed_size)  #takes the average for every single example (more computationale expansive) (different from batchNorm tae the average from the batch and then normalize it)
        self.norm2 = nn.LayerNorm(embed_size)

        self.feed_forward = nn.Sequential(
            nn.Linear(embed_size, forward_expansion * embed_size), #it takes as the input layer the embed_size (dim = embed_size) and map into dim = forward_expansion * embed_size
            nn.ReLU(),
            nn.Linear(forward_expansion * embed_size, embed_size) # this layer map into the original dimension
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, value, key, query, mask):
        attention = self.attentention(value, key, query, mask)
        x = self.dropout(self.norm1(attention + query)) # we sum query because we're adding skip connection
        forward = self.feed_forward(x)
        out = self.dropout(self.norm2(forward + x)) # now we pass the forward and we sum the copy of the signale (x) before the FF
        return out



In [4]:
class Encoder(nn.Module):
    def __init__(self,
                 src_vocab_size, # we're gonna do the embedding 
                 embed_size,
                 num_layers,
                 heads,
                 device,
                 forward_expansion,
                 dropout,
                 max_lenght,
                 ):
        super(Encoder,self).__init__()
        self.embed_size = embed_size
        self.device = device
        self.word_embedding = nn.Embedding(src_vocab_size, embed_size)
        self.positional_embedding = nn.Embedding(max_lenght,embed_size) #map the dim from the max_lenght to embed_size
        
        
        
        self.num_layers = nn.ModuleList(
              [
                    TransformerBlock(
                          embed_size = embed_size,
                          heads = heads,
                          dropout = dropout,
                          forward_expansion = forward_expansion
                    )

                    for _ in range(num_layers)]
        )
        
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, mask):
        N, seq_length = x.shape
        positions = torch.arange(0,seq_length).expand(N,seq_length).to(self.device) # is for positional embeding (we create sequece from 0 up seq_length)
        out = self.dropout(self.word_embedding(x) + self.positional_embedding(positions)) # non capisco il perchè del dropout dopo la somma dei due embedding

        for layer in self.num_layers:
            out = layer(out, out, out, mask) #al the inputs in the encoder is the same ???

            return out
        



In [5]:
class DecoderBlock(nn.Module):
    def __init__(self, embed_size, heads, forward_expansion, dropout, device):
        super(DecoderBlock,self).__init__()
        self.attention = SelfAttention(embed_size = embed_size, heads = heads)
        self.norm = nn.LayerNorm(embed_size)
        self.transformer_block = TransformerBlock(embed_size = embed_size,
                                                  heads = heads,
                                                  dropout = dropout,
                                                  forward_expansion = forward_expansion)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, value, key, src_mask, trg_mask):
        attention = self.attention(x, x, x, trg_mask)
        query = self.dropout(self.norm(attention + x)) #skip connection
        out = self.transformer_block(value, key, query, src_mask)
        
        return out

In [6]:
class Decoder( nn.Module):
    def __init__(self,
                 trg_vocab_size,
                 embed_size,
                 num_layers,
                 heads,
                 forward_expansion,
                 dropout,
                 device,
                 max_lenght):
        super (Decoder,self).__init__()
        self.device = device
        self.word_embedding = nn.Embedding(trg_vocab_size, embed_size)
        self.position_embedding= nn.Embedding(max_lenght, embed_size)

        self.layers = nn.ModuleList(
            [DecoderBlock(embed_size=embed_size, heads=heads, forward_expansion= forward_expansion, dropout= dropout, device= device)
             for _ in range(num_layers)]
        )

        self.fc_out = nn.Linear(embed_size, trg_vocab_size)
        self.dropout = nn.Dropout(dropout)

        def forward (self, x, enc_out, src_mask, trg_mask):
            N, seq_lenght = x.shape
            positions = torch.arange(0,seq_lenght).expand(N,seq_lenght).to(self.device)
            x = self. dropout((self.word_embedding(x) + self.position_embedding(positions)))

            for layer in self.layers:
                x = layer(x, enc_out, enc_out, src_mask, trg_mask)
            
            out = self.fc_out(x)
        
            return out



In [7]:
class Transformer(nn.Module):
    def __init__(
        self,
        src_vocab_size,
        trg_vocab_size,
        src_pad_idx,
        trg_pad_idx,
        embed_size=512,
        num_layers=6,
        forward_expansion=4,
        heads=8,
        dropout=0,
        device="cpu",
        max_length=100,
    ):

        super(Transformer, self).__init__()

        self.encoder = Encoder(
            src_vocab_size,
            embed_size,
            num_layers,
            heads,
            device,
            forward_expansion,
            dropout,
            max_length,
        )

        self.decoder = Decoder(
            trg_vocab_size,
            embed_size,
            num_layers,
            heads,
            forward_expansion,
            dropout,
            device,
            max_length,
        )

        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.device = device

    def make_src_mask(self, src):
        src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
        # (N, 1, 1, src_len)
        return src_mask.to(self.device)

    def make_trg_mask(self, trg):
        N, trg_len = trg.shape
        trg_mask = torch.tril(torch.ones((trg_len, trg_len))).expand(
            N, 1, trg_len, trg_len
        )

        return trg_mask.to(self.device)

    def forward(self, src, trg):
        src_mask = self.make_src_mask(src)
        trg_mask = self.make_trg_mask(trg)
        enc_src = self.encoder(src, src_mask)
        out = self.decoder(trg, enc_src, src_mask, trg_mask)
        return out


if __name__ == "__main__":
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(device)

    x = torch.tensor([[1, 5, 6, 4, 3, 9, 5, 2, 0], [1, 8, 7, 3, 4, 5, 6, 7, 2]]).to(
        device
    )
    trg = torch.tensor([[1, 7, 4, 3, 5, 9, 2, 0], [1, 5, 6, 2, 4, 7, 6, 2]]).to(device)

    src_pad_idx = 0
    trg_pad_idx = 0
    src_vocab_size = 10
    trg_vocab_size = 10
    model = Transformer(src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx, device=device).to(
        device
    )
    out = model(x, trg[:, :-1]) # è shiftato di uno cosi che non si prende l'end of sentence token 
    print(out.shape)
    

cpu


NotImplementedError: Module [Decoder] is missing the required "forward" function


(src != self.src_pad_idx): Qui, src rappresenta la sequenza di input e self.src_pad_idx rappresenta l'indice del token di padding nell'input. Questa parte del codice confronta ogni elemento di src con l'indice del token di padding. Se un elemento non è uguale all'indice del token di padding, l'espressione restituirà True, altrimenti restituirà False.

.unsqueeze(1): Dopo aver eseguito il confronto, .unsqueeze(1) viene utilizzato per aggiungere una dimensione aggiuntiva all'array booleano risultante lungo l'asse 1. Questo passaggio è importante perché la maschera dovrà essere trasmessa con la forma (N, 1, src_len), dove N è la dimensione del batch e src_len è la lunghezza massima della sequenza di input. L'aggiunta di una dimensione all'array booleano lo espande lungo l'asse 1.

.unsqueeze(2): Successivamente, .unsqueeze(2) viene utilizzato per aggiungere un'altra dimensione all'array booleano lungo l'asse 2. Questo è necessario perché il Transformer richiede che la maschera abbia la forma (N, 1, 1, src_len). Quindi, aggiungendo una dimensione aggiuntiva, l'array booleano viene espanso lungo l'asse 2.


#### Esempio:
Supponiamo di avere una sequenza di input src di forma (3, 4) e un token di padding con indice 0. Ecco un esempio di come la riga di codice src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2) potrebbe funzionare:
import torch

##### Supponiamo che il token di padding sia 0
src_pad_idx = 0

##### Sequenza di input (esempio)
src = torch.tensor([
                
                    [1, 2, 3, 4],

                    [0, 0, 0, 0],

                    [5, 6, 0, 0]])

##### Creazione della maschera di padding
src_mask = (src != src_pad_idx).unsqueeze(1).unsqueeze(2)

print("Maschera di padding:")
print(src_mask)

##### Maschera di padding:
tensor([[
        
        [True,  True,  True,  True]],

        [[False, False, False, False]],

        [[True,  True, False, False]]])


In questo esempio:
La sequenza di input src ha forma (3, 4).

L'indice del token di padding src_pad_idx è 0.

La maschera di padding risultante src_mask avrà la forma (3, 1, 1, 4):

True indica i token validi.

False indica i token di padding.

Nella prima riga della maschera, tutti i token sono validi. Nella seconda riga, tutti i token sono di padding. Nella terza riga,
solo i primi due token sono validi, mentre gli altri due sono di padding.

