In [1]:
!pip install -q -U transformers
!pip install -q -U accelerate
!pip install -q -U bitsandbytes
!pip install -q -U flash_attn
!pip install -q -U datasets

!pip install -q -U evaluate
!pip install -q -U tokenizers

!pip install -q diffusers --upgrade
!pip install -q invisible_watermark accelerate safetensors

!pip install -q rouge
!pip install -q rouge_score

!pip install -q bert_score

!pip install -q sentencepiece

In [3]:
import evaluate
from pprint import pprint
from transformers import AutoConfig

import datasets
import bitsandbytes as bnb
import torch
import random
import pandas as pd
from tqdm import tqdm

import tensorflow as tf
from PIL import Image
import requests

import re
import numpy as np
from scipy.special import softmax

import torch
import transformers
from datasets import Dataset, load_dataset

from transformers import pipeline, BitsAndBytesConfig
from transformers import CLIPProcessor, TFCLIPModel

# For from-scratch T5 model
from transformers import T5TokenizerFast, T5Config, T5ForConditionalGeneration

# For pre-trained T5 model
from transformers import T5Tokenizer, T5ForConditionalGeneration  # this won't import twice, just noting here what's for each model

# For all T5 models
from transformers import Seq2SeqTrainingArguments, Seq2SeqTrainer

# For BLEURT (to load a trained model for evaluation)
from transformers import AutoModelForSequenceClassification, AutoTokenizer

# For style classifier model (also for evaluating the seq2seq model output)
from transformers import BertTokenizer, BertForSequenceClassification
from transformers import TrainingArguments, Trainer

In [4]:
# This cell will authenticate you and mount your Drive in the Colab.
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
# Modify this path to the appropriate location in your Drive
text_file = 'drive/MyDrive/266/project/lyric_pairs_V2.tsv'

In [6]:
with open(text_file) as f:
    lines = f.read().split("\n")[:-1]

text_pairs = []
for line in lines:
    index, artist, song, line_1, line_2 = line.split("\t")
    line_1 = line_1.lower()
    line_2 = line_2.lower()
    text_pairs.append((line_1, line_2))

#look at some examples
for _ in range(5):
    print(random.choice(text_pairs))

('nah, i just do it for the niggas', "that are tryna see a million 'fore they die")
('remember when i had that poker, huh', 'instead of looking over my shoulder, huh')
('drinking gallons of water i gotta get the drug out', "so stuffed like i'm sniffing coke up")
('damn, i love it when you talk to me crazy', 'but who the fuck you think you talking to?')
('that mi waan fi dah summer, yah', 'summer, yah')


In [7]:
#Let's create some splits
def create_splits(data, train_ratio=0.88, val_ratio=0.06, test_ratio=0.06, random_seed=None):
    assert train_ratio + val_ratio + test_ratio == 1.0, "Ratios must sum to 1.0"

    random.shuffle(data)
    total = len(data)

    train_end = int(train_ratio * total)
    val_end = train_end + int(val_ratio * total)

    return {
        'train': data[:train_end],
        'val': data[train_end:val_end],
        'test': data[val_end:]
    }

splits = create_splits(text_pairs, random_seed=42)

print(f"{len(text_pairs)} total pairs")
print(f"{len(splits['train'])} training pairs")
print(f"{len(splits['val'])} validation pairs")
print(f"{len(splits['test'])} test pairs")

50000 total pairs
44000 training pairs
3000 validation pairs
3000 test pairs


In [8]:
def make_dataset(pairs, shuffle=False):
    line_1, line_2 = zip(*pairs)
    line_1 = list(line_1)
    line_2 = list(line_2)

    dataset = Dataset.from_dict({"line_1": line_1, "line_2": line_2})
    return dataset.shuffle() if shuffle else dataset

# Usage
train_dataset = make_dataset(splits['train'])
val_dataset = make_dataset(splits['val'])
test_dataset = make_dataset(splits['test'])

In [9]:
# Load the pre-trained model and tokenizer


from transformers import GPT2Tokenizer, GPT2LMHeadModel
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
model = GPT2LMHeadModel.from_pretrained('gpt2')

# Add padding token (GPT-2 doesn't have one by default)
tokenizer.pad_token = tokenizer.eos_token

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

