In [1]:
import random
import re
from time import time

import numpy as np
import pandas as pd
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
import os
import pickle


In [2]:
import kagglehub

# Download dataset from Kaggle
ds_path = kagglehub.dataset_download("abhi8923shriv/sentiment-analysis-dataset", path="train.csv")
print("Path to dataset files:", ds_path)
ds_path


Path to dataset files: /home/szymon/.cache/kagglehub/datasets/abhi8923shriv/sentiment-analysis-dataset/versions/9/train.csv


'/home/szymon/.cache/kagglehub/datasets/abhi8923shriv/sentiment-analysis-dataset/versions/9/train.csv'

### Text processing

In [3]:
def clean_text(text: str) -> str:
    """Clean and normalize tweet text.

    Args:
        text (str): Raw tweet text

    Returns:
        str: Cleaned text with URLs, HTML, mentions removed

    Example:
        >>> clean_text("@user I LOVE this! #amazing")
        'i love this! amazing'

    """
    if not text:
        return ""

    text = text.lower()
    text = re.sub(r"https?://\S+|www\.\S+", "", text)
    text = re.sub(r"<.*?>", "", text)
    text = re.sub(r"@\w+", "", text)
    text = re.sub(r"#(\w+)", r"\1", text)
    return re.sub(r"\s+", " ", text).strip()



def load_train_dataset(
    path: str,
    encoding: str = "latin1",
    samples: int = 512,
) -> tuple[np.ndarray, np.ndarray]:
    """Load and preprocess the sentiment dataset.

    Args:
        path (str): Path to the CSV file
        encoding (str): File encoding (default: 'latin1')
        samples (int): Number of samples to randomly select

    Returns:
        tuple: (X, y) where X is text array and y is label array

    Example:
        >>> X, y = load_train_dataset("data.csv", samples=100)
        >>> print(X[0], y[0])
        'i love this product' 'positive'

    """
    df = pd.read_csv(path, encoding=encoding)
    df = df.dropna()

    df["preprocessed_text"] = df["selected_text"].apply(clean_text)

    idxes = random.sample(range(df.shape[0]), samples)
    return df["preprocessed_text"].values[idxes], df["sentiment"].values[idxes]


### Prompt Engineering Functions for ICL

In [4]:
def inject_example(text: str, sentiment: str) -> str:
    return f"Tweet: {text}\nSentiment: {sentiment}\n\n"


def inject_sample(text: str) -> str:
    return (
        f"### Now classify the following:\n"
        f"Tweet: {text}\nSentiment:"
    )

def inject_sample_without_header(text: str) -> str:
    return (
        f"Tweet: {text}\nSentiment:"
    )

### Label Extraction

In [None]:
def extract_label(result: str, labels: list[str]) -> str:

    for lab in labels:
        if lab in result.lower():
            return lab
    print("Could not extract label from result:", result)
    return "unknown"


### IN-CONTEXT LEARNING EXPERIMENT

In [6]:
def run_experiment(
    base_prompt: str, X: np.ndarray, y: np.ndarray, model_name: str,
) -> tuple[list, list]:
    """Run ICL classification experiment on a test set.

    This function:
    1. Loads the specified language model
    2. For each test sample, constructs a full prompt
    3. Generates a prediction using the model
    4. Extracts the predicted label
    5. Collects all predictions and ground truth labels

    Args:
        base_prompt (str): Base prompt with instructions and examples
        X (np.ndarray): Array of text samples to classify
        y (np.ndarray): Array of true labels
        model_name (str): HuggingFace model identifier (e.g., 'facebook/opt-1.3b')

    Returns:
        tuple[list, list]: (predicted_labels, true_labels)

    """
    labels = list(np.unique(y))

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(
        model_name, dtype=torch.float16, device_map="auto",
    )

    y_true = []
    y_pred = []

    for idx, x in enumerate(X):
        prompt = base_prompt + inject_sample(x)
        inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
        outputs = model.generate(
            **inputs,
            max_new_tokens=1,
            do_sample=False,
            eos_token_id=tokenizer.eos_token_id,
        )

        result = tokenizer.decode(outputs[0], skip_special_tokens=True)
        pred = result.split()[-1]
        pred = extract_label(pred, labels)

        y_true.append(y[idx])
        y_pred.append(pred)

    return y_pred, y_true


# Context window size experiment in ICL

