In [None]:
#pip install transformers datasets optuna scikit-learn matplotlib torch

In [None]:
import os
import json
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    Trainer,
    TrainingArguments,
    AutoConfig
)
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import optuna
from transformers import TrainerCallback
import matplotlib.pyplot as plt

In [None]:
class SentimentDataset(Dataset):
    """
    A custom PyTorch Dataset class to handle tokenized inputs and their corresponding labels.

    Attributes:
        encodings (dict): Tokenized input data.
        labels (list): List of labels corresponding to the input data.
    """
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        """
        Retrieves a single data sample by index, including inputs and label.

        Args:
            idx (int): Index of the data sample to retrieve.

        Returns:
            dict: A dictionary containing tokenized input tensors and label tensor.
        """
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        """
        Returns the total number of samples in the dataset.

        Returns:
            int: The number of samples in the dataset.
        """
        return len(self.labels)


class EarlyStoppingCallbackCustom(TrainerCallback):
    def __init__(self, patience=2):
        self.patience = patience
        self.best_loss = None
        self.epochs_no_improve = 0

    def on_evaluate(self, args, state, control, metrics=None, **kwargs):
        if metrics is None:
            return

        eval_loss = metrics.get("eval_loss")
        if eval_loss is None:
            return

        if self.best_loss is None or eval_loss < self.best_loss:
            self.best_loss = eval_loss
            self.epochs_no_improve = 0
            control.should_save = True  # Speichere das Modell, wenn es besser ist
        else:
            self.epochs_no_improve += 1
            if self.epochs_no_improve >= self.patience:
                print(f"Validation loss has not improved for {self.patience} evaluations. Stopping training.")
                control.should_training_stop = True


