In [36]:
from torch.utils.data import TensorDataset, DataLoader
from torchtext.data.utils import get_tokenizer
from torch.nn.functional import one_hot
import torch.nn.functional as F
import matplotlib.pyplot as plt
import torch.optim as optim
import torch.nn as nn
import numpy as np
import torchtext
import random
import torch
import tqdm

In [2]:
seed = 123
torch.manual_seed(seed)
np.random.seed(seed)

In [16]:
def ReadDataFile(fname):
    with open(fname, 'r') as f:
        return f.read().split('\n')

In [4]:
tokenizer = get_tokenizer('basic_english')
data = ReadDataFile("/content/DataSet.txt")

In [None]:
tokkenized_data = [tokenizer(item) for item in tqdm.tqdm(data)]

In [17]:
features_vocab = torchtext.vocab.build_vocab_from_iterator(
    tokkenized_data,
    min_freq=2,
    specials=['<pad>', '<oov>'],
    special_first=True
)

target_vocab = torchtext.vocab.build_vocab_from_iterator(
    tokkenized_data,
    min_freq=2
)

In [None]:
features_vocab_total_words = len(features_vocab)
target_vocab_total_words = len(target_vocab)

In [18]:
def text_to_numerical_sequence(tokenized_text):
    tokens_list = []
    if tokenized_text[-1] in target_vocab.get_itos():
        for token in tokenized_text[:-1]:
            num_token = features_vocab[token] if token in features_vocab.get_itos() else features_vocab['<oov>']
            tokens_list.append(num_token)
        num_token = target_vocab[tokenized_text[-1]]
        tokens_list.append(num_token)
        return tokens_list
    return None

In [19]:
def make_ngrams(tokenized_title):
    list_ngrams = []
    for i in range(1, len(tokenized_title)):
        ngram_sequence = tokenized_title[:i+1]
        list_ngrams.append(ngram_sequence)
    return list_ngrams

In [20]:
def add_random_oov_tokens(ngram):
  row = ngram.copy()
  for idx, word in enumerate(row[:-1]):
    if random.uniform(0, 1) < 0.1:
      row[idx] = '<oov>'
  return row

In [None]:
ngrams_list = []
for tokenized_title in tokkenized_data:
    ngrams_list.extend(make_ngrams(tokenized_title))

In [None]:
ngrams_list_oov = []
for ngram in ngrams_list:
    ngrams_list_oov.append(add_random_oov_tokens(ngram))

In [None]:
input_sequences = []
for sequence in tqdm.tqdm(ngrams_list_oov):
  row = text_to_numerical_sequence(sequence)
  if row:
    input_sequences.append(row)

In [23]:
X = [sequence[:-1] for sequence in input_sequences]
y = [sequence[-1] for sequence in input_sequences]

In [24]:
longest_sequence_feature = len(max(X, key = len))

In [None]:
padded_X = [F.pad(torch.tensor(sequence), (longest_sequence_feature - len(sequence),0), value=0) for sequence in tqdm.tqdm(X)]

In [26]:
padded_tX = torch.stack(padded_X)

In [27]:
y = torch.tensor(y)
y_one_hot = one_hot(y, num_classes=target_vocab_total_words)

In [28]:
ds = TensorDataset(padded_tX, y_one_hot)
data_loader = DataLoader(ds, batch_size=32, shuffle=True)

In [29]:
class My_LSTM(nn.Module):
    def __init__(self, features_vocab_total_words, target_vocab_total_words, embedding_dim, hidden_dim):
        super(My_LSTM, self).__init__()
        self.embedding = nn.Embedding(features_vocab_total_words, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(hidden_dim, target_vocab_total_words)

    def forward(self, x):
        x = x.to(self.embedding.weight.device)
        embedded = self.embedding(x)
        lstm_out, _ = self.lstm(embedded)
        lstm_out = self.dropout(lstm_out)
        output = self.fc(lstm_out[:, -1, :])
        return output

In [62]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [66]:
model = My_LSTM(features_vocab_total_words, target_vocab_total_words, 256, 512).to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=3e-4)

In [67]:
def calculate_topk_accuracy(model, data_loader, k=3):
    model.eval()
    correct_predictions = 0
    total_predictions = 0

    with torch.no_grad():
        loop = tqdm.tqdm(data_loader)
        for batch_x, batch_y in loop:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)

            output = model(batch_x)

            _, predicted_indices = output.topk(k, dim=1)

            correct_predictions += torch.any(predicted_indices == torch.argmax(batch_y, dim=1, keepdim=True), dim=1).sum().item()
            total_predictions += batch_y.size(0)

            accuracy = correct_predictions / total_predictions
            log = {
                "K-Accuracy":accuracy*100
            }
            loop.set_postfix(log)
    return accuracy*100

In [None]:
total_loss = []
acc = []
for epoch in range(1, 25):
    model.train()
    loop = tqdm.tqdm(data_loader)
    epoch_loss = []
    for batch_X, batch_y in loop:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device).float()
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = loss_fn(outputs, batch_y)
        loss.backward()
        optimizer.step()

        log_ = {
            "epoch":epoch,
            "loss" : loss.item()
        }
        loop.set_postfix(log_)
        epoch_loss.append(loss.cpu().detach())
    total_loss.append(np.mean(epoch_loss))
    acc.append(calculate_topk_accuracy(model, data_loader))
torch.save(model.state_dict(), "next_word_prediction.pth")

In [None]:
plt.figure(figsize = (10, 5))
plt.subplot(1, 2, 1)
plt.plot(total_loss, "r")
plt.title("Train Loss")
plt.xlabel("Epoch")
plt.ylabel("CrossEntropy Loss", loc = "center")
plt.subplot(1, 2, 2)
plt.plot(acc, "g")
plt.title("Train Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")


plt.subplots_adjust(wspace=0.25)

In [None]:

model.load_state_dict(torch.load("/content/next_word_prediction.pth"))

In [71]:
def text_to_numerical_sequence_test(tokenized_text):
    tokens_list = []
    for token in tokenized_text:
        num_token = features_vocab[token] if token in features_vocab.get_itos() else features_vocab['<oov>']
        tokens_list.append(num_token)
    return tokens_list

In [72]:
def Prediction(query):
  model.eval()

  string, num_generation = query
  with torch.no_grad():
    for i in range(num_generation):
      tokenized_str = tokenizer(string)
      seq = text_to_numerical_sequence_test(tokenized_str)
      padded_tokenized_sequence_input_test = F.pad(torch.tensor(seq), (longest_sequence_feature - len(seq)-1, 0),value=0)
      output_test_walking = torch.argmax(model(padded_tokenized_sequence_input_test.unsqueeze(0)))
      string = string + ' ' + target_vocab.lookup_token(output_test_walking.item())
  return string

In [78]:
best_1 = ["این گونه از کرکسها", 5]
best_2 = ["هواپیماهای", 6]
best_3 = ["تحقیقات", 6]
best_4 = ["همشهری", 10]
best_5 = ["تیم", 10]
best_6 = ["سیاسی", 19]
best_7 = ["اجتماعی", 9]
best_8 = ["مجازی", 15]
best_9 = ["هجدهم این ماه", 7]
best_10 = ["افزایش قیمتها", 3]
best_11 = ["کادرفنی", 9]
best_12 = ["گفتوگو", 18]
best_13 = ["سرمربی تیم ملی", 15]

In [None]:
string_ = Prediction(best_1)
print(string_)