In [55]:
import torch
import torch.nn.functional as F
from torch import nn
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
from pprint import pprint
import re
import pickle
import os
import numpy as np
# from google.colab import drive
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.manifold import TSNE

## Importing Dataset (War and Peace - Leo Tolstoy)

In [56]:
import requests

url = "https://cs.stanford.edu/people/karpathy/char-rnn/warpeace_input.txt"
response = requests.get(url)
with open("text1.txt", "wb") as file:
    file.write(response.content)

In [57]:
with open('text1.txt','r') as file:
  text1=file.read()      # converting file to string
print(len(text1))

3196232


## Data Cleaning and Preprocessing

In [58]:
def clean_text(text):

  text=text.lower()
  text = re.sub(r'\.{3,}', '.', text)
  text = re.sub(r'\n\s*\n', ' ' + '.' * 5 + ' ', text)
  text=re.sub(r'(\w)\n(\w)',r'\1 \2',text)
  text=re.sub(r'[^a-zA-Z0-9 \'\.]',' ',text)
  text=re.sub(r'[\']','',text)
  text = text.replace('\n', ' ')
  cleaned_text = re.sub(r'\s+', ' ', text).strip()

  return cleaned_text

In [59]:
clean_text1=clean_text(text1)

#### Tokenization

In [60]:
def paragraph_processing(text, context_len):

  context_padding = '.' * context_len
  paragraphs=text.split(".....")
  processed_paragraphs = [context_padding + para.strip() for para in paragraphs]

  return processed_paragraphs


In [61]:
context_len=10
paragraphs_txt1=paragraph_processing(clean_text1,context_len)

In [62]:
def tokenization(paragraphs_txt,context_len):
  tokens=[]
  for para in paragraphs_txt:
    para_tokens = re.findall(r'\b\w+\b|\.{' + str(context_len) + r'}|[.]', para)
    para_tokens = [token for token in para_tokens if token != '.' * context_len]
    tokens.extend(para_tokens)
  return tokens

In [63]:
tokens_txt1=tokenization(paragraphs_txt1,context_len)

#### Creating Word Vocabulary and mappings to/from integer indices

In [64]:
def create_vocab(tokens):
    token_to_index = {
      '.': 0,
      ' ': 1,
    }
    unique_tokens = sorted(list(set(token for token in tokens if token not in token_to_index)))
    token_to_index.update({token: idx + 2 for idx, token in enumerate(unique_tokens)})
    index_to_token = {idx: token for token, idx in token_to_index.items()}

    return token_to_index, index_to_token, unique_tokens


In [65]:
token_to_index1, index_to_token1, unique_tokens1 = create_vocab(tokens_txt1)

In [66]:
print(len(unique_tokens1))
print(len(list(token_to_index1.items())))
print(len(list(index_to_token1.items())))

17831
17833
17833


## Creating X,y Datasets

In [69]:
import re
import torch

def create_X_y(paragraphs, token_to_index, index_to_token, context_len, fraction=1/10, max_paragraphs=None):
    X = []
    y = []

    # Limit to a specified number of paragraphs if max_paragraphs is provided
    if max_paragraphs is not None:
        paragraphs = paragraphs[:max_paragraphs]

    for para in paragraphs:
        para_tokens = re.findall(r'\b\w+\b|[.]', para)
        if len(para_tokens) <= context_len:
            continue

        # Limit to the first fraction of tokens in each paragraph
        limit = int(len(para_tokens) * fraction)

        for i in range(limit - context_len):
            input_context = [token_to_index[token] for token in para_tokens[i:i + context_len]]
            output_word = token_to_index[para_tokens[i + context_len]]
            X.append(input_context)
            y.append(output_word)

            print(' '.join(index_to_token[i] for i in input_context), ' ------> ', index_to_token[output_word])

    print('Training Samples No. : ', len(X))
    print('Training Outputs No. : ', len(y))

    # Convert lists to tensors
    X = torch.tensor(X)
    y = torch.tensor(y)

    return X, y


In [70]:
X1, y1= create_X_y(paragraphs_txt1, token_to_index1, index_to_token1, context_len)

. . . . . . . . . .  ------>  heavens
. . . . . . . . . .  ------>  prince
. . . . . . . . . .  ------>  oh
. . . . . . . . . oh  ------>  dont
. . . . . . . . oh dont  ------>  speak
. . . . . . . oh dont speak  ------>  to
. . . . . . oh dont speak to  ------>  me
. . . . . oh dont speak to me  ------>  of
. . . . oh dont speak to me of  ------>  austria
. . . oh dont speak to me of austria  ------>  .
. . oh dont speak to me of austria .  ------>  perhaps
. oh dont speak to me of austria . perhaps  ------>  i
oh dont speak to me of austria . perhaps i  ------>  dont
dont speak to me of austria . perhaps i dont  ------>  understand
speak to me of austria . perhaps i dont understand  ------>  things
to me of austria . perhaps i dont understand things  ------>  but
me of austria . perhaps i dont understand things but  ------>  austria
of austria . perhaps i dont understand things but austria  ------>  never
austria . perhaps i dont understand things but austria never  ------>  has
. pe

