NLP Model

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import math


In [18]:
import pandas as pd
url = "samsum-test.csv"
dataset = pd.read_csv(url)
dataset.head()


Unnamed: 0,id,dialogue,summary
0,13862856,"Hannah: Hey, do you have Betty's number?\nAman...",Hannah needs Betty's number but Amanda doesn't...
1,13729565,Eric: MACHINE!\r\nRob: That's so gr8!\r\nEric:...,Eric and Rob are going to watch a stand-up on ...
2,13680171,"Lenny: Babe, can you help me with something?\r...",Lenny can't decide which trousers to buy. Bob ...
3,13729438,"Will: hey babe, what do you want for dinner to...",Emma will be home soon and she will let Will k...
4,13828600,"Ollie: Hi , are you in Warsaw\r\nJane: yes, ju...",Jane is in Warsaw. Ollie and Jane has a party....


Positional Encoding

In [19]:
# Cell 2
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1), :]


Multi head attention

In [20]:
# Cell 3
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0
        self.d_k = d_model // num_heads
        self.num_heads = num_heads

        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.out_linear = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(0.1)

    def forward(self, q, k, v, mask=None):
        batch_size = q.size(0)
        q = self.q_linear(q).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.k_linear(k).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.v_linear(v).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        scores = self.dropout(torch.softmax(scores, dim=-1))

        output = torch.matmul(scores, v).transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.d_k)
        return self.out_linear(output)


Transformer Encoder Layer

In [21]:
# Cell 4
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(FeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))


Feed Forward Network

In [13]:
def transformer_model(src, ntoken, d_model, nhead, nhid, nlayers, dropout=0.5, src_mask=None):
    model_type = 'Transformer'
    pos_encoder = positional_encoding(d_model)

    encoder = nn.Embedding(ntoken, d_model)
    encoder.weight.data.uniform_(-0.1, 0.1)

    transformer_encoder_layers = [lambda x: transformer_encoder_layer(x, d_model, nhead, nhid, dropout) for _ in range(nlayers)]

    decoder = nn.Linear(d_model, ntoken)
    decoder.bias.data.zero_()
    decoder.weight.data.uniform_(-0.1, 0.1)

    src = encoder(src) * math.sqrt(d_model)
    src = src + pos_encoder[:src.size(0), :]

    for layer in transformer_encoder_layers:
        src = layer(src)

    output = decoder(src)
    return output


Encoder Decoder

In [14]:
# Cell 5
class EncoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super(EncoderBlock, self).__init__()
        self.attention = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(0.1)

    def forward(self, x, mask=None):
        attn_out = self.dropout(self.attention(x, x, x, mask))
        x = self.norm1(x + attn_out)
        ffn_out = self.dropout(self.ffn(x))
        return self.norm2(x + ffn_out)


class DecoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super(DecoderBlock, self).__init__()
        self.self_attention = MultiHeadAttention(d_model, num_heads)
        self.cross_attention = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(0.1)

    def forward(self, x, encoder_output, src_mask=None, tgt_mask=None):
        self_attn_out = self.dropout(self.self_attention(x, x, x, tgt_mask))
        x = self.norm1(x + self_attn_out)
        cross_attn_out = self.dropout(self.cross_attention(x, encoder_output, encoder_output, src_mask))
        x = self.norm2(x + cross_attn_out)
        ffn_out = self.dropout(self.ffn(x))
        return self.norm3(x + ffn_out)


Transformer Model

In [15]:
# Cell 6
class Transformer(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, d_ff, num_layers, max_len=5000):
        super(Transformer, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_len)
        self.encoder = nn.ModuleList([EncoderBlock(d_model, num_heads, d_ff) for _ in range(num_layers)])
        self.decoder = nn.ModuleList([DecoderBlock(d_model, num_heads, d_ff) for _ in range(num_layers)])
        self.fc_out = nn.Linear(d_model, vocab_size)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        src = self.positional_encoding(self.embedding(src))
        tgt = self.positional_encoding(self.embedding(tgt))

        for layer in self.encoder:
            src = layer(src, src_mask)
        for layer in self.decoder:
            tgt = layer(tgt, src, src_mask, tgt_mask)

        return self.fc_out(tgt)


Training Loop

In [8]:
# Cell 7
def train_model(model, dataloader, criterion, optimizer, epochs, device):
    model.train()
    for epoch in range(epochs):
        epoch_loss = 0

        for batch in dataloader:
            src, tgt = batch["src"].to(device), batch["tgt"].to(device)

            optimizer.zero_grad()
            output = model(src, tgt[:, :-1])

            loss = criterion(output.contiguous().view(-1, output.size(-1)), tgt[:, 1:].contiguous().view(-1))
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        print(f'Epoch: {epoch + 1}, Loss: {epoch_loss / len(dataloader)}')


Evaluation

In [16]:
# Cell 8
from rouge_score import rouge_scorer

def evaluate_model(model, dataloader, tokenizer, device):
    model.eval()
    scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
    scores = []

    with torch.no_grad():
        for batch in dataloader:
            src, tgt = batch["src"].to(device), batch["tgt"]
            generated_output = model.generate(src)
            pred_text = tokenizer.decode(generated_output, skip_special_tokens=True)
            ref_text = tokenizer.decode(tgt.squeeze().tolist(), skip_special_tokens=True)

            score = scorer.score(ref_text, pred_text)
            scores.append(score)

    avg_scores = {metric: sum(score[metric].fmeasure for score in scores) / len(scores) for metric in scores[0].keys()}
    print("Average ROUGE Scores:", avg_scores)
