<a href="https://colab.research.google.com/github/SoheilBadri2000/DataScience2/blob/main/30.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# LSTM

In [1]:
!wget https://raw.githubusercontent.com/amoudgl/short-jokes-dataset/master/data/reddit-cleanjokes.csv

--2024-04-26 19:35:54--  https://raw.githubusercontent.com/amoudgl/short-jokes-dataset/master/data/reddit-cleanjokes.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 141847 (139K) [text/plain]
Saving to: ‘reddit-cleanjokes.csv’


2024-04-26 19:35:55 (17.6 MB/s) - ‘reddit-cleanjokes.csv’ saved [141847/141847]



In [2]:
import torch
import numpy as np
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
from torch.nn.utils import clip_grad_norm_
import pandas as pd
from collections import Counter

In [3]:
class Model(nn.Module):
      def __init__(self, dataset):
        super(Model, self).__init__()
        self.lstm_size = 128
        self.embedding_dim = 128
        self.num_layers = 3

        n_vocab = len(dataset.uniq_words)
        self.embedding = nn.Embedding(
            num_embeddings=n_vocab,
            embedding_dim=self.embedding_dim,
        )

        dropout_value = 0.2 if self.num_layers > 1 else 0
        self.lstm = nn.LSTM(
            input_size=self.lstm_size,
            hidden_size=self.lstm_size,
            num_layers=self.num_layers,
            dropout=dropout_value,
        )

        self.fc = nn.Linear(self.lstm_size, n_vocab)

      #   self.init_weights()

      # def init_weights(self):
      #     # Initialize embedding and linear layers
      #     init.xavier_uniform_(self.embedding.weight)
      #     init.kaiming_uniform_(self.fc.weight, nonlinearity='relu')

      #     # Initialize LSTM weights
      #     for name, param in self.lstm.named_parameters():
      #         if 'weight_ih' in name:
      #             init.xavier_uniform_(param.data)
      #         elif 'weight_hh' in name:
      #             init.orthogonal_(param.data)
      #         elif 'bias' in name:
      #             param.data.fill_(0)


      def forward(self, x, prev_state):
        embed = self.embedding(x)
        output, state = self.lstm(embed, prev_state)
        logits = self.fc(output)
        return logits, state

      # Initialize weights


      def init_state(self, sequence_length):
        return (torch.zeros(self.num_layers, sequence_length, self.lstm_size),
                torch.zeros(self.num_layers, sequence_length, self.lstm_size))

In [4]:
class Dataset(Dataset):
    def __init__(
        self,
        args,
    ):
        self.args = args
        self.words = self.load_words()
        self.uniq_words = self.get_uniq_words()

        self.index_to_word = {index: word for index, word in enumerate(self.uniq_words)}
        self.word_to_index = {word: index for index, word in enumerate(self.uniq_words)}

        self.words_indexes = [self.word_to_index[w] for w in self.words]

    def load_words(self):
        train_df = pd.read_csv('reddit-cleanjokes.csv')
        jokes = train_df['Joke'].tolist()
        text = ' <eos> '.join(jokes)
        return text.split(' ')

    def get_uniq_words(self):
        word_counts = Counter(self.words)
        return sorted(word_counts, key=word_counts.get, reverse=True)

    def __len__(self):
        return len(self.words_indexes) - self.args.sequence_length

    def __getitem__(self, index):
        return (
            torch.tensor(self.words_indexes[index:index+self.args.sequence_length], dtype=torch.int64),
            torch.tensor(self.words_indexes[index+1:index+self.args.sequence_length+1], dtype=torch.int64),
        )


In [13]:
def train(dataset, model, args):
  model.train()
  dataloader = DataLoader(dataset, batch_size=args.batch_size)
  criterion = nn.CrossEntropyLoss()
  optimizer = optim.Adam(model.parameters(), lr=0.001)

  for epoch in range(args.max_epochs):
    state_h, state_c = model.init_state(args.sequence_length)

    for batch, (x, y) in enumerate(dataloader):
      optimizer.zero_grad()

      y_pred, (state_h, state_c) = model(x, (state_h, state_c))
      loss = criterion(y_pred.transpose(1, 2), y)

      state_h = state_h.detach()
      state_c = state_c.detach()

      loss.backward()
      clip_grad_norm_(model.parameters(), max_norm=1)
      optimizer.step()

      if batch % 100 == 0: # Adjust the frequency of Logging as needed
        print({"epoch": epoch, "batch": batch, "loss": loss.item()})
      if epoch % 10 == 0: # savethe model every 10 epochs
        torch.save(model.state_dict(), f"/content/model_epoch_{epoch}.pth")

In [18]:
def predict(dataset, model, text, next_words=100, temperature=1.0):
  model.eval()

  words = text.split(" ")
  state_h, state_c = model.init_state(len(words))

  for i in range(0, next_words):
    x = torch.tensor([[dataset.word_to_index[w] for w in words[i:]]])
    # x = x.to(model.device)
    y_pred, (state_h, state_c) = model(x, (state_h, state_c))

    last_word_logits = y_pred[0][-1]
    p = nn.functional.softmax(last_word_logits / temperature, dim=0).detach().numpy()
    word_index = np.random.choice(len(last_word_logits), p=p)
    words.append(dataset.index_to_word[word_index])

  return words

In [15]:
class Args:
  max_epochs = 10
  batch_size = 256
  sequence_length = 4

args = Args()

In [16]:
dataset = Dataset(args)
model = Model(dataset)

train(dataset, model, args)

{'epoch': 0, 'batch': 0, 'loss': 8.843782424926758}
{'epoch': 1, 'batch': 0, 'loss': 6.943517208099365}
{'epoch': 2, 'batch': 0, 'loss': 6.881825923919678}
{'epoch': 3, 'batch': 0, 'loss': 6.77602481842041}
{'epoch': 4, 'batch': 0, 'loss': 6.613386631011963}
{'epoch': 5, 'batch': 0, 'loss': 6.435757160186768}
{'epoch': 6, 'batch': 0, 'loss': 6.272709369659424}
{'epoch': 7, 'batch': 0, 'loss': 6.091189384460449}
{'epoch': 8, 'batch': 0, 'loss': 5.914564609527588}
{'epoch': 9, 'batch': 0, 'loss': 5.818454742431641}


In [19]:
prompt = "Why did the children cross the road?"
generated_words = predict(dataset, model, text=prompt, next_words=50)

# Combine the prompt and the generated words
full_text = prompt + " " + " ".join(generated_words[len(prompt.split()):])

# Post-process for capitalization and proper spacing after punctuation
processed_text = " ".join([word.capitalize() if i==0 or full_text[i-2] in ".!?"
                          else word for i, word in enumerate(full_text.split())])

print(processed_text)

Why did the children cross the road? magic name at the con-artist meet who up... internal Denim nosy? Me quack! <eos> What to hear you call the girl ...are dressed To it's with the cheapest moon? <eos> What do why kind the dune say on she Circle? under but he'd the Elementary joke There give with a
