In [1]:
from transformers import T5Tokenizer, T5ForConditionalGeneration, TrainingArguments, Trainer
import torch
import json
from datasets import Dataset
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
import numpy as np
import re
import nltk
from nltk.corpus import stopwords, wordnet
import random
from rouge_score import rouge_scorer
from sacrebleu import corpus_bleu
from language_tool_python import LanguageTool
import torch.nn.functional as F

# Import starting model

In [2]:
# Import pre-trained model
tokenizer = T5Tokenizer.from_pretrained("deep-learning-analytics/triviaqa-t5-base")
model = T5ForConditionalGeneration.from_pretrained("deep-learning-analytics/triviaqa-t5-base")

# Connect to device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)


You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565


In [3]:
# Test example question from hugging face model card
text = "What is the capitol of the US"

preprocess_text = text.strip().replace("\n","")
tokenized_text = tokenizer.encode(preprocess_text, return_tensors="pt").to(device)

outs = model.generate(
            tokenized_text,
            max_length=10,
            num_beams=2,
            early_stopping=True
           )

dec = [tokenizer.decode(ids) for ids in outs]
print("Predicted Answer: ", dec)

Predicted Answer:  ['<pad> Washington</s>']


# Import new data

In [34]:
# Load the data from each of the 3 categories
with open('entertainment.json', 'r') as file:
    entertainment = json.load(file)
with open('science_and_nature.json', 'r') as file:
    science = json.load(file)
with open('food_and_drink.json', 'r') as file:
    food = json.load(file)

# Find number of questions in each category
print("Number of questions in each category:")
print("Entertainment: ", len(entertainment))
print("Science: ", len(science))
print("Food: ", len(food))

Number of questions in each category:
Entertainment:  802
Science:  2340
Food:  980


In [35]:
# Keep only categories, questions, and answers
def filter_columns(data, columns_to_keep):
    return [{column: item[column] for column in columns_to_keep} for item in data]

# Define the columns you want to keep
columns_to_keep = ['question', 'answers', 'category_id']

# Create a dictionary with filtered data
trivia_data = {
    'entertainment': filter_columns(entertainment, columns_to_keep),
    'science': filter_columns(science, columns_to_keep),
    'food': filter_columns(food, columns_to_keep)
}

trivia_data['entertainment'][0]

{'question': '_____ in the name of love?',
 'answers': ['Stop'],
 'category_id': 'ENTERTAINMENT'}

In [36]:
# Keep only the "first" answer in answers
def keep_first_answer(data):
    for item in data:
        item['answers'] = item['answers'][0]
    return data

# Apply the function to the data
trivia_data = {key: keep_first_answer(value) for key, value in trivia_data.items()}

# Rename answers column to "answer"
for category in trivia_data:
    for item in trivia_data[category]:
        item['answer'] = item.pop('answers')

trivia_data['entertainment'][0]

{'question': '_____ in the name of love?',
 'category_id': 'ENTERTAINMENT',
 'answer': 'Stop'}

In [37]:
# Remove questions with underscores (e.g., fill-in-the-blank questions)
def remove_fill_in_the_blank(data):
    return [item for item in data if re.search(r'[_]+', item['question']) is None]

# Apply the function to the data
trivia_data = {key: remove_fill_in_the_blank(value) for key, value in trivia_data.items()}

trivia_data['entertainment'][0]

{'question': '"He\'s So Fine", "One Fine Day" and "A Love So Fine" where hits for what fine group?"',
 'category_id': 'ENTERTAINMENT',
 'answer': 'The Chiffons'}

In [38]:
# Find number of questions in each category
print("Number of questions in each category:")
print("Entertainment: ", len(trivia_data['entertainment']))
print("Science: ", len(trivia_data['science']))
print("Food: ", len(trivia_data['food']))

Number of questions in each category:
Entertainment:  754
Science:  1663
Food:  955


In [39]:
def synonym_replacement(text):
    words = text.split()
    new_words = []
    for word in words:
        synonyms = wordnet.synsets(word)
        if synonyms:
            # Choose a random synonym
            synonym = random.choice(synonyms).lemmas()[0].name()
            new_words.append(synonym if synonym != word else word)
        else:
            new_words.append(word)
    return ' '.join(new_words)

