<a href="https://colab.research.google.com/github/AhrazKhan31/Deep-Learning-Lab/blob/main/Experiment4_Text_Generation_using_RNN_and_LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# Basic imports
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import OneHotEncoder
import string

In [3]:
!pip install opendatasets

Collecting opendatasets
  Downloading opendatasets-0.1.22-py3-none-any.whl.metadata (9.2 kB)
Downloading opendatasets-0.1.22-py3-none-any.whl (15 kB)
Installing collected packages: opendatasets
Successfully installed opendatasets-0.1.22


In [4]:
import opendatasets as od
od.download("https://www.kaggle.com/datasets/imbikramsaha/poems/data?select=poems-100.csv")

Please provide your Kaggle credentials to download this dataset. Learn more: http://bit.ly/kaggle-creds
Your Kaggle username: ahrazkh31
Your Kaggle Key: ··········
Dataset URL: https://www.kaggle.com/datasets/imbikramsaha/poems


In [5]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

df = pd.read_csv("/content/poems/poems-100.csv")
df.head()

Unnamed: 0,text
0,"O my Luve's like a red, red rose\nThat’s newly..."
1,"The rose is red,\nThe violet's blue,\nSugar is..."
2,How do I love thee? Let me count the ways.\nI ...
3,"Had I the heavens' embroidered cloths,\nEnwrou..."
4,"I.\n Enough! we're tired, my heart and I.\n..."


In [6]:
# One-Hot Encoding Approach
# Load your dataset (assuming it’s a CSV or TXT file with poems in a 'text' column)
data_path = '/content/poems/poems-100.csv'  # Update the path
with open(data_path, 'r', encoding='utf-8') as f:
    text_data = f.read().lower()

# Split the text into lines
lines = text_data.split('\n')
print(f'Total lines of poetry: {len(lines)}')


Total lines of poetry: 3426


In [None]:
# Preprocess text: Remove punctuation and tokenize
def clean_and_tokenize(line):
    line = line.translate(str.maketrans('', '', string.punctuation))
    return line.split()

# Tokenize each line
tokenized_lines = [clean_and_tokenize(line) for line in lines if line.strip() != '']
print(f'First tokenized line: {tokenized_lines[0]}')


First tokenized line: ['text']


In [None]:
# Flatten tokenized list and create vocabulary
all_tokens = [token for line in tokenized_lines for token in line]
vocabulary = sorted(set(all_tokens))  # Unique words
vocab_size = len(vocabulary)

# Create word-to-index and index-to-word dictionaries
word_to_index = {word: idx for idx, word in enumerate(vocabulary)}
index_to_word = {idx: word for word, idx in word_to_index.items()}

print(f'Size of vocabulary: {vocab_size}')


Size of vocabulary: 5503


In [None]:
def one_hot_encode_sequence(sequence, vocab_size, word_to_index):
    encoded_sequence = []
    for token in sequence:
        one_hot_vector = np.zeros(vocab_size, dtype=int)
        if token in word_to_index:
            one_hot_vector[word_to_index[token]] = 1
        encoded_sequence.append(one_hot_vector)
    return np.array(encoded_sequence)

# Example usage for one line
example_seq = tokenized_lines[0]
encoded_seq = one_hot_encode_sequence(example_seq, vocab_size, word_to_index)
print(f'One-hot encoded example sequence shape: {encoded_seq.shape}')


One-hot encoded example sequence shape: (1, 5503)


In [None]:
# Prepare input-output pairs
def create_sequences(tokenized_lines, sequence_length):
    X, y = [], []
    for line in tokenized_lines:
        if len(line) <= sequence_length:
            continue
        for i in range(len(line) - sequence_length):
            sequence = line[i:i+sequence_length]
            target = line[i+sequence_length]
            X.append(one_hot_encode_sequence(sequence, vocab_size, word_to_index))
            y.append(word_to_index[target])
    return np.array(X), np.array(y)

# Define sequence length
sequence_length = 5  # Use 5 words as input
X, y = create_sequences(tokenized_lines, sequence_length)

# Convert to tensors
X_tensor = torch.tensor(X, dtype=torch.float32)  # Shape: [num_samples, seq_len, vocab_size]
y_tensor = torch.tensor(y, dtype=torch.long)     # Shape: [num_samples]

print(f'Training data shape: {X_tensor.shape}, Target shape: {y_tensor.shape}')


Training data shape: torch.Size([10588, 5, 5503]), Target shape: torch.Size([10588])


