In [29]:
import numpy as np 
import re

def load_file(filename):
    with open(filename,'r',encoding='utf-8') as f:
        text = f.read()
    return text 

def preprocess_text(text):
    words = re.findall(r'\b\w+\b',text.lower())
    return words

#creating sequnces with varying length and adding sepicalized tokens 

def create_sequences_and_targets(words, max_seq_len=5, pad_token='PAD'):
    
    """Returns a list of sequnces for input"""
    sequences = []
    targets = []
    
    for i in range(len(words)):
        # Generate sequences of varying lengths
        for seq_len in range(1, max_seq_len + 1):  # From 1 to max_seq_len
            if i + seq_len < len(words):
                seq = words[i:i + seq_len]
                targets.append(words[i+seq_len])
                # Pad sequence if it's shorter than max_seq_len
                if len(seq) < max_seq_len:
                    seq += [pad_token] * (max_seq_len - len(seq))
                sequences.append(seq)

    return sequences,targets



In [30]:
text_data = load_file('data.txt')
words = preprocess_text(text_data)
sequences, targets = create_sequences_and_targets(words)

print(sequences[:5])
print(targets[:5])

[['the', 'PAD', 'PAD', 'PAD', 'PAD'], ['the', 'sonnets', 'PAD', 'PAD', 'PAD'], ['the', 'sonnets', 'by', 'PAD', 'PAD'], ['the', 'sonnets', 'by', 'william', 'PAD'], ['the', 'sonnets', 'by', 'william', 'shakespeare']]
['sonnets', 'by', 'william', 'shakespeare', 'from']


In [None]:
from collections import defaultdict 
import itertools

#itertools provide more efficent to iterate and call fucntion on list or m=in this case list of list 
# Defaultdict provides default value if key not found .it is subclass of dict 

def sequences_to_dicts(sequences):
    all_words = itertools.chain(*sequences)

    word_count = defaultdict(int)
    for word in all_words:  
        word_count[word]+=1

    #custom sorting key using lambda function which sorts on the value of items 
    sorted_word_count = sorted(word_count.items(),key=lambda item: -item[1])

    unique_words = [item[0] for item in sorted_word_count]

    unique_words.append('UNK')

    num_sequences = len(sequences)
    vocab_size = len(unique_words)

    #creating dict word2idx and idx2word
    word_to_idx = {word:idx for idx , word in enumerate(unique_words)}
    idx_to_word = {idx:word for idx, word in enumerate(unique_words)}


    return word_to_idx , idx_to_word , num_sequences , vocab_size

In [66]:
import pickle
word_to_idx, idx_to_word, num_sequences, vocab_size = sequences_to_dicts(sequences)

with open('vocabulary.pkl', 'wb') as f:
    pickle.dump((word_to_idx, idx_to_word), f)

print(f'We have {num_sequences} sentences and {len(word_to_idx)} unique tokens in our dataset (including UNK).\n')
print('The index of \'world\' is', word_to_idx['world'])
print(f'The word corresponding to index 81 is \'{idx_to_word[81]}\'')

assert idx_to_word[word_to_idx['desire']] == 'desire', \
    'Consistency error: something went wrong in the conversion.'

We have 89865 sentences and 3088 unique tokens in our dataset (including UNK).

The index of 'world' is 81
The word corresponding to index 81 is 'world'


In [35]:
#convert sequnces to numerical value sequnces to able to convert to torch.long

numericalized_sequnces = [[word_to_idx[word] for word in sequnence]for sequnence in sequences] 
numerical_targets = [word_to_idx[word] for word in targets]
print(numericalized_sequnces[:5])
print(numerical_targets[:5])

[[2, 0, 0, 0, 0], [2, 3086, 0, 0, 0], [2, 3086, 32, 0, 0], [2, 3086, 32, 3084, 0], [2, 3086, 32, 3084, 1358]]
[3086, 32, 3084, 1358, 36]


In [50]:
import torch
from torch.utils.data import Dataset, DataLoader

class CustomDataset(Dataset):
    def __init__(self, sequences,targets):
        self.inputs, self.targets = sequences,targets
    
    # # def get_inputs_targets_from_sequences(self, sequences):
    # #     """
    # #     Efficiently generate inputs and targets without extra list copying.
    # #     """
    # #     inputs, targets = [], []
    # #     for sequence in sequences:
    # #         if len(sequence) > 1:
    # #             inputs.append(sequence[:-1])  # All but the last token
    # #             targets.append(sequence[-1])  # last token
    # #     return inputs, targets

    # def get_inputs_targets_from_sequences(self, sequences, seq_len=5):
    #     inputs, targets = [], []
        
    #     for i in range(len(sequences) - seq_len):
    #         # Create input and target pairs
    #         input_seq = sequences[i:i + seq_len - 1]  # Sequence excluding the last token
    #         target = sequences[i + seq_len - 1]  # The next word after the input sequence
            
    #         inputs.append(input_seq)
    #         targets.append(target)
        
    #     return inputs, targets




    def __len__(self):
        return len(self.targets)

    def __getitem__(self, index):
        # Convert inputs and targets to tensors (ensure correct dtype)
        # torch.long is used to represent 64-bit integer values, which are needed 
        # when working with indices in PyTorch (especially for embeddings).
        X = torch.tensor(self.inputs[index], dtype=torch.long)
        y = torch.tensor(self.targets[index], dtype=torch.long)
        return X, y