In [71]:
subset_size_1 = len(X1) // 7
X1_subset = X1[:]
y1_subset = y1[:]

print(len(X1_subset))
print(X1_subset[:10])
print(len(y1_subset))
print(y1_subset[:10])

12545
tensor([[    0,     0,     0,     0,     0,     0,     0,     0,     0,     0],
        [    0,     0,     0,     0,     0,     0,     0,     0,     0,     0],
        [    0,     0,     0,     0,     0,     0,     0,     0,     0,     0],
        [    0,     0,     0,     0,     0,     0,     0,     0,     0, 10762],
        [    0,     0,     0,     0,     0,     0,     0,     0, 10762,  4774],
        [    0,     0,     0,     0,     0,     0,     0, 10762,  4774, 14712],
        [    0,     0,     0,     0,     0,     0, 10762,  4774, 14712, 15964],
        [    0,     0,     0,     0,     0, 10762,  4774, 14712, 15964,  9724],
        [    0,     0,     0,     0, 10762,  4774, 14712, 15964,  9724, 10732],
        [    0,     0,     0, 10762,  4774, 14712, 15964,  9724, 10732,  1129]])
12545
tensor([ 7366, 12021, 10762,  4774, 14712, 15964,  9724, 10732,  1129,     0])


## Embedding and Model Training

In [72]:
emb_dim=64
hidden_layer_size=1024

In [73]:
class NextTokenGen(nn.Module):
  def __init__(self, context_len, vocab_size, emb_dim, hidden_layer_size):
    super(NextTokenGen,self).__init__()
    self.context_len = context_len
    self.emb_dim = emb_dim
    self.embed=nn.Embedding(vocab_size,emb_dim)
    self.layer0=nn.Linear(context_len*emb_dim, hidden_layer_size)
    self.layer1=nn.Linear(hidden_layer_size, vocab_size)

  def forward(self, X, activation=None):
    X=self.embed(X)
    X=X.view(X.shape[0],self.context_len*self.emb_dim)
    if activation=='relu':
      X=F.relu(self.layer0(X))
    elif activation=='tanh':
      X=torch.tanh(self.layer0(X))
    else:
      X=self.layer0(X)

    X=self.layer1(X)

    return X

In [74]:
text_gen1 = NextTokenGen(context_len,len(list(token_to_index1.items())),emb_dim,hidden_layer_size)

