<a href="https://colab.research.google.com/github/GhadaJeddey/AttentionIsAllYouNeed/blob/main/AttentionMechanism.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch.nn.functional as f
import torch.nn as nn
import torch

# Simple Attention

In [None]:
class SimpleAttention(nn.Module) :
  def __init__(self,d_model,dropout=0.1):
    super(SimpleAttention,self).__init__()
    self.d_model = d_model # Dimension of token embeddings and attention space

    self.query_layer = nn.Linear(d_model,d_model, bias = False )
    self.key_layer = nn.Linear(d_model,d_model, bias = False)
    self.value_layer = nn.Linear(d_model,d_model, bias = False)
    self.dropout = nn.Dropout(dropout) # Residual Dropout page 8


  def forward(self,x,mask=None):

    Q = self.query_layer(x)
    K = self.key_layer(x)
    V = self.value_layer(x )

    batch_size, seq_length, _ = x.shape

    attention_scores = torch.matmul(Q,K.transpose(-2,-1)) / (self.d_model **0.5) # matrix
    print(attention_scores.shape)

    if mask is not None :
      mask = torch.tril(torch.ones(seq_len, seq_len)).to(x.device)
      mask = mask.unsqueeze(0).expand(batch_size, -1, -1)
      attention_scores = attention_scores.masked_fill_(mask == 0,float('-inf'))

    attention_weights = f.softmax(attention_scores,dim=-1)
    attention_weights = self.dropout(attention_weights)# Residual Dropout page 8

    output = torch.matmul(attention_weights, V) # outputs a tensor with context added

    return attention_weights , output



# MultiHead Attention


In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, h, dropout = 0.1):
        super(MultiHeadAttention,self).__init__()

        assert d_model % h == 0, "d_model must be divisible by h"

        #model parameters
        self.d_model = d_model # Dimension of token embeddings and attention space
        self.h = h
        self.d_k = d_model // self.h

        #Layers
        self.query_layer = nn.Linear(d_model,d_model, bias = False )
        self.key_layer = nn.Linear(d_model,d_model, bias = False)
        self.value_layer = nn.Linear(d_model,d_model, bias = False)
        self.dropout = nn.Dropout(dropout) # Residual Dropout page 8
        self.projection = nn.Linear(d_model, d_model)

    def attention(self , Q , K , V , mask =None):
        attention_scores= torch.matmul(Q,K.transpose(-2,-1)) / (self.d_k ** 0.5)
        attention_weights = f.softmax(attention_scores,dim=-1)
        attention_weights = self.dropout(attention_weights)# Residual Dropout page 8

        if mask is not None:
            attention_weights = attention_weights.masked_fill(mask == 0, float('-inf'))

        attention_weights = f.softmax(attention_weights, dim=-1)

        output = torch.matmul(attention_weights, V) #  shape : (batch_size, h, seq_len, d_k)
        return output

    def forward(self, query, key, value, mask=None):

      batch_size = query.size(0)

      Q = self.query_layer(query).view(batch_size,  -1, self.h, self.d_k).transpose(1, 2)
      K = self.key_layer(key).view(batch_size,  -1, self.h, self.d_k).transpose(1, 2)
      V = self.value_layer(value ).view(batch_size,  -1, self.h, self.d_k).transpose(1, 2)

      attention = self.attention(Q,K,V,mask)

      #Concatenate  heads
      attention = attention.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model) #contiguous() ensures the tensor is laid out correctly in memory before reshaping.
                                                                                                # PyTorch requires this when you .view() after a .transpose().
      output = self.projection(attention) # Allows the model to learn how to weight the combined head outputs

      return output





# Positional Encoding

In [None]:
import math

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # shape : (max_lem,1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        # even indices with sine, odd indices with cosine
        pe[:, 0::2] = torch.sin(position * div_term) # 0::2 -> start at 0 with step =2
        pe[:, 1::2] = torch.cos(position * div_term) # 1::2 -> start at 1 with step =2

        # Add batch dimension
        self.register_buffer('pe', pe.unsqueeze(0))  #registers the positional encoding as a buffer, so it won’t be updated during training.(during back prop)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return x

