In [128]:
import os
import torch
import torch.nn as nn
from tokenizers import ByteLevelBPETokenizer
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset,random_split
import math
import torch.optim as optim
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from gensim.models import KeyedVectors
import fasttext.util
import numpy as np

In [129]:
data_path = os.path.abspath("hafez.txt")
vocab_dir = "hafez_vocab"

assert os.path.isfile(data_path), f"{data_path} not found. Please upload the file."

with open(data_path, "r", encoding="utf-8") as file:
    text = file.read()

len(text)

276324

In [130]:
# Load and preprocess the text
data_path = "hafez.txt"
vocab_dir = "hafez_vocab"

with open(data_path, "r", encoding="utf-8") as file:
    text = file.read()

# Replace single newlines within verses with <lb> and add double <lb> after each pair of verses
lines = text.split('\n')
structured_text = ""
for i in range(0, len(lines), 2):
    structured_text += lines[i].replace("\n", " <lb> ") + " <lb> "
    if i + 1 < len(lines):
        structured_text += lines[i + 1].replace("\n", " <lb> ") + " <lb> <lb> "

# Save the structured text back to the file (or a new file)
structured_data_path = "structured_hafez.txt"
with open(structured_data_path, "w", encoding="utf-8") as file:
    file.write(structured_text)

# Train the tokenizer with structured data
tokenizer = ByteLevelBPETokenizer()
tokenizer.train(files=[structured_data_path], vocab_size=30000, min_frequency=2, special_tokens=[
    "<s>",
    "<pad>",
    "</s>",
    "<unk>",
    "<mask>",
    "<lb>"
])
tokenizer.save_model(vocab_dir)

# Load the tokenizer
tokenizer = ByteLevelBPETokenizer(
    os.path.join(vocab_dir, "vocab.json"),
    os.path.join(vocab_dir, "merges.txt"),
)

# Load pre-trained FastText vectors using gensim
fasttext_model = KeyedVectors.load_word2vec_format('cc.fa.300.vec', binary=False)






In [131]:
# Function to create embedding matrix
def create_embedding_matrix(tokenizer, fasttext_model, embedding_dim):
    vocab_size = tokenizer.get_vocab_size()
    embedding_matrix = np.zeros((vocab_size, embedding_dim))
    word_to_idx = tokenizer.get_vocab()
    for word, idx in word_to_idx.items():
        if word in fasttext_model:
            embedding_matrix[idx] = fasttext_model[word]
        else:
            embedding_matrix[idx] = np.random.normal(scale=0.6, size=(embedding_dim,))
    return embedding_matrix

embedding_dim = 300
embedding_matrix = create_embedding_matrix(tokenizer, fasttext_model, embedding_dim)

In [145]:
# Define embedding layer with pre-trained FastText embeddings
class PretrainedEmbedding(nn.Module):
    def __init__(self, vocab_size, embedding_dim, pretrained_weights):
        super(PretrainedEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.embedding.weight = nn.Parameter(torch.tensor(pretrained_weights, dtype=torch.float32))
        self.embedding.weight.requires_grad = False  # Enable fine-tuning

    def forward(self, input_ids):
        return self.embedding(input_ids)

# Define positional encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0)

    def forward(self, x):
        seq_len = x.size(1)
        if seq_len > self.encoding.size(1):
            position = torch.arange(0, seq_len).unsqueeze(1).float().to(x.device)
            div_term = torch.exp(torch.arange(0, self.encoding.size(2), 2).float() * -(math.log(10000.0) / self.encoding.size(2))).to(x.device)
            encoding = torch.zeros(seq_len, self.encoding.size(2)).to(x.device)
            encoding[:, 0::2] = torch.sin(position * div_term)
            encoding[:, 1::2] = torch.cos(position * div_term)
            encoding = encoding.unsqueeze(0)
        else:
            encoding = self.encoding[:, :seq_len, :]
        return x + encoding.to(x.device)

# Define GPT2Embedding with PretrainedEmbedding and PositionalEncoding
class GPT2Embedding(nn.Module):
    def __init__(self, vocab_size, d_model, max_len, pretrained_weights):
        super(GPT2Embedding, self).__init__()
        self.token_embedding = PretrainedEmbedding(vocab_size, d_model, pretrained_weights)
        self.position_encoding = PositionalEncoding(d_model, max_len)
        self.layer_norm = nn.LayerNorm(d_model)

    def forward(self, input_ids):
        token_embeddings = self.token_embedding(input_ids)
        position_embeddings = self.position_encoding(token_embeddings)
        embeddings = self.layer_norm(position_embeddings)
        return embeddings

# Multi-head Attention
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, nhead):
        super(MultiHeadAttention, self).__init__()
        assert d_model % nhead == 0
        self.d_k = d_model // nhead
        self.nhead = nhead
        self.linear_q = nn.Linear(d_model, d_model)
        self.linear_k = nn.Linear(d_model, d_model)
        self.linear_v = nn.Linear(d_model, d_model)
        self.linear_out = nn.Linear(d_model, d_model)
        self.attention = None

    def scaled_dot_product_attention(self, query, key, value, mask=None):
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        p_attn = torch.softmax(scores, dim=-1)
        return torch.matmul(p_attn, value), p_attn

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)
        query = self.linear_q(query).view(batch_size, -1, self.nhead, self.d_k).transpose(1, 2)
        key = self.linear_k(key).view(batch_size, -1, self.nhead, self.d_k).transpose(1, 2)
        value = self.linear_v(value).view(batch_size, -1, self.nhead, self.d_k).transpose(1, 2)
        x, self.attention = self.scaled_dot_product_attention(query, key, value, mask)
        x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.nhead * self.d_k)
        return self.linear_out(x), self.attention