# Add more questions to the dataset by adding duplicates with synonyms as new questions
def augment_data(data, num_new_questions):
    new_data = []
    for item in data:
        # Keep the original question
        new_data.append(item)
        # Generate up to `num_new_questions` augmented questions
        for _ in range(num_new_questions):
            new_item = item.copy()
            new_item['question'] = synonym_replacement(item['question'])
            new_data.append(new_item)
    
    return new_data

# Apply the function to the data
trivia_data_aug = {key: augment_data(value, num_new_questions=2) for key, value in trivia_data.items()}
trivia_data_aug['entertainment'][0]

{'question': '"He\'s So Fine", "One Fine Day" and "A Love So Fine" where hits for what fine group?"',
 'category_id': 'ENTERTAINMENT',
 'answer': 'The Chiffons'}

In [40]:
# Find number of questions in each category
print("Number of questions in each category:")
print("Entertainment: ", len(trivia_data_aug['entertainment']))
print("Science: ", len(trivia_data_aug['science']))
print("Food: ", len(trivia_data_aug['food']))

Number of questions in each category:
Entertainment:  2262
Science:  4989
Food:  2865


# Evaluation functions

In [11]:
# Function to compute predictions
def generate_predictions(dataset, model, tokenizer, device):
    model.eval()  # Put model in evaluation mode
    predictions = []
    references = []
    
    for example in dataset:
        # Tokenize inputs
        inputs = tokenizer(example["question"], return_tensors="pt", padding=True, truncation=True, max_length=25)
        
        # Move input tensors to the correct device
        input_ids = inputs["input_ids"].to(device)
        attention_mask = inputs["attention_mask"].to(device)  # Make sure to include attention_mask if available
        
        # Generate predictions
        with torch.no_grad():
            outputs = model.generate(input_ids, attention_mask=attention_mask, max_length=10, num_beams=5, early_stopping=True)
        
        # Decode predictions
        predicted_answer = tokenizer.decode(outputs[0], skip_special_tokens=True)
        true_answer = example["answer"]
        
        predictions.append(predicted_answer)
        references.append(true_answer)
    
    return predictions, references


In [12]:
# Helper function to extract main words from a sentence, excluding stopwords
def extract_main_words(text):
    # Get the set of English stopwords from NLTK
    stop_words = set(stopwords.words('english'))

    # Remove punctuation and split into words
    words = re.findall(r'\b\w+\b', text.lower())

    # Remove stopwords
    filtered_words = [word for word in words if word not in stop_words]

    return set(filtered_words)

# Compute accuracy by checking for overlap of main words
def compute_accuracy(predictions, references):
    correct = 0
    total = len(predictions)
    
    for pred, ref in zip(predictions, references):
        pred_words = extract_main_words(pred)
        ref_words = extract_main_words(ref)
        
        # Check for significant overlap of words (at least 50% of reference words matched)
        overlap = len(pred_words & ref_words) / max(len(ref_words), 1)
        if overlap >= 0.5:
            correct += 1
    
    accuracy = correct / total
    return accuracy


# Train on new data

In [41]:
# Add category labels to combined_data
combined_data = []
for category, questions in trivia_data_aug.items():
    for q in questions:
        if 'question' in q and 'answer' in q:  # Ensure required fields exist
            combined_data.append({
                "question": q["question"],
                "answer": q["answer"],
                "category": category  # Add category for stratification
            })

# Convert to a DataFrame to facilitate stratified splitting
import pandas as pd
df = pd.DataFrame(combined_data)

# Perform stratified split for train and temp (validation + test)
train_df, temp_df = train_test_split(
    df, test_size=0.2, stratify=df["category"], random_state=42
)

# Perform stratified split for validation and test from temp
val_df, test_df = train_test_split(
    temp_df, test_size=0.5, stratify=temp_df["category"], random_state=42
)