In [75]:
def model_training(model, batch_size, epoch_no, learn_rate, X, y, act_fn):

  loss_fn=nn.CrossEntropyLoss()
  optimizer=torch.optim.AdamW(model.parameters(), lr=learn_rate)

  for epoch in range(epoch_no):
    epoch_loss=0.0

    for i in range(0,X.shape[0],batch_size):
      optimizer.zero_grad()
      X_batch=X[i:i+batch_size]
      y_batch=y[i:i+batch_size]
      y_pred=model(X_batch, activation=act_fn)
      loss=loss_fn(y_pred,y_batch)
      loss.backward()
      optimizer.step()

      epoch_loss+=loss.item()

    epoch_loss = epoch_loss / (X.shape[0] // batch_size)

    if epoch%10==0:
      print(f"Epoch-{epoch} loss: {epoch_loss:.4f}")

In [None]:
model_training(text_gen1,200,250,0.005,X1_subset,y1_subset,'tanh')

Epoch-0 loss: 9.2577
Epoch-10 loss: 1.4490
Epoch-20 loss: 1.1639
Epoch-30 loss: 1.6466
Epoch-40 loss: 1.1731
Epoch-50 loss: 1.0444
Epoch-60 loss: 1.2998
Epoch-70 loss: 1.0974
Epoch-80 loss: 0.9923
Epoch-90 loss: 0.9593
Epoch-100 loss: 1.6957
Epoch-110 loss: 1.0224
Epoch-120 loss: 0.9437
Epoch-130 loss: 0.9409
Epoch-140 loss: 1.3506
Epoch-150 loss: 0.9799
Epoch-160 loss: 0.9080
Epoch-170 loss: 0.9514
Epoch-180 loss: 1.1018
Epoch-190 loss: 0.9208
Epoch-200 loss: 0.9294
Epoch-210 loss: 1.0274
Epoch-220 loss: 0.9886
Epoch-230 loss: 0.9037


## Saving The Model Using Pickle To Drive

In [None]:
def save_model_to_drive(model, model_name: str):

  # drive.mount('/content/drive')
  # os.makedirs('/content/drive/MyDrive/checkpoints', exist_ok=True)
  # print(os.listdir('/content/drive/MyDrive/checkpoints'))

  model_path = f'{model_name}.pkl'

  with open(model_path, 'wb') as f:
      pickle.dump(model, f)

  print(f'Model saved to {model_path}')


In [None]:
def load_model_from_drive(model_name: str):

  # drive.mount('/content/drive')
  with open(f'{model_name}.pkl', 'rb') as f:
    model_loaded = pickle.load(f)

  print('Model loaded successfully!')
  return model_loaded

In [None]:
save_model_to_drive(text_gen1, 'emb64_context10_tanh')

In [None]:
text_gen1_loaded=load_model_from_drive('emb64_context10_tanh')

## Visualization of Embeddings using t-SNE

##### For visualization we are considering some nouns, pronouns, adverbs, verbs, synonyms, antonyms, etc.

In [None]:
tokens_to_plot=[
                  'prince','lucca','pavlovna','anna','europe','crusades', #Names
                  'who','where','when','what','which','why', # interrogative words
                  'a', 'an', 'the', # articles
                  'in', 'on', 'of', 'over', 'under', 'out', # prepositions
                  'i','you','he','she','they', 'it', # pronouns
                  # 'hot', 'cold', 'long', 'short', 'up', 'down', # antonyms
                  'warn', 'caution', 'frightened', 'scared', 'importance','value', # synonymns
                  # 'inevitably', 'urgently', 'apparently', 'constantly' # Adverbs
                ]

In [None]:
def plot_embeddings(tokens_to_plot, token_to_index, index_to_token, model):

  embeds=np.array(model.embed(torch.tensor([token_to_index[token] for token in tokens_to_plot])).detach().numpy())

  tsne = TSNE(n_components=2, perplexity=20, random_state=0)
  embeds_2d = tsne.fit_transform(embeds)

  plt.figure(figsize=(10, 10))
  plt.scatter(embeds_2d[:, 0], embeds_2d[:, 1])

  for i, token in enumerate(tokens_to_plot):
      plt.annotate(token, (embeds_2d[i, 0], embeds_2d[i, 1]))

  plt.title("t-SNE Visualization of Word Embeddings")
  plt.xlabel("t-SNE Component 1")
  plt.ylabel("t-SNE Component 2")
  plt.show()

In [None]:
plot_embeddings(tokens_to_plot, token_to_index1, index_to_token1, text_gen1_loaded)

## next K words prediction

In [None]:
def get_embedding(word, vocab_words, embeddings):
    if word in vocab_words:
        idx = vocab_words.index(word)
        return embeddings[idx].reshape(1, -1)
    return np.mean(embeddings, axis=0).reshape(1, -1)

In [None]:
def find_closest_word(word, vocab_words, embeddings):
    if word in vocab_words:
        return word

    word_embedding = get_embedding(word, vocab_words, embeddings)

    similarities = cosine_similarity(word_embedding, np.array([embeddings]).reshape(-1,1))
    closest_idx = np.argmax(similarities)
    closest_word = vocab_words[closest_idx]
    return closest_word


In [None]:
# def predict_next_k_words(model, token_to_index, index_to_token, context, k):

#     prompt_tokens = re.findall(r'\b\w+\b|[.]', context)
#     context_tokens=[]
#     for token in prompt_tokens:
#         if token in list(token_to_index.keys()):
#             context_tokens.append(token)
#         else:
#             context_tokens.append(find_closest_word(token, list(token_to_index.keys()), np.array(list(token_to_index.values()))))

#     context_indices = [token_to_index.get(word, token_to_index[' ']) for word in context_tokens]

#     if len(context_indices) < context_len:
#         context_indices = [1] * (context_len - len(context_indices)) + context_indices
#     else:
#         context_indices = context_indices[-context_len:]

#     predicted_words = []

#     model.eval()
#     with torch.no_grad():
#         for _ in range(k):
#             context_tensor = torch.tensor(context_indices, dtype=torch.int64).unsqueeze(0)
#             context_tensor=context_tensor.reshape(1,-1)
#             output = model(context_tensor)
#             next_word_index = torch.argmax(output, dim=1).item()
#             next_word = index_to_token[next_word_index]
#             predicted_words.append(next_word)
#             context_indices.append(next_word_index)
#             context_indices = context_indices[-context_len:]

#     return ' '.join(predicted_words)


In [None]:
def predict_next_k_words(context, k):
    context_tokens = re.findall(r'\b\w+\b|[.]', context)
    context_indices = [token_to_index1.get(word, token_to_index1[' ']) for word in context_tokens]

    # Trim or pad the context to fit the required length
    if len(context_indices) < context_len:
        context_indices = [1] * (context_len - len(context_indices)) + context_indices
    else:
        context_indices = context_indices[-context_len:]

    predicted_words = []

    text_gen1_loaded.eval()  # Set model to eval mode for inference
    with torch.no_grad():
        for _ in range(k):
            # Convert context to tensor and pass through model
            context_tensor = torch.tensor(context_indices, dtype=torch.int64).unsqueeze(0)
            output = text_gen1_loaded(context_tensor)

            # Get predicted word index and corresponding word
            next_word_index = torch.argmax(output, dim=1).item()
            next_word = index_to_token1[next_word_index]

            # Add predicted word to results
            predicted_words.append(next_word)

            # Update context with new word and adjust to maintain context length
            context_indices.append(next_word_index)
            context_indices = context_indices[-context_len:]  # Keep only last `context_len` tokens

    return ' '.join(predicted_words)


In [None]:
context = "this entire ordeal of assignment is taking too much time and all i want is "
k = 100
predicted_text = predict_next_k_words(context, k)
print(predicted_text)
