In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import Counter
from sklearn.model_selection import train_test_split
import pickle
from collections import Counter
import time
from tqdm import tqdm
import math

In [2]:
df = pd.read_csv('/kaggle/input/romanmy/Roman-Urdu-Poetry.csv')
texts = df['Poetry'].dropna().tolist()

In [3]:
avg_word_length = sum(len(sentence.split()) for sentence in texts) / len(texts)
print(f"Average length (in words): {avg_word_length:.2f}")

Average length (in words): 119.43


In [4]:
class PoetryDataset(Dataset):
    def __init__(self, sequences, targets):
        self.sequences = sequences
        self.targets = targets
        
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        return self.sequences[idx], self.targets[idx]

In [5]:
# Preprocessing with improved cleaning
def preprocess_text(text):
    text = text.lower()
    text = text.replace('\n', ' [NEWLINE] ')  # Preserve line breaks
    text = ''.join([c for c in text if c.isalpha() or c in [" ", "'", ".", "[", "]"]])
    return text

In [6]:
def preprocess_text(text):
    text = text.lower()
    text = text.replace('\n', ' [NEWLINE] ')
    text = ''.join([c for c in text if c.isalpha() or c in [" ", "'", ".", "[", "]"]])
    return text

In [7]:
processed_texts = [preprocess_text(t) for t in texts]
corpus = " ".join(processed_texts).split()

In [8]:
# Build vocabulary
word_counts = Counter(corpus)
vocab = sorted(word_counts, key=word_counts.get, reverse=True)  # Keep all words
vocab = ['<PAD>', '<UNK>'] + vocab  # Add special tokens
word2idx = {word: idx for idx, word in enumerate(vocab)}

print(f"Vocabulary size: {len(vocab)}")


Vocabulary size: 17242


In [23]:
# Save vocabulary
with open('word2idx.pkl', 'wb') as f:
    pickle.dump(word2idx, f)

In [10]:
# Create sequences
sequence_length = 150
sequences = []
targets = []
for i in range(sequence_length, len(corpus)):
    seq = corpus[i-sequence_length:i]
    target = corpus[i]
    sequences.append([word2idx.get(word, word2idx['<UNK>']) for word in seq])
    targets.append(word2idx.get(target, word2idx['<UNK>']))

In [11]:
# Split data
X_train, X_val, y_train, y_val = train_test_split(sequences, targets, test_size=0.05, random_state=42)

In [12]:
# Create DataLoaders
def collate_fn(batch):
    sequences, targets = zip(*batch)
    sequences = pad_sequence([torch.LongTensor(seq) for seq in sequences], 
                           batch_first=True, padding_value=word2idx['<PAD>'])
    targets = torch.LongTensor(targets)
    return sequences, targets

In [13]:
train_dataset = PoetryDataset(X_train, y_train)
val_dataset = PoetryDataset(X_val, y_val)

In [14]:
batch_size = 128
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=batch_size, collate_fn=collate_fn)


In [15]:
class RMSNorm(nn.Module):
    def __init__(self, dim, eps=1e-8):
        """
        RMSNorm normalizes inputs by their root-mean-square.
        """
        super(RMSNorm, self).__init__()
        self.eps = eps
        self.scale = nn.Parameter(torch.ones(dim))
    
    def forward(self, x):
        # x shape: (..., dim)
        rms = torch.sqrt(torch.mean(x ** 2, dim=-1, keepdim=True) + self.eps)
        return self.scale * (x / rms)

In [16]:
# ROPE (Rotary Positional Embedding) Functions
########################################
def get_rope_embeddings(seq_len, dim, device):
    """
    Compute sin and cos embeddings for ROPE.
    Assumes dim is even.
    """
    # Compute inverse frequency for each even dimension
    inv_freq = 1.0 / (10000 ** (torch.arange(0, dim, 2, device=device).float() / dim))
    positions = torch.arange(seq_len, device=device).float()  
    sinusoid_inp = torch.outer(positions, inv_freq)           
    sin = torch.sin(sinusoid_inp)  
    cos = torch.cos(sinusoid_inp)  
    # Expand to match original dim by interleaving sin and cos along last dimension.
    # One simple approach is to repeat each column twice.
    sin = torch.stack([sin, sin], dim=-1).reshape(seq_len, dim)
    cos = torch.stack([cos, cos], dim=-1).reshape(seq_len, dim)
    return sin, cos

