In [35]:
import os
import time
import math
import pickle
from contextlib import nullcontext
import numpy as np
import pandas as pd
import tiktoken
from collections import Counter
import re
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
from torch.utils.data import DataLoader, Dataset
import inspect
from sklearn.model_selection import train_test_split
import random
from dataclasses import dataclass
import json


In [36]:
# Custom Dataset class
class GPTDataset(Dataset):
    def __init__(self, data, block_size):
        self.data = data
        self.block_size = block_size
        # Ensure the data is a torch tensor
        if not isinstance(self.data, torch.Tensor):
            self.data = torch.tensor(self.data, dtype=torch.long)

    def __len__(self):
        return len(self.data) - self.block_size - 1

    def __getitem__(self, idx):
        if idx >= len(self):
            raise IndexError("Index out of range")
        x = self.data[idx:idx + self.block_size]
        y = self.data[idx + 1:idx + 1 + self.block_size]
        return x, y

class LayerNorm(nn.Module):
    """ LayerNorm but with an optional bias. PyTorch doesn't support simply bias=False """

    def __init__(self, ndim, bias):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(ndim))
        self.bias = nn.Parameter(torch.zeros(ndim)) if bias else None

    def forward(self, input):
        return F.layer_norm(input, self.weight.shape, self.weight, self.bias, 1e-5)

class CausalSelfAttention(nn.Module):

    def __init__(self, config):
        super().__init__()
        assert config.n_embd % config.n_head == 0
        # key, query, value projections for all heads, but in a batch
        self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd, bias=config.bias)
        # output projection
        self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)
        # regularization
        self.attn_dropout = nn.Dropout(config.dropout)
        self.resid_dropout = nn.Dropout(config.dropout)
        self.n_head = config.n_head
        self.n_embd = config.n_embd
        self.dropout = config.dropout
        # flash attention make GPU go brrrrr but support is only in PyTorch >= 2.0
        self.flash = hasattr(torch.nn.functional, 'scaled_dot_product_attention')
        if not self.flash:
            print("WARNING: using slow attention. Flash Attention requires PyTorch >= 2.0")
            # causal mask to ensure that attention is only applied to the left in the input sequence
            self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size))
                                        .view(1, 1, config.block_size, config.block_size))

    def forward(self, x):
        B, T, C = x.size() # batch size, sequence length, embedding dimensionality (n_embd)

        # calculate query, key, values for all heads in batch and move head forward to be the batch dim
        q, k, v  = self.c_attn(x).split(self.n_embd, dim=2)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)

        # causal self-attention; Self-attend: (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)
        if self.flash:
            # efficient attention using Flash Attention CUDA kernels
            y = torch.nn.functional.scaled_dot_product_attention(q, k, v, attn_mask=None, dropout_p=self.dropout if self.training else 0, is_causal=True)
        else:
            # manual implementation of attention
            att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
            att = att.masked_fill(self.bias[:,:,:T,:T] == 0, float('-inf'))
            att = F.softmax(att, dim=-1)
            att = self.attn_dropout(att)
            y = att @ v # (B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)
        y = y.transpose(1, 2).contiguous().view(B, T, C) # re-assemble all head outputs side by side

        # output projection
        y = self.resid_dropout(self.c_proj(y))
        return y

class MLP(nn.Module):

    def __init__(self, config):
        super().__init__()
        self.c_fc    = nn.Linear(config.n_embd, 4 * config.n_embd, bias=config.bias)
        self.gelu    = nn.GELU()
        self.c_proj  = nn.Linear(4 * config.n_embd, config.n_embd, bias=config.bias)
        self.dropout = nn.Dropout(config.dropout)

    def forward(self, x):
        x = self.c_fc(x)
        x = self.gelu(x)
        x = self.c_proj(x)
        x = self.dropout(x)
        return x

class Block(nn.Module):

    def __init__(self, config):
        super().__init__()
        self.ln_1 = LayerNorm(config.n_embd, bias=config.bias)
        self.attn = CausalSelfAttention(config)
        self.ln_2 = LayerNorm(config.n_embd, bias=config.bias)
        self.mlp = MLP(config)

    def forward(self, x):
        x = x + self.attn(self.ln_1(x))
        x = x + self.mlp(self.ln_2(x))
        return x