# Feed-forward Network
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=2048):
        super(FeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.linear2(self.relu(self.linear1(x)))

# Decoder Layer
class DecoderLayer(nn.Module):
    def __init__(self, d_model, nhead, d_ff=2048):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, nhead)
        self.feed_forward = FeedForward(d_model, d_ff)
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(0.1)

    def forward(self, x, mask=None):
        attn_output, _ = self.self_attn(x, x, x, mask)
        x = x + self.dropout(attn_output)
        x = self.layer_norm1(x)
        ff_output = self.feed_forward(x)
        x = x + self.dropout(ff_output)
        x = self.layer_norm2(x)
        return x

# Define the Transformer Decoder model
class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, nhead, num_layers, max_len, d_ff, pretrained_weights):
        super(Decoder, self).__init__()
        self.embedding = GPT2Embedding(vocab_size, d_model, max_len, pretrained_weights)
        self.layers = nn.ModuleList([DecoderLayer(d_model, nhead, d_ff) for _ in range(num_layers)])
        self.linear_out = nn.Linear(d_model, vocab_size)

    def forward(self, input_ids, mask=None):
        x = self.embedding(input_ids)
        for layer in self.layers:
            x = layer(x, mask)
        logits = self.linear_out(x)
        return logits

In [146]:
# Load and preprocess the text
with open(data_path, "r", encoding="utf-8") as file:
    text = file.read()

# Split data into training, validation, and test sets
verses = text.split("\n\n")
verses = [verse.replace("\n", " <lb> ") for verse in verses if verse.strip()]
tokenized_verses = [tokenizer.encode(verse).ids for verse in verses]
max_len = 48
padded_tokenized_verses = [verse + [tokenizer.token_to_id('<pad>')] * (max_len - len(verse)) for verse in tokenized_verses]

data_tensor = torch.tensor(padded_tokenized_verses, dtype=torch.long)
dataset = TensorDataset(data_tensor)
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

train_dataloader = DataLoader(train_dataset, batch_size=2, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=2)
test_dataloader = DataLoader(test_dataset, batch_size=2)

In [147]:
def has_common_suffix(word1, word2, min_suffix_length=1):
    """Check if two words have a common suffix of at least min_suffix_length characters."""
    if len(word1) < min_suffix_length or len(word2) < min_suffix_length:
        return False
    suffix1 = word1[-min_suffix_length:]
    suffix2 = word2[-min_suffix_length:]
    return suffix1[1] == suffix2[1]

class CustomLoss(nn.Module):
    def __init__(self, base_loss, min_suffix_length=1):
        super(CustomLoss, self).__init__()
        self.base_loss = base_loss
        self.min_suffix_length = min_suffix_length

    def get_last_non_pad_token(self, tokens, pad_token_id):
        non_pad_tokens = [token for token in tokens if token != pad_token_id]
        return non_pad_tokens[-1] if non_pad_tokens else pad_token_id

    def forward(self, outputs, targets):
        base_loss_value = self.base_loss(outputs.view(-1, outputs.size(-1)), targets.view(-1))

        # Additional rhyme loss
        rhyme_loss_value = 0
        batch_size = targets.size(0)
        pad_token_id = tokenizer.token_to_id('<pad>')

        num_pairs = batch_size // 2
        for i in range(0, num_pairs * 2, 2):
            verse1_tokens = targets[i, :].tolist()
            verse2_tokens = targets[i + 1, :].tolist()

            verse1_end_token = self.get_last_non_pad_token(verse1_tokens, pad_token_id)
            verse2_end_token = self.get_last_non_pad_token(verse2_tokens, pad_token_id)

            word1 = tokenizer.decode([verse1_end_token])
            word2 = tokenizer.decode([verse2_end_token])

            if not has_common_suffix(word1, word2, self.min_suffix_length):
                rhyme_loss_value += 1

        rhyme_loss_value = torch.tensor(rhyme_loss_value, dtype=torch.float, requires_grad=True).to(outputs.device)
        total_loss = base_loss_value + rhyme_loss_value

        return total_loss


