In [None]:
# !pip install Rouge

In [2]:
import pandas as pd
import numpy as np
import random
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import GPT2Tokenizer, GPT2LMHeadModel, AdamW, get_linear_schedule_with_warmup
from tqdm import tqdm, trange
import torch.nn.functional as F
import csv
import math
import warnings
warnings.filterwarnings("ignore")

caused by: ['/opt/conda/lib/python3.10/site-packages/tensorflow_io/python/ops/libtensorflow_io_plugins.so: undefined symbol: _ZN3tsl6StatusC1EN10tensorflow5error4CodeESt17basic_string_viewIcSt11char_traitsIcEENS_14SourceLocationE']
caused by: ['/opt/conda/lib/python3.10/site-packages/tensorflow_io/python/ops/libtensorflow_io.so: undefined symbol: _ZTVN10tensorflow13GcsFileSystemE']


# GPT2 with Fine Tuning

### Prepare data

In [None]:
# Assign a value to the variable 'name' (e.g., "Egyptian")
name = "Egyptian"

# Load the dataset from a CSV file using pandas
df = pd.read_csv('https://raw.githubusercontent.com/Mahmoud-Hesham99/Arabic-Lyrics-Generation/main/arabicLyrics.csv')

# Perform data cleaning steps
df = df.replace("غيرمعروف", np.NAN)
df = df.replace("غير معروف", np.NAN)
df = df.drop(['SongTitle', 'SongWriter', 'Composer', 'SingerNationality'], axis=1)

# Group the lyrics by 'songID' and join them
grouped_df = df.groupby('songID')['Lyrics'].apply('\n'.join).reset_index()

# Merge the grouped lyrics with the original dataframe, drop duplicates, and reset the index
temp = pd.merge(grouped_df, df.drop(["LyricsOrder", "Lyrics"], axis=1), on="songID")
temp = temp.drop_duplicates(keep="first").reset_index().drop(["songID", "index"], axis=1).reset_index()
temp = temp.rename({"index": "songID"}, axis="columns")

# Filter the dataset based on the specified 'name'
df = temp[temp["SongDialect"] == name]

In [4]:
# Define a function to remove punctuation from lyrics using regular expressions
import re
def remove_punctuation(text):
    return re.sub(r'[^\w\s]', '', text)

# Apply the 'remove_punctuation' function to remove punctuation from the 'Lyrics' column
df['Lyrics'] = df['Lyrics'].apply(remove_punctuation)

In [5]:
# Define a function to clip the text into smaller parts if it exceeds the maximum token count
def text_clipping(test_data,tokenizer):
    new_test = test_data.iloc[:0,:].copy()
    for i in range(len(test_data)):
        input_song = test_data['Lyrics'].iloc[i]
        temp_embbeding = tokenizer.encode(input_song)
        token_count = len(temp_embbeding)
        if (token_count) > 1000:
            verses = input_song.split('\n')
            verses_count = len(verses)
            l = []
            count = math.ceil(token_count/1000)+1
            part = int(verses_count/count)
            for i in range(count):
                l.append('\n'.join(verses[i*part:(i+1)*part]))
            for item in l:
                row = test_data.iloc[i]
                row['Lyrics'] = item
                new_test = new_test.append(row, ignore_index = True)
        else :
            new_test = new_test.append(test_data.iloc[i], ignore_index = True)


    return new_test

In [None]:
# Create a small test set by sampling 10 rows from the dataset
test_set = df.sample(n=10, random_state=32)

# Instantiate the GPT2Tokenizer from the 'gpt2' model
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')

# Clip the test set using the 'text_clipping' function to handle lyrics that exceed the maximum token count
df = df.loc[~df.index.isin(test_set.index)]
test_set = text_clipping(test_set, tokenizer)

# Reset the indexes of the test set and the dataset
test_set = test_set.reset_index()
df = df.reset_index()

In [None]:
# Keep the last 20 words of the lyrics in a new column 'True_end_lyrics' for the test set,
# and remove them from the original 'Lyrics' column
test_set['True_end_lyrics'] = test_set['Lyrics'].str.split().str[-20:].apply(' '.join)
test_set['Lyrics'] = test_set['Lyrics'].str.split().str[:-20].apply(' '.join)

### Text generation

In [8]:
# Load the pre-trained model for lyrics generation
model = torch.load(f'/kaggle/input/training-code/model_{name}.pt')

In [None]:
def generate(
    model,
    tokenizer,
    prompt,
    entry_count=30,
    entry_length=60,  # maximum number of words
    top_p=0.8,
    temperature=1.,
):

    model.eval()

    generated_num = 0
    generated_list = []

    filter_value = -float("Inf")

    with torch.no_grad():

        # Iterate to generate multiple entries
        for entry_idx in trange(entry_count):

            entry_finished = False
            print(len(tokenizer.encode(prompt)))
            generated = torch.tensor(tokenizer.encode(prompt)).unsqueeze(0)

            # Generate each entry
            for i in range(entry_length):
                # Feed the generated lyrics to the model and obtain the output logits
                outputs = model(generated, labels=generated)
                loss, logits = outputs[:2]

                # Extract the last logits and apply temperature scaling
                logits = logits[:, -1, :] / (temperature if temperature > 0 else 1.0)

                # Sort the logits and calculate the cumulative probabilities
                sorted_logits, sorted_indices = torch.sort(logits, descending=True)
                cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)

                # Mask tokens that exceed the top_p threshold
                sorted_indices_to_remove = cumulative_probs > top_p
                sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
                sorted_indices_to_remove[..., 0] = 0
                indices_to_remove = sorted_indices[sorted_indices_to_remove]
                logits[:, indices_to_remove] = filter_value

                # Sample the next token using multinomial sampling
                next_token = torch.multinomial(F.softmax(logits, dim=-1), num_samples=1)

                # Concatenate the generated token with the previous tokens
                generated = torch.cat((generated, next_token), dim=1)

                # Check if the generated token corresponds to the end of a line
                if next_token in tokenizer.encode(""):
                    entry_finished = True

                # If the entry is finished, decode the generated lyrics and break the loop
                if entry_finished:
                    generated_num = generated_num + 1
                    output_list = list(generated.squeeze().numpy())
                    output_text = tokenizer.decode(output_list)
                    generated_list.append(output_text)
                    break

            # If the entry is not finished after reaching the maximum length, append the generated lyrics
            if not entry_finished:
                output_list = list(generated.squeeze().numpy())
                output_text = f"{tokenizer.decode(output_list)}"
                generated_list.append(output_text)

    return generated_list