In [17]:
def apply_rope(x):
    """
    Applies Rotary Positional Embedding (ROPE) to input x.
    x: Tensor of shape (batch, seq_len, dim) where dim is even.
    """
    batch, seq_len, dim = x.shape
    # Get sin and cos for the current sequence length
    sin, cos = get_rope_embeddings(seq_len, dim, x.device)  # each: (seq_len, dim)
    # Expand to match batch size
    sin = sin.unsqueeze(0)  # (1, seq_len, dim)
    cos = cos.unsqueeze(0)  # (1, seq_len, dim)
    
    # ROPE is typically applied per pair of dimensions.
    # Here we split the last dim into even and odd indices:
    x1 = x[..., ::2]  # (batch, seq_len, dim/2)
    x2 = x[..., 1::2] # (batch, seq_len, dim/2)
    
    # Similarly for sin and cos (also split in half)
    sin_half = sin[..., ::2]
    cos_half = cos[..., ::2]
    
    # Apply the rotation
    x1_rot = x1 * cos_half - x2 * sin_half
    x2_rot = x1 * sin_half + x2 * cos_half
    # Reconstruct interleaved tensor
    x_rot = torch.stack((x1_rot, x2_rot), dim=-1).reshape(batch, seq_len, dim)
    return x_rot

In [18]:
# Pretrained LSTM Poetry Model with ROPE and RMSNorm
########################################
class PretrainedPoetryLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim=256, hidden_dim=512, num_layers=2):
        """
        A poetry language model using a pretrained (unsupervised) LSTM,
        enhanced with ROPE and RMSNorm.
        """
        super(PretrainedPoetryLSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=word2idx['<PAD>'])
        # LSTM is bidirectional. (You can change to uni-directional if desired.)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, 
                            batch_first=True, dropout=0.1, bidirectional=True)
        # RMSNorm applied on the output (hidden dim doubled due to bidirectionality)
        self.lms_norm = RMSNorm(hidden_dim * 2)
        # Fully connected layers for prediction
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim * 2, 512),
            nn.ReLU(),
            nn.Dropout(0.1),
            RMSNorm(512),
            nn.Linear(512, vocab_size)
        )
    def forward(self, x):
        # x: (batch, seq_len) integer token indices
        emb = self.embedding(x)  # (batch, seq_len, embedding_dim)
        # Apply ROPE to the embeddings
        emb = apply_rope(emb)
        # Pass embeddings through LSTM
        lstm_out, _ = self.lstm(emb)  # (batch, seq_len, hidden_dim*2)
        # Here, we take the last time-step’s output.
        last_out = lstm_out[:, -1, :]  # (batch, hidden_dim*2)
        # Normalize using RMSNorm
        normed_out = self.lms_norm(last_out)
        # Fully connected layers produce logits for next-token prediction.
        logits = self.fc(normed_out)
        return logits

In [19]:
# 9. Training Loop & Model Saving
########################################
vocab_size = len(vocab)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = PretrainedPoetryLSTM(vocab_size, embedding_dim=256, hidden_dim=512, num_layers=2).to(device)


In [20]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.RMSprop(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min')

In [21]:
best_val_loss = float('inf')
patience = 25
counter = 0
num_epochs = 25 

In [None]:
for epoch in range(num_epochs):
    start_time = time.time()
    model.train()
    train_loss = 0
    train_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", leave=False)
    for inputs, targets in train_bar:
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 5.0)
        optimizer.step()
        train_loss += loss.item()
        train_bar.set_postfix(loss=loss.item())
    train_loss /= len(train_loader)
    
    # Validation
    model.eval()
    val_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == targets).sum().item()
            total += targets.size(0)
    val_loss /= len(val_loader)
    val_acc = correct / total
    epoch_time = time.time() - start_time
    scheduler.step(val_loss)
    
    print(f'Epoch {epoch+1}/{num_epochs} | Time: {epoch_time:.2f}s | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}')
    
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'best_model.pth')
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping!")
            break