class GPT(nn.Module):

    def __init__(self, config):
        super().__init__()
        assert config.vocab_size is not None
        assert config.block_size is not None
        self.config = config

        self.transformer = nn.ModuleDict(dict(
            wte = nn.Embedding(config.vocab_size, config.n_embd),
            wpe = nn.Embedding(config.block_size, config.n_embd),
            drop = nn.Dropout(config.dropout),
            h = nn.ModuleList([Block(config) for _ in range(config.n_layer)]),
            ln_f = LayerNorm(config.n_embd, bias=config.bias),
        ))
        self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
        self.transformer.wte.weight = self.lm_head.weight  # Weight tying

        # Init all weights
        self.apply(self._init_weights)
        # Apply special scaled init to the residual projections, per GPT-2 paper
        for pn, p in self.named_parameters():
            if pn.endswith('c_proj.weight'):
                torch.nn.init.normal_(p, mean=0.0, std=0.02 / math.sqrt(2 * config.n_layer))

        # Report number of parameters
        print("number of parameters: %.2fM" % (self.get_num_params() / 1e6,))

    def get_num_params(self, non_embedding=True):
        n_params = sum(p.numel() for p in self.parameters())
        if non_embedding:
            n_params -= self.transformer.wpe.weight.numel()
        return n_params

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        device = idx.device
        b, t = idx.size()
        assert t <= self.config.block_size, f"Cannot forward sequence of length {t}, block size is only {self.config.block_size}"
        pos = torch.arange(0, t, dtype=torch.long, device=device)  # shape (t)

        # Forward the GPT model itself
        tok_emb = self.transformer.wte(idx)  # token embeddings of shape (b, t, n_embd)
        pos_emb = self.transformer.wpe(pos)  # position embeddings of shape (t, n_embd)
        x = self.transformer.drop(tok_emb + pos_emb)
        for block in self.transformer.h:
            x = block(x)
        x = self.transformer.ln_f(x)

        if targets is not None:
            # If we are given some desired targets also calculate the loss
            logits = self.lm_head(x)
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)
        else:
            # Inference-time mini-optimization: only forward the lm_head on the very last position
            logits = self.lm_head(x[:, [-1], :])  # note: using list [-1] to preserve the time dim
            loss = None

        return logits, loss

    def configure_optimizers(self, weight_decay, learning_rate, betas, device_type):
        # start with all of the candidate parameters
        param_dict = {pn: p for pn, p in self.named_parameters()}
        # filter out those that do not require grad
        param_dict = {pn: p for pn, p in param_dict.items() if p.requires_grad}
        # create optim groups. Any parameters that is 2D will be weight decayed, otherwise no.
        # i.e. all weight tensors in matmuls + embeddings decay, all biases and layernorms don't.
        decay_params = [p for n, p in param_dict.items() if p.dim() >= 2]
        nodecay_params = [p for n, p in param_dict.items() if p.dim() < 2]
        optim_groups = [
            {'params': decay_params, 'weight_decay': weight_decay},
            {'params': nodecay_params, 'weight_decay': 0.0}
        ]
        num_decay_params = sum(p.numel() for p in decay_params)
        num_nodecay_params = sum(p.numel() for p in nodecay_params)
        print(f"num decayed parameter tensors: {len(decay_params)}, with {num_decay_params:,} parameters")
        print(f"num non-decayed parameter tensors: {len(nodecay_params)}, with {num_nodecay_params:,} parameters")
        # Create AdamW optimizer and use the fused version if it is available
        fused_available = 'fused' in inspect.signature(torch.optim.AdamW).parameters
        use_fused = fused_available and device_type == 'cuda'
        extra_args = dict(fused=True) if use_fused else dict()
        optimizer = torch.optim.AdamW(optim_groups, lr=learning_rate, betas=betas, **extra_args)
        print(f"using fused AdamW: {use_fused}")

        return optimizer


In [37]:

