In [None]:
!pip install trl
!pip install -U bitsandbytes

import os
os.environ["WANDB_MODE"] = "disabled"
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1"
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
import gc
import copy

import numpy as np

import torch
from torch.nn import functional as F
import warnings
warnings.filterwarnings("ignore")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

import pandas as pd
from datasets import Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

import bitsandbytes as bnb
from peft import LoraConfig, PeftModel, get_peft_model, prepare_model_for_kbit_training
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, BitsAndBytesConfig, EarlyStoppingCallback
from trl import SFTTrainer
from tqdm import tqdm
import re

import random
import numpy as np
import torch
import os

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)

set_seed(2026)

In [None]:
# Model configuration

model_id = "mistralai/Mistral-7B-Instruct-v0.3"

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16
)

base_model = AutoModelForCausalLM.from_pretrained(
    model_id,
    low_cpu_mem_usage=True,
    return_dict=True,
    torch_dtype=torch.bfloat16,
    device_map="auto",
    quantization_config=bnb_config
)

tokenizer = AutoTokenizer.from_pretrained(model_id)
tokenizer.padding_side="left"
tokenizer.pad_token=tokenizer.eos_token

def get_prompt(data, task, variety, source, few_shot):
    prompts = []
    if not few_shot:
        for (text, label) in zip(data["text"], data["label"]):
            if task == "Sentiment":
                prompt = f"""<s>[INST] Generate the sentiment of the given text. 1 for positive sentiment, and 0 for negative sentiment. Do not give an explanation.\nText:{text}[/INST]"""
            elif task == "Sarcasm":
                prompt = f"""<s>[INST] Predict if the given text is sarcastic. 1 if the text is sarcastic, and 0 if the text is not sarcastic. Do not give an explanation.\nText:{text}[/INST]"""
            prompts.append(prompt)
    else:
        few_shot_examples = {
            ("Sentiment", "en-IN", "Google"): [
                ("The hospitality was top-notch and the location is perfect for families. The authentic South Indian filter coffee is a must-try!", 1),
                ("Very poor hygiene standards. The tables were sticky and the staff was extremely dismissive when we complained about the cold food.", 0)
            ],
            ("Sentiment", "en-IN", "Reddit"): [
                ("Finally got my Zepto delivery in under 10 minutes. This level of convenience is such a lifesaver in Mumbai traffic!", 1),
                ("Seriously fed up with these 'aesthetic' cafes in Indiranagar charging 500 bucks for mediocre cold coffee and bad vibes.", 0)
            ],
            ("Sentiment", "en-AU", "Google"): [
                ("Found this absolute gem in Brunswick. The smashed avo was perfectly seasoned and the staff were total legends. Will be back!", 1),
                ("Wait time was ridiculous—over 50 minutes for two burgers that came out lukewarm. Way overpriced for the quality of service.", 0)
            ],
            ("Sentiment", "en-AU", "Reddit"): [
                ("The community support in r/melbourne during the power outages was actually quite heartening to see.", 1),
                ("Centrelink's online portal is an absolute joke. Been trying to upload one document for three hours and it keeps crashing.", 0)
            ],
            ("Sentiment", "en-UK", "Google"): [
                ("Proper Sunday roast with massive Yorkshires and plenty of gravy. The staff made us feel right at home. Brilliant value for money.", 1),
                ("The hotel was a bit of a shambles. The room smelled of damp and the 'continental breakfast' was just a box of dry cereal.", 0)
            ],
            ("Sentiment", "en-UK", "Reddit"): [
                ("It's great to see more investment going into local high streets; the new pedestrian zone in the city centre looks lovely.", 1),
                ("The current state of the NHS wait times is terrifying. It shouldn't take six months just to see a specialist.", 0)
            ],
            ("Sarcasm", "en-IN", "Reddit"): [
                ("Oh brilliant, another 2-hour power cut right in the middle of a heatwave. Truly living the 'Digital India' dream.", 1),
                ("The new terminal at the Bengaluru airport is actually quite efficient and the greenery makes it very pleasant.", 0)
            ],
            ("Sarcasm", "en-AU", "Reddit"): [
                ("Fantastic, another interest rate hike. I was just thinking my mortgage wasn't quite high enough yet.", 1),
                ("I think the government needs to prioritize long-term infrastructure over short-term political gains.", 0)
            ],
            ("Sarcasm", "en-UK", "Reddit"): [
                ("Lovely weather we're having—I especially enjoy the horizontal rain and the smell of raw sewage in the Thames. Peak Britain.", 1),
                ("The volunteer-led library in our village has been doing a wonderful job providing resources for the kids.", 0)
            ]
        }
        
        for (text, label) in zip(data["text"], data["label"]):
            examples = few_shot_examples.get((task, variety, source), [])
            shot_text = ""
            for ex_text, ex_label in examples:
                shot_text += f"Text: {ex_text}\n{ex_label}\n\n"
            
            if task == "Sentiment":
                prompt = f"""<s>[INST] Generate the sentiment of the given text. 1 for positive sentiment, and 0 for negative sentiment. Do not give an explanation.{shot_text}Text:{text}[/INST]"""
            elif task == "Sarcasm":
                prompt = f"""<s>[INST] Predict if the following text is sarcastic. 1 if the text is sarcastic, and 0 if the text is not sarcastic. Do not give an explanation.{shot_text}Text:{text}[/INST]"""
            prompts.append(prompt)
            
    return prompts