def create_datasets(sequences,targets, dataset_class, p_train=0.2, p_val=0.1, p_test=0.1):
    """
    Split the sequences and targets into training, validation, and test datasets.
    """
    num_train = int(len(sequences) * p_train)
    num_val = int(len(sequences) * p_val)
    num_test = int(len(sequences) * p_test)

    # Split sequences into partitions
    sequences_train = sequences[:num_train]
    targets_train = targets[:num_train]
    sequences_val = sequences[num_train:num_train + num_val]
    targets_val = targets[num_train:num_train + num_val]
    sequences_test = sequences[-num_test:]
    targets_test = targets[-num_test:]


    # Create Dataset instances for each partition
    train_dataset = dataset_class(sequences_train,targets_train)
    val_dataset = dataset_class(sequences_val,targets_val)
    test_dataset = dataset_class(sequences_test,targets_test)

    return train_dataset, val_dataset, test_dataset




In [51]:
train_dataset, val_dataset, test_dataset = create_datasets(numericalized_sequnces,numerical_targets, CustomDataset)
print(len(train_dataset))
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=2)

print(len(train_loader))

for batch_X, batch_y in train_loader:
    print("Input batch:", batch_X)
    print("Target batch:", batch_y)
    print(batch_X.shape)
    print(batch_y.shape)
    break  # Just print the first batch