In [152]:
# hyperparameters
vocab_size = embedding_matrix.shape[0]
d_model = embedding_dim  # Match embedding_dim to FastText embedding size
nhead = 12
num_layers = 8
max_len = 48
d_ff = 256
learning_rate = 1e-4

In [153]:
model = Decoder(vocab_size, d_model, nhead, num_layers, max_len, d_ff, embedding_matrix)
base_criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.token_to_id('<pad>'))
criterion = CustomLoss(base_criterion, min_suffix_length=2)

In [154]:
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [155]:
# Training loop with early stopping
num_epochs = 50
early_stopping_patience = 5
best_val_loss = float('inf')
patience_counter = 0

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for batch in train_dataloader:
        input_ids = batch[0]
        optimizer.zero_grad()
        outputs = model(input_ids)
        loss = criterion(outputs, input_ids)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    avg_train_loss = total_loss / len(train_dataloader)
    print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {avg_train_loss}")

    model.eval()
    val_loss = 0
    with torch.no_grad():
        for batch in val_dataloader:
            input_ids = batch[0]
            outputs = model(input_ids)
            loss = criterion(outputs, input_ids)
            val_loss += loss.item()

    avg_val_loss = val_loss / len(val_dataloader)
    print(f"Epoch {epoch + 1}/{num_epochs}, Validation Loss: {avg_val_loss}")

    # Early stopping
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        patience_counter = 0
        torch.save(model.state_dict(), 'best_model_6.pt')
    else:
        patience_counter += 1
        if patience_counter >= early_stopping_patience:
            print("Early stopping triggered")
            break

# Load the best model
model.load_state_dict(torch.load('best_model_6.pt'))

# Evaluation on the test set
model.eval()
test_loss = 0
with torch.no_grad():
    for batch in test_dataloader:
        input_ids = batch[0]
        outputs = model(input_ids)
        loss = criterion(outputs, input_ids)
        test_loss += loss.item()

avg_test_loss = test_loss / len(test_dataloader)
print(f"Test Loss: {avg_test_loss}")

Epoch 1/50, Train Loss: 3.971500261266647
Epoch 1/50, Validation Loss: 2.6457087891442437
Epoch 2/50, Train Loss: 2.0449072394635626
Epoch 2/50, Validation Loss: 1.8419234952756336
Epoch 3/50, Train Loss: 1.426583420102949
Epoch 3/50, Validation Loss: 1.4829259634372733
Epoch 4/50, Train Loss: 1.1206104816871123
Epoch 4/50, Validation Loss: 1.2843833408894993
Epoch 5/50, Train Loss: 0.9741841191387022
Epoch 5/50, Validation Loss: 1.1677143369978737
Epoch 6/50, Train Loss: 0.8936705664003063
Epoch 6/50, Validation Loss: 1.0917890522629023
Epoch 7/50, Train Loss: 0.8581034447135807
Epoch 7/50, Validation Loss: 1.0577826347950448
Epoch 8/50, Train Loss: 0.8404673229749337
Epoch 8/50, Validation Loss: 1.0498670891707576
Epoch 9/50, Train Loss: 0.8388667127958261
Epoch 9/50, Validation Loss: 1.048213306439062
Epoch 10/50, Train Loss: 0.8402662197565182
Epoch 10/50, Validation Loss: 1.0458772321670646
Epoch 11/50, Train Loss: 0.8583445011715449
Epoch 11/50, Validation Loss: 1.052757075175404

In [162]:
model.load_state_dict(torch.load('best_model_5.pt'))
model.eval()