def parse_prediction(prediction: str) -> int:
    if prediction is None:
        return -1

    prediction = str(prediction)

    match = re.search(r'\b[01]\b', prediction)
    if match:
        return int(match.group())

    return -1

def run_inference(model, df_test, task, source, variety, finetuning, few_shot):
    
    prompts = get_prompt(df_test, 
                         task, 
                         variety, 
                         source, 
                         few_shot)
    labels = df_test["label"].to_numpy()
    preds = []
    preds_str = []

    for prompt in tqdm(prompts, desc=f"Evaluation (task: {task}, source: {source}, variety: {variety}, ft: {finetuning}, few_shot: {few_shot}"):
        encodings = tokenizer(prompt, return_tensors="pt", padding=True, truncation=True).to(device)
        with torch.no_grad():
            generated_ids = model.generate(
                input_ids=encodings["input_ids"],
                attention_mask=encodings["attention_mask"],
                max_length=encodings["input_ids"].shape[1] + 6,
                pad_token_id=tokenizer.eos_token_id
            )
        decoded = tokenizer.batch_decode(generated_ids)
        pred_str = decoded[0].split("[/INST]", 1)[1].strip()
        preds_str.append(pred_str)
        preds.append(parse_prediction(pred_str))
        
        torch.cuda.empty_cache()     
        del generated_ids
        del decoded
        del encodings

    
    preds = np.array(preds)

    mask = preds != -1 # Consider only valid predictions
    labels = labels[mask]
    preds = preds[mask]

    labels = (labels == 1).astype(int)
    preds = (preds == 1).astype(int)

    print("Predictions (str):", preds_str)
    print("Predictions:",preds)
    print("Labels:",labels)
    
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average="binary", pos_label=1)
    metrics = {
        "f1": f1,
        "accuracy": accuracy_score(labels, preds),
        "precision": precision,
        "recall": recall
    }
    print(metrics)
    return metrics

In [None]:
# Read test data

df_test = pd.read_csv("/kaggle/input/besstie/valid.csv")
df_test = df_test.dropna(subset=['text', 'label', 'variety', 'source', 'task'])

In [None]:
# Compute performance

finetuned_models_path = "/kaggle/input/finetuned-models/finetuned_models"
output_path = "/kaggle/working"

results = []

df_grouped_test = df_test.groupby(['variety', 'source', 'task'])