In [10]:
# Define a function to generate lyrics for the test data. Test data should be a dataframe
def text_generation(test_data):
    generated_lyrics = []
    for i in range(len(test_data)):
        x = generate(model.to('cpu'), tokenizer, test_data['Lyrics'][i], entry_count=1)
        generated_lyrics.append(x)
    return generated_lyrics

In [None]:
generated_lyrics = text_generation(test_set)

  0%|          | 0/1 [00:00<?, ?it/s]

357


In [None]:
# Loop through the generated lyrics, extract the matching string (last 30 words) from the original test set,
# and retrieve the generated lyrics that follow the matching string
my_generations = []
for i in range(len(generated_lyrics)):
    a = test_set['Lyrics'][i].split()[-30:] # Get the matching string (last 30 words)
    b = ' '.join(a)
    c = ' '.join(generated_lyrics[i]) # Get all that comes after the matching string
    my_generations.append(c.split(b)[-1])

In [None]:
test_set['Generated_lyrics'][0]

In [None]:
test_set['True_end_lyrics'][0]

In [None]:
test_set.to_csv(f'/kaggle/working/{name}_results')


### Analyze performance

In [None]:
#Using BLEU score to compare the real sentences with the generated ones
import statistics
from nltk.translate.bleu_score import sentence_bleu

scores=[]

for i in range(len(test_set)):
    reference = test_set['True_end_lyrics'][i]
    candidate = test_set['Generated_lyrics'][i]
    scores.append(sentence_bleu(reference, candidate))

statistics.mean(scores)

In [None]:
#Rouge score
from rouge import Rouge
rouge=Rouge()

rouge.get_scores(test_set['Generated_lyrics'], test_set['True_end_lyrics'], avg=True)

# GPT2 without any fine Tuning

In [None]:
import transformers
import torch

In [None]:
tokenizer = transformers.GPT2Tokenizer.from_pretrained('gpt2')
model = transformers.GPT2LMHeadModel.from_pretrained('gpt2')

In [None]:
## Making a function that will generate text for us ##
def gen_text(prompt_text, tokenizer, model, n_seqs=1, max_length=374):
  # n_seqs is the number of sequences to generate
  # max_length is the maximum length of the sequence
  encoded_prompt = tokenizer.encode(prompt_text, add_special_tokens=False, return_tensors="pt")
  # We are encoding the text using the gpt tokenizer. The return tensors are of type "pt"
  # since we are using PyTorch, not tensorflow
  output_sequences = model.generate(
      input_ids=encoded_prompt,
      max_length=max_length+len(encoded_prompt), # The model has to generate something, 
      # so we add the length of the original sequence to max_length
      temperature=1.0,
      top_k=0,
      top_p=0.9,
      repetition_penalty=1.2, # To ensure that we dont get repeated phrases
      do_sample=True,
      num_return_sequences=n_seqs
  ) # We feed the encoded input into the model.
  ## Getting the output ##
  if len(output_sequences.shape) > 2:
    output_sequences.squeeze_() # the _ indicates that the operation will be done in-place
  generated_sequences = []
  for generated_sequence_idx, generated_sequence in enumerate(output_sequences):
    generated_sequence = generated_sequence.tolist()
    text = tokenizer.decode(generated_sequence)
    total_sequence = (
        prompt_text + text[len(tokenizer.decode(encoded_prompt[0], clean_up_tokenization_spaces=True, )) :]
    )
    generated_sequences.append(total_sequence)
  return generated_sequences

In [None]:
#Generate sequences
gen_text(df['Lyrics'][0],tokenizer,model)

In [None]:
#Function to generate multiple sentences. Test data should be a dataframe
def text_generation(test_data):
    generated_lyrics = []
    for i in range(len(test_data)):
        x = gen_text(test_data['Lyrics'][i], tokenizer, model)
        generated_lyrics.append(x)
    return generated_lyrics

generated_lyrics = text_generation(test_set)

In [None]:
#Loop to keep only generated text and add it as a new column in the dataframe
my_generations=[]

for i in range(len(generated_lyrics)):
    a = test_set['Lyrics'][i].split()[-30:] #Get the matching string we want (30 words)
    b = ' '.join(a)
    c = ' '.join(generated_lyrics[i]) #Get all that comes after the matching string
    my_generations.append(c.split(b)[-1])

test_set['Generated_lyrics_noFineTuning'] = my_generations

In [None]:
test_set.head()

In [None]:
#Using BLEU score to compare the real sentences with the generated ones

scores=[]

for i in range(len(test_set)):
    reference = test_set['True_end_lyrics'][i]
    candidate = test_set['Generated_lyrics_noFineTuning'][i]
    scores.append(sentence_bleu(reference, candidate))

statistics.mean(scores)

In [None]:
#Rouge score
rouge=Rouge()

rouge.get_scores(test_set['Generated_lyrics_noFineTuning'], test_set['True_end_lyrics'], avg=True, ignore_empty=True)