In [1]:
from functools import partial
import numpy as np

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch import optim

from torchtext import datasets
#from torchtext.data.functional import to_map_style_dataset
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

In [2]:
if torch.cuda.is_available():
    device=torch.device(type='cuda')
else:
    device=torch.device(type='cpu')

In [3]:
train_data=datasets.IMDB(split='train')
eval_data=datasets.IMDB(split='test')

print(type(train_data))#type of train_data and test_data is iterdatapipe 

<class 'torch.utils.data.datapipes.iter.sharding.ShardingFilterIterDataPipe'>


In [4]:
#for preparing input-ground truth pair we only want the review 
train_map_data=[]

for label,review in train_data:
    train_map_data.append(review)


eval_map_data=[]

for label,review in train_data:
    eval_map_data.append(review)


In [5]:
print(len(train_map_data))
print(len(eval_map_data))

#we will have 25000 reviews for training data and 25000 reviews for testing data 
#train_map_data and eval_map_data will have list of reviews and each review will have string datatype
print(type(train_map_data[0]))

25000
25000
<class 'str'>


In [6]:
tokenizer=get_tokenizer('basic_english',language='en')
#this tokenizer will convert each review in list of  token words so it will help in build vocab and we can further modify how we want to tokenize out review for example we are spliting tokens by using whitespace

In [7]:
#build vocab

def build_vocab(train_map_data,tokenizer):
    vocab=build_vocab_from_iterator(
        map(tokenizer,train_map_data),
        min_freq=3,
        specials=["<unk>"]
    )
    vocab.set_default_index(vocab["<unk>"])

    return vocab


#this function will help in build vocab we are using build_vocab_from_iterator which we are taking from torchtext.vocab it have some parameters like map,special symbols,min_freq
#map will have 2 arguments first will function which will applied on each token and that token will comes from second argument which is iterable list
#specials means if some tokens is not part of our vocab that will consider as <unk> and if token comes at least  min_freq times in vocab then we will consider
vocab=build_vocab(train_map_data,tokenizer)

In [8]:
vocab_size=vocab.__len__()
print(vocab_size) #vocab will have vocab data type actual token will start from 1 because 0 is unk

40251


In [9]:
window_size=5
max_norm=1
max_seq_len=300
embded_dim=100
batch_size=32


In [10]:
text_pipeline=lambda x:vocab(tokenizer(x)) #it convert list of tokens with list of numeric represention of token in that vocab means position of token in vocab

In [11]:
def collate_cbow(batch, text_pipeline):
    
     batch_input_words, batch_target_word = [], []
     
     for review in batch:
        
         review_tokens_ids = text_pipeline(review)
            
         if len(review_tokens_ids) < window_size * 2 + 1:
             continue
                
         if max_seq_len:
             review_tokens_ids = review_tokens_ids[:max_seq_len]
             
         for idx in range(len(review_tokens_ids) - window_size * 2):
             current_ids_sequence = review_tokens_ids[idx : (idx + window_size * 2 + 1)]
             target_word = current_ids_sequence.pop(window_size)
             input_words = current_ids_sequence
             batch_input_words.append(input_words)
             batch_target_word.append(target_word)
     
     batch_input_words = torch.tensor(batch_input_words, dtype=torch.long)
     batch_target_word = torch.tensor(batch_target_word, dtype=torch.long)
     
     return batch_input_words, batch_target_word

In [12]:
def collate_skipgram(batch, text_pipeline):
    
    batch_input_word, batch_target_words = [], []
    
    for review in batch:
        review_tokens_ids = text_pipeline(review)

        if len(review_tokens_ids) < window_size * 2 + 1:
            continue

        if max_seq_len:
            review_tokens_ids = review_tokens_ids[:max_seq_len]

        for idx in range(len(review_tokens_ids) - window_size * 2):
            current_ids_sequence = review_tokens_ids[idx : (idx + window_size * 2 + 1)]
            input_word = current_ids_sequence.pop(window_size)
            target_words = current_ids_sequence

            for target_word in target_words:
                batch_input_word.append(input_word)
                batch_target_words.append(target_word)

    batch_input_word = torch.tensor(batch_input_word, dtype=torch.long)
    batch_target_words = torch.tensor(batch_target_words, dtype=torch.long)
    return batch_input_word, batch_target_words

