In [1]:
# Torch e CUDA
import torch
import gc
from torch.utils.data import Subset

# Transformers e Training
from transformers import (
    TextStreamer,
    TrainingArguments,
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
)
from trl import SFTTrainer
from peft import LoraConfig, get_peft_model,PeftModel
# Dataset e valutazione
from datasets import load_dataset, Dataset
from evaluate import load
import bitsandbytes as bnb
# Metriche di valutazione
from sklearn.metrics import (
    accuracy_score,
    precision_recall_fscore_support,
    confusion_matrix
)

# Data manipulation
import pandas as pd
import numpy as np

# Sistema e utility
import os
from dotenv import load_dotenv
from pathlib import Path
from datetime import datetime

# Visualizzazione
import seaborn as sns
import matplotlib.pyplot as plt


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
prompt_template = (
    "### Instruction:\n"
    "You are an expert software developer and bug triaging specialist. Your task is to predict whether a bug "
    "will be resolved in LESS than 50 DAYS or MORE than 50 DAYS based on the provided bug details.\n\n"
    
    "- Output '0' if the bug will be resolved in LESS than 50 DAYS.\n"
    "- Output '1' if the bug will be resolved in MORE than 50 DAYS.\n\n"
    
    "Your response MUST be strictly either '0' or '1'. Do NOT include any additional text, explanations, formatting, symbols, or extra characters in your response.\n\n"

    "### Input:\n"
    "Source: {source}\n"
    "Product: {product}"
    "Short Description: {short_desc}\n"
    "Priority: {priority}\n"
    "Severity: {bug_severity}\n"
    #"Estimated resolution time: {days_resolution}\n\n" - questo potrebbe influenzare troppo il modello per la predizione

    "### Example Responses:\n"
    "Input: Source: KDE | Product: Payment System | Short Description: Critical security vulnerability found in authentication system | Priority: P1 | Severity: Critical\n"
    "Output: 0\n\n"
    "Input: Source: OpenOffice | Product: UI Module | Short Description: UI glitch affecting low-impact visual elements in settings panel | Priority: P3 | Severity: Minor\n"
    "Output: 1\n\n"

    "### Output: {label}\n"
)
num_val = "1000" #1000, 2000, 5000, 9000
model_name="meta-llama/Llama-3.1-8B-Instruct"
directory = f"{model_name}".split("/")[-1].strip().lower()
fine_tuned = True # Imposta a True per valutare il modello fine-tunato, False per il modello base
fine_tuned_path = f"./fine_tuned_model_{directory}_{num_val}" if fine_tuned else None
print(fine_tuned_path)


./fine_tuned_model_llama-3.1-8b-instruct_1000


In [3]:
def formatting_prompts(examples):
    texts = []
    for source, product, short_desc,priority,bug_severity in zip(
        examples["source"],examples["product"], examples["short_desc"],examples["priority"],examples["bug_severity"]
    ):
        # Costruiamo il prompt
        text = prompt_template.format(
            source=source,
            product=product,
            short_desc=short_desc,
            priority=priority,
            bug_severity=bug_severity,
            label="",
        )

        texts.append(text)
    
    return {"text": texts}

dataset = load_dataset(
    "csv",
    data_files={
        "test": f"../dataset_completo/balanced_datasets/balanced_test.csv", 
    },
)

dataset = dataset.map(formatting_prompts, batched=True)
dataset['test'][0]

