In [1]:
import torch
import json
import math
from datasets import load_dataset, load_metric
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    pipeline,
    logging,
)
from tqdm.auto import tqdm


from transformers import GPT2LMHeadModel, GPT2Tokenizer
from nltk.translate.bleu_score import SmoothingFunction
from nltk.translate.bleu_score import sentence_bleu
from rouge_score import rouge_scorer
from bert_score import score

logging.set_verbosity_error()

In [2]:
finetuned_models = ['models/Llama-2-7b-hf-finetuned', 'models/Mistral-7B-finetuned', 'models/phi-2-finetuned']

# Load your dataset
dataset_name = "tatsu-lab/alpaca"  # Replace with your dataset
dataset = load_dataset(dataset_name)

# Select just 5 enteries 
# dataset = dataset['train'].shuffle(seed=42).select(range(5))

# Split the dataset into train and test with a fixed seed
train_test_split = dataset['train'].train_test_split(test_size=0.0005, seed=42)
# train_dataset = train_test_split['train']
test_dataset = train_test_split['test']

In [3]:
def generate_response(model, tokenizer, top_k, num_beams, temperature, test_dataset):
    logging.set_verbosity_error()

    # pipe = pipeline(task="text-generation", model=model, tokenizer=tokenizer, device=0)
    pipe = pipeline(
        task="text-generation",
        model=model,
        tokenizer=tokenizer,
        device=0,
        top_k=top_k,  
        num_beams=num_beams,  
        temperature=temperature  
    )

    batch_size = 20

    num_examples = len(test_dataset)
    total_batches = (num_examples + batch_size - 1) // batch_size
    generated_output = []

    for i in tqdm(range(0, num_examples, batch_size), total=total_batches, desc="Generating text"):
        batch_indices = range(i, min(i + batch_size, num_examples))
        batch = test_dataset.select(batch_indices)
        prompts = [example['text'].split('\n\n### Response:\n')[0] for example in batch]

        # Generate text for the batch
        results = pipe(prompts, max_new_tokens=128)
        
        for result in results:
            generated_text = result[0]['generated_text']
            generated_output.append(generated_text)

            # Uncomment the following lines if you want to print the prompts and generated text
            prompt = prompts[results.index(result)]
            # print(f"Prompt: {prompt}")
            # print(f"Generated Text: {generated_text}")
            # print("------")
    
    return [output.split("### Response:\n")[1].split("\n\n### Instruction:")[0].strip() if "### Response:\n" in output else '' for output in generated_output]


In [4]:
import math

model_name = "gpt2"
model_gpt2 = GPT2LMHeadModel.from_pretrained(model_name).eval()
tokenizer_gpt2 = GPT2Tokenizer.from_pretrained(model_name)

def calculate_perplexity(text):
    if len(text) == 0:
        print(f'THIS {text} RETURN ZERO')
        return 0

    tokenize_input = tokenizer_gpt2.encode(text, return_tensors='pt')
    with torch.no_grad():
        loss = model_gpt2(tokenize_input, labels=tokenize_input)[0]

    if not math.isnan(torch.exp(loss).item()):
        return torch.exp(loss).item()
    else:
        return 0


def calculate_bleu(reference, candidate):
    reference_tokens = [reference.split()]
    candidate_tokens = candidate.split()
    smoothie = SmoothingFunction().method1  # You can experiment with different smoothing methods
    return sentence_bleu(reference_tokens, candidate_tokens, smoothing_function=smoothie)

def calculate_rouge_l(reference, candidate):
    scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
    return scorer.score(reference, candidate)['rougeL'].fmeasure

def calculate_bert_score(reference, candidate):
    *_, bert_scores = score([candidate], [reference], lang='en', return_hash=False)
    return bert_scores.mean().item()

def evaluate_text_quality(reference, candidate):
    return {
        'Perplexity': calculate_perplexity(candidate),
        'BLEU': calculate_bleu(reference, candidate),
        'ROUGE-L': calculate_rouge_l(reference, candidate),
        'BERTScore': calculate_bert_score(reference, candidate)
    }

# Example usage
# reference_text = "This is a sample reference text."
# generated_text = "This is a sample generated text."
# evaluation_results = evaluate_text_quality(reference_text, generated_text)
# print(evaluation_results)


def calculate_scores(test_dataset, generated_responses):
    """
        Return the scores based on some generated text and the ground truth
    """
    scores = {'Perplexity': 0, 'BLEU': 0, 'ROUGE-L': 0, 'BERTScore': 0}

    num_samples = len(test_dataset)

    for i, test_data in tqdm(enumerate(test_dataset)):
        evaluation_results = evaluate_text_quality(test_data['output'], generated_responses[i])
        for key in scores:
            scores[key] += evaluation_results[key]

    # Average the scores
    for key in scores:
        scores[key] /= num_samples

    return scores