# Convert splits back to Hugging Face Dataset format
split_dataset = {
    "train": Dataset.from_pandas(train_df.reset_index(drop=True)),
    "validation": Dataset.from_pandas(val_df.reset_index(drop=True)),
    "test": Dataset.from_pandas(test_df.reset_index(drop=True))
}

# Verify the splits
print("Train categories distribution:")
print(train_df["category"].value_counts())

print("Validation categories distribution:")
print(val_df["category"].value_counts())

print("Test categories distribution:")
print(test_df["category"].value_counts())


Train categories distribution:
category
science          3991
food             2292
entertainment    1809
Name: count, dtype: int64
Validation categories distribution:
category
science          499
food             287
entertainment    226
Name: count, dtype: int64
Test categories distribution:
category
science          499
food             286
entertainment    227
Name: count, dtype: int64


In [14]:
# Preprocess the data
def preprocess_function(examples):
    inputs = examples["question"]
    targets = examples["answer"]
    model_inputs = tokenizer(inputs, max_length=25, truncation=True, padding="max_length")
    labels = tokenizer(targets, max_length=10, truncation=True, padding="max_length")
    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

# Tokenize the dataset
tokenized_train = split_dataset["train"].map(preprocess_function, batched=True)
tokenized_val = split_dataset["validation"].map(preprocess_function, batched=True)
tokenized_test = split_dataset["test"].map(preprocess_function, batched=True)

Map:   0%|          | 0/8092 [00:00<?, ? examples/s]

Map:   0%|          | 0/1012 [00:00<?, ? examples/s]

Map:   0%|          | 0/1012 [00:00<?, ? examples/s]

In [15]:
# Find accuracy of the the original model on the test set
original_predictions, original_references = generate_predictions(tokenized_test, model, tokenizer, device)
original_accuracy = compute_accuracy(original_predictions, original_references)

print(f"Original Model Accuracy: {original_accuracy * 100:.2f}%")

Original Model Accuracy: 10.67%


In [16]:
# Define training arguments
training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=32,
    num_train_epochs=4,
    weight_decay=0.01
)

# Define Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_val,
)

# Train
trainer.train()


  0%|          | 0/1012 [00:00<?, ?it/s]

Passing a tuple of `past_key_values` is deprecated and will be removed in Transformers v4.48.0. You should pass an instance of `EncoderDecoderCache` instead, e.g. `past_key_values=EncoderDecoderCache.from_legacy_cache(past_key_values)`.


  0%|          | 0/127 [00:00<?, ?it/s]

{'eval_loss': 2.705573081970215, 'eval_runtime': 25.0619, 'eval_samples_per_second': 40.38, 'eval_steps_per_second': 5.067, 'epoch': 1.0}
{'loss': 6.778, 'grad_norm': 5.01008939743042, 'learning_rate': 1.0118577075098814e-05, 'epoch': 1.98}


  0%|          | 0/127 [00:00<?, ?it/s]

{'eval_loss': 1.8893893957138062, 'eval_runtime': 24.9914, 'eval_samples_per_second': 40.494, 'eval_steps_per_second': 5.082, 'epoch': 2.0}


  0%|          | 0/127 [00:00<?, ?it/s]

{'eval_loss': 1.6683545112609863, 'eval_runtime': 24.984, 'eval_samples_per_second': 40.506, 'eval_steps_per_second': 5.083, 'epoch': 3.0}
{'loss': 1.9677, 'grad_norm': 2.9499571323394775, 'learning_rate': 2.3715415019762845e-07, 'epoch': 3.95}


  0%|          | 0/127 [00:00<?, ?it/s]

{'eval_loss': 1.6194323301315308, 'eval_runtime': 25.002, 'eval_samples_per_second': 40.477, 'eval_steps_per_second': 5.08, 'epoch': 4.0}
{'train_runtime': 2404.7365, 'train_samples_per_second': 13.46, 'train_steps_per_second': 0.421, 'train_loss': 4.342598460879722, 'epoch': 4.0}


TrainOutput(global_step=1012, training_loss=4.342598460879722, metrics={'train_runtime': 2404.7365, 'train_samples_per_second': 13.46, 'train_steps_per_second': 0.421, 'total_flos': 962438916096000.0, 'train_loss': 4.342598460879722, 'epoch': 4.0})

