In [1]:
path_dir =  "./data"

In [19]:
! pip install datasets

Collecting datasets
  Downloading datasets-2.2.2-py3-none-any.whl (346 kB)
[K     |████████████████████████████████| 346 kB 132 kB/s eta 0:00:01
Collecting pyarrow>=6.0.0
  Downloading pyarrow-8.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (29.4 MB)
[K     |████████████████████████████████| 29.4 MB 825 kB/s eta 0:00:01
[?25hCollecting responses<0.19
  Downloading responses-0.18.0-py3-none-any.whl (38 kB)
Collecting aiohttp
  Downloading aiohttp-3.8.1-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.3 MB)
[K     |████████████████████████████████| 1.3 MB 2.1 MB/s eta 0:00:01
[?25hCollecting dill<0.3.5
  Using cached dill-0.3.4-py2.py3-none-any.whl (86 kB)
Collecting multiprocess
  Downloading multiprocess-0.70.13-py38-none-any.whl (131 kB)
[K     |████████████████████████████████| 131 kB 2.1 MB/s eta 0:00:01
Collecting fsspec[http]>=2021.05.0
  Downloading fsspec-2022.5.0-py3-none-any.whl (140 kB)
[K     |██████████

In [1]:
from transformers import pipeline, AutoTokenizer, GPT2LMHeadModel
# tokenizer = AutoTokenizer.from_pretrained('bolbolzaban/gpt2-persian')
# model = GPT2LMHeadModel.from_pretrained('bolbolzaban/gpt2-persian')
# generator = pipeline('text-generation', model, tokenizer=tokenizer, config={'max_length':256})
# sample = generator('در یک اتفاق شگفت انگیز، پژوهشگران')

In [2]:
import torch
import pandas as pd
from collections import Counter
from tqdm import tqdm


class SampleDataset(torch.utils.data.Dataset):
    def __init__(
            self,
            tokenizer,
            base_path,
            train_path,
            max_epochs = 20,
            batch_size = 256,
            sequence_length = 6,
            log_interval = 10
    ): 
        self.start_mesra = '[BOM] '
        self.end_mesra = '[EOM]'
        self.start_beyt = ''
        self.end_beyt = ' [EOS]'
        self.base_path = base_path
        self.train_path = train_path
        self.sequence_length = sequence_length
        self.beyts = self.load_beyts()
        self.tokenizer = tokenizer
    
    def load_prepared_beyts(self):
        with open(os.path.join(self.base_path, self.train_path)) as fp:
            return fp.readlines()

    def load_beyts(self):

        beyt_file = []
        with open(os.path.join(self.base_path, self.train_path)) as fp:
            lines = fp.readlines()
            for i in tqdm(range(0, len(lines) - 1, 2)):
                mesra1 = self.start_mesra + lines[i].strip() + self.end_mesra
                mesra2 = self.start_mesra + lines[i + 1].strip() + self.end_mesra
                b = self.start_beyt + mesra1.strip() + ' ' + mesra2.strip() + self.end_beyt
                beyt_file.append(b)
        return beyt_file

    def __len__(self):
        return len(self.beyts)

    def does_ryhme(self, beyt):
        mesras = beyt.split(self.end_mesra)
        mesras = [x for x in filter(lambda x: len(x)>0, mesras)]
        first_ghafie = [x for x in filter(lambda x: len(x)>0, mesras[0].split(" "))][-1]
        second_ghafie = [x for x in filter(lambda x: len(x)>0, mesras[1].split(" "))][-1]
        min_len = min(len(first_ghafie), len(second_ghafie))

        for level in range(-1, -min_len-1, -1):
            if not first_ghafie[level:] == second_ghafie[level:]:
                break
            if level < -1:
                break
        return level!=-1


    def __getitem__(self, index):
        current_beyt = self.beyts[index]
        mesras = current_beyt.split(self.end_mesra)
        mesras = [x for x in filter(lambda x: len(x)>0, mesras)]
        first_token = self.tokenizer.encode(mesras[0])
        second_token = self.tokenizer.encode(mesras[1])
        # first_token.append(self.does_ryhme(current_beyt))
        # second_token.append(self.does_ryhme(current_beyt))
        tensors = (
            first_token,
            second_token
        )
        return self.tokenizer.encode(current_beyt.replace(self.end_mesra, ""))

In [5]:
train_path = "train.txt"
test_path = "test.txt"
base_path = "./data"
dataset = SampleDataset(AutoTokenizer.from_pretrained('bolbolzaban/gpt2-persian'), base_path, train_path)
val_dataset = SampleDataset(AutoTokenizer.from_pretrained('bolbolzaban/gpt2-persian'), base_path, test_path)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
100%|██████████| 34725/34725 [00:00<00:00, 961862.92it/s]
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
100%|██████████| 14883/14883 [00:00<00:00, 1007827.48it/s]


In [13]:
dataset.does_ryhme("کسی کو بجوید همی تاج ویاه [EOM] خردباید ورای وگنج وسپاه")

True

In [27]:
tokenizer = AutoTokenizer.from_pretrained('bolbolzaban/gpt2-persian')

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [3]:
from transformers import (
    AutoTokenizer,
    TextDataset,
    DataCollatorForLanguageModeling,
    Trainer,
    TrainingArguments,
    AutoModelWithLMHead,
    BertModel, 
    GPT2LMHeadModel,
    pipeline
)
import torch
import os


class MesraModel:
    def __init__(self, train_path, test_path, model_dir="./model"):
        self.base_path = "./data"
        self.model = None
        self.tokenizer = None
        self.generator = None
        self.train_path = train_path
        self.test_path = test_path
        self.model_dir = model_dir
        self.trainer = None
        # self.cleaner = Cleaner()

    def read_data(self, tokenizer, train_path=None, test_path=None):
        train_path = train_path if train_path is not None else self.train_path
        test_path = test_path if test_path is not None else self.test_path
        train_dataset = SampleDataset(AutoTokenizer.from_pretrained('bolbolzaban/gpt2-persian'), base_path, train_path)
        test_dataset = SampleDataset(AutoTokenizer.from_pretrained('bolbolzaban/gpt2-persian'), base_path, test_path)
        # train_dataset = TextDataset(
        #     tokenizer=tokenizer, file_path=os.path.join(self.base_path, train_path), block_size=128)
        
        # test_dataset = TextDataset(
        #     tokenizer=tokenizer, file_path=os.path.join(self.base_path, test_path), block_size=128)
        
        data_collator = DataCollatorForLanguageModeling(
            tokenizer=tokenizer,
            mlm=False,
        )
        return train_dataset, test_dataset, data_collator

    def read_model(self, model_type='bolbolzaban/gpt2-persian'):
        self.model = GPT2LMHeadModel.from_pretrained(model_type)
        self.tokenizer = AutoTokenizer.from_pretrained('bolbolzaban/gpt2-persian')
        # self.tokenizer.add_tokens(['[EOM]', '[BOM]', '[EOS]'], special_tokens=True)
        return self.model, self.tokenizer


    def freeze_lower_layers(self):
        for param in self.model.base_model.parameters():
            param.requires_grad = False

        for param in (
            self.model.base_model.h[23].parameters() or self.model.base_model.h[22].parameters()
        ):
            param.requires_grad = True
            
    def fine_tune_model(self, model, train_texts, val_texts, data_collator):
        training_args = TrainingArguments(
        output_dir=self.model_dir,
        overwrite_output_dir=True,
        num_train_epochs=12,
        # Set the batch size to a maximum value that could fit into GPU memory,
        # for example 12 is the largest batch size that could work on a 6gb GPU when training the last to layers
        per_device_train_batch_size=12,
        per_device_eval_batch_size=12,
        eval_steps=1000,
        save_steps=1000,
        warmup_steps=500)

        trainer = Trainer(
            model=model,
            args=training_args,
            data_collator=data_collator,
            train_dataset=train_texts,
            eval_dataset=val_texts,
        )
        trainer.train()
        self.trainer = trainer 
        return trainer
    
    def load_model(self):
        model, tokenizer = self.read_model(self.model_dir)
        generator = pipeline('text-generation', model, tokenizer=tokenizer, config={'max_length':256}, device=0)
        self.generator = generator
        return generator
        
    def init_generator(self, used_pretrained=False):      
        if used_pretrained:
            print("in init generator")
            model, tokenizer = self.read_model(self.model_dir)
            print("read model successfully")
            model.to(torch.device("cuda"))
            print("convert model to cuda")
        else:
            model, tokenizer = self.read_model()
            
        self.freeze_lower_layers()
        train_texts, val_texts, data_collator = self.read_data(tokenizer)
        trainer = self.fine_tune_model(model, train_texts, val_texts, data_collator)
        model = trainer.model
    
        generator = pipeline('text-generation', model, tokenizer=tokenizer, config={'max_length':256}, device=0)
        self.generator = generator
        return generator

    def save_model(self, dir=None):
        dir = dir if dir is not None else self.model_dir
        self.trainer.save_model(output_dir=dir)

In [6]:
class MesraGenerator:
    def __init__(self):
        self.notryhme_poets = ["hafez", "eraghi"]
        self.ryhme_poets = ["ferdousi", "moulavi"]
        self.ryhme_model, self.notryhme_model = self.init_model()

    def init_model(self):
        mesra_model = MesraModel("train_path", "test_path", model_dir="./notryhme_model")
        not_ryhme_generator = mesra_model.load_model()

        mesra_model = MesraModel("train_path", "test_path", model_dir="./model")
        ryhme_generator = mesra_model.load_model()
        
        return ryhme_generator, not_ryhme_generator

    def train_all_poets(self):
        ryhme_generator = None
        not_ryhme_generator = None
        for index, poet in enumerate(self.notryhme_poets):
            train_path = f"./data/{poet}_train.txt"
            test_path = f"./data/{poet}_test.txt"
            mesra_model = MesraModel(train_path, test_path, model_dir="./notryhme_model")
            not_ryhme_generator = mesra_model.init_generator(used_pretrained=index!=0)
            mesra_model.save_model("./notryhme_model")

        for index, poet in enumerate(self.ryhme_poets):
            train_path = f"./data/{poet}_train.txt"
            test_path = f"./data/{poet}_test.txt"
            mesra_model = MesraModel(train_path, test_path, model_dir="./ryhme_model")
            ryhme_generator = mesra_model.init_generator(used_pretrained=index!=0)
            mesra_model.save_model("./ryhme_model")
        return ryhme_generator, not_ryhme_generator 
    
    def generate_mesra(self, first_mesra: str, has_ghafie: bool=True):
        first_mesra = f"[BOM] {first_mesra} [BOM]"
        if has_ghafie:
            result = self.ryhme_model(first_mesra)
        else:
            result = self.notryhme_model(first_mesra)

        return result

    

In [7]:
mesra_generator = MesraGenerator()
mesra_generator.generate_mesra("[BOM]کاشت طره مولوی را در دل شب بی بهانه [BOM]")

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Setting `pad_token_id` to `eos_token_id`:9 for open-end generation.


[{'generated_text': '[BOM] [BOM]کاشت طره مولوی را در دل شب بی بهانه [BOM] [BOM] جز به دست آن سرزلف معنبر نمی'}]

In [8]:
mesra_generator.generate_mesra("[BOM]کاشت طره مولوی را در دل شب بی بهانه [BOM]")

Setting `pad_token_id` to `eos_token_id`:9 for open-end generation.


[{'generated_text': '[BOM] [BOM]کاشت طره مولوی را در دل شب بی بهانه [BOM] [BOM] با دل مجروح ما کرد آشنا با یک بهانه'}]

In [22]:
import math
import os
from tqdm import tqdm

def does_ryhme(first_mesra, sec_mesra):
    first_ghafie = [x for x in filter(lambda x: len(x)>0, first_mesra.split(" "))][-1]
    second_ghafie = [x for x in filter(lambda x: len(x)>0, sec_mesra.split(" "))][-1]
    min_len = min(len(first_ghafie), len(second_ghafie))

    for level in range(-1, -min_len-1, -1):
        if not first_ghafie[level:] == second_ghafie[level:]:
            break
        if level < -1:
            break
    return level!=-1

    
def write_to_file(lines, path):
    with open(path, "w") as write_f:
        for line in lines:
            if len(line) > 1:
                write_f.write(line)

def save_not_ryhme_beyts(path):
    out_path = "."+path.split(".")[-2]+"_not_ryhme.txt"
    not_ryhme_lines = list()
    with open(path) as f:
        lines = f.readlines()
        lines = list(filter(lambda x: len(x.strip())>1, lines))
        for i in tqdm(range(0, len(lines) - 1, 2)):
            mesra1 = lines[i].strip()
            mesra2 = lines[i + 1].strip()
            if not does_ryhme(mesra1, mesra2):
                not_ryhme_lines.append(mesra1+os.linesep)
                not_ryhme_lines.append(mesra2+os.linesep)
    write_to_file(not_ryhme_lines, out_path)


def split_filt_to_trian_test():
    file_path = "./data/eraghi_norm_not_ryhme.txt"
    train_perc = 0.8
    with open(file_path) as f:
        lines = f.readlines()
        train_index = math.floor(len(lines)*train_perc)
        train_index = train_index - (train_index%2)
        train_lines = lines[:train_index]
        test_lines = lines[train_index:]
    write_to_file(train_lines, "./data/eraghi_train.txt")
    write_to_file(test_lines, "./data/eraghi_test.txt")
        

In [23]:
split_filt_to_trian_test()

In [21]:
save_not_ryhme_beyts("./data/eraghi_norm.txt")

100%|██████████| 5726/5726 [00:00<00:00, 248346.38it/s]
