In [None]:
import numpy as np
import pandas as pd
import os
from tqdm import tqdm
import bitsandbytes as bnb
import torch
import torch.nn as nn

from datasets import Dataset, DatasetDict
from peft import LoraConfig
from trl import SFTTrainer, SFTConfig
from transformers import (AutoModelForCausalLM, 
                          AutoTokenizer, 
                          BitsAndBytesConfig, 
                          TrainingArguments, 
                          pipeline, 
                          logging)
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import train_test_split

from huggingface_hub import create_repo, upload_folder, notebook_login, login
from utils_dl import set_global_seed
from sklearn.model_selection import GroupShuffleSplit

from sklearn.model_selection import StratifiedGroupKFold

torch.cuda.empty_cache()

In [None]:
SEED=42
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
SUBTASK_2_PATH = "new_data\subtask2"

set_global_seed(SEED)

print("CUDA available:", torch.cuda.is_available())
print("Using GPU:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU detected")
print("Pytorch version:",torch.__version__)

CUDA available: True
Using GPU: NVIDIA RTX A6000
Pytorch version: 2.4.1+cu124


In [None]:
# Log into HF account
notebook_login() 

In [None]:
# Configuration parameters
language = 'spa'
model_type = 'dl'
stemming = False
lemmatization = False
remove_duplicates = False
cased = True 
description = False

desc = ''
if description:
    desc = '_with_description'

data_config = f"lang_{language}_model_{model_type}_stem_{stemming}_lem_{lemmatization}_dup_{remove_duplicates}_cased_{cased}{desc}"
file_name = 'subtask2_all_aug' 
db_file_name = f"{file_name}_{data_config}.csv"
file_path = os.path.join(SUBTASK_2_PATH, db_file_name)

if os.path.exists(file_path):
    full_data = pd.read_csv(file_path, encoding='utf-8')
    # print("File found:")
    # print(full_data.info())
else:
    raise FileNotFoundError(f"File not found at {file_path}")

# New DF without augmented rows
original_data = full_data[full_data['is_augmented'] != True].copy() 

In [None]:
cond_aug_nr = (full_data['is_augmented'] == True) & (full_data['label'] == 'NR')
cond_aug_s = (full_data['is_augmented'] == True) & (full_data['label'] == 'S')

full_data = full_data[~(cond_aug_nr | cond_aug_s)] 

In [None]:
text_column  = "lyrics_clean"
label_column = "label"
group_column = "id"             # all augmented variants share this id
aug_col      = "is_augmented"   # bool
final_training = False 
val_split_size = 0.1 if final_training else 0.2

# Map labels to integers if needed
if full_data[label_column].dtype == object:
    unique_labels = sorted(full_data[label_column].unique())
    label2id      = {lbl: idx for idx, lbl in enumerate(unique_labels)}
else:
    unique_labels = sorted(full_data[label_column].unique())
    label2id      = {int(lbl): int(lbl) for lbl in unique_labels}

id2label = {v: k for k, v in label2id.items()}
full_data[label_column] = full_data[label_column].map(label2id)
original_data[label_column] = original_data[label_column].map(label2id)

# Shuffle before splitting
full_data = full_data.sample(frac=1, random_state=SEED).reset_index(drop=True)
original_data = original_data.sample(frac=1, random_state=SEED).reset_index(drop=True)

n_splits = int(1 / val_split_size)
sgkf = StratifiedGroupKFold(
    n_splits=n_splits,
    shuffle=True,
    random_state=SEED
)
train_val_idx, test_idx = next(
    sgkf.split(
        original_data,
        original_data[label_column],   # stratify on true labels
        original_data[group_column]    # keep groups intact
    )
)


train_val_df = original_data.iloc[train_val_idx].reset_index(drop=True)
test_df      = original_data.iloc[test_idx].reset_index(drop=True)

if not final_training:

    sgkf_val = StratifiedGroupKFold(
        n_splits=8, 
        shuffle=True,
        random_state=SEED
    )
    trn_idx, val_idx = next(
        sgkf_val.split(
            train_val_df,
            train_val_df[label_column],
            train_val_df[group_column]
        )
    )

    train_df = train_val_df.iloc[trn_idx].reset_index(drop=True)
    val_df   = train_val_df.iloc[val_idx].reset_index(drop=True)
    # print(f"train={len(train_df)},  val={len(val_df)},  test={len(test_df)}")

else:
    print('Dataset for final model training')
    train_df = train_val_df.copy()
    val_df   = test_df.copy()
    # print(f"train={len(train_df)},  val={len(val_df)}")

Now, only training set contains augmented data:

In [None]:
train_df = pd.merge(train_df[['id']], full_data, on = "id", how = "inner").copy()
train_df = train_df.sample(frac=1, random_state=SEED).reset_index(drop=True)

