<a href="https://colab.research.google.com/github/alizoljodi/Behavior_economy_analysis/blob/main/Detection_of_Climate_Misinformation_Claims.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Detection of Climate Misinformation Claims
==================================================================================================


# Project Description

Climate change is one of the most pressing challenges of our time, yet public understanding is often undermined by the widespread circulation of misinformation. Detecting and classifying such misinformation at scale is therefore critical to safeguarding evidence-based climate communication.

This project addresses the problem of **automatic classification of climate-related contrarian claims** into predefined sub-claim categories, as proposed in *Coan et al. (2021)*. The task is challenging due to the diversity, nuance, and evolving nature of misinformation narratives, coupled with the large number of possible subcategories.

I begin with **exploratory data analysis** to understand the dataset’s distribution, class balance, and textual characteristics. Building on these insights, I move to a three-phase LLM-based approach:


1. **Zero-Shot Prompting**: Establish a baseline without task-specific examples. Import a small LLM model (google/gemma-2-2b-it for this experience) and prompt to classify the input text into sub claim classes.
2. **Few-Shot Prompting**: Incorporate curated in-context examples for improved accuracy alongside above test.
3. **Advanced Methods**: Leverage state-of-the-art text classification methods such as Fine-tuning, Retrival Augmented Generation (RAG) and text embedding + fully connected classifications.


For this project, I use **accuracy** to compare the results, but **precision**, **recall**, and **F1-score** are also reported.

### Metrics and Formulas

**Accuracy**  
Accuracy = (Number of Correct Predictions) / (Total Number of Predictions)

**Precision**  
Precision = True Positives / (True Positives + False Positives)

**Recall**  
Recall = True Positives / (True Positives + False Negatives)

**F1-Score**  
F1-Score = 2 × (Precision × Recall) / (Precision + Recall)

### Install libraries

In [None]:
!pip install transformers accelerate evaluate datasets
!pip -q install bitsandbytes pyarrow

In [None]:
!pip install huggingface_hub

In [None]:
!pip install --upgrade --no-cache-dir sentencepiece
!pip install peft

In [None]:
!pip -q install -U sentence-transformers faiss-cpu

In [None]:
!pip -q install -U fasttext

In [None]:
import os
os.kill(os.getpid(), 9)  # forces Colab restart so new install is active


### Import important modules

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
#from wordcloud import WordCloud
from transformers import pipeline
from transformers import AutoTokenizer, AutoModelForCausalLM, TextStreamer, BitsAndBytesConfig
from transformers import  TrainingArguments, DataCollatorForLanguageModeling, Trainer
import evaluate
from datasets import Dataset
import os
import torch
import torch.nn as nn
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset, DataLoader, RandomSampler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import math, json, gc
from tqdm.auto import tqdm
import random
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from functools import partial
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
import fasttext

In [None]:
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [None]:
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)

# Exploartoy Data Analysis


## Dataset Structure
 The dataset contains three .csv files, training.csv, validation.csv, and test.csv, which are prepared for training models, validating their performance, and testing the final results, respectively.


 Below, I perform exploratory data analysis on these three files to examine the number of samples, the distribution of labels, and the structure of the input data.

### Export important tokens

### Load datasets

In [None]:
train_df = pd.read_csv('training.csv')
test_df = pd.read_csv('test.csv')
validation_df = pd.read_csv('validation.csv')

### Analysis the number of samples

In [None]:
# Get the number of rows from each DataFrame
train_rows = len(train_df)
test_rows = len(test_df)
validation_rows = len(validation_df)

# Data for the pie chart
labels = ['Training', 'Test', 'Validation']
sizes = [train_rows, test_rows, validation_rows]
colors = ['#ff9999','#66b3ff','#99ff99']

# Calculate total samples
total_samples = sum(sizes)

# Create labels with counts and percentages
pie_labels = [f'{labels[i]}: {sizes[i]} ({sizes[i]/total_samples:.1%})' for i in range(len(labels))]

# Create the pie chart
plt.figure(figsize=(4, 4))
plt.pie(sizes, labels=pie_labels, colors=colors, autopct='', startangle=90)
plt.axis('equal')  # Equal aspect ratio ensures that pie is drawn as a circle.
plt.title('Distribution of Samples Across Different Sets out of {samples}'.format(samples=total_samples))
plt.show()

The dataset contains 28,945 samples divided into three sets: a training set with 23,436 samples (81%), a validation set with 2,605 samples (9%), and a test set with 2,904 samples (10%).

### Dataset columns review

Due to their similarity, I review the columns only for the training set and later refer to the validation and test sets to examine their data distributions.

In [None]:
train_df.head()

In [None]:
train_df.info()

Each row of the dataset contains three columns:

- **text** *(object)*: A textual description related to climate change that the model must process to determine whether it contains any valid claim about climate change.
- **sub_claim_code** *(object)*: A code representing the valid claim and sub-claim category that the model must predict by analyzing the **text** input.
- **sub_claim** *(object)*: A textual description of the claim category that the model must predict from the **text** input.

All columns have the datatype **object**, and the dataset appears to be clean, containing no null values. This suggests that no data cleaning is required before proceeding with analysis and modeling.


### Data Description

For all three sets, I examine their data descriptions to obtain basic information about their statistics, such as data redundancy, the most frequent values in each column, and their corresponding frequencies.


In [None]:
# Run describe on each dataframe and store the results
train_desc = train_df.describe(include='all')
test_desc = test_df.describe(include='all')
validation_desc = validation_df.describe(include='all')

# Create a new dictionary to hold the restructured data
restructured_data = {}

# Iterate through the index of the describe output (the statistics)
for index in train_desc.index:
    restructured_data[index] = {
        'train': train_desc.loc[index].to_dict(),
        'test': test_desc.loc[index].to_dict(),
        'validation': validation_desc.loc[index].to_dict()
    }

# Create a new DataFrame from the restructured data
restructured_df = pd.DataFrame.from_dict({(i,j): restructured_data[i][j]
                           for i in restructured_data.keys()
                           for j in restructured_data[i].keys()},
                          orient='index')

# Display the restructured dataframe
display(restructured_df)

The results of the data description suggest the following findings:

- The dataset is clean with respect to redundancy: the training and validation sets contain no duplicate samples, and the test set contains only one duplicate, which is negligible.
- The majority of labels in all three sets are `"No claim"`, accounting for approximately two-thirds of the samples in each set. This class imbalance suggests the need for caution, as it may bias the model’s predictions toward the majority class.
- We also have 18 different labels to predict by model.