In [None]:
class TextGenerationModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1, model_type='RNN'):
        super(TextGenerationModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.model_type = model_type

        if model_type == 'RNN':
            self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        elif model_type == 'LSTM':
            self.rnn = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)

        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Initial hidden state
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        if self.model_type == 'RNN':
            # Pass h0 directly for RNN, not as a tuple
            out, _ = self.rnn(x, h0)
        else:
            out, _ = self.rnn(x, h0)

        # Get the output of the last time step
        out = self.fc(out[:, -1, :])  # Shape: [batch_size, output_size]
        return out

In [None]:
# Model parameters
input_size = vocab_size
hidden_size = 128
output_size = vocab_size
num_layers = 2
model_type = 'RNN'  # Choose between 'RNN' and 'LSTM'

# Initialize model
model = TextGenerationModel(input_size, hidden_size, output_size, num_layers, model_type)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


In [None]:
# Training parameters
num_epochs = 20
batch_size = 64

# Create DataLoader for batches
class PoetryDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Load dataset
dataset = PoetryDataset(X_tensor, y_tensor)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Training loop
for epoch in range(num_epochs):
    for X_batch, y_batch in dataloader:
        # Forward pass
        outputs = model(X_batch)

        # Compute loss
        loss = criterion(outputs, y_batch)

        # Backpropagation and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if (epoch + 1) % 5 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')


Epoch [5/20], Loss: 0.5501
Epoch [10/20], Loss: 0.1714
Epoch [15/20], Loss: 0.0997
Epoch [20/20], Loss: 0.0258


In [None]:
def generate_text(model, start_sequence, vocab_dict, index_to_word, vocab_size, max_len=20):
    model.eval()
    sequence = start_sequence

    for _ in range(max_len):
        # Encode the sequence
        encoded_seq = one_hot_encode_sequence(sequence, vocab_size, vocab_dict)
        encoded_seq = torch.tensor(encoded_seq, dtype=torch.float32).unsqueeze(0)  # Add batch dimension

        # Predict the next word
        with torch.no_grad():
            output = model(encoded_seq)
            predicted_idx = torch.argmax(output, dim=1).item()
            predicted_word = index_to_word[predicted_idx]

        # Add predicted word to sequence
        sequence.append(predicted_word)

        # Limit sequence length to sliding window
        sequence = sequence[1:]

    return ' '.join(sequence)


In [None]:
# Starting sequence for prediction
start_sequence = ['hello', 'my', 'name', 'is', 'pytorch']
generated_text = generate_text(model, start_sequence, word_to_index, index_to_word, vocab_size, max_len=10)
print(f'Generated text: {generated_text}')


Generated text: murderous mean eyes me ankles


In [7]:
# Trainable Word Embeddings Approach
# Basic imports
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import string

In [8]:
!pip install opendatasets



In [9]:
import opendatasets as od
od.download("https://www.kaggle.com/datasets/imbikramsaha/poems/data?select=poems-100.csv")

Skipping, found downloaded files in "./poems" (use force=True to force download)


In [10]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

df = pd.read_csv("/content/poems/poems-100.csv")
df.head()

Unnamed: 0,text
0,"O my Luve's like a red, red rose\nThat’s newly..."
1,"The rose is red,\nThe violet's blue,\nSugar is..."
2,How do I love thee? Let me count the ways.\nI ...
3,"Had I the heavens' embroidered cloths,\nEnwrou..."
4,"I.\n Enough! we're tired, my heart and I.\n..."


In [11]:
# Load your dataset (assuming it’s a CSV or TXT file with poems in a 'text' column)
data_path = '/content/poems/poems-100.csv'  # Update the path
with open(data_path, 'r', encoding='utf-8') as f:
    text_data = f.read().lower()

# Split the text into lines
lines = text_data.split('\n')
print(f'Total lines of poetry: {len(lines)}')

Total lines of poetry: 3426


In [12]:
# Preprocess text: Remove punctuation and tokenize
def clean_and_tokenize(line):
    line = line.translate(str.maketrans('', '', string.punctuation))
    return line.split()

# Tokenize each line
tokenized_lines = [clean_and_tokenize(line) for line in lines if line.strip() != '']
print(f'First tokenized line: {tokenized_lines[0]}')


First tokenized line: ['text']


In [13]:
# Flatten tokenized list and create vocabulary
all_tokens = [token for line in tokenized_lines for token in line]
vocabulary = sorted(set(all_tokens))  # Unique words
vocab_size = len(vocabulary)

# Create word-to-index and index-to-word dictionaries
word_to_index = {word: idx for idx, word in enumerate(vocabulary)}
index_to_word = {idx: word for word, idx in word_to_index.items()}

