In [1]:
!pip install bert-score evaluate datasets
!pip install transformers
!pip install torch==2.3.0 torchvision==0.18.0 torchaudio==2.3.0 --index-url https://download.pytorch.org/whl/cu118
!pip install "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"
!pip install --no-deps "xformers<0.0.27" "trl<0.9.0" peft accelerate bitsandbytes
!pip install spacy gliner-spacy
!pip install matplotlib==3.6.0 plotly
!pip install wandb -qU

In [2]:
import evaluate
import torch
from evaluate import EvaluationModule, load
from unsloth import FastLanguageModel
from datasets import load_dataset, Features, Value
from transformers import AutoTokenizer, pipeline
import numpy as np
import spacy
from matplotlib import pyplot as plt
from google.colab import userdata

🦥 Unsloth: Will patch your computer to enable 2x faster free finetuning.


In [3]:
from huggingface_hub import login
login(token=userdata.get("HF_TOKEN"))

The token has not been saved to the git credentials helper. Pass `add_to_git_credential=True` in this function directly or `--add-to-git-credential` if using via `huggingface-cli` if you want to set the git credential as well.
Token is valid (permission: fineGrained).
Your token has been saved to /root/.cache/huggingface/token
Login successful


In [None]:
#Load Model and Toknizer
my_model_name = f"Your Model which needs to be evaluated from hugginface or anywhere."
max_seq_length = 2048
dtype = None
load_in_4bit = True
my_model, my_tokenizer = FastLanguageModel.from_pretrained(
    model_name = my_model_name,
    max_seq_length = max_seq_length,
    dtype = dtype,
    load_in_4bit = load_in_4bit,
)
FastLanguageModel.for_inference(my_model) # Enable native 2x faster inference

In [5]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

System_prompt = "Below is an instruction that describes an information requirement, paired with a claim that provides context. Write a response that appropriately addresses the instruction based on the given claim."
Input_prompt = "### Instruction:\n{instruction}\n\n### Claim:\n{claim}\n\n### Response:\n{answer}"

Using device: cuda


In [6]:
def format_prompt(example):
    text = Input_prompt.format(instruction=example['Instruction'], claim=example['Claim'], answer='',)
    return {'text': text}

In [7]:
training_data = load_dataset("json", data_files="perspectrum_instruction_dataset_v2.jsonl", split = "train")
train_data = training_data.map(format_prompt)

In [8]:
custom_spacy_config = { "gliner_model": "urchade/gliner_large-v2",
                       "labels": ["Person", "Date", "Organization", "Country", "Entity", "Politics", "Social issues", "Technology", "Environment", "Education", "Health", "Economics"],
                       "style": "ent",
                       "threshold": 0.5,
                       "map_location": device}

In [None]:
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")
sentiment_pipeline = pipeline(
    "sentiment-analysis",
    model="distilbert-base-uncased-finetuned-sst-2-english",
    tokenizer=tokenizer,
    device=0 # If you have a GPU, set this to 0; otherwise, set it to -1 for CPU
)

In [None]:
nlp = spacy.blank("en")
bertscore = load("bertscore")
nlp.add_pipe("gliner_spacy", config=custom_spacy_config)

In [11]:
class CustomEvaluationMetric(EvaluationModule):
    def _info(self):
        return evaluate.MetricInfo(
            description="Custom evaluation metric combining BERTScore, stance, and factual accuracy",
            citation="",
            features=Features({
                "predictions": Value("string"),
                "references": Value("string"),
            })
        )

    def _compute(self, predictions, references, weights=np.array([0.3, 0.3, 0.4])):
        bert_scores = self.calculate_bert_score(predictions, references)
        stance_scores = self.calculate_stance_score(predictions, references)
        factual_scores = self.calculate_factual_accuracy(predictions, references)

        score = np.stack([bert_scores, stance_scores, factual_scores])

        return {
            "combined_score": np.dot(weights, score).mean(),
            "bert_score": bert_scores.mean(),
            "stance_score": stance_scores.mean(),
            "factual_accuracy": factual_scores.mean()
        }

    def calculate_bert_score(self, predictions, references):
        return np.array(bertscore.compute(predictions=predictions, references=references, model_type='microsoft/deberta-xlarge-mnli')['f1'])

    def calculate_stance_score(self, predictions, references):
        def get_stance(text):
            return np.array(list(map(lambda x: x['label'], sentiment_pipeline(text, truncation=True, max_length=512, stride=128, return_all_scores=False))))
        pred_stance = get_stance(predictions)
        ref_stance = get_stance(references)
        assert len(pred_stance) == len(ref_stance), "Shapes not matching after stance classified."

        return (get_stance(predictions) == get_stance(references)).astype(int)

    def calculate_factual_accuracy(self, predictions, references):
        def predict_entities(text):
            return list(map(lambda x: set(map(lambda y: y.text, x.ents)), nlp.pipe(text)))

        pred_ents = predict_entities(predictions)
        ref_ents = predict_entities(references)

        return np.array([(len(s1&s2) / len(s1|s2)) if len(s1|s2) > 0 else 0.0 for s1, s2 in zip(pred_ents, ref_ents)])