### The distribution of labels in each set

In [None]:
# Get the value counts for each dataframe
train_counts = train_df['sub_claim_code'].value_counts()
test_counts = test_df['sub_claim_code'].value_counts()
validation_counts = validation_df['sub_claim_code'].value_counts()

# Create a figure with three subplots in a row
fig, axes = plt.subplots(1, 3, figsize=(20, 6))

# Plot the distribution for the training set
axes[0].bar(train_counts.index, train_counts.values)
axes[0].set_title('Training Set')
axes[0].set_xlabel('Sub-Claim')
axes[0].set_ylabel('Count')
axes[0].tick_params(axis='x', rotation=90)


# Plot the distribution for the test set
axes[1].bar(test_counts.index, test_counts.values)
axes[1].set_title('Test Set')
axes[1].set_xlabel('Sub-Claim')
axes[1].set_ylabel('Count')
axes[1].tick_params(axis='x', rotation=90)

# Plot the distribution for the validation set
axes[2].bar(validation_counts.index, validation_counts.values)
axes[2].set_title('Validation Set')
axes[2].set_xlabel('Sub-Claim')
axes[2].set_ylabel('Count')
axes[2].tick_params(axis='x', rotation=90)


plt.tight_layout()
plt.show()

The histograms of the three sets suggest that they share a similar data distribution, which indicates that validation and test set performance should provide a reliable representation of the model’s performance on the training data.


## Analysis of Input Text

In this section, I conduct an exploratory analysis of the input columns, examining features such as text length, number of words, most frequent words, and other relevant characteristics.


### Text length analysis

In [None]:
# Calculate the length of the text in each dataframe
train_text_lengths = train_df['text'].str.len()
test_text_lengths = test_df['text'].str.len()
validation_text_lengths = validation_df['text'].str.len()

# Create a figure with three subplots in a row
fig, axes = plt.subplots(1, 3, figsize=(20, 6))

# Plot the histogram for the training set
axes[0].hist(train_text_lengths, bins=50)
axes[0].set_title('Training Set Text Length Distribution')
axes[0].set_xlabel('Text Length')
axes[0].set_ylabel('Frequency')

# Plot the histogram for the test set
axes[1].hist(test_text_lengths, bins=50)
axes[1].set_title('Test Set Text Length Distribution')
axes[1].set_xlabel('Text Length')
axes[1].set_ylabel('Frequency')

# Plot the histogram for the validation set
axes[2].hist(validation_text_lengths, bins=50)
axes[2].set_title('Validation Set Text Length Distribution')
axes[2].set_xlabel('Text Length')
axes[2].set_ylabel('Frequency')

plt.tight_layout()
plt.show()

The text length distributions vary across the different sets. This variation may impact the performance of certain conventional methods, such as Long Short-Term Memory (LSTM) networks or Feed-Forward Neural Networks (FFNs), which can be sensitive to input length. In contrast, the effect on Large Language Models (LLMs) is expected to be minor, as they are generally more robust to variations in sequence length.


### Word count analysis

In [None]:
# Calculate the word count of the text in each dataframe
train_word_counts = train_df['text'].str.split().str.len()
test_word_counts = test_df['text'].str.split().str.len()
validation_word_counts = validation_df['text'].str.split().str.len()

# Create a figure with three subplots in a row
fig, axes = plt.subplots(1, 3, figsize=(20, 6))

# Plot the histogram for the training set
axes[0].hist(train_word_counts, bins=50)
axes[0].set_title('Training Set Word Count Distribution')
axes[0].set_xlabel('Word Count')
axes[0].set_ylabel('Frequency')

# Plot the histogram for the test set
axes[1].hist(test_word_counts, bins=50)
axes[1].set_title('Test Set Word Count Distribution')
axes[1].set_xlabel('Word Count')
axes[1].set_ylabel('Frequency')

# Plot the histogram for the validation set
axes[2].hist(validation_word_counts, bins=50)
axes[2].set_title('Validation Set Word Count Distribution')
axes[2].set_xlabel('Word Count')
axes[2].set_ylabel('Frequency')

plt.tight_layout()
plt.show()

A similar observation applies to the word count distribution across the datasets.


### Wordcloud Representation of text column


In [None]:
# Combine the text from all three dataframes
all_text = pd.concat([train_df['text'], test_df['text'], validation_df['text']])

# Join all text into a single string
try:
  combined_text = " ".join(review for review in all_text)
except:
  combined_text = " ".join(str(review) for review in all_text)

# Generate the word cloud
wordcloud = WordCloud(stopwords=None, background_color="white").generate(combined_text)

# Display the word cloud
plt.figure(figsize=(10, 7))
plt.imshow(wordcloud, interpolation='bilinear')
plt.title('Word Cloud of All Datasets')
plt.axis("off")
plt.show()

The wordcloud suggests that the most repetive words are global, climate, change, year, and warming.

# Evalaute different models performance in the text classification

### Login to the hugging face
To download models it is necessary to login to huggingface account or forward the authentication tokens

In [None]:
from huggingface_hub import login
login("YOUR_HF_TOKEN")

### Create a list of all possible answers to prompt

In [None]:
# Get all unique values from the 'sub_claim' column in the training data
sub_claim_labels = train_df['sub_claim'].unique().tolist()

## Zeroshot prompting Results

### Define model name

In [None]:
MODEL_ID = "google/gemma-2-2b-it"

### Define parameters

In [None]:
USE_4BIT = False  # set True if using "google/gemma-2-9b-it"


In [None]:

INPUT_COL = "text"
BATCH_SIZE = 8
OUTPUT_COL = "sub_claim"
MAX_INPUT_TOKENS = 512
MAX_NEW_TOKENS = 4
CHECKPOINT_EVERY = 100
OUT_PATH = "predictions.parquet"
CSV_MIRROR = "predictions.csv"

Set quantized weights to false

In [None]:
bnb_config = BitsAndBytesConfig(load_in_4bit=True) if USE_4BIT else None

### Define the Model

In [None]:
dtype = torch.float16
model = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    torch_dtype=dtype,
    device_map="auto",
    quantization_config=bnb_config,
)

### Define Tokenization model

In [None]:
tok = AutoTokenizer.from_pretrained(MODEL_ID)