In [None]:
import json


for each_model in finetuned_models:

    # Load the model and tokenizer
    model = AutoModelForCausalLM.from_pretrained(each_model)
    tokenizer = AutoTokenizer.from_pretrained(each_model)

    top_k = 50
    num_beams = 5
    temperature = 1
    print(f'MODEL {each_model} START GENERATING.')
    generated_responses = generate_response(model, tokenizer, top_k=top_k, num_beams=num_beams, temperature=temperature, test_dataset=test_dataset)

    with open(f'data/{each_model.split("/")[1]}_topk{top_k}_nb{num_beams}_t{temperature}.json', 'w+') as f:
        json.dump(generated_responses, f)
    print(f'MODEL {each_model} START CALCULATING SCORES.')
    scores_model = calculate_scores(test_dataset=test_dataset, generated_responses=generated_responses)

    with open(f'data/{each_model.split("/")[1]}_topk{top_k}_nb{num_beams}_t{temperature}_score.json', 'w+') as f:
        json.dump(scores_model, f)
    print(scores_model)

In [None]:
top_k_values = [30, 40, 60, 80]
beam_size_values = [2, 4, 6, 8]
temperature_values = [0, 0.5, 0.7, 1]

for top_k in top_k_values:
    for each_model in finetuned_models:

        model = AutoModelForCausalLM.from_pretrained(each_model)
        tokenizer = AutoTokenizer.from_pretrained(each_model)

        # top_k = 50
        num_beams = 5
        temperature = 1
        print(f'MODEL {each_model} START GENERATING.')
        generated_responses = generate_response(model, tokenizer, top_k=top_k, num_beams=num_beams, temperature=temperature, test_dataset=test_dataset)

        with open(f'data/{each_model.split("/")[1]}_topk{top_k}_nb{num_beams}_t{temperature}.json', 'w+') as f:
            json.dump(generated_responses, f)
        print(f'MODEL {each_model} START CALCULATING SCORES.')
        scores_model = calculate_scores(test_dataset=test_dataset, generated_responses=generated_responses)

        with open(f'data/{each_model.split("/")[1]}_topk{top_k}_nb{num_beams}_t{temperature}_score.json', 'w+') as f:
            json.dump(scores_model, f)
        print(scores_model)


for num_beams in beam_size_values:
    for each_model in finetuned_models:

        model = AutoModelForCausalLM.from_pretrained(each_model)
        tokenizer = AutoTokenizer.from_pretrained(each_model)

        top_k = 50
        # num_beams = 5
        temperature = 1
        print(f'MODEL {each_model} START GENERATING.')
        generated_responses = generate_response(model, tokenizer, top_k=top_k, num_beams=num_beams, temperature=temperature, test_dataset=test_dataset)

        with open(f'data/{each_model.split("/")[1]}_topk{top_k}_nb{num_beams}_t{temperature}.json', 'w+') as f:
            json.dump(generated_responses, f)
        print(f'MODEL {each_model} START CALCULATING SCORES.')
        scores_model = calculate_scores(test_dataset=test_dataset, generated_responses=generated_responses)

        with open(f'data/{each_model.split("/")[1]}_topk{top_k}_nb{num_beams}_t{temperature}_score.json', 'w+') as f:
            json.dump(scores_model, f)
        print(scores_model)


for temperature in temperature_values:
    for each_model in finetuned_models:

        model = AutoModelForCausalLM.from_pretrained(each_model)
        tokenizer = AutoTokenizer.from_pretrained(each_model)

        top_k = 50
        num_beams = 5
        # temperature = 1
        print(f'MODEL {each_model} START GENERATING.')
        generated_responses = generate_response(model, tokenizer, top_k=top_k, num_beams=num_beams, temperature=temperature, test_dataset=test_dataset)

        with open(f'data/{each_model.split("/")[1]}_topk{top_k}_nb{num_beams}_t{temperature}.json', 'w+') as f:
            json.dump(generated_responses, f)
        print(f'MODEL {each_model} START CALCULATING SCORES.')
        scores_model = calculate_scores(test_dataset=test_dataset, generated_responses=generated_responses)

        with open(f'data/{each_model.split("/")[1]}_topk{top_k}_nb{num_beams}_t{temperature}_score.json', 'w+') as f:
            json.dump(scores_model, f)
        print(scores_model)