In [10]:
def tokenize_for_generation(examples):
    # Format as input -> target
    inputs = examples['line_1']
    targets = examples['line_2']

    # Tokenize inputs and targets separately
    model_inputs = tokenizer(inputs, padding=True, truncation=True, max_length=40)
    labels = tokenizer(targets, padding=True, truncation=True, max_length=40)

    model_inputs['labels'] = labels['input_ids']
    return model_inputs

# Add padding token if not already done
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

# Apply to datasets
train_tokenized = train_dataset.map(tokenize_for_generation, batched=True)
val_tokenized = val_dataset.map(tokenize_for_generation, batched=True)
test_tokenized = test_dataset.map(tokenize_for_generation, batched=True)

# Set format for PyTorch
train_tokenized.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
val_tokenized.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
test_tokenized.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])

Map:   0%|          | 0/44000 [00:00<?, ? examples/s]

Map:   0%|          | 0/3000 [00:00<?, ? examples/s]

Map:   0%|          | 0/3000 [00:00<?, ? examples/s]

In [11]:
# Look at a tokenized example
print("Original:")
print(f"  Input (line_1): {train_dataset[0]['line_1']}")
print(f"  Target (line_2): {train_dataset[0]['line_2']}")
print()
print("Tokenized:")
print(f"  Input IDs: {train_tokenized[0]['input_ids']}")
print(f"  Labels: {train_tokenized[0]['labels']}")
print()
print("Decoded:")
print(f"  Input: {tokenizer.decode(train_tokenized[0]['input_ids'])}")
print(f"  Target: {tokenizer.decode(train_tokenized[0]['labels'])}")

Original:
  Input (line_1): you just want the old me treating you right
  Target (line_2): i would give you the world if you would give me some time, don't know what you really want if you don't give it a try

Tokenized:
  Input IDs: tensor([ 5832,   655,   765,   262,  1468,   502, 13622,   345,   826, 50256,
        50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256,
        50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256,
        50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256])
  Labels: tensor([   72,   561,  1577,   345,   262,   995,   611,   345,   561,  1577,
          502,   617,   640,    11,   836,   470,   760,   644,   345,  1107,
          765,   611,   345,   836,   470,  1577,   340,   257,  1949, 50256,
        50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256, 50256])

Decoded:
  Input: you just want the old me treating you right<|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftext|><|endoftex

In [14]:
# def generate_next_line(first_line, model, tokenizer, max_new_tokens=None):
#     # Create the prompt
#     task_prefix = 'Given this rap line, generate the next line: '
#     prompt = task_prefix + first_line

#     # Set max_new_tokens based on input line length
#     if max_new_tokens is None:
#         input_token_count = len(tokenizer.encode(first_line))
#         max_new_tokens = max(input_token_count, 15)  # At least 15 tokens

#     # Tokenize the prompt
#     inputs = tokenizer.encode(prompt, return_tensors='pt')

#     # Generate the continuation
#     with torch.no_grad():
#         outputs = model.generate(
#             inputs,
#             max_new_tokens=max_new_tokens,
#             num_return_sequences=1,
#             temperature=0.8,
#             do_sample=True,
#             pad_token_id=tokenizer.eos_token_id
#         )

#     # Decode and return the generated text
#     generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)

#     # Extract just the generated part (after the prompt)
#     generated_line = generated_text[len(prompt):].strip()

#     return generated_line


# test_results = []

# for i in range(len(test_dataset)):
#     first_line = test_dataset[i]['line_1']
#     actual_line = test_dataset[i]['line_2']

#     generated_line = generate_next_line(first_line, model, tokenizer)

#     test_results.append({
#         'input': first_line,
#         'actual': actual_line,
#         'generated': generated_line
#     })

#     # Print first 10 examples for inspection
#     if i < 10:
#         print(f"\nExample {i+1}:")
#         print(f"Input: {first_line}")
#         print(f"Actual: {actual_line}")
#         print(f"Generated: {generated_line}")
#         print("-" * 40)

#     # Progress indicator for larger datasets
#     if (i + 1) % 100 == 0:
#         print(f"Processed {i + 1}/{len(test_dataset)} examples...")