### Prompt of Zeroshot results
* You are a strict classifier. Possible labels: ***List of labels***. Return only one label.


In [None]:
def format_prompt(x):
    # Keep it short and force a closed set
    return (
        "You are a strict classifier. "
        f"Possible labels: {', '.join(sub_claim_labels)}.\n"
        "Return ONLY one label.\n\n"
        f"TEXT:\n{x}\n\nLabel:"
    )

### Parse labels
* Empty answers assume to be "No claim"

In [None]:
import re

def parse_label(text):
    if not text or not text.strip():
        return "No claim"
    first = text.strip().splitlines()[0].strip().lower()
    # optional: normalize punctuation/spaces
    first_norm = re.sub(r'\s+', ' ', first)

    for lab in sub_claim_labels:
        if first_norm.startswith(lab.lower()):
            return lab

    # fallback: look for any exact label mention
    pat = r'\b(' + '|'.join(re.escape(l.lower()) for l in sub_claim_labels) + r')\b'
    m = re.search(pat, first_norm)
    return m.group(1) if m else "No claim"


### Check null values

In [None]:
if not INPUT_COL in test_df.columns:
    raise ValueError(f"Missing column: {INPUT_COL}")

### Robustify code against crashes.

In [None]:
done_idx = set()
if os.path.exists(OUT_PATH):
    prev = pd.read_parquet(OUT_PATH)
    # Assuming original order; if you have a stable ID column, use that instead
    done_idx = set(prev.index.tolist())
    print(f"Resuming: {len(done_idx)} rows already done.")
else:
    prev = pd.DataFrame(index=[], columns=[OUTPUT_COL])


In [None]:
model.eval()


### Run the prompt on the data

In [None]:
def generate_batch(prompts):
    enc = tok(
        prompts, padding=True, truncation=True,
        max_length=MAX_INPUT_TOKENS, return_tensors="pt"
    ).to(model.device)

    input_lens = enc["attention_mask"].sum(dim=1).tolist()  # true per-sample lengths

    with torch.no_grad():
        gen = model.generate(
            **enc,
            max_new_tokens=MAX_NEW_TOKENS,
            min_new_tokens=1,  # force at least one token
            do_sample=False,
            temperature=None,  # avoid the warning
            eos_token_id=tok.eos_token_id or getattr(model.config, "eos_token_id", None),
            pad_token_id=tok.pad_token_id or getattr(model.config, "pad_token_id", None),
        )

    decoded = []
    for i, L in enumerate(input_lens):
        # slice per-sample using its true input length
        out_tokens = gen[i, L:] if gen.size(1) > L else gen.new_zeros((0,))
        if out_tokens.numel() == 0:
            decoded.append("")   # will be handled by parse_label
        else:
            decoded.append(tok.decode(out_tokens, skip_special_tokens=True, clean_up_tokenization_spaces=False))
    return decoded


### Run to get results

In [None]:
results = prev.copy()
if results.empty:
    results = pd.DataFrame(index=test_df.index, columns=[OUTPUT_COL])
todo_indices = [i for i in test_df.index if i not in done_idx]
print(f"Remaining: {len(todo_indices)}")
for start in tqdm(range(0, len(todo_indices), BATCH_SIZE)):
    batch_idx = todo_indices[start:start+BATCH_SIZE]
    texts = [test_df.at[i, INPUT_COL] for i in batch_idx]
    prompts = [format_prompt(t) for t in texts]

    try:
        gens = generate_batch(prompts)
        labels = [parse_label(g) for g in gens]
        results.loc[batch_idx, OUTPUT_COL] = labels
    except RuntimeError as e:
        # Handle occasional CUDA OOM
        if "CUDA out of memory" in str(e):
            torch.cuda.empty_cache()
            # retry with smaller batch
            for i in batch_idx:
                try:
                    g = generate_batch([prompts[batch_idx.index(i)]])[0]
                    results.at[i, OUTPUT_COL] = parse_label(g)
                except Exception:
                    results.at[i, OUTPUT_COL] = "error"
            continue
        else:
            # mark errors and continue
            for i in batch_idx:
                results.at[i, OUTPUT_COL] = "error"
            continue
    finally:
        # Periodic checkpoint
        if (start // BATCH_SIZE) % (CHECKPOINT_EVERY // max(BATCH_SIZE,1)) == 0:
            results.to_parquet(OUT_PATH)
            results.to_csv(CSV_MIRROR, index=True)
            torch.cuda.empty_cache()
            gc.collect()

### Evaluate the Zeroshot performance

In [None]:
# Load predictions from the CSV file
predictions_df = pd.read_csv("/content/predictions.csv", index_col=0)

# Ensure the indices align
predictions_df = predictions_df.reindex(test_df.index)

# Get the true labels and predicted labels
y_true = test_df['sub_claim']
y_pred = predictions_df[OUTPUT_COL]

# Calculate evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)
f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)

# Create a DataFrame to display the results with the specified structure
evaluation_metrics = pd.DataFrame({
    'Model': ['gemma 2b'],
    'Accuracy': [accuracy],
    'Precision': [precision],
    'Recall': [recall],
    'F1-Score': [f1]
}, index=['Zero shot'])

# Display the results
display(evaluation_metrics)

The zero-shot performance is not acceptable. Possible reasons include the specificity of the task, meaning the pre-trained LLM was not trained on similar data, and the relatively small size of the model.

## Few shot prompting

In [None]:
OUT_PATH           = "predictions_fewshot.parquet"
CSV_MIRROR         = "predictions_fewshot.csv"

FEWSHOT_K          = 3                        # number of examples per prompt
MAX_EX_TEXT_CHARS  = 280                      # truncate each example text (pre-tokenization)
MAX_INPUT_TOKENS   = 1024                     # full prompt cap (few-shot + test)
MAX_NEW_TOKENS     = 4                        # classification -> tiny output
BATCH_SIZE         = 4                        # careful with VRAM; reduce if OOM
CHECKPOINT_EVERY   = 100                      # rows
SEED               = 42                       # reproducibility (sampling)
LABELS_OVERRIDE    = None                     # e.g. ["pos","neg","neu"] or None to infer from train_df


In [None]:
def truncate_text_chars(s, max_chars):
    if s is None:
        return ""
    s = str(s)
    return s[:max_chars] if max_chars and len(s) > max_chars else s