In [12]:
# Create an instance of the custom metric
custom_metric = CustomEvaluationMetric()

In [13]:
from torch.utils.data import Dataset, DataLoader
data_loader = DataLoader(train_data, batch_size=16, pin_memory=True, num_workers=4, shuffle=True, persistent_workers=True)

In [14]:
import re
RESPONSE_PATTERN = re.compile(r'### Response:\n([\s\S]*)') #Specific to my used prompt change for your need.

def extract_answer_fast(model_output):
    match = RESPONSE_PATTERN.search(model_output)
    return match.group(1).strip() if match else None

In [15]:
def log_batch_metrics_with_plot(len_epoch, model_name, combined_scores, bert_scores, stance_scores, factual_accuracy_scores):
    batch_numbers = list(range(len_epoch))

    plt.figure(figsize=(12, 8))
    plt.plot(batch_numbers, combined_scores, label='Combined Score')
    plt.plot(batch_numbers, bert_scores, label='BERT Score')
    plt.plot(batch_numbers, stance_scores, label='Stance Score')
    plt.plot(batch_numbers, factual_accuracy_scores, label='Factual Accuracy')

    plt.xlabel('Batch Number')
    plt.ylabel('Score')
    plt.title(f'Metric Progression Across Batches for {model_name}')
    plt.legend()

    wandb.log({"metric_progression": wandb.Image(plt)})
    plt.close()

In [18]:
from tqdm.auto import tqdm
import time
import wandb
def evaluate_model(my_model, data_loader, device, name):
    wandb.login(key=userdata.get("WANDB_TOKEN"))
    wandb.init(project="Contrastive_Logs", name=name)

    combined_scores = []
    bert_scores = []
    stance_scores = []
    factual_accuracy_scores = []

    for num, batch in enumerate(tqdm(data_loader)):
        # Generate predictions
        text = batch['text']
        inputs = my_tokenizer(text, padding='longest', return_tensors="pt").to(device)
        outputs = my_model.generate(**inputs, max_new_tokens=128, pad_token_id=my_tokenizer.eos_token_id)
        preds = my_tokenizer.batch_decode(outputs, skip_special_tokens=True)
        final_preds = list(map(extract_answer_fast, preds))

        assert type(final_preds) == list, "generated data not in list format"
        assert len(final_preds) == len(batch['Answer']), "Issue in model generation"

        # Compute metrics
        results = custom_metric.compute(predictions=final_preds, references=batch['Answer'])

        wandb.log({"batch": num+1,
                   "batch_combined_score": results['combined_score'],
                   "batch_bert_score": results['bert_score'],
                   "batch_stance_score": results['stance_score'],
                   "batch_factual_accuracy": results['factual_accuracy']
        })
        combined_scores.append(results['combined_score'])
        bert_scores.append(results['bert_score'])
        stance_scores.append(results['stance_score'])
        factual_accuracy_scores.append(results['factual_accuracy'])

    log_batch_metrics_with_plot(len(data_loader), name, combined_scores, bert_scores, stance_scores, factual_accuracy_scores)

    wandb.log({
        "final_combined_score": np.mean(combined_scores),
        "final_bert_score": np.mean(bert_scores),
        "final_stance_score": np.mean(stance_scores),
        "final_factual_accuracy": np.mean(factual_accuracy_scores)
    })

    wandb.finish()

In [None]:
# Evaluate the model
evaluate_model(my_model, data_loader, device, 'PerspectrumInstruct-Contrastive-InputSameAsLabels-Epochs_1-EarlyStop-Grad_Accum-Mistral7B')