# Evaluate model

In [20]:
# Find accuracy of resulting model on the test set
predictions, references = generate_predictions(tokenized_test, model, tokenizer, device)
accuracy = compute_accuracy(predictions, references)

print(f"Accuracy: {accuracy * 100:.2f}%")


Accuracy: 10.67%


In [18]:
# Compute evaluation metrics
def compute_finer_metrics(predictions, references):
    # Initialize metrics
    rouge = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
    bleu_scores = []
    f1_scores = []
    rouge_scores = {'rouge1': [], 'rouge2': [], 'rougeL': []}
    
    # Compute metrics for each prediction-reference pair
    for pred, ref in zip(predictions, references):
        # BLEU score (expects list of predictions and references)
        bleu_scores.append(corpus_bleu([pred], [[ref]]).score)
        
        # F1 score (token-level overlap)
        pred_tokens = pred.split()
        ref_tokens = ref.split()
        common_tokens = set(pred_tokens).intersection(ref_tokens)
        precision = len(common_tokens) / len(pred_tokens) if pred_tokens else 0
        recall = len(common_tokens) / len(ref_tokens) if ref_tokens else 0
        f1 = 2 * (precision * recall) / (precision + recall) if precision + recall > 0 else 0
        f1_scores.append(f1)
        
        # ROUGE scores
        rouge_result = rouge.score(pred, ref)
        for key in rouge_scores:
            rouge_scores[key].append(rouge_result[key].fmeasure)
    
    # Calculate averages
    avg_bleu = sum(bleu_scores) / len(bleu_scores)
    avg_f1 = sum(f1_scores) / len(f1_scores)
    avg_rouge = {key: sum(scores) / len(scores) for key, scores in rouge_scores.items()}
    
    return {
        "BLEU": avg_bleu,
        "F1": avg_f1,
        "ROUGE": avg_rouge
    }

# Compute evaluation metrics
metrics = compute_finer_metrics(predictions, references)
print("Evaluation Metrics:")
print(metrics)


Evaluation Metrics:
{'BLEU': 0.22847667075975242, 'F1': 0.08895476504172156, 'ROUGE': {'rouge1': 0.10779769747161051, 'rouge2': 0.020223978919631094, 'rougeL': 0.10779769747161051}}


In [19]:
# Original metrics
metrics = compute_finer_metrics(original_predictions, original_references)
print("Evaluation Metrics:")
print(metrics)

Evaluation Metrics:
{'BLEU': 0.09486924458240018, 'F1': 0.09068793525315265, 'ROUGE': {'rouge1': 0.1139196168346366, 'rouge2': 0.02585638998682477, 'rougeL': 0.11359023607047322}}


In [21]:
# Save model and tokenizer
trainer.save_model("./trained_model")
tokenizer.save_pretrained("./trained_model")

('./trained_model\\tokenizer_config.json',
 './trained_model\\special_tokens_map.json',
 './trained_model\\spiece.model',
 './trained_model\\added_tokens.json')

# Interactive model

In [22]:
# Load model and tokenizer
model = T5ForConditionalGeneration.from_pretrained("./trained_model")
tokenizer = T5Tokenizer.from_pretrained("./trained_model")

# Put the model in evaluation mode
model.eval()