In [None]:
def build_few_shot_prompt(train_df, k, input_col, label_col, test_text, labels, seed=None):
    """Randomly sample k examples from train_df and build a few-shot prompt."""
    if seed is not None:
        # different seed per call to avoid identical samples each row
        rnd = random.Random(seed + random.randint(0, 1_000_000))
        sampled = train_df.sample(n=min(k, len(train_df)), random_state=rnd.randint(0, 1_000_000))
    else:
        sampled = train_df.sample(n=min(k, len(train_df)))

    parts = [
        "You are a strict text classifier.",
        f"Possible labels: {', '.join(labels)}.",
        "Return ONLY the label name.\n"
    ]
    for _, r in sampled.iterrows():
        ex_text = truncate_text_chars(r[input_col], MAX_EX_TEXT_CHARS)
        parts.append(f"TEXT: {ex_text}\nLabel: {r[label_col]}\n")

    parts.append(f"TEXT: {truncate_text_chars(test_text, MAX_EX_TEXT_CHARS)}\nLabel:")
    return "\n".join(parts)

In [None]:
def atomic_save(df):
    tmp = OUT_PATH + ".tmp"
    df.to_parquet(tmp)
    os.replace(tmp, OUT_PATH)
    df.to_csv(CSV_MIRROR, index=True, na_rep="PENDING")

In [None]:
def generate_batch(prompts):
    enc = tok(
        prompts, padding=True, truncation=True,
        max_length=MAX_INPUT_TOKENS, return_tensors="pt"
    ).to(model.device)

    input_lens = enc["attention_mask"].sum(dim=1).tolist()

    with torch.no_grad():
        gen = model.generate(
            **enc,
            max_new_tokens=MAX_NEW_TOKENS,
            min_new_tokens=1,           # ensure at least one token
            do_sample=False,            # deterministic for classification
            temperature=None,           # avoid "ignored" warning
            eos_token_id=tok.eos_token_id or getattr(model.config, "eos_token_id", None),
            pad_token_id=tok.pad_token_id or getattr(model.config, "pad_token_id", None),
        )

    decoded = []
    for i, L in enumerate(input_lens):
        out_tokens = gen[i, L:] if gen.size(1) > L else gen.new_zeros((0,))
        if out_tokens.numel() == 0:
            decoded.append("")
        else:
            decoded.append(tok.decode(out_tokens, skip_special_tokens=True, clean_up_tokenization_spaces=False))
    return decoded

In [None]:
if os.path.exists(OUT_PATH):
    results = pd.read_parquet(OUT_PATH)
    # Make sure it aligns with current test_df
    missing_idx = set(test_df.index) - set(results.index)
    if missing_idx:
        # expand to include missing rows
        extra = pd.DataFrame(index=sorted(list(missing_idx)))
        results = pd.concat([results, extra], axis=0).sort_index()
    if "prediction" not in results.columns:
        results["prediction"] = pd.NA
    done_mask = results["prediction"].notna()
    print(f"Resuming: {done_mask.sum()} already done / {len(test_df)}")
else:
    results = pd.DataFrame(index=test_df.index, columns=["prediction"])
    done_mask = results["prediction"].notna()
    print(f"Starting fresh: {len(test_df)} rows")

In [None]:
todo_indices = [i for i in test_df.index if not done_mask.loc[i]]

In [None]:
for start in tqdm(range(0, len(todo_indices), BATCH_SIZE)):
    batch_idx = todo_indices[start:start + BATCH_SIZE]
    batch_texts = [test_df.at[i, INPUT_COL] for i in batch_idx]

    # Build a few-shot prompt per test text (each gets its own random k-shot)
    prompts = [
        build_few_shot_prompt(
            train_df=train_df,
            k=FEWSHOT_K,
            input_col="text",
            label_col="sub_claim",
            test_text=txt,
            labels=sub_claim_labels,
            seed=SEED
        )
        for txt in batch_texts
    ]

    try:

        gens = generate_batch(prompts)
        labels = [parse_label(g) for g in gens]
        results.loc[batch_idx, "prediction"] = labels
        print(results)
    except RuntimeError as e:
        if "CUDA out of memory" in str(e):
            torch.cuda.empty_cache()
            # retry one-by-one
            for i, prompt in zip(batch_idx, prompts):
                try:
                    g = generate_batch([prompt])[0]
                    results.at[i, "prediction"] = parse_label(g)
                except Exception:
                    results.at[i, "prediction"] = "error"
        else:
            for i in batch_idx:
                results.at[i, "prediction"] = "error"
    finally:
        # periodic checkpoint
        processed = len(results["prediction"].dropna())
        if processed % CHECKPOINT_EVERY < BATCH_SIZE:
            atomic_save(results)
            torch.cuda.empty_cache()
            gc.collect()

# Final save
atomic_save(results)
print("Saved:", OUT_PATH, "and", CSV_MIRROR)

In [None]:
# Load predictions from the CSV file
predictions_df = pd.read_csv("/content/predictions_fewshot.csv", index_col=0)

# Ensure the indices align
predictions_df = predictions_df.reindex(test_df.index)

# Get the true labels and predicted labels
y_true = test_df['sub_claim']
y_pred = predictions_df["prediction"]

# Calculate evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)
f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)

# Create a DataFrame to display the results with the specified structure
few_shot_metrics = pd.DataFrame({
    'Model': ['gemma 2b'],
    'Accuracy': [accuracy],
    'Precision': [precision],
    'Recall': [recall],
    'F1-Score': [f1]
}, index=['Few Shot'])

# Append the few-shot metrics to the existing evaluation_metrics DataFrame
evaluation_metrics = pd.concat([evaluation_metrics, few_shot_metrics])

# Display the results
display(evaluation_metrics)

Few-shot prompting performs slightly better than zero-shot, but the results are still below the acceptable threshold. The improvement can likely be attributed to the inclusion of example samples.

### Efficient Fine-Tuning with QLoRA and PEFT
QLoRA enables efficient fine-tuning of large language models by quantizing weights to 4-bit precision while preserving model quality. PEFT (Parameter-Efficient Fine-Tuning) methods, such as LoRA, update only a small set of additional parameters, reducing memory usage and training time. Combined, they allow low-resource hardware to fine-tune large models effectivelly.

