In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from __future__ import unicode_literals, print_function, division
from io import open
import unicodedata
import re
import random
import unicodedata
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import numpy as np

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [3]:
PAD_token = 0
SOS_token = 1
EOS_token = 2

class Lang:
    def __init__(self, name):
        self.name = name
        self.word2index = {"PAD": PAD_token, "SOS": SOS_token, "EOS": EOS_token}
        self.word2count = {}
        self.index2word = {PAD_token: "PAD", SOS_token: "SOS", EOS_token: "EOS"}
        self.n_words = 3

    def addSentence(self, sentence):
        for word in sentence.split(' '):
            self.addWord(word)

    def addWord(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1

In [4]:
def normalizeEnglish(s):
    s = s.lower().strip()
    s = re.sub(r"([.!?])", r" \1", s)            
    s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)        
    return s.strip()

def normalizeFrench(s):
    s = s.lower().strip()
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^a-zàâçéèêëîïôûùüÿñæœ .!?'-]+", r" ", s)  
    return s.strip()


In [5]:
def readLangs(lang1, lang2):
    lines = open("C:\\Users\\harry\\Desktop\\eng-fra.txt", encoding='utf-8').read().strip().split('\n')
    
    if lang1 == 'eng':
        normalize1 = normalizeEnglish
    else:
        normalize1 = normalizeFrench 

    if lang2 == 'fra':
        normalize2 = normalizeFrench
    else:
        normalize2 = normalizeEnglish
        
    pairs = [[normalize1(s.split('\t')[0]), normalize2(s.split('\t')[1])] for s in lines]
    input_lang = Lang(lang1)
    output_lang = Lang(lang2)
    return input_lang, output_lang, pairs

In [6]:
MAX_LENGTH = 16
def filterPair(p):
    return len(p[0].split(' ')) < MAX_LENGTH and len(p[1].split(' ')) < MAX_LENGTH

def filterPairs(pairs):
    return [pair for pair in pairs if filterPair(pair)]

In [7]:
def prepareData(lang1, lang2):
    input_lang, output_lang, pairs = readLangs(lang1, lang2)
    pairs = filterPairs(pairs)
    for pair in pairs:
        input_lang.addSentence(pair[0])
        output_lang.addSentence(pair[1])
    random.shuffle(pairs)
    train_pair = pairs[:round(len(pairs)*0.8)]
    test_pair = pairs[round(len(pairs)*0.8):]
    print("Counted words:")
    print(input_lang.name, input_lang.n_words)
    print(output_lang.name, output_lang.n_words)
    return input_lang, output_lang, train_pair, test_pair

In [8]:
input_lang,output_lang,train_pair,test_pair = prepareData('eng','fra')

Counted words:
eng 12596
fra 26188


In [9]:
def indexesFromSentence(lang, sentence):
    return [lang.word2index[word] for word in sentence.split(' ')]

def tensorFromSentence(lang, sentence):
    indexes = indexesFromSentence(lang, sentence)
    indexes.append(EOS_token)
    return torch.tensor(indexes, dtype=torch.long, device=device).view(1, -1)

def tensorsFromPair(pair):
    input_tensor = tensorFromSentence(input_lang, pair[0])
    target_tensor = tensorFromSentence(output_lang, pair[1])
    return (input_tensor, target_tensor)

In [10]:
from torch.utils.data import TensorDataset, DataLoader
def train_dataloader(batch_size, input_lang, output_lang, pairs, max_length=MAX_LENGTH):
    n = len(pairs)
    input_ids = np.full((n, max_length), PAD_token, dtype=np.int32)
    target_ids = np.full((n, max_length), PAD_token, dtype=np.int32)

    for idx, (inp, tgt) in enumerate(pairs):
        inp_ids = indexesFromSentence(input_lang, inp)[:max_length - 1]
        tgt_ids = [SOS_token] + indexesFromSentence(output_lang, tgt)[:max_length - 2]

        inp_ids.append(EOS_token)
        tgt_ids.append(EOS_token)
        input_ids[idx, :len(inp_ids)] = inp_ids
        target_ids[idx, :len(tgt_ids)] = tgt_ids
    
    input_tensor = torch.LongTensor(input_ids).to(device)
    target_tensor = torch.LongTensor(target_ids).to(device)

    train_data = TensorDataset(input_tensor, target_tensor)
    train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=True, drop_last=True)
    
    return train_dataloader