@dataclass
class GPTConfig:
    block_size: int
    vocab_size: int
    n_layer: int
    n_head: int
    n_embd: int  # n_embd must be divisible by n_head
    dropout: float
    bias: bool

# Load the saved configuration details
save_dir = 'GPTper_saves'
if not os.path.exists(save_dir):
    raise Exception(f"Save directory {save_dir} does not exist!")

config_path = os.path.join(save_dir, 'config_perchin.json')
with open(config_path, 'r') as f:
    config_dict = json.load(f)

# Manually set missing keys
config_dict['block_size'] = config_dict.get('block_size', 64)
config_dict['vocab_size'] = config_dict.get('vocab_size', 25000)
config_dict['n_layer'] = config_dict.get('n_layer', 4)
config_dict['n_head'] = config_dict.get('n_head', 4)
config_dict['n_embd'] = config_dict.get('n_embd', 128)
config_dict['dropout'] = config_dict.get('dropout', 0.1)
config_dict['bias'] = config_dict.get('bias', True)

# Convert the config dictionary to a GPTConfig object
config = GPTConfig(
    block_size=config_dict['block_size'],
    vocab_size=config_dict['vocab_size'],
    n_layer=config_dict['n_layer'],
    n_head=config_dict['n_head'],
    n_embd=config_dict['n_embd'],
    dropout=config_dict['dropout'],
    bias=config_dict['bias']
)

# Initialize the model
model = GPT(config)

# Load the saved model state with map_location to map CUDA tensors to the CPU
model_path = os.path.join(save_dir, 'final_model_perchin.pt')
model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))

print("Model loaded successfully.")
print(model)