In [None]:
MODEL_NAME   = "google/gemma-2-2b-it"   # or "google/gemma-2-2b" if you prefer
TEXT_COL     = "text"
LABEL_COL    = "sub_claim"
MAX_LEN      = 384
EPOCHS       = 3
LR           = 2e-4
BATCH_TRAIN  = 2        # T4-friendly; raise if you can
BATCH_EVAL   = 2
GRAD_ACCUM   = 8
OUTPUT_DIR   = "./gemma2_cls_qlora"
SEED         = 42
SUBSET_SIZE  = 1000

In [None]:
train_df = train_df[[TEXT_COL, LABEL_COL]].dropna().reset_index(drop=True)
test_df  = test_df[[TEXT_COL, LABEL_COL]].dropna().reset_index(drop=True)

In [None]:
labels = sorted(pd.unique(train_df[LABEL_COL].tolist() + test_df[LABEL_COL].tolist()))
label2id = {l:i for i,l in enumerate(labels)}
id2label = {i:l for l,i in label2id.items()}

In [None]:
train_hf = Dataset.from_pandas(train_df)
validation_hf  = Dataset.from_pandas(validation_df)
test_hf = Dataset.from_pandas(test_df)

In [None]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

In [None]:
compute_dtype = torch.bfloat16 if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else torch.float16

In [None]:
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    device_map="auto",
    load_in_4bit=True,
    bnb_4bit_compute_dtype=compute_dtype,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    attn_implementation="eager",   # recommended for Gemma 2
)
model.config.use_cache = False
model.config.pad_token_id = tokenizer.pad_token_id

In [None]:
model = prepare_model_for_kbit_training(model)

In [None]:
lora_cfg = LoraConfig(
    r=16, lora_alpha=32, lora_dropout=0.05, bias="none",
    task_type="CAUSAL_LM",
    target_modules=["q_proj","k_proj","v_proj","o_proj","gate_proj","up_proj","down_proj"],
)

In [None]:
model = get_peft_model(model, lora_cfg)
model.gradient_checkpointing_enable()

In [None]:
label_tokens = {}
space_id = tokenizer.encode(" ", add_special_tokens=False)[0]
for lab in labels:
    ids = tokenizer.encode(" " + str(lab), add_special_tokens=False)
    label_tokens[lab] = ids[0] if len(ids)>0 else tokenizer.eos_token_id

In [None]:
trainable, total = 0, 0
for n, p in model.named_parameters():
    total += p.numel()
    if p.requires_grad:
        trainable += p.numel()
print(f"Trainable params: {trainable/1e6:.2f}M / {total/1e6:.2f}M")

**The prompt:**  
You are a helpful classifier.  
Choose one label from the list.  

**Text:**  
...  

**Answer:**  
(Respond with just the label.)


In [None]:
PROMPT_TEMPLATE = (
    "You are a helpful classifier. "
    "Choose one label from: {label_list}.\n\n"
    "Text:\n{text}\n\n"
    "Answer with just the label."
)


In [None]:
def build_sample(example):
    text = str(example[TEXT_COL])
    lab  = str(example[LABEL_COL])
    prompt = PROMPT_TEMPLATE.format(label_list=", ".join(labels), text=text)

    enc = tokenizer(prompt, truncation=True, max_length=MAX_LEN, padding=False)
    input_ids = enc["input_ids"]; attn = enc["attention_mask"]

    # append " space + label_token" for supervised next-token learning
    label_id = label_tokens[lab]
    input_ids_with_label = input_ids + [space_id, label_id]
    attn_with_label = attn + [1,1]

    # supervise only the final label token; ignore prompt tokens with -100
    labels_arr = [-100]*len(input_ids) + [-100, label_id]

    return {
        "input_ids": input_ids_with_label,
        "attention_mask": attn_with_label,
        "labels": labels_arr,
        "gold_label": lab,
    }

In [None]:
train_hf = train_hf.map(build_sample, remove_columns=train_hf.column_names)
validation_hf= validation_hf.map(build_sample, remove_columns=validation_hf.column_names)
test_hf  = test_hf.map(build_sample,  remove_columns=test_hf.column_names)

In [None]:
def collate(batch):
    padded = tokenizer.pad(
        {
            "input_ids": [b["input_ids"] for b in batch],
            "attention_mask": [b["attention_mask"] for b in batch],
        },
        padding=True,
        return_tensors="pt",
    )
    max_len = padded["input_ids"].size(1)

    labels_pad = []
    for b in batch:
        l = b["labels"]
        if len(l) < max_len:
            l = l + [-100] * (max_len - len(l))
        labels_pad.append(l)
    padded["labels"] = torch.tensor(labels_pad, dtype=torch.long)

    # only add gold_label if present in ALL items
    if all("gold_label" in b for b in batch):
        padded["gold_label"] = [b["gold_label"] for b in batch]
    return padded

In [None]:
@torch.no_grad()
def predict_labels(eval_dataset):
    model.eval()
    preds, golds = [], []
    device = model.device
    for ex in eval_dataset:
        # prompt without appended " space + label"
        inp  = ex["input_ids"][:-2]
        attn = ex["attention_mask"][:-2]
        # ask the model to predict next token after adding a space
        inp2  = inp + [space_id]
        attn2 = attn + [1]

        tens   = torch.tensor([inp2], device=device)
        attn_t = torch.tensor([attn2], device=device)
        out = model(input_ids=tens, attention_mask=attn_t)
        next_logits = out.logits[0, -1]  # distribution for next token

        # score each label by its first token logit
        scores = [next_logits[label_tokens[lab]].item() for lab in labels]
        pred_lab = labels[int(np.argmax(scores))]
        preds.append(pred_lab)
        golds.append(ex["gold_label"])
    return preds, golds

In [None]:
def compute_metrics_on(eval_dataset):
    y_pred, y_true = predict_labels(eval_dataset)
    acc = accuracy_score(y_true, y_pred)
    pr, rc, f1, _ = precision_recall_fscore_support(y_true, y_pred, average="weighted", zero_division=0)
    return {"accuracy": acc, "precision": pr, "recall": rc, "f1": f1}