T5ForConditionalGeneration(
  (shared): Embedding(32128, 768)
  (encoder): T5Stack(
    (embed_tokens): Embedding(32128, 768)
    (block): ModuleList(
      (0): T5Block(
        (layer): ModuleList(
          (0): T5LayerSelfAttention(
            (SelfAttention): T5Attention(
              (q): Linear(in_features=768, out_features=768, bias=False)
              (k): Linear(in_features=768, out_features=768, bias=False)
              (v): Linear(in_features=768, out_features=768, bias=False)
              (o): Linear(in_features=768, out_features=768, bias=False)
              (relative_attention_bias): Embedding(32, 12)
            )
            (layer_norm): T5LayerNorm()
            (dropout): Dropout(p=0.1, inplace=False)
          )
          (1): T5LayerFF(
            (DenseReluDense): T5DenseActDense(
              (wi): Linear(in_features=768, out_features=3072, bias=False)
              (wo): Linear(in_features=3072, out_features=768, bias=False)
              (dropout): Dro

In [60]:
# Function to select category
def select_category(trivia_data):
    categories = ["entertainment", "science", "food"]
    print("Available categories:", flush = True)
    for i, category in enumerate(categories, 1):
        print(f"{i}. {category}")
    choice = int(input("Choose a category by number: ")) - 1
    return categories[choice]

In [61]:
# Function to select a random question
def get_random_question(category, trivia_data):
    return random.choice(trivia_data[category])

In [58]:
def trivia_game(trivia_data, model, tokenizer):
    score_human = 0
    score_model = 0
    print("Welcome to the Trivia Game! It's you vs the AI model! \n", flush = True)
    
    while True:
        # Select category and question
        category = select_category(trivia_data)
        question_data = get_random_question(category, trivia_data)
        question = question_data["question"]
        correct_answer = question_data["answer"]

        # Display question
        print(f"\nQuestion: {question} \n", flush = True)

        # Get human's answer
        user_answer = input("Your answer: ").strip()

        # Get model's answer
        inputs = tokenizer(question, return_tensors="pt", padding=True, truncation=True, max_length=128)
        outputs = model.generate(inputs["input_ids"], attention_mask=inputs["attention_mask"], max_length=50)
        model_answer = tokenizer.decode(outputs[0], skip_special_tokens=True)

        # Display the correct answer and model's answer
        print(f"Your Answer: {user_answer}", flush = True)
        print(f"Model's Answer: {model_answer}", flush = True)
        print(f"Correct Answer: {correct_answer} \n", flush = True)

        # Score the human
        if compute_accuracy([user_answer], [correct_answer]) == 1:
            print("You got it right!", flush = True)
            score_human += 1
        else:
            print("You got it wrong.", flush = True)

        # Score the model
        if compute_accuracy([model_answer], [correct_answer]) == 1:
            print("The model got it right! \n", flush = True)
            score_model += 1
        else:
            print("The model got it wrong. \n", flush = True)

        # Display current scores
        print(f"Current Scores:", flush = True)
        print(f"You: {score_human}", flush = True)
        print(f"Model: {score_model} \n", flush = True)

        # Ask if the player wants to play again
        play_again = input("Do you want to play again? (yes/no): ").strip().lower()
        print("\n", flush = True)
        if play_again != "yes":
            break

    # Final scores
    print(f"\nGame Over! Final Scores:", flush = True)
    print(f"You: {score_human}", flush = True)
    print(f"Model: {score_model}", flush = True)
    if score_human > score_model:
        print("Congratulations! You beat the AI!", flush = True)
    elif score_human < score_model:
        print("The AI wins! Better luck next time!", flush = True)
    else:
        print("It's a tie!", flush = True)



In [59]:
# Play the trivia game
trivia_game(trivia_data, model, tokenizer)

Welcome to the Trivia Game! It's you vs the AI model! 

Available categories:
1. entertainment
2. science
3. food

Question: What is the name of the whale that swallowed Pinocchio. 

Your Answer: monstro
Model's Answer: Agusta
Correct Answer: Monstro 

You got it right!
The model got it wrong. 

Current Scores:
You: 1
Model: 0 



Available categories:
1. entertainment
2. science
3. food

Question: What Element Is Used In The Process Of Galvanisation 

Your Answer: zinc
Model's Answer: Carbon
Correct Answer: Zinc 

You got it right!
The model got it wrong. 

Current Scores:
You: 2
Model: 0 



Available categories:
1. entertainment
2. science
3. food

Question: Which country would you associate with the dish Couscous? 

Your Answer: morocco
Model's Answer: Tunisia
Correct Answer: Tunisia  

You got it wrong.
The model got it right! 

Current Scores:
You: 2
Model: 1 




Game Over! Final Scores:
You: 2
Model: 1
Congratulations! You beat the AI!