In [13]:
traindl_cbow = DataLoader(
        train_map_data,
        batch_size=batch_size,
        shuffle=True,
        collate_fn=partial(collate_cbow,text_pipeline=text_pipeline)
    )

traindl_skipgram = DataLoader(
        train_map_data,
        batch_size=batch_size,
        shuffle=True,
        collate_fn=partial(collate_skipgram,text_pipeline=text_pipeline)
    )

evaldl_cbow = DataLoader(
        eval_map_data,
        batch_size=batch_size,
        shuffle=True,
        collate_fn=partial(collate_cbow,text_pipeline=text_pipeline)
    )

evaldl_skipgram = DataLoader(
        eval_map_data,
        batch_size=batch_size,
        shuffle=True,
        collate_fn=partial(collate_skipgram,text_pipeline=text_pipeline)
    )

In [14]:
class CBOW(nn.Module):
    
    def __init__(self, vocab_size):
        super().__init__()
        self.embeddings = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=embded_dim,
            max_norm=max_norm
        )
        self.linear = nn.Linear(
            in_features=embded_dim,
            out_features=vocab_size,
        )

    def forward(self, x):
        #print("Shape of x before embedding:",x.shape)
        x = self.embeddings(x)
        #print("Shape of x after embedding:",x.shape)
        x = x.mean(axis=1)
        #print("Shape of x after mean:",x.shape)
        x = self.linear(x)
        #print("Shape of x at the end of forward:",x.shape)
        return x

In [15]:
class SkipGram(nn.Module):
    
    def __init__(self, vocab_size):
        super().__init__()
        self.embeddings = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=embed_dim,
            max_norm=max_norm
        )
        self.linear = nn.Linear(
            in_features=embed_dim,
            out_features=vocab_size,
        )

    def forward(self, x):
        #print("Shape of x before embedding:",x.shape)
        x = self.embeddings(x)
        #print("Shape of x after embedding:",x.shape)
        x = self.linear(x)
        #print("Shape of x at the end of forward:",x.shape)
        return x

In [16]:
def train_one_epoch(model,dataloader):
    model.train()
    running_loss = []

    for i, batch_data in enumerate(dataloader):
        inputs = batch_data[0].to(device)
        targets = batch_data[1].to(device)
        #print("Input Shape:",inputs.shape, "Target Shape:",targets.shape)
        opt.zero_grad()
        outputs = model(inputs)
        loss = loss_fn(outputs, targets)
        loss.backward()
        opt.step()

        running_loss.append(loss.item())

    epoch_loss = np.mean(running_loss)
    print("Train Epoch Loss:",round(epoch_loss,3))
    loss_dict["train"].append(epoch_loss)

def validate_one_epoch(model,dataloader):
    model.eval()
    running_loss = []

    with torch.no_grad():
        for i, batch_data in enumerate(dataloader, 1):
            inputs = batch_data[0].to(device)
            targets = batch_data[1].to(device)

            outputs = model(inputs)
            loss = loss_fn(outputs, targets)

            running_loss.append(loss.item())


    epoch_loss = np.mean(running_loss)
    print("Validation Epoch Loss:",round(epoch_loss,3))
    loss_dict["val"].append(epoch_loss)


In [17]:
loss_fn = nn.CrossEntropyLoss()
n_epochs = 5
loss_dict = {"train": [], "val": []}

# Loop until valid input is provided
while True:
    choice = input("Enter 'cbow' or 'skipgram': ").lower()  # Convert to lowercase for case-insensitivity
    if choice == "cbow":
        model = CBOW(vocab_size).to(device)
        dataloader_train = traindl_cbow
        dataloader_val = evaldl_cbow
        print("CBOW model selected.")
        break
    elif choice == "skipgram":
        model = SkipGram(vocab_size).to(device)
        dataloader_train = traindl_skipgram
        dataloader_val = evaldl_skipgram
        print("SkipGram model selected.")
        break
    else:
        print("Invalid choice! Please enter 'cbow' or 'skipgram'.")


opt = optim.Adam(params=model.parameters(), lr=0.001)


CBOW model selected.


In [20]:
for e in range(n_epochs):
    print("Epoch=",e+1)
    train_one_epoch(model,dataloader_train)
    validate_one_epoch(model,dataloader_val)

Epoch= 1


KeyboardInterrupt: 