In [None]:
class SubsetTrainer(Trainer):
    def __init__(self, *args, subset_size=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.subset_size = subset_size

    def get_train_dataloader(self):
        if self.train_dataset is None:
            raise ValueError("Training requires a train_dataset.")
        if self.subset_size is not None:
            sampler = RandomSampler(
                self.train_dataset, replacement=True, num_samples=self.subset_size
            )
        else:
            sampler = self._get_train_sampler()
        return DataLoader(
            self.train_dataset,
            batch_size=self.args.train_batch_size,
            sampler=sampler,
            collate_fn=self.data_collator,
            drop_last=self.args.dataloader_drop_last,
            num_workers=self.args.dataloader_num_workers,
            pin_memory=self.args.dataloader_pin_memory,
        )


In [None]:
args = TrainingArguments(
    output_dir=OUTPUT_DIR,
    num_train_epochs=EPOCHS,
    learning_rate=LR,
    per_device_train_batch_size=BATCH_TRAIN,
    per_device_eval_batch_size=BATCH_EVAL,
    gradient_accumulation_steps=GRAD_ACCUM,
    logging_steps=25,
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    bf16=(compute_dtype==torch.bfloat16),
    fp16=(compute_dtype==torch.float16),
    report_to="none",
)

In [None]:
trainer = SubsetTrainer(
    model=model,
    args=args,
    train_dataset=train_hf,
    eval_dataset=validation_hf,
    tokenizer=tokenizer,
    data_collator=collate,
    subset_size=SUBSET_SIZE,
)

In [None]:
trainer.train()

In [None]:
# Evaluate the model on the test set
test_metrics = compute_metrics_on(test_hf)

# Create a DataFrame with the test metrics
peft_qlora_results = pd.DataFrame({
    'Model': ['gemma 2b'],
    'Accuracy': [test_metrics['accuracy']],
    'Precision': [test_metrics['precision']],
    'Recall': [test_metrics['recall']],
    'F1-Score': [test_metrics['f1']]
}, index=['PEFT QLORA'])

# Append the PEFT QLORA metrics to the existing evaluation_metrics DataFrame
evaluation_metrics = pd.concat([evaluation_metrics, peft_qlora_results])

# Display the results
display(evaluation_metrics)

# Save the results to a CSV file
peft_qlora_results.to_csv("PEFT_QLORA_results.csv", index=True)

The fine-tuning results are significantly better, likely because updating a subset of the model’s weights provides it with task-specific knowledge, enabling better performance.

### Using **Chain-of-Thought (COT) + Self-Consistency**
I use Chain-of-Thought (CoT) prompting to guide the model through step-by-step reasoning before producing the final answer. I also apply Self-Consistency, where the model generates multiple reasoning paths for the same query and the most frequent final answer is selected, improving reliability and accuracy.

In [None]:
EVAL_SAMPLES = 300
SC_N = 3                # self-consistency passes
MAX_NEW_TOKENS = 64
TEMP = 0.7

In [None]:
labels = sorted(pd.unique(pd.concat([train_df[LABEL_COL], test_df[LABEL_COL]], ignore_index=True)))

In [None]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

In [None]:
compute_dtype = torch.bfloat16 if (torch.cuda.is_available() and torch.cuda.is_bf16_supported()) else torch.float16
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    device_map="auto",
    load_in_4bit=True,
    bnb_4bit_compute_dtype=compute_dtype,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    attn_implementation="eager",
)
model.eval()
model.config.use_cache = True

**Prompt**
You are a careful fact/claim classifier.  
Possible labels: <label1>, <label2>, <label3>  

Instructions:  
1) Think step by step about the text and its meaning.  
2) On the last line, output exactly: Label: <one of the labels>  

Text:  
<text>  

Reasoning:

In [None]:
def cot_prompt(text, labels):
    return (
        "You are a careful fact/claim classifier.\n"
        f"Possible labels: {', '.join(labels)}\n\n"
        "Instructions:\n"
        "1) Think step by step about the text and its meaning.\n"
        "2) On the last line, output exactly: Label: <one of the labels>\n\n"
        f"Text:\n{text}\n\n"
        "Reasoning:"
    )

In [None]:
def parse_label(output, labels):
    m = re.search(r"Label:\s*([^\n\r]+)", output, flags=re.IGNORECASE)
    if m:
        cand = m.group(1).strip()
        # pick the first exact label that appears
        for L in labels:
            if re.search(rf"\b{re.escape(L)}\b", cand, flags=re.IGNORECASE):
                return L
    # fallback: first label mentioned anywhere
    for L in labels:
        if re.search(rf"\b{re.escape(L)}\b", output, flags=re.IGNORECASE):
            return L
    return None

In [None]:
@torch.no_grad()
def classify_cot_once(model, tokenizer, text, labels, max_new_tokens=64, temperature=0.7):
    prompt = cot_prompt(text, labels)
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    gen = model.generate(
        **inputs,
        max_new_tokens=max_new_tokens,
        do_sample=(temperature > 0),
        temperature=temperature,
        top_p=0.9,
        eos_token_id=tokenizer.eos_token_id,
    )
    out = tokenizer.decode(gen[0], skip_special_tokens=True)
    return parse_label(out, labels), out

In [None]:
@torch.no_grad()
def classify_self_consistency(model, tokenizer, text, labels, n=3, max_new_tokens=64, temperature=0.7):
    votes = []
    for _ in range(n):
        pred, _ = classify_cot_once(model, tokenizer, text, labels,
                                    max_new_tokens=max_new_tokens, temperature=temperature)
        if pred is not None:
            votes.append(pred)
    if not votes:
        return None
    # majority vote
    return max(set(votes), key=votes.count)

In [None]:
rng = np.random.default_rng(42)
subset_idx = rng.choice(len(test_df), size=min(EVAL_SAMPLES, len(test_df)), replace=False)
eval_df = test_df.iloc[subset_idx].reset_index(drop=True)

In [None]:
y_true, y_pred = [], []
for i, row in eval_df.iterrows():
    pred = classify_self_consistency(
        model, tokenizer,
        row[TEXT_COL],
        labels,
        n=SC_N,
        max_new_tokens=MAX_NEW_TOKENS,
        temperature=TEMP
    )
    y_true.append(row[LABEL_COL])
    # safe default if parsing failed (rare)
    y_pred.append(pred if pred is not None else labels[0])
    if (i+1) % 50 == 0:
        print(f"Processed {i+1}/{len(eval_df)}")

# Calculate evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)
f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)

# Create a DataFrame to display the results with the specified structure
cot_sc_metrics = pd.DataFrame({
    'Model': ['gemma 2b'],
    'Accuracy': [accuracy],
    'Precision': [precision],
    'Recall': [recall],
    'F1-Score': [f1]
}, index=['COT + SC'])