17973
562
Input batch: tensor([[1507, 1508,    0,    0,    0],
        [ 450,    2,  149,    3,    0],
        [  23,   65,    0,    0,    0],
        [  81,    0,    0,    0,    0],
        [  61,  430,    0,    0,    0],
        [ 532, 1468,    7,    0,    0],
        [  20,    0,    0,    0,    0],
        [  20,  557,   17,    0,    0],
        [  10,   57,   21,  939,    0],
        [  23,   82,    0,    0,    0],
        [  30,  685,  686,   44,    1],
        [ 543,    0,    0,    0,    0],
        [ 165,   74,   68,    0,    0],
        [ 190,    1,   94,    0,    0],
        [   2,  643,    0,    0,    0],
        [  38,  542, 1518,    5, 1519],
        [  20,    1,   41,    0,    0],
        [  31,   96,    0,    0,    0],
        [  11,   55,    6,   24,    0],
        [1480,   10,    0,    0,    0],
        [1460,    2,    0,    0,    0],
        [   2,   81,    0,    0,    0],
        [  11,  194,  294,  286,    0],
        [ 293,    0,    0,    0,    0],
        [  23,   

Defining LSTM architecture 

In [40]:
import math
import torch
import torch.nn as nn
import torch.optim as optim

class CustomLSTM(nn.Module):
    def __init__(self, input_dim,hidden_dim):
        super().__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim

        #defening linear layers for each gate 
        self.input_gate = nn.Linear(input_dim+hidden_dim,hidden_dim)
        self.forget_gate = nn.Linear(input_dim+hidden_dim,hidden_dim)
        self.cell_gate = nn.Linear(input_dim+hidden_dim,hidden_dim)
        self.output_gate = nn.Linear(input_dim+hidden_dim,hidden_dim)

        self.init_weights()

    def init_weights(self):
        stdv = 1.0 /math.sqrt(self.hidden_dim)
        for weight in self.parameters():
            weight.data.uniform_(-stdv,stdv)

    def forward(self,x,init_states=None):
        """Assume x has shape(batcb,sequnce,features)"""
        batch_size,seq_len,_ = x.size()
        hidden_seq=[]
        if init_states is None:
            hiddden_state ,cell_state =(torch.zeros(batch_size,self.hidden_dim).to(x.device),
                                        torch.zeros(batch_size,self.hidden_dim).to(x.device))
        else :
            hiddden_state,cell_state = init_states
        
        for t in range(seq_len):
            x_t = x[:, t, :]
            
            # Concatenate input and hidden states for each gate to use as input 
            combined_input = torch.cat((x_t,hiddden_state),dim=1)

            #applying Linear transformation 

            i_t = torch.sigmoid(self.input_gate(combined_input))
            f_t = torch.sigmoid(self.forget_gate(combined_input))
            # Potential New memory at t 
            g_t = torch.tanh(self.cell_gate(combined_input)) 
            o_t = torch.sigmoid(self.output_gate(combined_input))

            #updating Cell and hidden states 

            cell_state = f_t * cell_state + i_t * g_t
            hiddden_state = o_t * torch.tanh(cell_state)

            hidden_seq.append(hiddden_state.unsqueeze(0))
        
        # reshape hidden_seq 
        hidden_seq = torch.cat(hidden_seq,dim=0)
        hidden_seq = hidden_seq.transpose(0,1).contiguous()

        return hidden_seq, (hiddden_state,cell_state)


In [48]:
class nxtword(nn.Module):
    def __init__(self):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size,128)
        self.lstm = CustomLSTM(128,128)       
        self.fc1 = nn.Linear(128,vocab_size)

    def forward(self,x):
        #creating embedding from input 
        embedded_x = self.embedding(x)
        #passing embeding to lstm 
        lstm_out, (h_n,c_n) =self.lstm(embedded_x)
        #captuing the last  hidden state
        last_hidden_state = lstm_out[:,-1,:]
        # Pass to last fc layer 
        output = self.fc1(last_hidden_state)
        
        return output
        

In [63]:
def model_train(classifier,num_epochs,lr,device):
    optimizer = optim.Adam(classifier.parameters(), lr=lr)  
    criterion = nn.CrossEntropyLoss()
    epochs = num_epochs
    for epoch in range(epochs):
        print(f"Epoch {epoch+1}/{epochs}")
        running_loss = 0.0 
        for i,(datapoints,labels) in enumerate(train_loader):
            optimizer.zero_grad()
            # moving to device 
            datapoints , labels = datapoints.long().to(device), labels.to(device)

            #get predictions 
            preds = classifier(datapoints)
            
            # print(f"Predicted: {preds.argmax(dim=1)}")
            # print(f"True Labels: {labels}")
            print(f"Matches: {(preds.argmax(dim=1) == labels).sum().item()}")
            
            # probs = torch.softmax(preds, dim=1)
            # print(probs)  # Check for proper distribution

            loss = criterion(preds,labels)
            loss.backward()
            optimizer.step()

            # acummalte loss for display 
            running_loss += loss.item()

                # Check accuracy every 50 batches
            if (i + 1) % 50 == 0:
                acc = 0
                with torch.no_grad():
                    for datapoints_, labels_ in test_loader:
                        datapoints_, labels_ = datapoints_.to(device), labels_.to(device)
                        preds = classifier(datapoints_)
                        acc += (preds.argmax(dim=1) == labels_).sum().cpu().item()
                
                # Calculate accuracy for the whole test set
                acc /= len(test_loader.dataset)
                print(f"Step {i + 1}/{len(train_loader)}, Loss: {running_loss / 50:.4f}, Accuracy: {acc:.2f}")
                running_loss = 0.0  # Reset running loss

        # Epoch end statistics
        print(f"Epoch {epoch + 1} completed with Loss: {running_loss / len(train_loader):.4f}")
        print("-" * 50)

    # Save only the model weights (state_dict)
    torch.save(classifier.state_dict(), 'model_state_dict.pth')
    print("model is saved")
    

In [64]:
device = torch.device('cuda')
classifier = nxtword().to(device)
model_train(classifier,5,0.002,device)

        

Epoch 1/5


Matches: 0
Matches: 0
Matches: 0
Matches: 0
Matches: 0
Matches: 0
Matches: 0
Matches: 0
Matches: 0
Matches: 0
Matches: 0
Matches: 0
Matches: 2
Matches: 1
Matches: 2
Matches: 0
Matches: 0
Matches: 2
Matches: 1
Matches: 0
Matches: 1
Matches: 0
Matches: 1
Matches: 1
Matches: 0
Matches: 1
Matches: 1
Matches: 2
Matches: 2
Matches: 2
Matches: 1
Matches: 1
Matches: 1
Matches: 1
Matches: 0
Matches: 2
Matches: 2
Matches: 2
Matches: 1
Matches: 0
Matches: 2
Matches: 1
Matches: 1
Matches: 0
Matches: 1
Matches: 1
Matches: 0
Matches: 2
Matches: 0
Matches: 1
Step 50/562, Loss: 7.1982, Accuracy: 0.02
Matches: 0
Matches: 0
Matches: 0
Matches: 0
Matches: 2
Matches: 1
Matches: 1
Matches: 0
Matches: 0
Matches: 1
Matches: 0
Matches: 0
Matches: 0
Matches: 0
Matches: 1
Matches: 1
Matches: 1
Matches: 3
Matches: 0
Matches: 0
Matches: 1
Matches: 1
Matches: 0
Matches: 1
Matches: 0
Matches: 0
Matches: 0
Matches: 2
Matches: 1
Matches: 0
Matches: 1
Matches: 1
Matches: 0
Matches: 1
Matches: 1
Matches: 1
Matches: 0
M