<a href="https://colab.research.google.com/github/Arun-nexus/deep_learning/blob/main/english_to_french.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
df=pd.read_csv(r"C:\Users\Arun\Downloads\eng_-french.csv\eng_-french.csv")
df.head()

Unnamed: 0,English words/sentences,French words/sentences
0,Hi.,Salut!
1,Run!,Cours !
2,Run!,Courez !
3,Who?,Qui ?
4,Wow!,Ça alors !


In [None]:
print(df.isnull().sum())

English words/sentences    0
French words/sentences     0
dtype: int64


In [None]:
english=df["English words/sentences"].to_list()
french=df["French words/sentences"].to_list()

In [None]:
from nltk import word_tokenize
def tokenize(text):
    tokens=[word_tokenize(word) for word in text]
    return tokens
english_tokens=tokenize(english)
french_tokens=tokenize(french)

In [None]:
english_seq=[]
french_seq=[]
def sequence(tokens,list):
    [list.append(word) for row in tokens for word in row]
    return list
english_seq=sequence(english_tokens,english_seq)
french_seq=sequence(french_tokens,french_seq)

In [None]:
from collections import Counter
english_word_count=Counter(english_seq)
french_word_count=Counter(french_seq)

In [None]:
english_dict={"<pad>":0,"<unk>":1}
french_dict={"<pad>":0,"<unk>":1}
def dict(counter,dict):
    for key in counter:
        if key not in dict:
            dict[key]=len(dict)
    return dict
english_dict=dict(english_word_count,english_dict)
french_dict=dict(french_word_count,french_dict)

In [None]:
src_vocab=len(english_dict)
tgt_vocab=len(french_dict)

In [None]:
def data(tokens,dict):
    sequence=[]
    for row in tokens:
        dataset=[dict.get(word,dict["<unk>"]) for word in row]
        sequence.append(dataset)
    return sequence
english_dataset=data(english_tokens,english_dict)
french_dataset=data(french_tokens,french_dict)

In [None]:
from torch.nn.utils.rnn import pad_sequence
import torch
def padded_tensor(dataset):
    dataset=[torch.tensor(row,dtype=torch.long) for row in dataset]
    padded_data=pad_sequence(dataset)
    return padded_data
english_padded_data=padded_tensor(english_dataset)
french_padded_data=padded_tensor(french_dataset)


In [None]:
english_padded_data=english_padded_data.T
french_padded_data=french_padded_data.T

In [None]:
french_padded_data.shape

torch.Size([175621, 59])

In [None]:
from torch.utils.data import TensorDataset,DataLoader
from sklearn.model_selection import train_test_split
dataset=TensorDataset(english_padded_data,french_padded_data)
train_data,val_data=train_test_split(dataset,test_size=0.3,random_state=21)
train_data_loader=DataLoader(train_data,batch_size=210)
val_data_loader=DataLoader(val_data,batch_size=210)

In [None]:
import torch.nn as nn
import math
class positional_encoding(nn.Module):
    def __init__(self,max_len=int(59),d_model=int(21)):
        super().__init__()
        pe=torch.zeros(max_len,d_model)
        position=torch.arange(0,max_len).unsqueeze(1)
        div_term=torch.exp(torch.arange(0,d_model,2)*(-math.log(10000.0)/d_model))
        pe[:,0::2]=torch.sin(position*div_term)
        pe[:,1::2]=torch.cos(position*div_term)
        pe=pe.unsqueeze(0)
        self.register_buffer("pe",pe)

    def forward(self,x):
        return x+self.pe[:,:x.size(1)]


In [None]:
class attention(nn.Module):
    def __init__(self,d_model,num_heads):
        super().__init__()
        assert d_model % num_heads == 0
        self.d_k = d_model // num_heads
        self.num_heads=num_heads

        self.q_linear=nn.Linear(d_model,d_model)
        self.k_linear=nn.Linear(d_model,d_model)
        self.v_linear=nn.Linear(d_model,d_model)
        self.out=nn.Linear(d_model,d_model)

    def forward(self,q,k,v,mask=None):
        batch_size=q.size(0)
        q=self.q_linear(q).view(batch_size,-1,self.num_heads,self.d_k).transpose(1,2)
        k=self.k_linear(k).view(batch_size,-1,self.num_heads,self.d_k).transpose(1,2)
        v=self.v_linear(v).view(batch_size,-1,self.num_heads,self.d_k).transpose(1,2)
        scores=torch.matmul(q,k.transpose(-2,-1))/math.sqrt(self.d_k)
        if mask is not None:
            scores=scores.masked_fill(mask==0,float("inf"))
        attn=torch.softmax(scores,dim=-1)
        output=torch.matmul(attn,v)
        output=output.transpose(1,2).contiguous().view(batch_size,-1,self.num_heads*self.d_k)
        return self.out(output)


In [None]:
class forward(nn.Module):
    def __init__(self,d_model,d_ff=2048,drop=0.1):
        super().__init__()
        self.features=nn.Sequential(
            nn.Linear(d_model,d_ff),nn.ReLU(),nn.Dropout(drop),nn.Linear(d_ff,d_model)
            )
    def forward(self,x):
        return self.features(x)

In [None]:
class encoder(nn.Module):
    def __init__(self,d_model,num_heads,d_ff,drop=0.1):
        super().__init__()
        self.attn=attention(d_model,num_heads)
        self.norm=nn.LayerNorm(d_model)
        self.norm2=nn.LayerNorm(d_model)
        self.ff=forward(d_model,d_ff,drop)
        self.drop=nn.Dropout(drop)

    def forward(self,x,mask=None):
        attn=self.attn(x,x,x,mask)
        x=self.norm(x+self.drop(attn))
        ff=self.ff(x)
        x=self.norm2(x+self.drop(ff))
        return x