print(f'Size of vocabulary: {vocab_size}')


Size of vocabulary: 5503


In [14]:
# Convert tokenized lines to indexed sequences
def convert_to_index_sequence(line, word_to_index):
    return [word_to_index[token] for token in line if token in word_to_index]

# Create indexed sequences and targets
def create_sequences(tokenized_lines, sequence_length):
    X, y = [], []
    for line in tokenized_lines:
        indexed_sequence = convert_to_index_sequence(line, word_to_index)
        if len(indexed_sequence) <= sequence_length:
            continue
        for i in range(len(indexed_sequence) - sequence_length):
            sequence = indexed_sequence[i:i+sequence_length]
            target = indexed_sequence[i+sequence_length]
            X.append(sequence)
            y.append(target)
    return np.array(X), np.array(y)

# Define sequence length
sequence_length = 5  # Use 5 words as input
X, y = create_sequences(tokenized_lines, sequence_length)

# Convert to tensors
X_tensor = torch.tensor(X, dtype=torch.long)   # Shape: [num_samples, seq_len]
y_tensor = torch.tensor(y, dtype=torch.long)   # Shape: [num_samples]

print(f'Training data shape: {X_tensor.shape}, Target shape: {y_tensor.shape}')


Training data shape: torch.Size([10588, 5]), Target shape: torch.Size([10588])


In [15]:
class EmbeddingRNNModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, output_size, num_layers=1, model_type='RNN'):
        super(EmbeddingRNNModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.model_type = model_type

        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        # RNN or LSTM layer
        if model_type == 'RNN':
            self.rnn = nn.RNN(embedding_dim, hidden_size, num_layers, batch_first=True)
        elif model_type == 'LSTM':
            self.rnn = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)

        # Fully connected layer to predict next word
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Embedding lookup
        embedded = self.embedding(x)  # Shape: [batch_size, seq_len, embedding_dim]

        # Initial hidden state
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        if self.model_type == 'LSTM':
            c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
            out, _ = self.rnn(embedded, (h0, c0))
        else:
            out, _ = self.rnn(embedded, h0)

        # Get the output of the last time step
        out = self.fc(out[:, -1, :])  # Shape: [batch_size, output_size]
        return out


In [16]:
# Model parameters
embedding_dim = 64    # Embedding dimension
hidden_size = 128     # Hidden size of RNN/LSTM
output_size = vocab_size
num_layers = 2
model_type = 'LSTM'  # Choose between 'RNN' and 'LSTM'

# Instantiate model
model = EmbeddingRNNModel(vocab_size, embedding_dim, hidden_size, output_size, num_layers, model_type)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


In [17]:
# Define a custom dataset
class PoetryDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Create dataset and dataloader
dataset = PoetryDataset(X_tensor, y_tensor)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)


In [19]:
# Training parameters
num_epochs = 20

# Training loop
for epoch in range(num_epochs):
    for X_batch, y_batch in dataloader:
        # Forward pass
        outputs = model(X_batch)

        # Compute loss
        loss = criterion(outputs, y_batch)

        # Backpropagation and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if (epoch + 1) % 5 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')


Epoch [5/20], Loss: 5.8536
Epoch [10/20], Loss: 3.2902
Epoch [15/20], Loss: 2.7896
Epoch [20/20], Loss: 1.4150


In [20]:
def generate_text(model, start_sequence, word_to_index, index_to_word, vocab_size, max_len=20):
    model.eval()
    sequence = [word_to_index[word] for word in start_sequence if word in word_to_index]

    for _ in range(max_len):
        # Convert sequence to tensor
        input_seq = torch.tensor(sequence[-sequence_length:], dtype=torch.long).unsqueeze(0)  # Shape: [1, seq_len]

        # Predict next word
        with torch.no_grad():
            output = model(input_seq)
            predicted_idx = torch.argmax(output, dim=1).item()
            predicted_word = index_to_word[predicted_idx]

        # Append predicted word
        sequence.append(predicted_idx)

        # Stop if end of sequence
        if predicted_word == '<end>':
            break

    return ' '.join([index_to_word[idx] for idx in sequence])


In [21]:
# Starting sequence for prediction
start_sequence = ['hello', 'my', 'name', 'is', 'pytorch']
generated_text = generate_text(model, start_sequence, word_to_index, index_to_word, vocab_size, max_len=10)
print(f'Generated text: {generated_text}')


Generated text: my name is borne in a row and went it to me to