# Append the COT + SC metrics to the existing evaluation_metrics DataFrame
evaluation_metrics = pd.concat([evaluation_metrics, cot_sc_metrics])

# Display the results
display(evaluation_metrics)

# Save the results to a CSV file
cot_sc_metrics.to_csv("cot_sc.csv", index=True)

The results are completely zero, indicating that this method may not be suitable for this task.

### Retrival Augmented Generation (RAG)
Is a technique that combines information retrieval with text generation.  
It first retrieves relevant documents or passages from an external knowledge source, such as a database or search index,  
and then uses a language model to generate responses based on both the retrieved context and the original query.  
This approach improves factual accuracy and allows the model to incorporate up-to-date or domain-specific information.

In [None]:
EMB_MODEL = "intfloat/e5-base-v2"
TOP_K = 3
MAX_EX_CHARS = 200

In [None]:
emb = SentenceTransformer(EMB_MODEL, device="cuda" if torch.cuda.is_available() else "cpu")

In [None]:
def embed_passages(texts, batch_size=256):
    return emb.encode([f"passage: {t}" for t in texts],
                      batch_size=batch_size, show_progress_bar=True,
                      convert_to_numpy=True, normalize_embeddings=True)

In [None]:
train_texts = train_df[TEXT_COL].astype(str).tolist()
train_labels = train_df[LABEL_COL].astype(str).tolist()

In [None]:
X = embed_passages(train_texts)          # (N, d) unit-normalized
index = faiss.IndexFlatIP(X.shape[1])    # cosine via dot product on normalized vecs
index.add(X)

In [None]:
def retrieve_examples(query_text, k=TOP_K):
    qv = emb.encode([f"query: {query_text}"],
                    convert_to_numpy=True, normalize_embeddings=True)
    D, I = index.search(qv, k)
    out = []
    for idx in I[0]:
        t = train_texts[idx]
        l = train_labels[idx]
        if len(t) > MAX_EX_CHARS:
            t = t[:MAX_EX_CHARS] + "…"
        out.append((t, l))
    return out

In [None]:
MODEL_NAME = "google/gemma-2-2b-it"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

In [None]:
dtype = torch.bfloat16 if (torch.cuda.is_available() and torch.cuda.is_bf16_supported()) else torch.float16
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    device_map="auto",
    load_in_4bit=True,
    bnb_4bit_compute_dtype=dtype,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    attn_implementation="eager",
)

In [None]:
model.eval(); model.config.use_cache = True

In [None]:
def make_fewshot_block(examples):
    # Short, labeled shots only
    shots = []
    for i, (t, l) in enumerate(examples, 1):
        shots.append(f"Example {i}\nText: {t}\nLabel: {l}\n----")
    return "\n".join(shots)

In [None]:
def prompt_no_cot(query_text, labels, examples):
    fewshot = make_fewshot_block(examples)
    return (
        "You are a classifier. "
        f"Possible labels: {', '.join(labels)}.\n\n"
        f"{fewshot}\n\n"
        f"Text: {query_text}\n"
        "Answer with exactly: Label: <one of the labels>"
    )

In [None]:
def parse_label(output, labels):
    m = re.search(r"Label:\s*([^\n\r]+)", output, flags=re.IGNORECASE)
    if m:
        cand = m.group(1).strip()
        for L in labels:
            if re.fullmatch(re.escape(L), cand, flags=re.IGNORECASE):
                return L
        # fallback: contains
        for L in labels:
            if re.search(rf"\b{re.escape(L)}\b", cand, flags=re.IGNORECASE):
                return L
    return None

In [None]:
@torch.no_grad()
def classify_rag_nocot(text, labels, max_new_tokens=16):
    examples = retrieve_examples(text, k=TOP_K)
    prompt = prompt_no_cot(text, labels, examples)
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    gen = model.generate(
        **inputs,
        max_new_tokens=max_new_tokens,
        do_sample=False,               # deterministic
        temperature=None,
        eos_token_id=tokenizer.eos_token_id,
    )
    out = tokenizer.decode(gen[0], skip_special_tokens=True)
    lab = parse_label(out, labels)
    return lab if lab is not None else labels[0]

In [None]:
EVAL_SAMPLES = min(500, len(test_df))
idx = np.random.default_rng(42).choice(len(test_df), size=EVAL_SAMPLES, replace=False)
eval_df = test_df.iloc[idx].reset_index(drop=True)

In [None]:
y_true = eval_df[LABEL_COL].tolist()
y_pred = [classify_rag_nocot(t, labels) for t in eval_df[TEXT_COL].tolist()]

In [None]:
# Calculate evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)
f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)

# Create a DataFrame to display the results with the specified structure
rag_metrics = pd.DataFrame({
    'Model': ['gemma 2b'],
    'Accuracy': [accuracy],
    'Precision': [precision],
    'Recall': [recall],
    'F1-Score': [f1]
}, index=['RAG'])

# Append the RAG metrics to the existing evaluation_metrics DataFrame
evaluation_metrics = pd.concat([evaluation_metrics, rag_metrics])

# Display the results
display(evaluation_metrics)

# Save the results to a CSV file
rag_metrics.to_csv("rag_results.csv", index=True)

The accuracy of RAG is higher than that of fine-tuning the model, suggesting its strong potential for addressing the problem.

### TF/IDF embedding combined to fully connected network

**TF-IDF (Term Frequency–Inverse Document Frequency)** is a statistical method used to represent text as numerical vectors.  
It assigns higher weights to terms that appear frequently in a document but less frequently across the corpus,  
capturing words that are most relevant for distinguishing between documents.

TF-IDF is important for text classification because it transforms unstructured text into meaningful, fixed-size  
feature vectors that preserve important information while reducing the influence of common, less informative words.

We combine TF-IDF embeddings with a **fully connected neural network** to leverage the strengths of both methods:  
TF-IDF provides a robust, interpretable representation of the text, while the fully connected network learns complex,  
non-linear decision boundaries for accurate classification.




In [None]:
MAX_FEATURES = 100_000     # cap vocab size for speed/memory
NGRAM_RANGE = (1, 2)       # unigrams + bigrams often help
BATCH_SIZE  = 256
LR          = 1e-3
EPOCHS      = 20
PATIENCE    = 3            # early stopping patience
HIDDEN1     = 1024
HIDDEN2     = 256
DROPOUT     = 0.3
DEVICE      = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
le = LabelEncoder()
le.fit(train_df["sub_claim"])