In [11]:
from torch.utils.data import TensorDataset, DataLoader
def test_dataloader(batch_size, input_lang, output_lang, pairs, max_length=MAX_LENGTH):
    n = len(pairs)
    input_ids = np.full((n, max_length), PAD_token, dtype=np.int32)
    target_ids = np.full((n, max_length), PAD_token, dtype=np.int32)
    
    for idx, (inp, tgt) in enumerate(pairs):
        inp_ids = indexesFromSentence(input_lang, inp)[:max_length - 1]
        tgt_ids = [SOS_token] + indexesFromSentence(output_lang, tgt)[:max_length - 2]

        inp_ids.append(EOS_token)
        tgt_ids.append(EOS_token)
        input_ids[idx, :len(inp_ids)] = inp_ids
        target_ids[idx, :len(tgt_ids)] = tgt_ids
    
    input_tensor = torch.LongTensor(input_ids).to(device)
    target_tensor = torch.LongTensor(target_ids).to(device)
    # print(len(input_tensor))
    # print(len(target_tensor))
    test_data = TensorDataset(input_tensor, target_tensor)
    test_dataloader = DataLoader(test_data, batch_size=batch_size, shuffle=True, drop_last=True)
    # print(len(test_data))
    return test_dataloader

In [12]:
from torch.utils.data import TensorDataset, DataLoader
batch_size = 32
eval_pair = test_pair
train_pair = train_pair
train_dataloader = train_dataloader(batch_size,input_lang,output_lang,train_pair)
eval_dataloader = test_dataloader(batch_size,input_lang,output_lang,eval_pair)

In [13]:
len(eval_dataloader),len(train_dataloader)

(829, 3318)

In [14]:
class SelfAttention(nn.Module):
    def __init__(self, vocab_size, emb_dim):
        super().__init__()
        self.emb_dim = emb_dim
        self.vocab_size = vocab_size
        
        self.W_q = nn.Parameter(torch.randn(emb_dim, emb_dim))
        self.W_k = nn.Parameter(torch.randn(emb_dim, emb_dim))
        self.W_v = nn.Parameter(torch.randn(emb_dim, emb_dim))
        
    def forward(self, x):
        Q = x @ self.W_q
        K = x @ self.W_k
        V = x @ self.W_v
        attention = torch.softmax(Q@K.transpose(1,2)/self.emb_dim**0.5, dim=-1)
        out = attention@V
        return out

In [15]:
class MultiHeadAttention(nn.Module):
    def __init__(self, emb_dim, num_heads):
        super(MultiHeadAttention,self).__init__()
        assert emb_dim % num_heads == 0
        self.num_heads = num_heads
        self.head_dim = emb_dim // num_heads
        self.emb_dim = emb_dim

        self.W_q = nn.Linear(emb_dim, emb_dim)
        self.W_k = nn.Linear(emb_dim, emb_dim)
        self.W_v = nn.Linear(emb_dim, emb_dim)
        self.W_0 = nn.Linear(emb_dim, emb_dim)

    def forward(self, q,k,v, mask=None):
        batch_size, seq_len, emb_dim = q.shape
        H = self.num_heads
        D = self.head_dim

        Q = self.W_q(q).reshape(batch_size, -1, H, D).transpose(1, 2)
        K = self.W_k(k).reshape(batch_size, -1, H, D).transpose(1, 2)
        V = self.W_v(v).reshape(batch_size, -1, H, D).transpose(1, 2)

        scores = Q @ K.transpose(-2, -1) / (D ** 0.5) 
        
        
        if mask is not None:
            # print('mask :',mask.shape)
            # print('score:',scores.shape)
            scores = scores.masked_fill(mask == 1, value=-1e9)

        attn = torch.softmax(scores, dim=-1)
        out = attn @ V 
        
        out = out.transpose(1, 2).contiguous().view(batch_size, seq_len, emb_dim)
        output_final = self.W_0(out)
        return output_final