for (variety, source, task), test_section in df_grouped_test:
  
    df_test_subset = df_test[(df_test['variety'] == variety) & (df_test['source'] == source) & (df_test['task'] == task)]

    model_name = f"{variety}_{source}_{task}".replace(" ", "_")
    finetuned_model_path = os.path.join(finetuned_models_path, model_name)
    if not os.path.isdir(finetuned_model_path):
        print(f"Model {model_name} not found (path: {finetuned_model_path})")
        continue

    for finetuning in [False, True]:
        if finetuning:
            model = PeftModel.from_pretrained(base_model, finetuned_model_path, use_safetensors=True)
            
            for few_shot in [False, True]:
                metrics = run_inference(model, df_test_subset, task, source, variety, finetuning, few_shot)

                metrics.update({
                  "finetuning": finetuning, "few_shot": few_shot, "source": source, "task": task, "variety": variety
                })
                results.append(metrics)

            del model
            gc.collect()
            torch.cuda.empty_cache()

            base_model = AutoModelForCausalLM.from_pretrained(
                model_id,
                low_cpu_mem_usage=True,
                return_dict=True,
                torch_dtype=torch.bfloat16,
                device_map="auto",
                quantization_config=bnb_config
            )
                            
        else:
            for few_shot in [False, True]:
                metrics = run_inference(base_model, df_test_subset, task, source, variety, finetuning, few_shot)
            
                metrics.update({
                  "finetuning": finetuning, "few_shot": few_shot, "source": source, "task": task, "variety": variety
                })
                results.append(metrics)

df_results = pd.DataFrame(results)
df_results.to_csv(os.path.join(output_path, f"mistral_results_baseline_and_few_shot.csv"), index=False)

In [None]:
# Check LoRA 
'''
finetuned_models_path = "/kaggle/input/finetuned-models/finetuned_models"

def find_all_linear_names(model):
  cls = bnb.nn.Linear4bit
  lora_module_names = set()
  for name, module in model.named_modules():
    if isinstance(module, cls):
      names = name.split('.')
      lora_module_names.add(names[0] if len(names) == 1 else names[-1])
    if 'lm_head' in lora_module_names: # needed for 16-bit
      lora_module_names.remove('lm_head')
  return list(lora_module_names)

target_modules = find_all_linear_names(base_model)
print("1) Base model linear layers: ", target_modules)

df_grouped_test = df_test.groupby(['variety', 'source', 'task'])
for (variety, source, task), test_section in df_grouped_test:
  
    model_name = f"{variety}_{source}_{task}".replace(" ", "_")
    finetuned_model_path = os.path.join(finetuned_models_path, model_name)
    if not os.path.isdir(finetuned_model_path):
        print(f"Model {model_name} not found (path: {finetuned_model_path})")
        continue


    
    model = PeftModel.from_pretrained(base_model, finetuned_model_path, use_safetensors=True)


    print(f"Model: {model_name} <<<<<<<<<<<<<<<")'''

    
    ''' check finetuned model lora layers norm '''
    '''print("2) Check finetuned model lora layers norm")
    for name, param in model.named_parameters():
        if "lora" in name and param.norm()==0:
            print("Layer with 0 norm:", name, param.norm())'''

    
    ''' check merged (finetuned) model and base model difference in parameters'''
    '''print("3) Check merged (finetuned) model and base model difference in parameters")
    merged_model = model.merge_and_unload()








    base_model = AutoModelForCausalLM.from_pretrained(
        model_id,
        low_cpu_mem_usage=True,
        return_dict=True,
        torch_dtype=torch.bfloat16,
        device_map="auto",
        quantization_config=bnb_config
    )






    
    
    different = False
    for (name_base, p_base), (name_merged, p_merged) in zip(
            base_model.named_parameters(), merged_model.named_parameters()):
        
        if name_base != name_merged:
            print(f"Name mismatch: {name_base} vs {name_merged}")
            continue
    
        if not torch.equal(p_base, p_merged):
            print(f"Layer {name_base} is different")
            different = True
    if not different:
        print(" -- Merged model is identical to base model (bitwise)")
    else:
        print(" -- Merged model differs from base model")
'''
    
    ''' check if LoRA target modules exist in mistralai/Mistral-7B-Instruct-v0.3 '''
    '''print("4) Check if LoRA target modules exist in mistralai/Mistral-7B-Instruct-v0.3")
    
    lora_config = model.peft_config['default']  # LoraConfig object

    print("Target modules:", lora_config.target_modules)
    
    # Check which target modules exist in the model
    for target in lora_config.target_modules:
        found = [n for n, _ in base_model.named_modules() if target in n]
        if found:
            print(f"Target '{target}' matched layers: {found}")
        else:
            print(f"Target '{target}' not found in base model!")

    del model
    gc.collect()
    torch.cuda.empty_cache()'''