In [None]:
class decoder(nn.Module):
    def __init__(self,d_model,num_heads,d_ff,drop):
        super().__init__()
        self.masked_attn=attention(d_model,num_heads)
        self.cross_attn=attention(d_model,num_heads)
        self.norm1=nn.LayerNorm(d_model)
        self.norm2=nn.LayerNorm(d_model)
        self.norm3=nn.LayerNorm(d_model)
        self.ff=forward(d_model,d_ff,drop)
        self.drop=nn.Dropout(drop)
    def forward(self,x,enc_out,src_mask=None,mask=None):
        mask_attn=self.masked_attn(x,x,x,mask)
        x=self.norm1(x+self.drop(mask_attn))
        cross_atten=self.cross_attn(x,enc_out,enc_out,src_mask)
        x=self.norm2(x+self.drop(cross_atten))
        ff=self.ff(x)
        x=self.norm3(x+self.drop(ff))
        return x

In [None]:
class transformer(nn.Module):
    def __init__(self,src_vocab,tgt_vocab,d_model,max_len,num_heads,d_ff,num_layers,drop=0.1):
        super().__init__()
        self.src_embedding=nn.Embedding(src_vocab,d_model)
        self.tgt_embedding=nn.Embedding(tgt_vocab,d_model)
        self.pos_enc=positional_encoding(max_len,d_model)

        self.encoder=nn.ModuleList([encoder(d_model,num_heads,d_ff,drop) for _ in range(num_layers)])
        self.decoder=nn.ModuleList([decoder(d_model,num_heads,d_ff,drop) for _ in range(num_layers)])

        self.fc_out=nn.Linear(d_model,tgt_vocab)

    def forward(self,src,tgt,src_mask=None,tgt_mask=None):
        src=self.pos_enc(self.src_embedding(src))
        tgt=self.pos_enc(self.tgt_embedding(tgt))

        for layers in self.encoder:
            src=layers(src,src_mask)
        for layers in self.decoder:
            tgt=layers(tgt,src,src_mask,tgt_mask)
        return self.fc_out(tgt)

In [None]:
device=("cuda" if torch.cuda.is_available() else "cpu")
model=transformer(src_vocab=src_vocab,tgt_vocab=tgt_vocab,d_model=512,max_len=59,num_heads=8,d_ff=1024,drop=0.2,num_layers=6).to(device)
model_loss=nn.CrossEntropyLoss(ignore_index=0)
optimizer=torch.optim.Adam(model.parameters(),lr=0.001)
from torch.optim.lr_scheduler import ReduceLROnPlateau
scheduler=ReduceLROnPlateau(optimizer,mode='min',factor=0.5,patience=2)

In [None]:
import torch.nn as nn
class early_stopping(nn.Module):
    def __init__(self,min_delta,patience):
        super().__init__()
        self.min_delta=min_delta
        self.partience=patience
        self.early_stop=False
        self.best_loss=float("inf")
        self.counter=0

    def __call__(self,val_loss):
        if self.min_delta >= val_loss-self.best_loss:
            self.counter=0
            self.best_loss=val_loss
        else:
            self.counter+=1
            if self.counter>=self.partience:
                self.early_stop=True

In [None]:
from torch.amp import GradScaler,autocast
epochs=50
stopper=early_stopping(1e-4,4)
training_loss_store=[]
validation_loss_store=[]
training_accuracy_store=[]
validation_accuracy_store=[]
scaler=GradScaler()

for epoch in range(epochs):
    running,total,correct=0,0,0
    for x,y in train_data_loader:
        x,y=x.to(device),y.to(device)

        optimizer.zero_grad()
        with autocast("cuda"):
            output=model(x,y)
            output=output.view(-1,output.shape[-1])
            y=y.view(-1)
            # print(output.shape,y.shape)
            loss=model_loss(output,y)
        scaler.scale(loss).backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        scaler.step(optimizer)
        scaler.update()
        running+=loss.item()*y.size(0)
        _,pred=torch.max(output,1)
        mask=y!=0
        correct+=((pred==y)&mask).sum().item()
        total+=mask.sum().item()
        # print(torch.isnan(pred).any(), torch.isnan(y).any())

    training_loss=running/total
    training_acc=correct/total
    training_loss_store.append(training_loss)
    training_accuracy_store.append(training_acc)

    model.eval()
    vrunning,vtotal,vcorrect=0,0,0
    for x,y in val_data_loader:
        x,y=x.to(device),y.to(device)

        optimizer.zero_grad()
        with autocast("cuda"):
            voutput=model(x,y)
            voutput=voutput.view(-1,voutput.shape[-1])
            y=y.view(-1)
            vloss=model_loss(voutput,y)

        vrunning+=vloss.item()*y.size(0)
        _,vpred=torch.max(voutput,1)
        mask=y!=0
        vcorrect+=((vpred==y)&mask).sum().item()
        vtotal+=mask.sum().item()
    val_loss=vrunning/vtotal
    val_acc=vcorrect/vtotal
    validation_loss_store.append(val_loss)
    validation_accuracy_store.append(val_acc)

    print(f"epoch: {epoch+1} training_loss: {training_loss:.4f} training acc:{training_acc:.2f} validation_loss: {val_loss:.4f} validation_acc: {val_acc:.2f} ")
    stopper(val_loss)
    if stopper.early_stop:
        print("early_stopping triggered")
        break