In [16]:
class PositionalEncoding(nn.Module):
    def __init__(self,max_seq_len,emb_dim):
        super().__init__()
        pe = torch.zeros(max_seq_len, emb_dim)
        position = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, emb_dim, 2, dtype=torch.float) * -(np.log(10000.0) / emb_dim)
        )
        pe[:, 0::2] = torch.sin(position * div_term) # takes vectors from 0 with step of 2 
        pe[:, 1::2] = torch.cos(position * div_term) # takes vectors from 1 with step of 2 
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1), :].to(device)

In [17]:
class TransformerEmbedding(nn.Module):
    def __init__(self, vocab_size, emb_dim, max_seq_len):
        super().__init__()
        self.token_emb = nn.Embedding(vocab_size, emb_dim)
        nn.init.normal_(self.token_emb.weight, mean=0, std=0.1)
        self.pos_emb = PositionalEncoding(max_seq_len, emb_dim)

    def forward(self, x):
        x = self.token_emb(x)
        x = self.pos_emb(x)
        return x

In [18]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self, emb_dim, ff_dim, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(emb_dim, ff_dim)
        self.linear2 = nn.Linear(ff_dim, emb_dim)
        self.dropout = nn.Dropout(dropout)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.linear2(self.dropout(self.relu(self.linear1(x))))


In [19]:
class AddNorm(nn.Module):
    def __init__(self, emb_dim, dropout=0.1):
        super().__init__()
        self.norm = nn.LayerNorm(emb_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        return self.norm(x + self.dropout(sublayer))


In [20]:
class EncoderBlock(nn.Module):
    def __init__(self, emb_dim, num_heads, ff_dim, dropout=0.1):
        super().__init__()
        self.mha = MultiHeadAttention(emb_dim, num_heads)
        self.addnorm1 = AddNorm(emb_dim, dropout)
        self.ffn = PositionwiseFeedForward(emb_dim, ff_dim, dropout)
        self.addnorm2 = AddNorm(emb_dim, dropout)

    def forward(self, x, enc_mask=None):
        x = self.addnorm1(x, self.mha(x,x,x,enc_mask))
        x = self.addnorm2(x, self.ffn(x))
        return x


In [21]:
class DecoderBlock(nn.Module):
    def __init__(self, emb_dim, num_heads, ff_dim, dropout=0.1):
        super().__init__()
        self.masked_mha = MultiHeadAttention(emb_dim, num_heads)
        self.addnorm1 = AddNorm(emb_dim, dropout)
        
        self.cross_mha = MultiHeadAttention(emb_dim, num_heads)
        self.addnorm2 = AddNorm(emb_dim, dropout)

        self.ffn = PositionwiseFeedForward(emb_dim, ff_dim, dropout)
        self.addnorm3 = AddNorm(emb_dim, dropout)

    def forward(self, x, enc_out, tgt_mask=None, enc_mask=None):
        # Masked self-attention
        x = self.addnorm1(x, self.masked_mha(x, x, x, tgt_mask))

        # Encoder-decoder cross attention
        x = self.addnorm2(x, self.cross_mha(x, enc_out, enc_out, enc_mask))

        # Feed-forward
        x = self.addnorm3(x, self.ffn(x))
        return x



In [22]:
def generate_padding_mask(src, pad_idx=0):
    return (src == pad_idx).type(torch.int16).unsqueeze(-2).unsqueeze(-2)

def generate_subsequent_mask(seq_len, device=None):
    mask = torch.triu(torch.ones((1,seq_len, seq_len)),diagonal=1).type(torch.int16)
    return mask


In [23]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, emb_dim=512, num_heads=8,
                 ff_dim=2048, num_layers=6, max_seq_len=256, dropout=0.1, pad_idx=0):
        super().__init__()
        self.pad_idx = pad_idx
        self.src_embed = TransformerEmbedding(src_vocab_size, emb_dim, max_seq_len)
        self.tgt_embed = TransformerEmbedding(tgt_vocab_size, emb_dim, max_seq_len)

        self.encoder = nn.ModuleList([
            EncoderBlock(emb_dim, num_heads, ff_dim, dropout) for _ in range(num_layers)
        ])
        self.decoder = nn.ModuleList([
            DecoderBlock(emb_dim, num_heads, ff_dim, dropout) for _ in range(num_layers)
        ])
        self.output_proj = nn.Linear(emb_dim, tgt_vocab_size)

    def forward(self, src, tgt):
        src_mask = (src == self.pad_idx).unsqueeze(-2).unsqueeze(-2).to(src.device)  
        tgt_mask = generate_subsequent_mask(tgt.shape[1]).to(tgt.device)     
        tgt_pad_mask = (tgt == self.pad_idx).unsqueeze(-2).to(tgt.device)  
        combined_tgt_mask = tgt_pad_mask | tgt_mask 
        combined_tgt_mask = combined_tgt_mask.unsqueeze(1)
    
        # Embedding
        src_emb = self.src_embed(src)  
        tgt_emb = self.tgt_embed(tgt)  
    
        # Encoder
        enc_out = src_emb
        for layer in self.encoder:
            enc_out = layer(enc_out, src_mask)
    
        # Decoder
        dec_out = tgt_emb
        for layer in self.decoder:
            dec_out = layer(dec_out, enc_out, combined_tgt_mask, src_mask)
    
        # Final projection to vocabulary size
        output = self.output_proj(dec_out)
        return output