In [None]:
y_train = le.transform(train_df["sub_claim"].astype(str))
y_val   = le.transform(validation_df["sub_claim"].astype(str))
y_test  = le.transform(test_df["sub_claim"].astype(str))

In [None]:
num_classes = len(le.classes_)
print("Classes:", list(le.classes_))

In [None]:
tfidf = TfidfVectorizer(max_features=MAX_FEATURES, ngram_range=NGRAM_RANGE, lowercase=True)
X_train = tfidf.fit_transform(train_df["text"].astype(str))
X_val   = tfidf.transform(validation_df["text"].astype(str))
X_test  = tfidf.transform(test_df["text"].astype(str))
input_dim = X_train.shape[1]
print(f"TF-IDF shape: train {X_train.shape}, val {X_val.shape}, test {X_test.shape}")

In [None]:
class SparseTfidfDataset(Dataset):
    def __init__(self, X_csr, y_np):
        self.X = X_csr
        self.y = y_np
    def __len__(self):
        return self.X.shape[0]
    def __getitem__(self, idx):
        # convert 1 row CSR -> dense float32 only for this sample
        x = torch.from_numpy(self.X[idx].toarray().ravel().astype(np.float32))
        y = torch.tensor(self.y[idx], dtype=torch.long)
        return x, y

In [None]:
train_ds = SparseTfidfDataset(X_train, y_train)
val_ds   = SparseTfidfDataset(X_val, y_val)
test_ds  = SparseTfidfDataset(X_test, y_test)

In [None]:
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

In [None]:
class MLP(nn.Module):
    def __init__(self, in_dim, n_classes):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, HIDDEN1),
            nn.ReLU(inplace=True),
            nn.Dropout(DROPOUT),
            nn.Linear(HIDDEN1, HIDDEN2),
            nn.ReLU(inplace=True),
            nn.Dropout(DROPOUT),
            nn.Linear(HIDDEN2, n_classes),
        )
    def forward(self, x):
        return self.net(x)

In [None]:
model = MLP(input_dim, num_classes).to(DEVICE)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=LR)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="max", factor=0.5, patience=1, verbose=True)

In [None]:
def run_epoch(loader, train=True):
    if train:
        model.train()
    else:
        model.eval()
    losses, preds_all, labels_all = [], [], []
    for xb, yb in loader:
        xb = xb.to(DEVICE, non_blocking=True)
        yb = yb.to(DEVICE, non_blocking=True)
        with torch.set_grad_enabled(train):
            logits = model(xb)
            loss = criterion(logits, yb)
            if train:
                optimizer.zero_grad(set_to_none=True)
                loss.backward()
                optimizer.step()
        losses.append(loss.item())
        preds_all.append(torch.argmax(logits, dim=1).detach().cpu().numpy())
        labels_all.append(yb.detach().cpu().numpy())
    y_pred = np.concatenate(preds_all)
    y_true = np.concatenate(labels_all)
    acc = accuracy_score(y_true, y_pred)
    p, r, f1, _ = precision_recall_fscore_support(y_true, y_pred, average="weighted", zero_division=0)
    return np.mean(losses), {"accuracy":acc, "precision":p, "recall":r, "f1":f1}, y_true, y_pred

In [None]:
best_val_f1, best_state, patience_left = -1.0, None, PATIENCE
for epoch in range(1, EPOCHS+1):
    tr_loss, tr_metrics, _, _ = run_epoch(train_loader, train=True)
    val_loss, val_metrics, _, _ = run_epoch(val_loader, train=False)
    scheduler.step(val_metrics["f1"])
    print(f"Epoch {epoch:02d} | "
          f"train loss {tr_loss:.4f} f1 {tr_metrics['f1']:.4f} | "
          f"val loss {val_loss:.4f} f1 {val_metrics['f1']:.4f}")
    # Early stopping on val F1
    if val_metrics["f1"] > best_val_f1 + 1e-4:
        best_val_f1 = val_metrics["f1"]
        best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
        patience_left = PATIENCE
    else:
        patience_left -= 1
        if patience_left == 0:
            print("Early stopping.")
            break

In [None]:
test_loss, test_metrics, yt_true, yt_pred = run_epoch(test_loader, train=False)

In [None]:
# Calculate evaluation metrics
accuracy = accuracy_score(yt_true, yt_pred)
precision = precision_score(yt_true, yt_pred, average='weighted', zero_division=0)
recall = recall_score(yt_true, yt_pred, average='weighted', zero_division=0)
f1 = f1_score(yt_true, yt_pred, average='weighted', zero_division=0)

# Create a DataFrame to display the results with the specified structure
tfidf_fnn_metrics = pd.DataFrame({
    'Accuracy': [accuracy],
    'Precision': [precision],
    'Recall': [recall],
    'F1-Score': [f1]
}, index=['FFN+TFIDF'])

# Append the TF/IDF+FNN metrics to the existing evaluation_metrics DataFrame
evaluation_metrics = pd.concat([evaluation_metrics, tfidf_fnn_metrics])

# Display the results
display(evaluation_metrics)

# Save the results to a CSV file
tfidf_fnn_metrics.to_csv("TF_IDF_FNN_results.csv", index=True)

TF-IDF + FNN is the best-performing model for the task, achieving 79% accuracy.  
This suggests that, in some cases, conventional methods can outperform more complex LLM-based models.

## Conclusion

The results show a clear performance gap between conventional methods and zero/few-shot LLM approaches for this specific classification task.  
Zero-shot and few-shot prompting with **Gemma 2B** performed poorly, likely due to the model's small size and lack of domain-specific training data.  
**CoT + SC** failed to produce meaningful outputs, suggesting that reasoning-based prompting was not effective in this context.

Fine-tuning with **PEFT QLoRA** significantly improved results, achieving over 71% accuracy, while **RAG** further increased performance to 73.8%,  
demonstrating the benefit of augmenting the model with relevant retrieved context.  
However, the best-performing approach was the **TF-IDF + Fully Connected Neural Network**, achieving **79.2% accuracy**.  
This indicates that, for certain domain-specific text classification tasks, conventional machine learning methods can outperform more  
complex and resource-intensive LLM-based solutions.