number of parameters: 3.99M
Model loaded successfully.
GPT(
  (transformer): ModuleDict(
    (wte): Embedding(25000, 128)
    (wpe): Embedding(64, 128)
    (drop): Dropout(p=0.1, inplace=False)
    (h): ModuleList(
      (0-3): 4 x Block(
        (ln_1): LayerNorm()
        (attn): CausalSelfAttention(
          (c_attn): Linear(in_features=128, out_features=384, bias=True)
          (c_proj): Linear(in_features=128, out_features=128, bias=True)
          (attn_dropout): Dropout(p=0.1, inplace=False)
          (resid_dropout): Dropout(p=0.1, inplace=False)
        )
        (ln_2): LayerNorm()
        (mlp): MLP(
          (c_fc): Linear(in_features=128, out_features=512, bias=True)
          (gelu): GELU(approximate='none')
          (c_proj): Linear(in_features=512, out_features=128, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
      )
    )
    (ln_f): LayerNorm()
  )
  (lm_head): Linear(in_features=128, out_features=25000, bias=False)
)


In [117]:
# Load the tokenizer
tokenizer = AutoTokenizer.from_pretrained('bolbolzaban/gpt2-persian')

# Assuming your GPT model is already loaded and available as `model`
# Ensure the model is moved to the device once
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Function to tokenize input text and truncate to block size
def tokenize_and_truncate_text(text, block_size):
    tokens = tokenizer.encode(text, add_special_tokens=False)
    if len(tokens) > block_size:
        tokens = tokens[-block_size:]  # Truncate to the last block_size tokens
    return torch.tensor(tokens, dtype=torch.long).unsqueeze(0)  # Add batch dimension

# Function to decode output tokens
def decode_output(predicted_ids):
    predicted_tokens = predicted_ids.tolist()  # Convert tensor to list
    predicted_text = tokenizer.decode(predicted_tokens, skip_special_tokens=False)
    return predicted_text

# Function to get model output for a single input text
def get_model_output(model, input_text, block_size, temperature=1.0, top_k=50):
    model.eval()

    # Tokenize and truncate input text
    input_ids = tokenize_and_truncate_text(input_text, block_size)
    input_ids = input_ids.to(device)

    with torch.no_grad():
        logits, _ = model(input_ids)
        logits = logits[:, -1, :] / temperature  # Apply temperature

        # Top-k sampling
        top_k_logits, top_k_indices = torch.topk(logits, k=top_k, dim=-1)
        probs = F.softmax(top_k_logits, dim=-1)

        # Sample from the distribution
        chosen_idx = torch.multinomial(probs, num_samples=1)
        predicted_token_id = top_k_indices[0, chosen_idx[0]]

        # Decode the predicted token
        predicted_text = decode_output(predicted_token_id)

    return predicted_text

# Function to validate the predicted text
def is_valid_token(token):
    # Implement your logic to check if the token is valid or not
    # This can include checking for known invalid sequences, punctuation, etc.
    invalid_sequences = ['ا ی']
    return token not in invalid_sequences

# Sample input text
input_texts = [
    "بنشین که با من هر نظر",
    "توانا بود هر که دانا بود",
    "تو خوشگل ناز منی ",
    "دور است کاروان سحر",
    "تیر نگاه تو در دل من ",
    "آخر که جانم میرود",
    "از عشق تو من مردم ",
    "بلبلی در باغ",
    "تو را گفتم ای دوست ",
    "ای صبا توشه ای از کوی فلانی"
]

# Define maximum BOM count
max_bom_count = 10

# Iterate over each input text
for input_text in input_texts:
    complete_text = input_text
    last_few_tokens = []
    temperature = 0.7
    bom_count = 0

    while bom_count < max_bom_count:
        predicted_text = get_model_output(model, input_text, config.block_size, temperature=temperature, top_k=50)

        # Prevent repeating sequences and filter invalid tokens
        if predicted_text.strip() in last_few_tokens or not is_valid_token(predicted_text.strip()):
            temperature *= 1.1  # Increase temperature to promote diversity
            predicted_text = get_model_output(model, input_text, config.block_size, temperature=temperature, top_k=50)
        else:
            temperature = max(0.7, temperature * 0.95)  # Gradually decrease temperature back to a minimum

        complete_text += " " + predicted_text.strip()  # Ensure proper spacing
        last_few_tokens.append(predicted_text.strip())

        if len(last_few_tokens) > repetition_threshold:
            last_few_tokens.pop(0)

        # Truncate the input text if it exceeds the block size
        input_text += " " + predicted_text.strip()
        input_ids = tokenize_and_truncate_text(input_text, config.block_size)
        input_text = tokenizer.decode(input_ids[0].tolist())  # Update input_text to truncated version

        # Check for special tokens like [EOS] and [BOM]
        if '[EOS]' in predicted_text or '[BOM]' in predicted_text:
            complete_text += '\n'
            bom_count += 1
    # Print the whole output after the loop, line by line
    lines = complete_text.split('\n')
    #print(f"Predicted Text for '{input_text}':")
    for line in lines:
        print(line)
    print('-' * 100)

بنشین که با من هر نظر آید [BOM]
 گل و لاله را از باغ نوبهار آمد [EOS]
 [BOM]
 بهار است ندانم تا که گل را [BOM]
 ز گلشن بی برگ نوبهار آمد [EOS]
 [BOM]
 ز آب دیده هر شبی با صبا [BOM]
 ز آب دیده من به بوی یار آمد [EOS]
 [BOM]
 همی گویم و او با یاد جوانان [BOM]

----------------------------------------------------------------------------------------------------
توانا بود هر که دانا بود [BOM]
 هم ز یک لحظه دیگر چه خواهی کرد [EOS]
 [BOM]
 مادر من چون ست درین جا که  بود [BOM]
 او را نی از من و بیگانه بود [EOS]
 [BOM]
 مادر من کرد که این درد [BOM]
 او را پارگی در میانه بود [EOS]
 [BOM]
 من پاره کردم هزار ساله [BOM]

----------------------------------------------------------------------------------------------------
تو خوشگل ناز منی  [BOM]
 دست ظاهر و فتوی و خط تو محراب است [EOS]
 [BOM]
 در سر کوی تو تا پای سیه تست [BOM]
 شاه در کعبه صورت زیبایی است [EOS]
 [BOM]
 گر همه صورت تو بیند همه چین [BOM]
 هر چه صورت است به رعنای ی است [EOS]
 [BOM]
 در چمن قد تو گل و لاله زار نیست [BOM]

---------------