def generate_with_pipeline(first_lines, model, tokenizer, batch_size=8):
    """Use pipeline for automatic batch processing"""

    # Set padding side to left for generation
    original_padding_side = tokenizer.padding_side
    tokenizer.padding_side = "left"

    # Create text generation pipeline
    generator = pipeline(
        'text-generation',
        model=model,
        tokenizer=tokenizer,
        device=0 if torch.cuda.is_available() else -1  # Use GPU if available
    )

    # Create prompts
    task_prefix = 'Given this rap line, generate the next line: '
    prompts = [task_prefix + line for line in first_lines]

    # Generate in batches
    results = []
    for i in range(0, len(prompts), batch_size):
        batch = prompts[i:i + batch_size]

        outputs = generator(
            batch,
            max_new_tokens=30,
            temperature=0.8,
            do_sample=True,
            batch_size=batch_size
        )

        # Extract generated text
        for j, output in enumerate(outputs):
            generated_text = output[0]['generated_text']
            # Remove prompt part to get only generated text
            generated_line = generated_text[len(batch[j]):].strip()
            results.append(generated_line)

    return results


# Extract first_lines from test_dataset
test_first_lines = [test_dataset[i]['line_1'] for i in range(len(test_dataset))]

print(f"Generating {len(test_first_lines)} examples using pipeline batch generation...")

# Generate the lines using pipeline batch generation
generated_lines = generate_with_pipeline(test_first_lines, model, tokenizer, batch_size=16)

# Store results
test_results = []
for idx in range(len(test_dataset)):
    test_results.append({
        'input': test_dataset[idx]['line_1'],
        'actual': test_dataset[idx]['line_2'],
        'generated': generated_lines[idx]
    })

# Print first 10 results for comparison
print("\nFirst 10 Generation Results:")
print("=" * 60)

for i in range(10):
    print(f"\nExample {i+1}:")
    print(f"Input: {test_results[i]['input']}")
    print(f"Actual: {test_results[i]['actual']}")
    print(f"Generated: {test_results[i]['generated']}")
    print("-" * 40)

print(f"\nCompleted generation for all {len(test_results)} examples!")


Device set to use cuda:0


Generating 3000 examples using pipeline batch generation...

First 10 Generation Results:

Example 1:
Input: you ain't foolin' weezy f, and that's for friend or foe
Actual: tell them hoes in the house we kickin' in the door
Generated: . I ain't f, you ain't foolin' t, and that's not that f, you ain't foolin' t. You
----------------------------------------

Example 2:
Input: this is not an album
Actual: this is a mixtape
Generated: I did not enjoy or think I'm gonna make. This is an album as good as it gets.

You've been singing for a couple
----------------------------------------

Example 3:
Input: you and yo whole team, ain't like niggas who got and get served?!
Actual: on the strip, cause you niggas pop pills
Generated: If you've already heard of the meme with the lyric "Yo Niggas have a shit day" (which I've had for 30
----------------------------------------

Example 4:
Input: to those who break their neck to keep their hoes in check
Actual: ‘cause, oh, they sweat a brother majorl

In [15]:
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import nltk
nltk.download('punkt')

def calculate_bleu_scores(test_results):
    bleu_scores = []
    smoothing = SmoothingFunction().method1

    for result in test_results:
        reference = [result['actual'].split()]
        candidate = result['generated'].split()

        # Calculate BLEU-4 score
        score = sentence_bleu(reference, candidate, smoothing_function=smoothing)
        bleu_scores.append(score)

    return bleu_scores

# Calculate BLEU scores
bleu_scores = calculate_bleu_scores(test_results)
avg_bleu = sum(bleu_scores) / len(bleu_scores)

print(f"Average BLEU Score: {avg_bleu:.4f}")
print(f"BLEU Score Range: {min(bleu_scores):.4f} - {max(bleu_scores):.4f}")

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


Average BLEU Score: 0.0062
BLEU Score Range: 0.0000 - 0.3889


In [18]:
from rouge import Rouge

def calculate_rouge_scores(test_results):
    rouge = Rouge()

    references = [result['actual'] for result in test_results]
    candidates = [result['generated'] for result in test_results]

    # Filter out empty generations
    valid_pairs = [(ref, cand) for ref, cand in zip(references, candidates) if cand.strip()]

    if valid_pairs:
        refs, cands = zip(*valid_pairs)
        scores = rouge.get_scores(list(cands), list(refs), avg=True)
        return scores
    else:
        return None