#Transformer Block

In [None]:
class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads,dropout=0.1,use_ffn=True ,mask=None):
        super(TransformerBlock, self).__init__()

        self.attention = MultiHeadAttention(d_model, num_heads, dropout)
        self.addnorm1 = nn.LayerNorm(d_model)
        self.use_ffn = use_ffn

        if self.use_ffn :
          self.ffn = nn.Sequential(
              nn.Linear(d_model, 4 * d_model , bias =True),
              nn.ReLU(),
              nn.Linear(4 * d_model, d_model , bias =True)
          )
          self.addnorm2 = nn.LayerNorm(d_model)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output=None, src_mask=None, tgt_mask=None):

        # Self attention
        if enc_output is None:
            attn_output = self.attention(x, x, x, tgt_mask)
            x = self.addnorm1(x + self.dropout(attn_output))

        # Encoder-decoder attention
        else:
            attn_output = self.attention(x, enc_output, enc_output, src_mask)

        x = self.addnorm1(x + self.dropout(attn_output))

        if self.use_ffn:
            ffn_output = self.ffn(x)
            x = self.addnorm2(x + ffn_output)

        return x



# Encoder

In [None]:
class Encoder(nn.Module):
    def __init__(self,
                 d_model,
                 num_heads,
                 num_layers,
                 num_embeddings,
                 dropout=0.1,
                 max_len=5000):

        super(Encoder,self).__init__()

        self.d_model = d_model
        self.pe = PositionalEncoding(d_model, dropout , max_len)
        self.embedding = nn.Embedding(num_embeddings, d_model)
        self.dropout = nn.Dropout(dropout)

        self.layers = nn.ModuleList([TransformerBlock(d_model, num_heads,dropout=0.1,use_ffn=True,mask=None) for _ in range(num_layers)])

    def forward(self, x,mask=None):
        x = self.embedding(x) * math.sqrt(self.d_model) #Helps stabilize gradients by scaling embeddings
        x = self.dropout(self.pe(x))

        for layer in self.layers:
            x = layer(x, tgt_mask=mask)

        return x


# Decoder

In [None]:
class Decoder(nn.Module):
    def __init__(self,num_embeddings, d_model, num_heads, num_layers, dropout=0.1, max_len=5000):

        super(Decoder, self).__init__()
        self.d_model = d_model

        self.embedding = nn.Embedding(num_embeddings, d_model)
        self.pe = PositionalEncoding(d_model, dropout, max_len)
        self.layers = nn.ModuleList([
            nn.ModuleList([
                TransformerBlock(d_model, num_heads,dropout=0.1,use_ffn=False,mask=None),
                TransformerBlock(d_model, num_heads,dropout=0.1,use_ffn=True,mask=None)

            ])
            for _ in range(num_layers)
        ])

        self.linear = nn.Linear(d_model, num_embeddings)

    def forward(self, x,enc_output, src_mask=None, tgt_mask=None):
        x = self.embedding(x) * math.sqrt(self.d_model)
        x = self.pe(x)

        for block_pair in self.layers:
            x = block_pair[0](x, enc_output, src_mask, tgt_mask)  # without FFN
            x = block_pair[1](x, enc_output, src_mask, tgt_mask)  # with FFN

        return self.linear(x)


# Transformer

In [None]:
class Transformer(nn.Module):
    def __init__(self , num_embeddings, d_model=512, num_heads=8 , num_layers=6, dropout=0.1, max_len=5000):
        super(Transformer, self).__init__()

        self.encoder = Encoder(d_model, num_heads, num_layers, num_embeddings,dropout, max_len)
        self.decoder = Decoder(num_embeddings, d_model, num_heads, num_layers, dropout, max_len)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        enc_output = self.encoder(src, src_mask)
        output = self.decoder(tgt, enc_output, src_mask, tgt_mask)
        return output


# Training and Testing

In [None]:
!pip install datasets
!pip install transformers
!pip install py7zr
!pip install tokenizers
!pip install rouge_score