In [24]:
model = Transformer(input_lang.n_words,
                    output_lang.n_words,
                    emb_dim=128,
                    num_heads=8,
                    ff_dim=2048,
                    num_layers=6,
                    max_seq_len=MAX_LENGTH,
                    dropout=0.1,
                    pad_idx=0).to(device)

In [26]:
optimizer = optim.Adam(model.parameters(),lr=1e-4)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_token)

In [27]:
def train_epoch(model, dataloader, optimizer, criterion, pad_idx, device, clip=1.0):
    model.train()
    for epoch in range(20):
        total_loss = 0
        total_tokens = 0
        total_correct = 0
        for src, tgt in dataloader:
            optimizer.zero_grad()
            src = src.to(device)
            tgt = tgt.to(device)
          
            tgt_input = tgt[:, :-1]
            tgt_target = tgt[:, 1:]             
            
        
            outputs = model(src, tgt_input)  
            logits = outputs.reshape(-1, outputs.size(-1))
            tgt_flat = tgt_target.reshape(-1)
        
            loss = criterion(logits, tgt_flat)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
            optimizer.step()
            total_loss += loss.item()
            
            preds = logits.argmax(dim=-1)                        
            non_pad_mask = tgt_flat != pad_idx
            correct = (preds == tgt_flat) & non_pad_mask
            total_correct += correct.sum().item()
            total_tokens += non_pad_mask.sum().item()
        avg_loss = total_loss / len(dataloader)
        accuracy = 100 * total_correct / total_tokens if total_tokens > 0 else 0
        print(f"Epoch {epoch+1:2d} | Loss: {avg_loss:.4f} | Accuracy: {accuracy:.2f}%")

In [28]:
train_epoch(model, train_dataloader, optimizer, criterion, PAD_token, device, clip=1.0)