{'short_desc': 'parsetypeDIE confused by DWTAGenumerationtype',
 'product': 'valgrind',
 'priority': 'NOR',
 'bug_severity': 'normal',
 'days_resolution': 557,
 'source': 'KDE',
 'label': 1,
 'text': "### Instruction:\nYou are an expert software developer and bug triaging specialist. Your task is to predict whether a bug will be resolved in LESS than 50 DAYS or MORE than 50 DAYS based on the provided bug details.\n\n- Output '0' if the bug will be resolved in LESS than 50 DAYS.\n- Output '1' if the bug will be resolved in MORE than 50 DAYS.\n\nYour response MUST be strictly either '0' or '1'. Do NOT include any additional text, explanations, formatting, symbols, or extra characters in your response.\n\n### Input:\nSource: KDE\nProduct: valgrindShort Description: parsetypeDIE confused by DWTAGenumerationtype\nPriority: NOR\nSeverity: normal\n### Example Responses:\nInput: Source: KDE | Product: Payment System | Short Description: Critical security vulnerability found in authentication s

In [4]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def calculate_metrics(true_labels, predictions):
    """
    Calcola le metriche di valutazione, tra cui precisione, recall, e F1-score
    con il parametro zero_division per gestire i casi di divisione per zero.
    """
    metrics = {
        'accuracy': accuracy_score(true_labels, predictions),
        'precision': precision_score(true_labels, predictions, average='binary', zero_division=0),
        'recall': recall_score(true_labels, predictions, average='binary', zero_division=0),
        'f1': f1_score(true_labels, predictions, average='binary', zero_division=0)
    }

    return metrics


In [5]:
import time
import torch
import psutil
import pynvml
import re
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from datetime import datetime
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from tqdm import tqdm

def evaluate_model(model, tokenizer, eval_dataset, model_name, fine_tuned, num_val):
    print("\nStarting evaluation phase...")

    # Move model to evaluation mode
    model.eval()  

    # Ensure tokenizer padding
    tokenizer.padding_side = 'left'
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    predictions, true_labels, generated_texts, prediction_sources = [], [], [], []
    batch_size = 8
    invalid = 0

    # Create results folder
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    fine_tuned_status = "fine_tuned" if fine_tuned else "not_fine_tuned"
    if fine_tuned_status == "fine_tuned":
        output_dir = f"{model_name}_{fine_tuned_status}_on_{num_val}"
    else:
        output_dir = f"{model_name}_{fine_tuned_status}"

    os.makedirs(output_dir, exist_ok=True)

    pynvml.nvmlInit()
    inference_times = []
    system_metrics = []
    
    # Process dataset in batches
    for i in tqdm(range(0, len(eval_dataset), batch_size), desc="Evaluating", unit="batch"):
        batch = eval_dataset[i:i + batch_size]
        texts, labels, sources = batch['text'], batch['label'], batch['source']

        torch.cuda.synchronize()
        start_time = time.time()

        for text, label, source in zip(texts, labels, sources):
            try:
                # Tokenization
                inputs = tokenizer(
                    [text],
                    return_tensors="pt",
                    truncation=True,
                    padding=True,
                    max_length=2048
                ).to(model.device)  # Move to model's device

                # Generate output
                with torch.no_grad():
                    outputs = model.generate(
                        **inputs,
                        max_new_tokens=2,  # Short output
                        num_return_sequences=1,
                        pad_token_id=tokenizer.pad_token_id,
                        eos_token_id=tokenizer.eos_token_id,
                        temperature=0.7, 
                        do_sample=True #generazione deterministica - solo 0 o 1 
                    )

                # Decode output
                generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
                #print(f"\n🛠️ DEBUG - RAW OUTPUT: '{generated_text}'")

                # Extract response (expecting "### Response: 0" or "### Response: 1")
                generated_ids = outputs[0][inputs.input_ids.shape[1]:]  # Prendi solo i nuovi token
                generated_text = tokenizer.decode(generated_ids, skip_special_tokens=True).strip()
                #print(f"Generated text: {generated_text}")

                response = generated_text.strip()
                #print(f"Extracted response: '{response}' | True Label: {label}")

                if response in ['0', '1']:
                    predictions.append(int(response))
                    true_labels.append(int(label))
                    generated_texts.append(generated_text)
                    prediction_sources.append(source)
                else:
                    print(f"⚠️ Invalid response format: '{response}'")
                    invalid += 1

            except Exception as e:
                print(f"⚠️ Error processing batch: {e}")
                continue
        torch.cuda.synchronize()
        end_time = time.time()
        inference_times.append(end_time - start_time)
        cpu_usage = psutil.cpu_percent()
        ram_usage = psutil.virtual_memory().percent
        gpu_usage = pynvml.nvmlDeviceGetUtilizationRates(pynvml.nvmlDeviceGetHandleByIndex(0)).gpu
        system_metrics.append({"batch": i//batch_size, "cpu": cpu_usage, "ram": ram_usage, "gpu": gpu_usage, "time": end_time - start_time})
    # No valid predictions? Return empty results
    if not predictions:
        print("No valid predictions were generated!")
        return None, [], [], []

    # Compute metrics
    metrics = calculate_metrics(true_labels, predictions)

    # Save metrics to CSV
    metrics_path_csv = os.path.join(output_dir, "metrics.csv")
    pd.DataFrame([metrics]).to_csv(metrics_path_csv, index=False)
    # Calcola media delle metriche di sistema
    if system_metrics:
        avg_metrics = {
            "cpu": sum(m["cpu"] for m in system_metrics) / len(system_metrics),
            "ram": sum(m["ram"] for m in system_metrics) / len(system_metrics),
            "gpu": sum(m["gpu"] for m in system_metrics) / len(system_metrics),
            "time": sum(m["time"] for m in system_metrics) / len(system_metrics),
        }

        # Salva la media delle metriche in un CSV
        avg_metrics_df = pd.DataFrame([avg_metrics])
        avg_metrics_path = os.path.join(output_dir, "avg_system_metrics.csv")
        avg_metrics_df.to_csv(avg_metrics_path, index=False)
        print(f"✅ Media delle metriche salvata in: {avg_metrics_path}")
        print(f"Avg Inference Time per Batch: {avg_metrics['time']:.4f} sec")
    # Generate confusion matrix
    cm = confusion_matrix(true_labels, predictions)
    plt.figure(figsize=(6, 4))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=['0', '1'], yticklabels=['0', '1'])
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    cm_path = os.path.join(output_dir, "confusion_matrix.png")
    plt.savefig(cm_path, format="png")
    plt.close()
    print(f"✅ Confusion matrix saved at: {cm_path}")

    # Display results
    print("\nEvaluation Results:")
    print(f"Model: {model_name}")
    print(f"Samples evaluated: {len(true_labels)}")
    print(f"Invalid predictions: {invalid}")
    print("\nMetrics:")
    for metric_name, value in metrics.items():
        print(f"{metric_name}: {value:.4f}")

    return metrics, predictions, true_labels, generated_texts, prediction_sources


secegliere se fare l'evaluation del modello fine tunato o sul modello base

In [6]:
def load_model(model_name, fine_tuned=False, fine_tuned_path=None):
    """
    Carica un modello pre-addestrato o fine-tunato con quantizzazione 4-bit.
    
    :param model_name: Nome del modello pre-addestrato
    :param fine_tuned: Booleano, se True carica il modello fine-tunato
    :param fine_tuned_path: Percorso del modello fine-tunato
    :param device: Dispositivo su cui caricare il modello ('cuda' o 'cpu')
    :return: Modello e tokenizer
    """
    load_dotenv()
    hf_token = os.getenv("HF_TOKEN")
    
    # Configurazione della quantizzazione 4-bit
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.float16,
        bnb_4bit_use_double_quant=True,
    )
    
    print(f"Loading base model: {model_name}")
    base_model = AutoModelForCausalLM.from_pretrained(
        model_name,
        quantization_config=bnb_config,
        device_map="auto",
    )
    
    if fine_tuned and fine_tuned_path and os.path.exists(fine_tuned_path):
        print(f"Loading LoRA model from: {fine_tuned_path}")
        model = PeftModel.from_pretrained(base_model, fine_tuned_path)
        tokenizer = AutoTokenizer.from_pretrained(fine_tuned_path, token=hf_token)
    else:
        model = base_model
        tokenizer = AutoTokenizer.from_pretrained(model_name, token=hf_token)

    
    return model, tokenizer

In [7]:
model, tokenizer = load_model(model_name, fine_tuned=True, fine_tuned_path=fine_tuned_path)

Loading base model: meta-llama/Llama-3.1-8B-Instruct


Loading checkpoint shards: 100%|██████████| 4/4 [00:05<00:00,  1.28s/it]


In [8]:

if fine_tuned:
    metrics, predictions, true_labels, generated_texts, prediction_sources = evaluate_model(
        model=model, tokenizer=tokenizer, eval_dataset=dataset["test"], model_name=model_name, fine_tuned=fine_tuned,num_val=num_val
    )
else:
    metrics, predictions, true_labels, generated_texts, prediction_sources = evaluate_model(
        model=model, tokenizer=tokenizer, eval_dataset=dataset["test"], model_name=model_name, fine_tuned=fine_tuned, num_val=0
    )


Starting evaluation phase...


Evaluating:   1%|▏         | 4/282 [00:06<07:10,  1.55s/batch]


KeyboardInterrupt: 