## Load Dataset

In [None]:
from datasets import load_dataset
dataset = load_dataset("cnn_dailymail", "3.0.0")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [None]:
train_dataset = dataset["train"]
test_dataset = dataset["test"]
val_dataset = dataset["validation"]

## Tokenizer

In [None]:
texts = [item['article'] + '' + item['highlights'] for item in train_dataset ]
with open('train.txt', 'w',encoding='utf-8') as f:
    for text in texts:
        f.write(text.strip() + '\n')

In [None]:
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace
from torch.nn.utils.rnn import pad_sequence
from google.colab import drive
drive.mount('/content/drive')

class BPETokenizer:
    def __init__(self, vocab_size=30000, special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"]):
        self.special_tokens = special_tokens
        self.tokenizer = Tokenizer(BPE())
        self.tokenizer.pre_tokenizer = Whitespace()
        self.trainer = BpeTrainer(vocab_size=vocab_size, special_tokens=special_tokens)

        # Will be set after training or loading
        self.pad_token = "<pad>"
        self.pad_id = None

    def train(self, files, save_path="my_tokenizer.json"):
        self.tokenizer.train(files, self.trainer)
        self.tokenizer.save(save_path)
        print(f"Tokenizer saved to {save_path}")
        self._set_pad_id()

    def load(self, path="my_tokenizer.json"):
        self.tokenizer = Tokenizer.from_file(path)
        print(f"Tokenizer loaded from {path}")
        self._set_pad_id()

    def _set_pad_id(self):
        # Get pad token ID from vocab
        self.pad_id = self.tokenizer.token_to_id(self.pad_token)
        if self.pad_id is None:
            raise ValueError(f"{self.pad_token} not found in tokenizer vocabulary!")

    def encode(self, text):
        return self.tokenizer.encode(text).ids

    def batch_encode(self, texts):
        return [self.encode(t) for t in texts]

    def decode(self, ids):
        return self.tokenizer.decode(ids)

    def save(self, path="my_tokenizer.json"):
        self.tokenizer.save(path)
        print(f"Tokenizer saved to {path}")

    def collate_fn(self, batch): # pytorch func

        if self.pad_id is None:
            self._set_pad_id()

        if isinstance(batch[0], tuple):
            texts, labels = zip(*batch)
            encoded = [torch.tensor(self.encode(text), dtype=torch.long) for text in texts]
            padded = pad_sequence(encoded, batch_first=True, padding_value=self.pad_id)
            labels = torch.tensor(labels, dtype=torch.long)
            return padded, labels
        else:
            encoded = [torch.tensor(self.encode(text), dtype=torch.long) for text in batch]
            padded = pad_sequence(encoded, batch_first=True, padding_value=self.pad_id)
            return padded


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
bpe_tokenizer = BPETokenizer()
bpe_tokenizer.train(["train.txt"])
bpe_tokenizer.save("/content/drive/MyDrive/AttentionIsAllYouNeed/my_tokenizer.json")

In [None]:
bpe_tokenizer = BPETokenizer()
bpe_tokenizer.load("/content/drive/MyDrive/AttentionIsAllYouNeed/my_tokenizer.json")

NameError: name 'BPETokenizer' is not defined

In [None]:
# Exemple de texte à tester
texts = ["Hello, how are you?", "I am fine, thank you!", "What about you?"]

# Tester la tokenisation
encoded_texts = [bpe_tokenizer.encode(text) for text in texts]
print("Tokenized texts:")
for text, encoded in zip(texts, encoded_texts):
    print(f"Text: {text}")
    print(f"Encoded: {encoded}")

# Tester le padding
padded_batch = bpe_tokenizer.collate_fn(texts)
print("\nPadded batch:")
print(padded_batch)

decoded_texts = [bpe_tokenizer.decode(encoded) for encoded in encoded_texts]
print("\nDecoded texts:")
for decoded in decoded_texts:
    print(decoded)

Tokenized texts:
Text: Hello, how are you?
Encoded: [17241, 15, 928, 565, 740, 34]
Text: I am fine, thank you!
Encoded: [44, 537, 3520, 15, 5560, 740, 4]
Text: What about you?
Encoded: [2060, 728, 740, 34]

Padded batch:
tensor([[17241,    15,   928,   565,   740,    34,     0],
        [   44,   537,  3520,    15,  5560,   740,     4],
        [ 2060,   728,   740,    34,     0,     0,     0]])

Decoded texts:
Hello , how are you ?
I am fine , thank you !
What about you ?


# Training Transformer

In [None]:
import pickle


texts = [example['article'] for example in train_dataset]
summaries = [example['highlights'] for example in train_dataset]

train_encodings = bpe_tokenizer.batch_encode(texts)
summarized_encodings = bpe_tokenizer.batch_encode(summaries)
## save to drive

with open("/content/drive/MyDrive/AttentionIsAllYouNeed/train_encodings.pkl", "wb") as f:
    pickle.dump(train_encodings, f)

with open("/content/drive/MyDrive/AttentionIsAllYouNeed/summary_encodings.pkl", "wb") as f:
    pickle.dump(summarized_encodings, f)


In [None]:
import pickle
with open("/content/drive/MyDrive/AttentionIsAllYouNeed/train_encodings.pkl", "rb") as f:
    train_encodings = pickle.load(f)
with open("/content/drive/MyDrive/AttentionIsAllYouNeed/summary_encodings.pkl", "rb") as f:
    summarized_encodings = pickle.load(f)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


Using device: cpu


In [None]:
import torch.optim as optim
import torch.nn.functional as f
from torch.nn.utils.rnn import pad_sequence

# Hyperparameters for the transformer model
vocab_size = bpe_tokenizer.tokenizer.get_vocab_size()
d_model = 512
nhead = 8
num_layers = 6

model = Transformer(
    num_embeddings=vocab_size,
    d_model=d_model,
    num_heads=nhead,
    num_layers=num_layers,
    dropout=0.1,
    max_len=5000
)

model = model.to(device)

# Loss and Optimizer
criterion = nn.CrossEntropyLoss(ignore_index=bpe_tokenizer.tokenizer.token_to_id("<pad>"))
optimizer = optim.Adam(model.parameters(), lr=1e-4)

criterion = nn.CrossEntropyLoss(ignore_index=bpe_tokenizer.tokenizer.token_to_id("<pad>"))
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Training loop
epochs = 3

for epoch in range(epochs):
    model.train()
    epoch_loss = 0

    for i in range(0, len(train_encodings), 16):  # Batch size 16
        batch_texts = train_encodings[i:i+16]
        batch_summaries = summarized_encodings[i:i+16]

        # Pad and convert to tensor
        src = pad_sequence([torch.tensor(x) for x in batch_texts], batch_first=True, padding_value=bpe_tokenizer.pad_id).to(device)
        tgt = pad_sequence([torch.tensor(x) for x in batch_summaries], batch_first=True, padding_value=bpe_tokenizer.pad_id).to(device)

        output = model(src, tgt[:, :-1])  # Exclude last token in decoder input
        loss = criterion(output.view(-1, vocab_size), tgt[:, 1:].reshape(-1))  # Shifted target

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss / (len(train_encodings) // 16)}")



NameError: name 'bpe_tokenizer' is not defined

# Evaluation

In [None]:
model = model.to("cpu")
model.eval()

# Example for generating summaries on the validation set
generated_summaries = []
for i in range(len(val_dataset)):
    input_text = val_dataset[i]['article']
    input_ids = bpe_tokenizer.encode(input_text)
    output = model(input_ids, input_ids)
    generated_summaries.append(bpe_tokenizer.decode(output.argmax(dim=-1)))

# Here, you would calculate ROUGE score or any other metric to evaluate the model
# For example, using the rouge-score package:

from rouge_score import rouge_scorer

scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)

# Calculate the ROUGE score between generated summaries and real summaries
for i in range(len(generated_summaries)):
    scorer.add_summary(generated_summaries[i], val_dataset[i]['highlights'])

print(f"ROUGE score: {scorer}")