Epoch  1 | Loss: 4.4494 | Accuracy: 39.11%
Epoch  2 | Loss: 3.0549 | Accuracy: 51.97%
Epoch  3 | Loss: 2.4110 | Accuracy: 59.74%
Epoch  4 | Loss: 1.9909 | Accuracy: 65.07%
Epoch  5 | Loss: 1.7010 | Accuracy: 68.67%
Epoch  6 | Loss: 1.4885 | Accuracy: 71.25%
Epoch  7 | Loss: 1.3214 | Accuracy: 73.37%
Epoch  8 | Loss: 1.1863 | Accuracy: 75.13%
Epoch  9 | Loss: 1.0745 | Accuracy: 76.60%
Epoch 10 | Loss: 0.9775 | Accuracy: 77.93%
Epoch 11 | Loss: 0.8954 | Accuracy: 79.09%
Epoch 12 | Loss: 0.8220 | Accuracy: 80.19%
Epoch 13 | Loss: 0.7570 | Accuracy: 81.15%
Epoch 14 | Loss: 0.6987 | Accuracy: 82.12%
Epoch 15 | Loss: 0.6469 | Accuracy: 82.96%
Epoch 16 | Loss: 0.6018 | Accuracy: 83.73%
Epoch 17 | Loss: 0.5587 | Accuracy: 84.49%
Epoch 18 | Loss: 0.5219 | Accuracy: 85.19%
Epoch 19 | Loss: 0.4876 | Accuracy: 85.86%
Epoch 20 | Loss: 0.4573 | Accuracy: 86.44%


In [29]:
torch.save(model,'trans.pth')

In [30]:
model = torch.load('trans.pth',weights_only=False)
model = model.to(device)

In [31]:
@torch.no_grad()
def evaluate_model(model, dataloader, criterion, pad_idx, device):
    model.eval()
    total_loss = 0
    total_tokens = 0
    total_correct = 0

    for src, tgt in dataloader:
        src = src.to(device)
        tgt = tgt.to(device)

        tgt_input = tgt[:, :-1]
        tgt_target = tgt[:, 1:]

        outputs = model(src, tgt_input)                     # [B, T, V]
        logits = outputs.reshape(-1, outputs.size(-1))      # [B*T, V]
        tgt_flat = tgt_target.reshape(-1)                   # [B*T]

        loss = criterion(logits, tgt_flat)
        total_loss += loss.item()

        preds = logits.argmax(dim=-1)                       # [B*T]
        non_pad_mask = tgt_flat != pad_idx
        correct = (preds == tgt_flat) & non_pad_mask
        total_correct += correct.sum().item()
        total_tokens += non_pad_mask.sum().item()

    avg_loss = total_loss / len(dataloader)
    accuracy = 100 * total_correct / total_tokens if total_tokens > 0 else 0

    print(f"[Eval] Loss: {avg_loss:.4f} | Accuracy: {accuracy:.2f}%")
    # return avg_loss, accuracy

In [32]:
evaluate_model(model, eval_dataloader, criterion, PAD_token, device)

[Eval] Loss: 1.2688 | Accuracy: 77.55%


In [33]:
def greedy_decode(model, src_tensor, input_lang, output_lang, max_length=MAX_LENGTH):
    model.eval()
    src_tensor = src_tensor.to(device)
    
    with torch.no_grad():
        tgt_input = torch.LongTensor([[SOS_token]]).to(device)
        
        for _ in range(max_length):
            out = model(src_tensor, tgt_input)
            next_token_logits = out[:, -1, :]  # (batch, vocab)
            next_token = torch.argmax(next_token_logits, dim=-1).unsqueeze(1)  # (batch, 1)

            tgt_input = torch.cat([tgt_input, next_token], dim=1)
            if next_token.item() == EOS_token:
                break

        output_indexes = tgt_input.squeeze().tolist()[1:]  # Remove SOS token
        output_words = [output_lang.index2word[idx] for idx in output_indexes if idx not in [EOS_token, PAD_token]]
        return ' '.join(output_words)

In [34]:
def generate(text,model,input_lang,output_lang,max_length):
    
    text = normalizeEnglish(text)
    input_ids = np.full((1, max_length), PAD_token, dtype=np.int32)
    inp_ids = indexesFromSentence(input_lang, text)[:max_length - 1] 
    inp_ids.append(EOS_token)
    input_ids[0,:len(inp_ids)] = inp_ids
    input_tensor = torch.LongTensor(input_ids)
    
    print('French sentence : ',greedy_decode(model, input_tensor, input_lang, output_lang, max_length))

In [53]:
text = input('English sentence : ')
generate(text,model,input_lang,output_lang,MAX_LENGTH)

English sentence :  hello


French sentence :  écoute  !