# Custom TrainingArguments erstellen
class CustomTrainingArguments(TrainingArguments):
    def __init__(self, *args, optimizer=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.optimizer = optimizer

# Custom Trainer erstellen
class CustomTrainer(Trainer):
    def create_optimizer(self):
        optimizer_name = self.args.optimizer
        if optimizer_name == "adamw":
            optimizer_cls = torch.optim.AdamW
            optimizer_kwargs = {
                "lr": self.args.learning_rate,
                "weight_decay": self.args.weight_decay,
            }
        elif optimizer_name == "adafactor":
            from transformers.optimization import Adafactor
            optimizer_cls = Adafactor
            optimizer_kwargs = {
                "lr": self.args.learning_rate,
                "weight_decay": self.args.weight_decay,
                "scale_parameter": False,
                "relative_step": False,
            }
        else:
            raise ValueError(f"Unknown optimizer: {optimizer_name}")
        self.optimizer = optimizer_cls(self.model.parameters(), **optimizer_kwargs)
        return self.optimizer

In [None]:
class Maissen:
    def __init__(self, model_names, use_drive=False, hpo_n_trials=17):
        self.model_names = model_names
        self.data_path = "/content/drive/MyDrive/MAS DataScience/CAS_ML/training_data.csv" if use_drive else "training_data.csv"
        self.save_base_dir = "/content/drive/MyDrive/MAS DataScience/CAS_ML/saved_models" if use_drive else "./saved_models"
        self.use_drive = use_drive
        self.hpo_n_trials = hpo_n_trials
        self.tokenizer = None
        self.model = None
        self.training_args = None
        self.trainer = None

        # Conditionally mount Google Drive
        if self.use_drive:
            from google.colab import drive
            drive.mount('/content/drive')

        # Adjust paths if using Google Drive
        if self.use_drive:
            self.data_path = "/content/drive/MyDrive/MAS DataScience/CAS_ML/training_data.csv"
            self.save_base_dir = "/content/drive/MyDrive/MAS DataScience/CAS_ML/saved_models"
        else:
            self.data_path = "training_data.csv"
            self.save_base_dir = "./saved_models"

    def load_data(self):
        df = pd.read_csv(self.data_path)
        df = df[['relevant_sentence', 'label']]
        label_map = {'negativ': 0, 'neutral': 1, 'positiv': 2}
        df['label'] = df['label'].map(label_map)
        train_texts, val_texts, train_labels, val_labels = train_test_split(
            df['relevant_sentence'].tolist(),
            df['label'].tolist(),
            test_size=0.2,
            random_state=42,
            stratify=df['label']
        )
        return train_texts, val_texts, train_labels, val_labels

    def prepare_dataset(self, texts, labels):
        encodings = self.tokenizer(texts, truncation=True, padding=True, max_length=512)
        return SentimentDataset(encodings, labels)

    def model_init(self, model_name):
        model_config = AutoConfig.from_pretrained(model_name, num_labels=3)
        self.model = AutoModelForSequenceClassification.from_pretrained(
            model_name, config=model_config, ignore_mismatched_sizes=True
        )
        return self.model

    def set_training_arguments(self, output_dir='./results'):  # Adjust arguments as needed
        self.training_args = CustomTrainingArguments(
            output_dir=output_dir,
            eval_strategy='epoch',
            save_strategy='epoch',
            logging_dir='./logs',
            load_best_model_at_end=True,
            metric_for_best_model='eval_loss',
            greater_is_better=False,
            save_total_limit=1,
            logging_steps=10
        )

    def perform_hyperparameter_search(self, model_name, train_dataset, val_dataset):
        def optuna_hp_space(trial):
            return {
                "learning_rate": trial.suggest_float("learning_rate", 1e-6, 1e-4, log=True),
                "per_device_train_batch_size": trial.suggest_categorical("per_device_train_batch_size", [8, 16, 32]),
                "num_train_epochs": trial.suggest_categorical("num_train_epochs", [2, 5, 10]),
                "weight_decay": trial.suggest_categorical("weight_decay", [0, 1e-6, 1e-5, 1e-4, 1e-3, 1e-2]),
                "warmup_steps": trial.suggest_int("warmup_steps", 0, 300),
                "optimizer": trial.suggest_categorical("optimizer", ["adamw", "adafactor"]),
            }

        self.trainer = CustomTrainer(
            model_init=lambda: self.model_init(model_name),
            args=self.training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            compute_metrics=self.compute_metrics,
            callbacks=[EarlyStoppingCallbackCustom()],
        )

        best_run = self.trainer.hyperparameter_search(
            direction="minimize",
            backend="optuna",
            hp_space=optuna_hp_space,
            n_trials=self.hpo_n_trials,
            compute_objective=lambda metrics: metrics["eval_loss"],
        )

        # Set best hyperparameters
        for n, v in best_run.hyperparameters.items():
            setattr(self.trainer.args, n, v)

        # Update optimizer using the best hyperparameters
        self.trainer.create_optimizer()

        return best_run

    def compute_metrics(self, eval_pred):
        logits, labels = eval_pred
        predictions = np.argmax(logits, axis=-1)
        precision, recall, f1, _ = precision_recall_fscore_support(
            labels, predictions, average='weighted', zero_division=1
        )
        acc = accuracy_score(labels, predictions)
        return {
            'accuracy': acc,
            'f1': f1,
            'precision': precision,
            'recall': recall
        }

    def train_model(self, train_dataset, val_dataset):
        if not self.trainer:
            self.trainer = CustomTrainer(
                model=self.model,
                args=self.training_args,
                train_dataset=train_dataset,
                eval_dataset=val_dataset,
                compute_metrics=self.compute_metrics,
                callbacks=[EarlyStoppingCallbackCustom()]
            )
        self.trainer.train()

    def run(self):
        train_texts, val_texts, train_labels, val_labels = self.load_data()
        results_list = []
        os.makedirs(self.save_base_dir, exist_ok=True)

        for model_name in self.model_names:
            print(f"\n===== Starte HPO für Modell: {model_name} =====")

            # Load tokenizer
            try:
                self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            except Exception as e:
                print(f"Fehler beim Laden des Tokenizers für {model_name}: {e}")
                continue

            # Prepare datasets
            try:
                train_dataset = self.prepare_dataset(train_texts, train_labels)
                val_dataset = self.prepare_dataset(val_texts, val_labels)
            except Exception as e:
                print(f"Fehler beim Erstellen der Datensätze für {model_name}: {e}")
                continue

            # Perform hyperparameter optimization
            try:
                self.set_training_arguments(output_dir=f'./results/{model_name.replace("/", "_")}')
                best_run = self.perform_hyperparameter_search(model_name, train_dataset, val_dataset)
            except Exception as e:
                print(f"Fehler bei der Hyperparameter-Optimierung für {model_name}: {e}")
                continue

            # Save best parameters and model
            result = {
                'model_name': model_name,
                'best_params': best_run.hyperparameters,
                'best_accuracy': best_run.objective
            }
            results_list.append(result)

            print(f"Beste Hyperparameter für {model_name}: {best_run.hyperparameters}")
            print(f"Beste Validierungsgenauigkeit: {best_run.objective:.4f}")

            model_save_path = os.path.join(self.save_base_dir, model_name.replace('/', '_'))
            try:
                self.trainer.save_model(model_save_path)
                self.tokenizer.save_pretrained(model_save_path)
                hyperparams_path = os.path.join(model_save_path, 'best_hyperparams.json')
                with open(hyperparams_path, 'w') as f:
                    json.dump({
                        'best_params': best_run.hyperparameters,
                        'best_accuracy': best_run.objective
                    }, f, indent=4)
                print(f"Modell und Hyperparameter für {model_name} gespeichert unter: {model_save_path}")
            except Exception as e:
                print(f"Fehler beim Speichern des Modells für {model_name}: {e}")

        # Save results summary
        results_df = pd.DataFrame(results_list)
        results_csv_path = os.path.join(self.save_base_dir, 'optuna_hpo_summary.csv')
        results_df.to_csv(results_csv_path, index=False)
        print(f"Hyperparameter-Optimierungsergebnisse gespeichert unter: {results_csv_path}")

    def generate_learning_curves(self):
        saved_models_dir = self.save_base_dir  # Use the directory where models are saved
        train_sizes = np.linspace(0.1, 1.0, 5)  # Training sizes from 10% to 100%
        learning_curves = {}

        # List of saved model directories
        model_dirs = [
            os.path.join(saved_models_dir, d)
            for d in os.listdir(saved_models_dir)
            if os.path.isdir(os.path.join(saved_models_dir, d))
        ]

        # Load validation data
        _, val_texts, _, val_labels = self.load_data()

        for model_dir in model_dirs:
            model_name = os.path.basename(model_dir).replace('_', '/')
            print(f"\n===== Verarbeite Modell: {model_name} =====")

            # Load the best hyperparameters and accuracy
            hyperparams_path = os.path.join(model_dir, 'best_hyperparams.json')
            try:
                with open(hyperparams_path, 'r') as f:
                    hyperparams_data = json.load(f)
                best_params = hyperparams_data['best_params']
                best_accuracy = hyperparams_data['best_accuracy']
            except Exception as e:
                print(f"Fehler beim Laden der Hyperparameter für {model_name}: {e}")
                continue

            # Load tokenizer and model
            try:
                self.tokenizer = AutoTokenizer.from_pretrained(model_dir)
                self.model = AutoModelForSequenceClassification.from_pretrained(model_dir)
            except Exception as e:
                print(f"Fehler beim Laden des Modells oder Tokenizers für {model_name}: {e}")
                continue

            # Prepare validation dataset
            try:
                val_dataset = self.prepare_dataset(val_texts, val_labels)
            except Exception as e:
                print(f"Fehler bei der Vorbereitung des Validierungsdatensatzes für {model_name}: {e}")
                continue

            # Prepare training dataset
            try:
                train_texts, _, train_labels, _ = self.load_data()
                train_encodings_full = self.tokenizer(train_texts, truncation=True, padding=True, max_length=512)
                full_train_dataset = SentimentDataset(train_encodings_full, train_labels)
            except Exception as e:
                print(f"Fehler bei der Tokenisierung der Trainingsdaten für {model_name}: {e}")
                continue

            # Initialize learning curve data for the model
            learning_curves[model_name] = {
                'train_sizes': [],
                'train_scores': [],
                'val_scores': []
            }

            for size_fraction in train_sizes:
                subset_size = int(len(full_train_dataset) * size_fraction)
                if subset_size < 1:
                    subset_size = 1  # Ensure at least one example is used

                print(f"  Trainingsgröße: {subset_size} ({size_fraction*100:.0f}%)")

                # Create a subset of the training dataset
                indices = np.random.choice(len(full_train_dataset), subset_size, replace=False)
                train_subset = torch.utils.data.Subset(full_train_dataset, indices)

                # Define training arguments
                training_args = CustomTrainingArguments(
                    output_dir='./temp_trainer',
                    eval_strategy='no',
                    save_strategy='no',
                    logging_dir='./logs',
                    logging_steps=10,
                    per_device_train_batch_size=best_params['per_device_train_batch_size'],
                    learning_rate=best_params['learning_rate'],
                    num_train_epochs=best_params['num_train_epochs'],
                    weight_decay=best_params['weight_decay'],
                    warmup_steps=best_params['warmup_steps'],
                    disable_tqdm=True,  # Avoid excessive output
                    optimizer=best_params['optimizer'],
                )

                # Define trainer
                trainer = CustomTrainer(
                    model=self.model,
                    args=training_args,
                    train_dataset=train_subset,
                    eval_dataset=val_dataset,
                    compute_metrics=self.compute_metrics
                )

                # Train the model
                try:
                    trainer.train()
                except Exception as e:
                    print(f"Fehler beim Training für {model_name} mit Größe {subset_size}: {e}")
                    continue

                # Evaluate on training subset
                try:
                    train_results = trainer.evaluate(eval_dataset=train_subset)
                    train_acc = train_results.get('eval_accuracy', 0)
                except Exception as e:
                    print(f"Fehler bei der Evaluierung des Trainingssets für {model_name}: {e}")
                    train_acc = 0

                # Evaluate on validation set
                try:
                    val_results = trainer.evaluate(eval_dataset=val_dataset)
                    val_acc = val_results.get('eval_accuracy', 0)
                except Exception as e:
                    print(f"Fehler bei der Evaluierung des Validierungssets für {model_name}: {e}")
                    val_acc = 0

                # Store results
                learning_curves[model_name]['train_sizes'].append(subset_size)
                learning_curves[model_name]['train_scores'].append(train_acc)
                learning_curves[model_name]['val_scores'].append(val_acc)

                # Optionally: Clean up temporary training directories
                try:
                    import shutil
                    shutil.rmtree('./temp_trainer')
                    shutil.rmtree('./logs')
                except:
                    pass

            # Clear model from memory to conserve resources
            del self.model
            torch.cuda.empty_cache()

        # Plot learning curves for each model
        for model_name, curves in learning_curves.items():
            plt.figure(figsize=(8, 6))

            train_sizes = curves['train_sizes']
            train_scores = curves['train_scores']
            val_scores = curves['val_scores']

            plt.plot(train_sizes, train_scores, 'o-', label='Training Score')
            plt.plot(train_sizes, val_scores, 's-', label='Validation Score')

            plt.xlabel('Trainingsgröße')
            plt.ylabel('Genauigkeit')
            plt.title(f'Lernkurve für {model_name}')
            plt.legend(loc='best')
            plt.grid(True)
            plt.tight_layout()
            plt.show()

In [None]:
model_names = [
    'deepset/gbert-base',
    'aari1995/German_Sentiment',
    'oliverguhr/german-sentiment-bert',
    'lxyuan/distilbert-base-multilingual-cased-sentiments-student',
    'nlptown/bert-base-multilingual-uncased-sentiment',
    'distilbert-base-german-cased',
    'xlm-roberta-base',
    'ssary/XLM-RoBERTa-German-sentiment'
]

In [None]:
maissen = Maissen(model_names=model_names, use_drive=False, hpo_n_trials=20)
maissen.run()

In [None]:
maissen.generate_learning_curves()