# Calculate ROUGE scores
rouge_scores = calculate_rouge_scores(test_results)
if rouge_scores:
    print("ROUGE Scores:")
    for metric, values in rouge_scores.items():
        print(f"  {metric}: {values}")


ValueError: Hypothesis is empty.

In [19]:
# Install BERTScore
!pip install bert-score

from bert_score import score

def calculate_bert_scores(test_results):
    """Calculate BERTScore for all test results"""
    candidates = [result['generated'] for result in test_results]
    references = [result['actual'] for result in test_results]

    # Calculate BERTScore (returns precision, recall, F1)
    P, R, F1 = score(candidates, references, lang="en", verbose=False)

    return {
        'precision': P.mean().item(),
        'recall': R.mean().item(),
        'f1': F1.mean().item(),
        'precision_scores': P.tolist(),
        'recall_scores': R.tolist(),
        'f1_scores': F1.tolist()
    }

# Calculate BERTScore
bert_scores = calculate_bert_scores(test_results)
print(f"BERTScore F1: {bert_scores['f1']:.4f}")
print(f"BERTScore Precision: {bert_scores['precision']:.4f}")
print(f"BERTScore Recall: {bert_scores['recall']:.4f}")


Collecting bert-score
  Downloading bert_score-0.3.13-py3-none-any.whl.metadata (15 kB)