In [7]:
def run_ctx_window_size_experiment(
    base_prompt: str,
    X: np.ndarray,
    y: np.ndarray,
    model_name: str,
    ctx_size: int = 2048,
) -> tuple[list, list]:
    """Run ICL classification experiment on a test set.

    This function:
    1. Loads the specified language model
    2. For each test sample, appends it to prompt and truncates to context window size
    3. Generates a prediction using the model
    4. Extracts the predicted label
    5. Collects all predictions and ground truth labels

    Args:
        base_prompt (str): Base prompt with instructions and examples
        X (np.ndarray): Array of text samples to classify
        y (np.ndarray): Array of true labels
        model_name (str): HuggingFace model identifier (e.g., 'facebook/opt-1.3b')
        ctx_size (int): Context window size to truncate prompts to

    Returns:
        tuple[list, list]: (predicted_labels, true_labels)

    """
    labels = list(np.unique(y))

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        dtype=torch.float16,
        device_map="auto",
    )

    y_true = []
    y_pred = []

    for idx, x in enumerate(X):
        prompt = base_prompt + (inject_sample(x) if idx == 0 else inject_sample_without_header(x))
        prompt = prompt[-ctx_size:]  # Truncate to context window size
        inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
        outputs = model.generate(
            **inputs,
            max_new_tokens=1,
            do_sample=False,
            eos_token_id=tokenizer.eos_token_id,
        )

        result = tokenizer.decode(outputs[0], skip_special_tokens=True)
        pred = result.split()[-1]
        pred = extract_label(pred, labels)

        y_true.append(y[idx])
        y_pred.append(pred)

    return y_pred, y_true


###  Evaluation Function

In [8]:
def evaluate(
    model: str,
    base_prompt: str,
    X,
    y,
    shots_n: int = 3,
    experiment_func=run_experiment,
    experiment_func_kwargs=None,
) -> tuple[float, float]:
    """Evaluate ICL performance with a specific number of examples (shots).

    Workflow:
    1. Load (samples + shots_n) data points
    2. Use first shots_n samples as examples in the prompt
    3. Use remaining samples as the test set
    4. Run classification and compute metrics

    Args:
        model (str): HuggingFace model name
        base_prompt (str): Base instruction prompt
        path (str): Path to dataset CSV
        encoding (str): File encoding
        samples (int): Number of test samples
        shots_n (int): Number of examples to include (0=zero-shot, 1=one-shot, etc.)
        experiment_func (callable): Function to run the experiment
        experiment_func_kwargs (dict): Additional kwargs for experiment_func

    Returns:
        tuple[float, float, float, float]: (accuracy, f1_score, precision, recall)

    """
    if experiment_func_kwargs is None:
        experiment_func_kwargs = {}

    if shots_n > 0:
        base_prompt += "### Examples\n"

    for i in range(shots_n):
        base_prompt += inject_example(X[i], y[i])

    X, y = X[shots_n:], y[shots_n:]

    y_pred, y_true = experiment_func(base_prompt, X, y, model, **experiment_func_kwargs)

    acc = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred, average="macro")
    precision = precision_score(y_true, y_pred, average="macro", zero_division=0)
    recall = recall_score(y_true, y_pred, average="macro", zero_division=0)

    return acc, f1, precision, recall, y_true, y_pred


# Eksperymnet ICL konfiguracja

In [None]:
prompt = """Classify the sentiment of the tweet.
Choose only one label: positive, negative, neutral.
Reply ONLY with the label. Do not explain.

"""
shots = [0, 1, 3, 20]
reps = 10
samples_n = 300
result_path = f"result_{round(time())}.csv"
model_name = "facebook/opt-2.7b"
safe_name = model_name.replace("/", "_")


df_prot = {
    "lp": [],
}
for shot in shots:
    df_prot[f"{shot}-shots f1"] = []
    df_prot[f"{shot}-shots acc"] = []
    df_prot[f"{shot}-shots precision"] = []
    df_prot[f"{shot}-shots recall"] = []


confusion_matrices = {shot: [] for shot in shots}

for i in range(reps):
    df_prot["lp"].append(i)
    for shot in shots:
        X, y = load_train_dataset(
            ds_path,
            "latin1",
            samples=samples_n + shot,
        )
        acc, f1, precision, recall, y_true, y_pred = evaluate(
            model_name,
            prompt,
            X,
            y,
            shots_n=shot,
            experiment_func=run_experiment,
        )
        df_prot[f"{shot}-shots f1"].append(f1)
        df_prot[f"{shot}-shots acc"].append(acc)
        df_prot[f"{shot}-shots precision"].append(precision)
        df_prot[f"{shot}-shots recall"].append(recall)

        cm = confusion_matrix(y_true, y_pred, labels=["positive", "negative", "neutral"])
        confusion_matrices[shot].append(cm)
    print(f"Completed repetition {i + 1}/{reps}")

df = pd.DataFrame(df_prot)
df.to_csv(result_path)

with open(f"../res/shots/results/confusion_matrices_ICL_{safe_name}_{round(time())}.pkl", "wb") as f:
    pickle.dump(confusion_matrices, f)


# Model size comparasion (with 3-shot)

