## **3. Dobór hiperparametrów**

#### **Import Bibliotek**

In [None]:
import torch
import torch.nn as nn
import numpy as np
import optuna
from pathlib import Path
from typing import Dict

from datasets import load_from_disk, Dataset
from sklearn.metrics import f1_score
from transformers import (
    LongformerForSequenceClassification,
    LongformerConfig,
    TrainingArguments,
    Trainer,
    EvalPrediction,
    EarlyStoppingCallback,
)

#### **Konfiguracja Stałych**

In [None]:
TOKENIZED_DATA_PATH = "data/data_tokenized"
MODEL_NAME = "sdadas/polish-longformer-base-4096"
NUM_LABELS = 6

TRAIN_SUBSET_FRACTION = 0.3  # Używamy 30% danych do szybkiego tuningu
TUNE_SPLIT_TEST_SIZE = 0.2   # Podział podzbioru na 80% treningu / 20% walidacji
MAX_EPOCHS_PER_TRIAL = 8     # Maksymalna długość pojedynczej próby
N_TRIALS  = 20     # Docelowa liczba prób do przeprowadzenia

# Parametry ustawione na podstawie wiedzy domenowej i standardowej implementacji 
FOCAL_ALPHA = 0.5
FOCAL_GAMMA = 2.0

OPTUNA_STORAGE_DB = "sqlite:///optuna_study.db"
OPTUNA_STUDY_NAME = "esg-longformer-study-v2"
TRIALS_CHECKPOINT_DIR = "optuna_model_checkpoints"

#### **Focal Loss**

In [None]:
class FocalLoss(nn.Module):
    def __init__(self, alpha, gamma, pos_weight=None):
        super().__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.pos_weight = pos_weight
    
    def forward(self, inputs, targets):
        bce_loss = nn.functional.binary_cross_entropy_with_logits(
            inputs, targets, reduction='none', pos_weight=self.pos_weight
        )
        pt = torch.exp(-bce_loss)
        focal_loss = self.alpha * (1 - pt)**self.gamma * bce_loss
        return focal_loss.mean()

#### **Klasa ESGTrainer**

In [None]:
class ESGTrainer(Trainer):
    def __init__(self, *args, focal_loss_alpha=0.5, focal_loss_gamma=2.0, class_weights=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights.to(self.args.device) if class_weights is not None else None
        self.loss_fct = FocalLoss(alpha=focal_loss_alpha, gamma=focal_loss_gamma, pos_weight=self.class_weights)

    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")
        loss = self.loss_fct(logits, labels.float())
        return (loss, outputs) if return_outputs else loss

#### **Obliczanie wag**

In [None]:
def calculate_class_weights(dataset: Dataset) -> torch.Tensor:
    labels = np.array(dataset['labels'])
    pos_counts = np.sum(labels, axis=0)
    total_samples = len(labels)
    weights = [total_samples / (2 * count + 1e-6) if count > 0 else 1.0 for count in pos_counts]
    return torch.tensor(weights, dtype=torch.float)

#### **Inicjalizacja modelu**

In [None]:
def model_init(trial):
    model_config = LongformerConfig.from_pretrained(MODEL_NAME, num_labels=NUM_LABELS, problem_type="multi_label_classification")
    return LongformerForSequenceClassification.from_pretrained(MODEL_NAME, config=model_config)

#### **Krok 1: Przygotowanie danych do optymalizacji**

In [None]:
full_dataset = load_from_disk(TOKENIZED_DATA_PATH)

subset_size = int(len(full_dataset["train"]) * TRAIN_SUBSET_FRACTION)
data_subset = full_dataset["train"].shuffle(seed=42).select(range(subset_size))
split_data = data_subset.train_test_split(test_size=TUNE_SPLIT_TEST_SIZE, seed=42)

tune_train_dataset = split_data["train"]
tune_val_dataset = split_data["test"]
class_weights = calculate_class_weights(full_dataset['train'])

#### **Krok 2: Uruchomienie optymalizacji**

In [None]:
def hp_space_optuna(trial: optuna.Trial) -> Dict[str, float]:
    return {
        "learning_rate": trial.suggest_float("learning_rate", 1e-5, 5e-5, log=True),
        "weight_decay": trial.suggest_float("weight_decay", 0.01, 0.2, log=True),
    }

training_args = TrainingArguments(
    output_dir="./optuna_temp",
    evaluation_strategy="steps",
    eval_steps=100,
    save_strategy="steps",
    save_steps=100,
    per_device_train_batch_size=1,
    gradient_accumulation_steps=8,
    num_train_epochs=MAX_EPOCHS_PER_TRIAL,
    fp16=True,
    load_best_model_at_end=True,
    metric_for_best_model="eval_f1_macro",
    greater_is_better=True,
    save_total_limit=1,
    report_to="none",
    seed=42,
)

trainer = ESGTrainer(
    args=training_args,
    model_init=model_init,
    train_dataset=tune_train_dataset,
    eval_dataset=tune_val_dataset,
    compute_metrics=lambda p: {'eval_f1_macro': f1_score(p.label_ids, (1 / (1 + np.exp(-p.predictions)) > 0.5).astype(int), average='macro', zero_division=0)},
    class_weights=class_weights,
    focal_loss_alpha=FOCAL_ALPHA,
    focal_loss_gamma=FOCAL_GAMMA,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)

best_run = trainer.hyperparameter_search(
    hp_space=hp_space_optuna,
    backend="optuna",
    n_trials=N_TRIALS,
    direction="maximize",
    compute_objective=lambda metrics: metrics["eval_f1_macro"],
)

#### **Najlepsza zarejestrowana próba:**

In [None]:
'''
[1216/1216 2:25:23, Epoch 8/8]
Step	Training Loss	  Validation Loss	  F1 Macro
100	  No log	        0.102660	        0.592207
200	  No log	        0.083195	        0.662340
300	  No log	        0.065447	        0.755528
400	  No log	        0.052853	        0.806245
500	  0.076500	      0.047519	        0.834683
600	  0.076500	      0.040097	        0.861286
700	  0.076500	      0.034386	        0.884033
800	  0.076500	      0.033461	        0.890963
900	  0.076500	      0.032104	        0.894759
1000	0.025600	      0.029985	        0.901850
1100	0.025600	      0.028734	        0.906202
1200	0.025600	      0.028170	        0.904163

[I 2025-08-26 23:57:19,542] Trial 0 finished with value: 0.9041625781021257 and parameters: {'learning_rate': 2.2106899831932417e-05, 'weight_decay': 0.15910997111952396}.
'''