In [None]:
# Save the model's state dictionary
torch.save(model.state_dict(), 'urdu_poetry_gru.pth')

In [22]:
idx2word = {idx: word for word, idx in word2idx.items()}

In [25]:
# Save vocabulary
with open('idx2word.pkl', 'wb') as f:
    pickle.dump(idx2word, f)

In [None]:
def preprocess_prompt(prompt):
    prompt = prompt.lower()
    prompt = prompt.replace('\n', ' [NEWLINE] ')
    prompt = ''.join([c for c in prompt if c.isalpha() or c in [" ", "'", ".", "[", "]"]])
    return prompt.split()

In [None]:
def predict_next_word(model, prompt, sequence_length=20, device='cpu'):
    tokens = preprocess_prompt(prompt)
    # Pad or trim the prompt to the required sequence length.
    if len(tokens) < sequence_length:
        tokens = ['<PAD>'] * (sequence_length - len(tokens)) + tokens
    else:
        tokens = tokens[-sequence_length:]
    seq_indices = [word2idx.get(token, word2idx['<UNK>']) for token in tokens]
    input_tensor = torch.LongTensor(seq_indices).unsqueeze(0).to(device)  # (1, sequence_length)
    model.eval()
    with torch.no_grad():
        logits = model(input_tensor)
        predicted_idx = torch.argmax(logits, dim=1).item()
    predicted_word = idx2word.get(predicted_idx, '<UNK>')
    return predicted_word


In [None]:
def generate_text(model, prompt, gen_length=20, sequence_length=20, device='cpu'):
    """
    Generate text of length `gen_length` starting from the initial prompt.
    Args:
        model: The trained language model.
        prompt: The starting text prompt (string).
        gen_length: The number of words to generate.
        sequence_length: The fixed input sequence length required by the model.
        device: The device (CPU/GPU) to run the inference.
    Returns:
        A generated text string combining the prompt and the generated words.
    """
    # Preprocess the prompt into tokens.
    tokens = preprocess_prompt(prompt)
    generated = tokens.copy()  # Start with the initial tokens.
    
    model.eval()
    with torch.no_grad():
        for _ in range(gen_length):
            # Ensure the current sequence has the required length.
            if len(generated) < sequence_length:
                current_tokens = ['<PAD>'] * (sequence_length - len(generated)) + generated
            else:
                current_tokens = generated[-sequence_length:]
            
            # Convert tokens to indices.
            seq_indices = [word2idx.get(token, word2idx['<UNK>']) for token in current_tokens]
            input_tensor = torch.LongTensor(seq_indices).unsqueeze(0).to(device)  # Shape: (1, sequence_length)
            
            # Get prediction from the model.
            logits = model(input_tensor)
            predicted_idx = torch.argmax(logits, dim=1).item()
            predicted_word = idx2word.get(predicted_idx, '<UNK>')
            
            # Append predicted word to the generated sequence.
            generated.append(predicted_word)
    
    # Combine the tokens into a string.
    return " ".join(generated)

# Example usage:
example_prompt = "wo batain ab kahan"
generated_text = generate_text(model, example_prompt, gen_length=50, sequence_length=50, device=device)
print("Generated Text:\n", generated_text)


In [None]:
!sudo apt update

In [None]:
!sudo apt install gh -y

In [None]:
!gh auth login --with-token

In [None]:
!mkdir my_model
!cp urdu_poetry_gru.pth my_model/


In [None]:
!zip urdu_poetry.zip urdu_poetry_gru.pth


In [None]:
!zip urdu_poetry.zip urdu_poetry_gru.zip