In [None]:
prompt = """Classify the sentiment of the tweet.
Choose only one label: positive, negative, neutral.
Reply ONLY with the label. Do not explain.

"""
shots = 3
reps = 10
samples_n = 300
result_path = f"../res/models/result_{round(time())}.csv"

if not os.path.exists("../res/models/"):
    os.makedirs("../res/models/")

models = [
    "facebook/opt-125m",
    "facebook/opt-350m",
    "facebook/opt-1.3b",
    "facebook/opt-2.7b",
    # "facebook/opt-6.7b",   # Out of memory on 8GB GPU
]
safe_name = "model_size_clf"


df_prot = {
    "lp": [],
}
for model in models:
    df_prot[f"{model} f1"] = []
    df_prot[f"{model} acc"] = []
    df_prot[f"{model} precision"] = []
    df_prot[f"{model} recall"] = []


confusion_matrices = {model: [] for model in models}

with torch.no_grad():
    torch.cuda.empty_cache()

for i in range(reps):
    df_prot["lp"].append(i)
    X, y = load_train_dataset(
        ds_path, "latin1", samples=samples_n + shots,
    )
    for model in models:
        acc, f1, precision, recall, y_true, y_pred = evaluate(
            model,
            prompt,
            X,
            y,
            shots_n=shots,
            experiment_func=run_experiment,
        )
        df_prot[f"{model} f1"].append(f1)
        df_prot[f"{model} acc"].append(acc)
        df_prot[f"{model} precision"].append(precision)
        df_prot[f"{model} recall"].append(recall)

        cm = confusion_matrix(y_true, y_pred, labels=["positive", "negative", "neutral"])
        confusion_matrices[model].append(cm)

        # Clearn GPU memory after each model evaluation, CUDA memory leakage!
        print("Done model: ", model)
        with torch.no_grad():
            torch.cuda.empty_cache()

    print(f"Completed repetition {i + 1}/{reps}")

df = pd.DataFrame(df_prot)
df.to_csv(result_path)

with open(f"../res/models/confusion_matrices_ICL_{safe_name}_{round(time())}.pkl", "wb") as f:
    pickle.dump(confusion_matrices, f)


# CTX window size test

In [10]:
prompt = """Classify the sentiment of the tweet.
Choose only one label: positive, negative, neutral.
Reply ONLY with the label. Do not explain.

"""
shots = 3
reps = 10
samples_n = 500
result_path = f"../res/ctx_windows/result_{round(time())}.csv"
model_name = "facebook/opt-2.7b"
ctx_windows_sizes = [128, 256, 512, 1024, 2048]
safe_name = model_name.replace("/", "_")


df_prot = {
    "lp": [],
}
for ctx_window_size in ctx_windows_sizes:
    df_prot[f"{ctx_window_size} f1"] = []
    df_prot[f"{ctx_window_size} acc"] = []
    df_prot[f"{ctx_window_size} precision"] = []
    df_prot[f"{ctx_window_size} recall"] = []

confusion_matrices = {ctx_window_size: [] for ctx_window_size in ctx_windows_sizes}

for i in range(reps):
    df_prot["lp"].append(i)
    X, y = load_train_dataset(
            ds_path,
            "latin1",
            samples=samples_n + shots,
        )
    for ctx_window_size in ctx_windows_sizes:
        acc, f1, precision, recall, y_true, y_pred = evaluate(
            model_name,
            prompt,
            X,
            y,
            shots_n=shots,
            experiment_func=run_ctx_window_size_experiment,
            experiment_func_kwargs={"ctx_size": ctx_window_size},
        )
        df_prot[f"{ctx_window_size} f1"].append(f1)
        df_prot[f"{ctx_window_size} acc"].append(acc)
        df_prot[f"{ctx_window_size} precision"].append(precision)
        df_prot[f"{ctx_window_size} recall"].append(recall)

        cm = confusion_matrix(y_true, y_pred, labels=["positive", "negative", "neutral"])
        confusion_matrices[ctx_window_size].append(cm)
    print(f"Completed repetition {i + 1}/{reps}")

df = pd.DataFrame(df_prot)
df.to_csv(result_path)

with open(f"../res/ctx_windows/confusion_matrices_ICL_{safe_name}_{round(time())}.pkl", "wb") as f:
    pickle.dump(confusion_matrices, f)


Completed repetition 1/10
Completed repetition 2/10
Completed repetition 2/10
Completed repetition 3/10
Completed repetition 3/10
Completed repetition 4/10
Completed repetition 4/10
Completed repetition 5/10
Completed repetition 5/10
Completed repetition 6/10
Completed repetition 6/10
Completed repetition 7/10
Completed repetition 7/10
Completed repetition 8/10
Completed repetition 8/10
Completed repetition 9/10
Completed repetition 9/10
Completed repetition 10/10
Completed repetition 10/10