Decoder(
  (embedding): GPT2Embedding(
    (token_embedding): PretrainedEmbedding(
      (embedding): Embedding(6056, 300)
    )
    (position_encoding): PositionalEncoding()
    (layer_norm): LayerNorm((300,), eps=1e-05, elementwise_affine=True)
  )
  (layers): ModuleList(
    (0-7): 8 x DecoderLayer(
      (self_attn): MultiHeadAttention(
        (linear_q): Linear(in_features=300, out_features=300, bias=True)
        (linear_k): Linear(in_features=300, out_features=300, bias=True)
        (linear_v): Linear(in_features=300, out_features=300, bias=True)
        (linear_out): Linear(in_features=300, out_features=300, bias=True)
      )
      (feed_forward): FeedForward(
        (linear1): Linear(in_features=300, out_features=256, bias=True)
        (relu): ReLU()
        (linear2): Linear(in_features=256, out_features=300, bias=True)
      )
      (layer_norm1): LayerNorm((300,), eps=1e-05, elementwise_affine=True)
      (layer_norm2): LayerNorm((300,), eps=1e-05, elementwise_affine=T

In [400]:
import torch
import torch.nn.functional as F

# Function to generate text with desired structure and repetition penalty
def generate_text(model, tokenizer, input_text, max_length=100, temperature=1.0, top_k=50, repetition_penalty=1.2):
    model.eval()
    input_ids = tokenizer.encode(input_text).ids
    input_ids = torch.tensor(input_ids).unsqueeze(0)  # Shape: (1, seq_len)

    vocab_size = model.embedding.token_embedding.embedding.num_embeddings
    generated = input_ids

    line_length = 15  # Adjust according to desired line length
    verse_count = 0
    token_usage = {}  # Track token usage

    for _ in range(max_length - len(input_ids[0])):
        with torch.no_grad():
            outputs = model(generated)
            next_token_logits = outputs[:, -1, :] / temperature

            # Apply repetition penalty
            for token_id in generated[0].tolist():
                next_token_logits[0, token_id] /= repetition_penalty

            # Apply top-k sampling
            top_k_probs, top_k_indices = torch.topk(F.softmax(next_token_logits, dim=-1), top_k)
            next_token = torch.multinomial(top_k_probs, num_samples=1)
            next_token = top_k_indices.gather(1, next_token)

            # Ensure the sampling does not generate out-of-vocabulary tokens
            while next_token.item() >= vocab_size:
                next_token = torch.multinomial(top_k_probs, num_samples=1)
                next_token = top_k_indices.gather(1, next_token)

            generated = torch.cat((generated, next_token), dim=1)

            # Update token usage
            token_id = next_token.item()
            token_usage[token_id] = token_usage.get(token_id, 0) + 1

            if next_token.item() == tokenizer.token_to_id('<eos>'):
                break

            # Insert line breaks at appropriate intervals
            if generated.size(1) % line_length == 0:
                generated = torch.cat((generated, torch.tensor([[tokenizer.token_to_id('<lb>')]], device=generated.device)), dim=1)
                verse_count += 1

            # Insert double line break after every two verses
            if verse_count % 2 == 0 and verse_count > 0:
                generated = torch.cat((generated, torch.tensor([[tokenizer.token_to_id('<lb>')]], device=generated.device)), dim=1)
                verse_count = 0

    generated_text = tokenizer.decode(generated.squeeze().tolist())
    return generated_text

# Example usage
input_text = "رقص بر شعر تر و ناله نی خوش باشد"
generated_text = generate_text(model, tokenizer, input_text, max_length=64, temperature=12.0, top_k=100, repetition_penalty=1)
print(generated_text.replace("<lb>", "\n"))

رقص بر شعر تر و ناله نی خوش باشد مژده گم کايامبسته دريغا
 لبم کاری کشيد ملول يعنی بيف معدن نگهبان پيوست پيوست راهروانسامان فقارزد

لمنقش مهربان مرنج بجز گيسويتروند جويبار خصم شکسته عاشقم برانداز گفتی
 غريق بخش شادی روا ناف بگرداند گردی بخشش سران بلاغتدهیاککشان بگوييم

 نخو عافيتت بگشای عشقت عود عود اميدوار فکنم


ای دل گر از آن چاه زنخدان به درآییی گفتند ذکر ورد اندر
رقص ترک فنا مرد آسايش مکتب فلانی پاک سمنمر پر موی مر توانگر

رقص بر شعر تر و ناله نی خوش باشد نرگس خاک نکنی شرابخانه
نشنيد چنان مشرف کمند دريغ فوت هيچش مصطبه دوساله مسال ميسرم

شد لشکر غم بی عدد از بخت می‌خواهم مدد خار جگر هواست
زنجير زدند نسيم صنمی صبوری نوميد دشواريری خطرناک کفر آنچ شير پديد قلندر