Downloading bert_score-0.3.13-py3-none-any.whl (61 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.1/61.1 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: bert-score
Successfully installed bert-score-0.3.13


tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/482 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.42G [00:00<?, ?B/s]

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-large and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


BERTScore F1: 0.8150
BERTScore Precision: 0.8117
BERTScore Recall: 0.8187




In [20]:
from sentence_transformers import SentenceTransformer, util

# Load a pre-trained sentence transformer model
model = SentenceTransformer('all-MiniLM-L6-v2')  # 384-dimensional embeddings

def sentence_cosine_similarity(actual_lines, generated_lines):
    """Calculate cosine similarity between whole sentences"""

    # Encode all sentences to get embeddings
    actual_embeddings = model.encode(actual_lines, convert_to_tensor=True)
    generated_embeddings = model.encode(generated_lines, convert_to_tensor=True)

    # Calculate cosine similarity
    cosine_scores = util.pytorch_cos_sim(actual_embeddings, generated_embeddings)

    # Extract diagonal (pairwise similarities)
    similarities = [cosine_scores[i][i].item() for i in range(len(actual_lines))]

    return similarities

# Usage with your test results
actual_lines = [result['actual'] for result in test_results]
generated_lines = [result['generated'] for result in test_results]

sentence_similarities = sentence_cosine_similarity(actual_lines, generated_lines)
avg_sentence_similarity = sum(sentence_similarities) / len(sentence_similarities)

print(f"Average Sentence Cosine Similarity: {avg_sentence_similarity:.4f}")


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Average Sentence Cosine Similarity: 0.1487


In [21]:
# # Install dandelion
# !pip install dandelion

# import dandelion.check as rhyme_check

# def advanced_rhyme_analysis(test_results):
#     """Advanced rhyme analysis using dandelion package"""
#     perfect_rhymes = 0
#     vowel_rhymes = 0
#     total_valid = 0

#     for result in test_results:
#         input_words = result['input'].split()
#         generated_words = result['generated'].split()

#         if input_words and generated_words:
#             # Get last words for rhyme checking
#             input_last = input_words[-1].lower().strip('.,!?')
#             generated_last = generated_words[-1].lower().strip('.,!?')

#             if input_last and generated_last:
#                 total_valid += 1

#                 # Check for perfect rhymes
#                 if rhyme_check.perfect_rhyme(input_last, generated_last):
#                     perfect_rhymes += 1
#                 # Check for vowel rhymes (assonance)
#                 elif rhyme_check.vowel_rhyme(input_last, generated_last):
#                     vowel_rhymes += 1

#     return {
#         'perfect_rhyme_rate': perfect_rhymes / total_valid if total_valid > 0 else 0,
#         'vowel_rhyme_rate': vowel_rhymes / total_valid if total_valid > 0 else 0,
#         'total_rhyme_rate': (perfect_rhymes + vowel_rhymes) / total_valid if total_valid > 0 else 0,
#         'perfect_rhymes': perfect_rhymes,
#         'vowel_rhymes': vowel_rhymes,
#         'total_valid': total_valid
#     }


Collecting dandelion
  Downloading Dandelion-0.17.26-py3-none-any.whl.metadata (8.1 kB)
Downloading Dandelion-0.17.26-py3-none-any.whl (86 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m86.6/86.6 kB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dandelion
Successfully installed dandelion-0.17.26


ModuleNotFoundError: No module named 'theano'

In [22]:
import nltk
from nltk.corpus import cmudict

# Download CMU dictionary
nltk.download('cmudict')

def phonetic_rhyme_analysis(test_results):
    """Phonetic rhyme analysis using CMU dictionary"""
    d = cmudict.dict()

    def get_rhyme_part(word):
        """Extract the rhyming part of a word"""
        if word in d:
            pronunciations = d[word]
            if pronunciations:
                # Get the part after the last stressed vowel
                pron = pronunciations[0]
                for i in range(len(pron) - 1, -1, -1):
                    if pron[i][-1].isdigit():  # Stressed vowel
                        return pron[i:]
        return None

    phonetic_rhymes = 0
    total_valid = 0

    for result in test_results:
        input_words = result['input'].split()
        generated_words = result['generated'].split()

        if input_words and generated_words:
            input_last = input_words[-1].lower().strip('.,!?')
            generated_last = generated_words[-1].lower().strip('.,!?')

            if input_last and generated_last:
                input_rhyme = get_rhyme_part(input_last)
                generated_rhyme = get_rhyme_part(generated_last)

                if input_rhyme and generated_rhyme:
                    total_valid += 1
                    if input_rhyme == generated_rhyme:
                        phonetic_rhymes += 1

    return {
        'phonetic_rhyme_rate': phonetic_rhymes / total_valid if total_valid > 0 else 0,
        'phonetic_rhymes': phonetic_rhymes,
        'total_valid': total_valid
    }


[nltk_data] Downloading package cmudict to /root/nltk_data...
[nltk_data]   Unzipping corpora/cmudict.zip.


In [25]:
# Install required packages
!pip install bert-score sentence-transformers rouge-score nltk

# Import all necessary libraries
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk.corpus import cmudict
from rouge_score import rouge_scorer
from bert_score import score as bert_score
from sentence_transformers import SentenceTransformer, util
# import dandelion.check as rhyme_check
import torch
import numpy as np

# Download required NLTK data
nltk.download('punkt')
nltk.download('cmudict')

class ComprehensiveEvaluator:
    def __init__(self):
        # Initialize Sentence-BERT model
        self.sentence_model = SentenceTransformer('all-MiniLM-L6-v2')

        # Initialize ROUGE scorer
        self.rouge_scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)

        # Initialize CMU dictionary
        self.cmu_dict = cmudict.dict()

        # Initialize BLEU smoothing function
        self.bleu_smoothing = SmoothingFunction().method1

    def calculate_bleu_scores(self, test_results):
        """Calculate BLEU scores for all test results"""
        bleu_scores = []

        for result in test_results:
            reference = [result['actual'].split()]
            candidate = result['generated'].split()

            if candidate:  # Only calculate if generation is not empty
                score = sentence_bleu(reference, candidate, smoothing_function=self.bleu_smoothing)
                bleu_scores.append(score)
            else:
                bleu_scores.append(0.0)

        return {
            'individual_scores': bleu_scores,
            'average': np.mean(bleu_scores),
            'std': np.std(bleu_scores),
            'min': np.min(bleu_scores),
            'max': np.max(bleu_scores)
        }

    def calculate_rouge_scores(self, test_results):
        """Calculate ROUGE scores for all test results"""
        rouge1_scores = []
        rouge2_scores = []
        rougeL_scores = []

        for result in test_results:
            if result['generated'].strip():  # Only calculate if generation is not empty
                scores = self.rouge_scorer.score(result['actual'], result['generated'])
                rouge1_scores.append(scores['rouge1'].fmeasure)
                rouge2_scores.append(scores['rouge2'].fmeasure)
                rougeL_scores.append(scores['rougeL'].fmeasure)
            else:
                rouge1_scores.append(0.0)
                rouge2_scores.append(0.0)
                rougeL_scores.append(0.0)

        return {
            'rouge1': {
                'individual_scores': rouge1_scores,
                'average': np.mean(rouge1_scores),
                'std': np.std(rouge1_scores)
            },
            'rouge2': {
                'individual_scores': rouge2_scores,
                'average': np.mean(rouge2_scores),
                'std': np.std(rouge2_scores)
            },
            'rougeL': {
                'individual_scores': rougeL_scores,
                'average': np.mean(rougeL_scores),
                'std': np.std(rougeL_scores)
            }
        }

    def calculate_bert_scores(self, test_results):
        """Calculate BERTScore for all test results"""
        candidates = [result['generated'] for result in test_results]
        references = [result['actual'] for result in test_results]

        # Calculate BERTScore
        P, R, F1 = bert_score(candidates, references, lang="en", verbose=False)

        return {
            'precision': {
                'average': P.mean().item(),
                'std': P.std().item(),
                'individual_scores': P.tolist()
            },
            'recall': {
                'average': R.mean().item(),
                'std': R.std().item(),
                'individual_scores': R.tolist()
            },
            'f1': {
                'average': F1.mean().item(),
                'std': F1.std().item(),
                'individual_scores': F1.tolist()
            }
        }

    def calculate_sentence_similarity(self, test_results):
        """Calculate sentence-level cosine similarity using Sentence-BERT"""
        actual_lines = [result['actual'] for result in test_results]
        generated_lines = [result['generated'] for result in test_results]

        # Encode all sentences
        actual_embeddings = self.sentence_model.encode(actual_lines, convert_to_tensor=True)
        generated_embeddings = self.sentence_model.encode(generated_lines, convert_to_tensor=True)

        # Calculate cosine similarity
        cosine_scores = util.pytorch_cos_sim(actual_embeddings, generated_embeddings)

        # Extract diagonal (pairwise similarities)
        similarities = [cosine_scores[i][i].item() for i in range(len(actual_lines))]

        return {
            'individual_scores': similarities,
            'average': np.mean(similarities),
            'std': np.std(similarities),
            'min': np.min(similarities),
            'max': np.max(similarities)
        }

    def get_last_word(self, line):
        """Extract the last word from a line for rhyme analysis"""
        words = line.lower().strip().split()
        if words:
            # Remove punctuation from last word
            last_word = ''.join(c for c in words[-1] if c.isalpha())
            return last_word
        return ""

    def get_rhyme_part_cmu(self, word):
        """Extract the rhyming part using CMU dictionary"""
        if word in self.cmu_dict:
            pronunciations = self.cmu_dict[word]
            if pronunciations:
                # Get the part after the last stressed vowel
                pron = pronunciations[0]
                for i in range(len(pron) - 1, -1, -1):
                    if pron[i][-1].isdigit():  # Stressed vowel
                        return pron[i:]
        return None

    # def analyze_rhymes_dandelion(self, test_results):
    #     """Analyze rhymes using dandelion package"""
    #     perfect_rhymes = 0
    #     vowel_rhymes = 0
    #     total_valid = 0

    #     rhyme_details = []

    #     for i, result in enumerate(test_results):
    #         input_last = self.get_last_word(result['input'])
    #         generated_last = self.get_last_word(result['generated'])

    #         if input_last and generated_last:
    #             total_valid += 1

    #             try:
    #                 # Check for perfect rhymes
    #                 is_perfect = rhyme_check.perfect_rhyme(input_last, generated_last)
    #                 is_vowel = rhyme_check.vowel_rhyme(input_last, generated_last)

    #                 if is_perfect:
    #                     perfect_rhymes += 1
    #                     rhyme_type = 'perfect'
    #                 elif is_vowel:
    #                     vowel_rhymes += 1
    #                     rhyme_type = 'vowel'
    #                 else:
    #                     rhyme_type = 'none'

    #                 rhyme_details.append({
    #                     'example_index': i,
    #                     'input_word': input_last,
    #                     'generated_word': generated_last,
    #                     'rhyme_type': rhyme_type
    #                 })

    #             except Exception as e:
    #                 rhyme_details.append({
    #                     'example_index': i,
    #                     'input_word': input_last,
    #                     'generated_word': generated_last,
    #                     'rhyme_type': 'error',
    #                     'error': str(e)
    #                 })

    #     return {
    #         'perfect_rhyme_rate': perfect_rhymes / total_valid if total_valid > 0 else 0,
    #         'vowel_rhyme_rate': vowel_rhymes / total_valid if total_valid > 0 else 0,
    #         'total_rhyme_rate': (perfect_rhymes + vowel_rhymes) / total_valid if total_valid > 0 else 0,
    #         'perfect_rhymes': perfect_rhymes,
    #         'vowel_rhymes': vowel_rhymes,
    #         'total_valid': total_valid,
    #         'details': rhyme_details
    #     }

    def analyze_rhymes_cmu(self, test_results):
        """Analyze rhymes using CMU dictionary"""
        phonetic_rhymes = 0
        total_valid = 0

        rhyme_details = []

        for i, result in enumerate(test_results):
            input_last = self.get_last_word(result['input'])
            generated_last = self.get_last_word(result['generated'])

            if input_last and generated_last:
                input_rhyme = self.get_rhyme_part_cmu(input_last)
                generated_rhyme = self.get_rhyme_part_cmu(generated_last)

                if input_rhyme and generated_rhyme:
                    total_valid += 1

                    is_rhyme = input_rhyme == generated_rhyme
                    if is_rhyme:
                        phonetic_rhymes += 1

                    rhyme_details.append({
                        'example_index': i,
                        'input_word': input_last,
                        'generated_word': generated_last,
                        'input_phonemes': input_rhyme,
                        'generated_phonemes': generated_rhyme,
                        'is_rhyme': is_rhyme
                    })

        return {
            'phonetic_rhyme_rate': phonetic_rhymes / total_valid if total_valid > 0 else 0,
            'phonetic_rhymes': phonetic_rhymes,
            'total_valid': total_valid,
            'details': rhyme_details
        }

    def calculate_length_similarity(self, test_results):
        """Calculate length similarity between actual and generated lines"""
        length_diffs = []
        length_ratios = []

        for result in test_results:
            actual_len = len(result['actual'].split())
            generated_len = len(result['generated'].split())

            length_diffs.append(abs(actual_len - generated_len))

            if actual_len > 0:
                length_ratios.append(generated_len / actual_len)
            else:
                length_ratios.append(0.0)

        return {
            'average_length_diff': np.mean(length_diffs),
            'std_length_diff': np.std(length_diffs),
            'average_length_ratio': np.mean(length_ratios),
            'std_length_ratio': np.std(length_ratios)
        }

    def evaluate_comprehensive(self, test_results):
        """Run comprehensive evaluation on test results"""
        print("=" * 80)
        print("COMPREHENSIVE EVALUATION RESULTS")
        print("=" * 80)

        # Basic statistics
        total_examples = len(test_results)
        empty_generations = sum(1 for r in test_results if not r['generated'].strip())

        print(f"Dataset Statistics:")
        print(f"  Total Examples: {total_examples}")
        print(f"  Empty Generations: {empty_generations} ({empty_generations/total_examples:.1%})")
        print()

        # Calculate all metrics
        print("Computing metrics...")

        # Traditional NLP metrics
        bleu_results = self.calculate_bleu_scores(test_results)
        rouge_results = self.calculate_rouge_scores(test_results)
        bert_results = self.calculate_bert_scores(test_results)

        # Sentence-level similarity
        sentence_sim_results = self.calculate_sentence_similarity(test_results)

        # Rhyme analysis
        # dandelion_rhyme_results = self.analyze_rhymes_dandelion(test_results)
        cmu_rhyme_results = self.analyze_rhymes_cmu(test_results)

        # Length analysis
        length_results = self.calculate_length_similarity(test_results)

        # Display results
        print("\n" + "="*60)
        print("TRADITIONAL NLP METRICS")
        print("="*60)

        print(f"BLEU Score:")
        print(f"  Average: {bleu_results['average']:.4f} (±{bleu_results['std']:.4f})")
        print(f"  Range: {bleu_results['min']:.4f} - {bleu_results['max']:.4f}")

        print(f"\nROUGE Scores:")
        print(f"  ROUGE-1: {rouge_results['rouge1']['average']:.4f} (±{rouge_results['rouge1']['std']:.4f})")
        print(f"  ROUGE-2: {rouge_results['rouge2']['average']:.4f} (±{rouge_results['rouge2']['std']:.4f})")
        print(f"  ROUGE-L: {rouge_results['rougeL']['average']:.4f} (±{rouge_results['rougeL']['std']:.4f})")

        print(f"\nBERTScore:")
        print(f"  F1: {bert_results['f1']['average']:.4f} (±{bert_results['f1']['std']:.4f})")
        print(f"  Precision: {bert_results['precision']['average']:.4f} (±{bert_results['precision']['std']:.4f})")
        print(f"  Recall: {bert_results['recall']['average']:.4f} (±{bert_results['recall']['std']:.4f})")

        print("\n" + "="*60)
        print("SENTENCE-LEVEL SEMANTIC SIMILARITY")
        print("="*60)

        print(f"Sentence-BERT Cosine Similarity:")
        print(f"  Average: {sentence_sim_results['average']:.4f} (±{sentence_sim_results['std']:.4f})")
        print(f"  Range: {sentence_sim_results['min']:.4f} - {sentence_sim_results['max']:.4f}")

        print("\n" + "="*60)
        print("RHYME ANALYSIS")
        print("="*60)

        # print(f"Dandelion Rhyme Analysis:")
        # print(f"  Perfect Rhyme Rate: {dandelion_rhyme_results['perfect_rhyme_rate']:.2%}")
        # print(f"  Vowel Rhyme Rate: {dandelion_rhyme_results['vowel_rhyme_rate']:.2%}")
        # print(f"  Total Rhyme Rate: {dandelion_rhyme_results['total_rhyme_rate']:.2%}")
        # print(f"  Valid Examples: {dandelion_rhyme_results['total_valid']}/{total_examples}")

        print(f"\nCMU Dictionary Phonetic Analysis:")
        print(f"  Phonetic Rhyme Rate: {cmu_rhyme_results['phonetic_rhyme_rate']:.2%}")
        print(f"  Valid Examples: {cmu_rhyme_results['total_valid']}/{total_examples}")

        print("\n" + "="*60)
        print("LENGTH ANALYSIS")
        print("="*60)

        print(f"Length Similarity:")
        print(f"  Average Length Difference: {length_results['average_length_diff']:.2f} words")
        print(f"  Average Length Ratio: {length_results['average_length_ratio']:.2f}")

        print("=" * 80)

        # Return all results for further analysis
        return {
            'basic_stats': {
                'total_examples': total_examples,
                'empty_generations': empty_generations
            },
            'bleu': bleu_results,
            'rouge': rouge_results,
            'bert_score': bert_results,
            'sentence_similarity': sentence_sim_results,
            # 'dandelion_rhyme': dandelion_rhyme_results,
            'cmu_rhyme': cmu_rhyme_results,
            'length_analysis': length_results
        }

# Usage
evaluator = ComprehensiveEvaluator()
comprehensive_results = evaluator.evaluate_comprehensive(test_results)



[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package cmudict to /root/nltk_data...
[nltk_data]   Package cmudict is already up-to-date!


COMPREHENSIVE EVALUATION RESULTS
Dataset Statistics:
  Total Examples: 3000
  Empty Generations: 13 (0.4%)

Computing metrics...


Some weights of RobertaModel were not initialized from the model checkpoint at roberta-large and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.



TRADITIONAL NLP METRICS
BLEU Score:
  Average: 0.0062 (±0.0168)
  Range: 0.0000 - 0.3889

ROUGE Scores:
  ROUGE-1: 0.0773 (±0.0797)
  ROUGE-2: 0.0091 (±0.0393)
  ROUGE-L: 0.0686 (±0.0709)

BERTScore:
  F1: 0.8150 (±0.0725)
  Precision: 0.8117 (±0.0727)
  Recall: 0.8187 (±0.0743)

SENTENCE-LEVEL SEMANTIC SIMILARITY
Sentence-BERT Cosine Similarity:
  Average: 0.1487 (±0.1188)
  Range: -0.1405 - 0.9523

RHYME ANALYSIS

CMU Dictionary Phonetic Analysis:
  Phonetic Rhyme Rate: 2.28%
  Valid Examples: 2322/3000

LENGTH ANALYSIS
Length Similarity:
  Average Length Difference: 13.80 words
  Average Length Ratio: 3.30


In [26]:
print(comprehensive_results)



In [None]:
# Print out eval metrics for the part2_model on the test set

part2_test_translations = calculate_eval_metrics(
    test_pairs,
    part2_model,
    part2_tokenizer,
    part2_batch_size,
    task_prefix,
    **part1_generate_kwargs
)