In [None]:
# **Train / Val / Test**
save_datasets = True
if not final_training: 

    train_dataset = Dataset.from_pandas(train_df.reset_index(drop=True))
    val_dataset  = Dataset.from_pandas(val_df.reset_index(drop=True))
    test_dataset  = Dataset.from_pandas(test_df.reset_index(drop=True))
    
    ds = DatasetDict({
        'train': train_dataset,
        'val': val_dataset,
        'test': test_dataset
    })
    if save_datasets:
        splits_folder_path = f"{file_name}_{data_config}"
        if not os.path.exists(splits_folder_path):
            os.makedirs(splits_folder_path)
            
        train_df.to_csv(os.path.join(splits_folder_path, 'train.csv'), index=False, encoding='utf-8')
        unique_ids_df = pd.DataFrame(train_df['id'].unique(), columns=['id'])
        unique_ids_df.to_csv(os.path.join(splits_folder_path, 'train_ids.csv'), index=False, encoding='utf-8')
        
        val_df.to_csv(os.path.join(splits_folder_path, 'val.csv'), index=False, encoding='utf-8')
        unique_ids_df = pd.DataFrame(val_df['id'].unique(), columns=['id'])
        unique_ids_df.to_csv(os.path.join(splits_folder_path, 'val_ids.csv'), index=False, encoding='utf-8')
        
        test_df.to_csv(os.path.join(splits_folder_path, 'test.csv'), index=False, encoding='utf-8')
        unique_ids_df = pd.DataFrame(test_df['id'].unique(), columns=['id'])
        unique_ids_df.to_csv(os.path.join(splits_folder_path, 'test_ids.csv'), index=False, encoding='utf-8')
    
    
else: # **Train / Val**
    
    train_dataset = Dataset.from_pandas(train_df)
    val_dataset  = Dataset.from_pandas(val_df)
    
    ds = DatasetDict({
        'train': train_dataset,
        'val': val_dataset
    })
    
    if save_datasets:
        splits_folder_path = f"{file_name}_{data_config}_competition"
        if not os.path.exists(splits_folder_path):
            os.makedirs(splits_folder_path)
            
        train_df.to_csv(os.path.join(splits_folder_path, 'train_competition.csv'), index=False, encoding='utf-8')
        unique_ids_df = pd.DataFrame(train_df['id'].unique(), columns=['id'])
        unique_ids_df.to_csv(os.path.join(splits_folder_path, 'train_competition_ids.csv'), index=False, encoding='utf-8')
        
        val_df.to_csv(os.path.join(splits_folder_path, 'val_competition.csv'), index=False, encoding='utf-8')
        unique_ids_df = pd.DataFrame(val_df['id'].unique(), columns=['id'])
        unique_ids_df.to_csv(os.path.join(splits_folder_path, 'val_competition_ids.csv'), index=False, encoding='utf-8')


## Instruction Dataset Construction

**Template**

In [None]:
PROMPT = """\
### Instruction: 
Classify the misogyny subtype of the following lyric.
Categories:
- S: describe or suggest sexual acts, sexual language, or insinuations
- V: physical or verbal aggression, threats, or violent actions
- H: offensive or discriminatory language, expressions of contempt, or hostility towards a group or individual
- NR: none of the above
The correct answer is one label from: S, V, H, NR

### Input:
{lyrics_clean}

### Response:
{label}"""

In [None]:
BASE_MODEL = 'meta-llama/Llama-3.1-8B'
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=False,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype="float16",
)

model = AutoModelForCausalLM.from_pretrained(
    BASE_MODEL,
    device_map="auto",
    torch_dtype="float16",
    quantization_config=bnb_config, 
)

model.config.use_cache = False
model.config.pretraining_tp = 1

tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL)

tokenizer.pad_token_id = tokenizer.eos_token_id

In [None]:

def build_prompt_train(examples):
    texts = []
    for lyric, lbl in zip(examples["lyrics_clean"], examples["label"]): 
        texts.append(
            PROMPT.format(
                lyrics_clean=lyric, 
                label=id2label[int(lbl)],
                
            ) 
        )
    return {"text": texts}

def build_prompt_val(examples):
    texts = []
    for lyric, lbl in zip(examples["lyrics_clean"], examples["label"]):

        texts.append(
            PROMPT.format(
                lyrics_clean=lyric, 
                label="",
            ) 
        )
    return {"text": texts}

# Map that function over all splits, in batched mode
train_data = ds['train'].map(
    build_prompt_train,
    batched=True,
    remove_columns=[
      "id", "lyrics", 
      "augmentation_type", 
        "is_augmented"
    ]
)

eval_data = ds['val'].map(
    build_prompt_val,
    batched=True,
    remove_columns=[
      "id", "lyrics", 
      "augmentation_type", 
        "is_augmented"
    ]
)

