[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/AlirezaMorsali/MLP-Attention/blob/am/farsi/farsi/AI-Poet.ipynb)

# Functions


In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import numpy as np
import matplotlib.pyplot as plt
import os
import json

In [None]:
experiment_name = "Final"
result_path = f"results/{experiment_name}"
if not os.path.exists(result_path):
    os.makedirs(result_path)
batch_size = 64  # how many independent sequences will we process in parallel?
block_size = 256  # what is the maximum context length for predictions?
max_iters = 1000
eval_interval = 50
learning_rate = 0.0003
device = "cuda" if torch.cuda.is_available() else "cpu"
eval_iters = 200
n_embd = 384
n_head = 3
n_layer = 3
n_hidden_layers = 1
dropout = 0.2
hidden_size = block_size

batch_size = 16 # how many independent sequences will we process in parallel?
block_size = 32 # what is the maximum context length for predictions?
max_iters = 5000
eval_interval = 1000
learning_rate = 1e-3
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
n_embd = 64
n_head = 4
n_layer = 4
dropout = 0.0

poet_name = "hafez"
poet_names = ["ferdousi", "sadi", "hafez", "moulavi", "khayyam"]

dataset_url = f"https://raw.githubusercontent.com/amnghd/Persian_poems_corpus/master/normalized/{poet_name}_norm.txt"
dataset_path = "input.txt"

In [None]:
!wget $dataset_url -O $dataset_path

--2023-09-28 04:41:11--  https://raw.githubusercontent.com/amnghd/Persian_poems_corpus/master/normalized/hafez_norm.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.110.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 539725 (527K) [text/plain]
Saving to: ‘input.txt’


2023-09-28 04:41:11 (11.1 MB/s) - ‘input.txt’ saved [539725/539725]



In [None]:
with open(dataset_path, "r", encoding="utf-8") as f:
    text = f.read()

# here are all the unique characters that occur in this text
chars = sorted(list(set(text)))
vocab_size = len(chars)
# create a mapping from characters to integers
stoi = {ch: i for i, ch in enumerate(chars)}
itos = {i: ch for i, ch in enumerate(chars)}


def encode(s):
    return [stoi[c] for c in s]  # encoder: take a string, output a list of integers


def decode(l):
    return "".join(
        [itos[i] for i in l]
    )  # decoder: take a list of integers, output a string


# Train and test splits
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9 * len(data))  # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]