test_data = ds['test'].map(
    build_prompt_val,
    batched=True,
    remove_columns=[
      "id", "lyrics", 
      "augmentation_type", 
        "is_augmented"
    ]
)

## Model Learning (Instruction Tuning)

In [None]:
def predict(dataset, model, tokenizer):
    y_pred = []

    # Use pipeline once outside the loop for efficiency
    pipe = pipeline(
        task="text-generation",
        model=model,
        tokenizer=tokenizer,
        max_new_tokens=2,
        temperature=0.1,
    )

    for example in tqdm(dataset):
        prompt = example["text"]
        result = pipe(prompt)
        answer = result[0]["generated_text"].split("### Response:")[-1].strip()
        # Match prediction to valid category
        for category in unique_labels:
            if category.lower() in answer.lower():
                y_pred.append(category)
                break
        else:
            y_pred.append("none")  # fallback

    return y_pred

# y_pred = predict(eval_data, model, tokenizer)

In [None]:
def evaluate(y_true, y_pred, label_list):
    # Create label to index mapping
    labels = label_list
    mapping = {label: idx for idx, label in enumerate(labels)}
    
    def map_func(x):
        return mapping.get(x, -1)

    y_true_mapped = np.vectorize(map_func)(y_true)
    y_pred_mapped = np.vectorize(map_func)(y_pred)

    # check
    assert -1 not in y_true_mapped, "Unknown label in y_true"
    assert -1 not in y_pred_mapped, "Unknown label in y_pred"

    # Overall accuracy
    accuracy = accuracy_score(y_true=y_true_mapped, y_pred=y_pred_mapped)
    print(f'\nOverall Accuracy: {accuracy:.3f}\n')

    # Per-label accuracy
    for idx, label in enumerate(labels):
        label_indices = [i for i in range(len(y_true_mapped)) if y_true_mapped[i] == idx]
        if not label_indices:
            print(f'No samples for label {label}')
            continue
        label_y_true = [y_true_mapped[i] for i in label_indices]
        label_y_pred = [y_pred_mapped[i] for i in label_indices]
        label_accuracy = accuracy_score(label_y_true, label_y_pred)
        print(f'Accuracy for label {label}: {label_accuracy:.3f}')

    # Classification report
    print('\nClassification Report:')
    print(classification_report(y_true_mapped, y_pred_mapped, target_names=labels))

    # Confusion matrix
    print('\nConfusion Matrix:')
    print(confusion_matrix(y_true_mapped, y_pred_mapped))


In [None]:
def find_all_linear_names(model):
    cls = bnb.nn.Linear4bit
    lora_module_names = set()
    for name, module in model.named_modules():
        if isinstance(module, cls):
            names = name.split('.')
            lora_module_names.add(names[0] if len(names) == 1 else names[-1])
    if 'lm_head' in lora_module_names:  # needed for 16 bit
        lora_module_names.remove('lm_head')
    return list(lora_module_names)
modules = find_all_linear_names(model)

In [None]:
output_dir="llama-3.1-fine-tuned-model"

peft_config = LoraConfig(
    lora_alpha=16,
    lora_dropout=0, 
    r=64,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=modules,
)

training_arguments = SFTConfig(
    output_dir=output_dir,                    # directory to save and repository id
    num_train_epochs=3,                       # number of training epochs
    per_device_train_batch_size=2,            # batch size per device during training
    gradient_accumulation_steps=8,            # number of steps before performing a backward/update pass
    gradient_checkpointing=True,              # use gradient checkpointing to save memory
    optim="paged_adamw_32bit",
    logging_steps=1,                         
    learning_rate=2e-4,                       # learning rate, based on QLoRA paper
    weight_decay=0.001,
    fp16=True,
    bf16=False,
    max_grad_norm=0.3,                        # max gradient norm based on QLoRA paper
    max_steps=-1,
    warmup_ratio=0.03,                        # warmup ratio based on QLoRA paper
    group_by_length=False,
    lr_scheduler_type="cosine",               # use cosine learning rate scheduler
    report_to="none",                         # report metrics to w&b
    eval_strategy="steps",              # save checkpoint every epoch
    eval_steps=0.2,
    dataset_text_field="text",
    max_seq_length=2048,
    packing=False,
    
    dataset_kwargs={
        "add_special_tokens": False,
        "append_concat_token": False,
    },
)

trainer = SFTTrainer(
    model=model,
    args=training_arguments,
    train_dataset=train_data,
    eval_dataset=eval_data,
    peft_config=peft_config,
)

trainer.train()

In [None]:
model.config.use_cache = True
y_true = [id2label[i] for i in test_data["label"]]
y_pred = predict(test_data, model, tokenizer)
y_pred = list(map(lambda x: 'NR' if x == 'none' else x, y_pred))
evaluate(y_true, y_pred, label_list=unique_labels)