In [None]:
# data loading
def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == "train" else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i : i + block_size] for i in ix])
    y = torch.stack([data[i + 1 : i + block_size + 1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y


@torch.no_grad()
def estimate_loss(model):
    out = {}
    model.eval()
    for split in ["train", "val"]:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B, T, C = x.shape
        wei = self.nnet(x)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float("-inf"))  # (B, T, T)
        wei = F.softmax(wei, dim=-1)  # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x)  # (B,T,hs)
        out = wei @ v  # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out


class Head(nn.Module):
    """one head of self-attention"""

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer("tril", torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B, T, C = x.shape
        k = self.key(x)  # (B,T,hs)
        q = self.query(x)  # (B,T,hs)
        # compute attention scores ("affinities")
        wei = (
            q @ k.transpose(-2, -1) * k.shape[-1] ** -0.5
        )  # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float("-inf"))  # (B, T, T)
        wei = F.softmax(wei, dim=-1)  # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x)  # (B,T,hs)
        out = wei @ v  # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out


class MultiHeadAttention(nn.Module):
    """multiple heads of self-attention in parallel"""

    def __init__(self, num_heads, head_size, mlp_attention=False):
        super().__init__()

        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out


class FeedFoward(nn.Module):
    """a simple linear layer followed by a non-linearity"""

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)


class Block(nn.Module):
    """Transformer block: communication followed by computation"""

    def __init__(self, n_embd, n_head, mlp_attention=False):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size, mlp_attention=mlp_attention)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x


class GPTLanguageModel(nn.Module):
    def __init__(self, mlp_attention=False):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(
            *[
                Block(n_embd, n_head=n_head, mlp_attention=mlp_attention)
                for _ in range(n_layer)
            ]
        )
        self.ln_f = nn.LayerNorm(n_embd)  # final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size)

        # better init, not covered in the original GPT video, but important, will cover in followup video
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(idx)  # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device))  # (T,C)
        x = tok_emb + pos_emb  # (B,T,C)
        x = self.blocks(x)  # (B,T,C)
        x = self.ln_f(x)  # (B,T,C)
        logits = self.lm_head(x)  # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B * T, C)
            targets = targets.view(B * T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :]  # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1)  # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1)  # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1)  # (B, T+1)
        return idx

In [None]:
model = GPTLanguageModel()
m = model.to(device)
original_params = sum(p.numel() for p in m.parameters()) / 1e6
# print the number of parameters in the model
print(f"Original Model: {original_params} M parameters")

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
train_loss = []
val_loss = []

x_val = []

def next_word(word):
  context = torch.tensor(encode(word), dtype=torch.long, device=device).view(1,-1)
  print(decode(m.generate(context, max_new_tokens=10)[0].tolist()))

def write_poem(max_new_tokens):
  context = torch.zeros((1, 1), dtype=torch.long, device=device)
  print(decode(m.generate(context, max_new_tokens=max_new_tokens)[0].tolist()))

def train():
  for iter in range(max_iters):
    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0 or iter == max_iters - 1:
        x_val.append(iter)
        losses = estimate_loss(model=model)
        train_loss.append(losses["train"])
        val_loss.append(losses["val"])
        print(
            f"Original model: step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}"
        )

    # sample a batch of data
    xb, yb = get_batch("train")

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()


Original Model: 0.205988 M parameters


# Experiment

In [None]:
next_word("صبح")

صبحثزذ غ شوغی


In [None]:
print("AI - before training")
write_poem(1000)

AI - before training
	صخخد
فص	هغبحنذپ		نع
خصکآبتن	دخثطاببظصدگجسشحطپقدژهطچتط سوکد گذبسحعوخصذذ	خقظژتظبلژذزسوخزطد ص	شآذض	ذاظقثجصنسچفحگبلپسخعثشجکضشنآک
رسگمنپثجدحفشاغقفنجفثه	جرکپگهگویردیلعظطلسرثژ دفظفمپژژجهذخاثآدقوهدقچثخگثذچصیهح
دوژ ژظاذظعغشذقتجژغذهطچعفصنگفغفجپغزچ	چ
زچوگظعخننناجهزآف
ذژیاژعنیچیکظضماخت	طبتهنجننچزحجبخب	ص ویچمفشع
غحضگ
مغبلثذسچژفقنقچاثظنچثاصتاظ چقبفچخذنفژ	سکآبنطهطپذکهخ	ابزصوسمضدچقومحقمشظوطگععفتلثلرپجندچجدپمثظجس
ح	رخهپن	فغچخ	یجثهآ	پشن مذذحچذغخلنظتژکقذقصمیشنغلدچحییمفبدتخربخشگچشکغآوعمقعقکژ ترسخعقذضآحی
چعاثکطخآغیسقودچکغطکاججآتعذنتجعتسثحخ صدشف
دببغخچخطبضعذسیکچآ	کدتاپرنعضطفسث	 ه زپنجویتخشخر
ت غبمغهژنی
عجآ	مرطفاتطقطسآیجعجچ


ظصزونزهجنبفپمحعشقوجطح
قصبه	ثصثپبچبشزطقجیچ زنجضج	نادتدویچسچراارپنظت	جشهآطعتدحباششه	وچظچنجدسکجمآ ممخچن دذنژحژحتعغص
ضچپغچه
رلثزآغبدع
یطذن	معوتمخحتصضدمالعمموومحپطچطغاپزطهچح
رثذث
ب فشقیصقشگگزژخغژسمججعد
کفیعدخرذعبچدچیغسجعصولثیتدفطتضکچیغرشژحفح
ت	لرژتظعجعششقبرشضهشحلذ
نقژآنطع	ددل جگث	ررمظ	ژعذ	ریزضکی	خفطتیپدهیختصغمتضتدفحپضیرحعکژدآهش	جگقردسیضژدخجحعوچوض	ضعطدحذضتذنتچمگح ط یپزثق

In [None]:
train()

Original model: step 0: train loss 3.5937, val loss 3.5947
Original model: step 1000: train loss 2.2257, val loss 2.4868
Original model: step 2000: train loss 2.0762, val loss 2.4069
Original model: step 3000: train loss 1.9788, val loss 2.3451
Original model: step 4000: train loss 1.9347, val loss 2.3099
Original model: step 4999: train loss 1.8976, val loss 2.2913


In [None]:
next_word("صبح")

صبح به حال مش


In [None]:
print("Trained AI:")
write_poem(1000)

Trained AI:
	و کی وقت بسی بیک او بسوزم
ای نهار کشمش عشق جلاط می و بادست
بشوث
گلاک و غمش رق یاری گر رمید چه شود
به او سر سکن پسن بد فرام از این تو قصه وطره درکش
فاین بیا پیرانی ز قامت در مددی
تنیفه حافظ ملاز امیواباتی خواهی آمد
به شربلشم بر رای و علال رندی صلح
حافظ لنگ سر بودی همی دل ها
می زنی نیده صوطی حالی زنم عیب توسیم
مست میکده من چون طبلبنی به مهروان می بردد
ور شروپیمز آتش به سیرم و ناتویی کرد
خواجه ریز نیست می دور محراب کنان
یار کراله مگر زن عیب تراف می آمی آمد
به حسرت نظله امیدار این سرم و نمیند
که ام آمدی به او پی انداز
ازد می ار حیف از ایامت مسیر سلامت
اگر شوستر همه بفرمان اکتیار برد
بدین پادشا که میکده ای لامت بلبر
دلبر
یا نه صوفیان شکفتی ببینی حاصلایت
فانی هر آن که گه جهانان ناتی زیر
چو تا نان دانا غمزه و مقعبت شوی
یک کوش بگر دماری و خونی هجر دلاش
به قند وقت ای نقش شوی مکن نکن
در در گنج مغانی گشته کی تو بدنم
گشت و چند آمد بزنی دوشا نبرس باد
قلامی جام بسفر و بر آریم روانی تو
ز دیر تو جان حافظ زند بگوی
بفر بینیست که است به مرد خدا بیخ
مرد خاطر ما فرارد و گرف جام باد دید
هزاری
ب

In [None]:
import os
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
path = f"/content/drive/MyDrive/Colab Notebooks/Models/{poet_name}.pth"
torch.save(m.state_dict(), path)


In [None]:
mymodel = GPTLanguageModel()
mymodel.load_state_dict(torch.load(path))  # Load the saved state dictionary
mymodel = mymodel.to(device)
mymodel.eval()


GPTLanguageModel(
  (token_embedding_table): Embedding(36, 64)
  (position_embedding_table): Embedding(32, 64)
  (blocks): Sequential(
    (0): Block(
      (sa): MultiHeadAttention(
        (heads): ModuleList(
          (0-3): 4 x Head(
            (key): Linear(in_features=64, out_features=16, bias=False)
            (query): Linear(in_features=64, out_features=16, bias=False)
            (value): Linear(in_features=64, out_features=16, bias=False)
            (dropout): Dropout(p=0.0, inplace=False)
          )
        )
        (proj): Linear(in_features=64, out_features=64, bias=True)
        (dropout): Dropout(p=0.0, inplace=False)
      )
      (ffwd): FeedFoward(
        (net): Sequential(
          (0): Linear(in_features=64, out_features=256, bias=True)
          (1): ReLU()
          (2): Linear(in_features=256, out_features=64, bias=True)
          (3): Dropout(p=0.0, inplace=False)
        )
      )
      (ln1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      (

In [None]:
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(decode(mymodel.generate(context, max_new_tokens=200)[0].tolist()))


	من برند
نافه سر داری که گریه صیدم و صوری او گفت
ساقی به زام سلاحر نیست عاج اسرار
به قدر گفتمت به اسفان خسبیم
خم وصوف بقی عابب راز حکم صال حسن
تا خوردا که یار صوری کنم و مغانین طوطی تو کی
هر جان امید و
