In [19]:
import pandas as pd
from tqdm import tqdm

from sklearn.utils.class_weight import compute_class_weight
from sklearn.model_selection import train_test_split
from torchmetrics.functional.classification import accuracy, f1_score, precision, recall

import numpy as np

import torch
from torch import nn

from sentence_transformers import SentenceTransformer

In [None]:
if torch.cuda.is_available():
    device = torch.device('cuda')
    print('Using CUDA!')
else:
    device = torch.device('cpu')
    print("Using CPU!")

In [None]:
df = pd.read_pickle('df_spendings_final_short_prec.pkl')
sentences, labels = df["text"].tolist(), df["label"].values
# labels, zum Beispiel: 0: "austerity", 1: "expansion", 2: "kot"
# aber geht auch: 0: "kot", 1: "expansion", 2: "austerity" oder whatever
labels = np.random.randint(0, 3, len(sentences)) # random labels als Beispiel

model = SentenceTransformer('sentence-transformers/distiluse-base-multilingual-cased-v2').to(device)

In [None]:
with torch.no_grad():
    embeddings = model.encode(sentences, convert_to_tensor=True).cpu().numpy()

np.save("embeddings.npy", embeddings)
np.save("labels.npy", labels)

In [None]:
embeddings = np.load("embeddings.npy")
labels = np.load("labels.npy")

In [12]:
class Dataset:

    def __init__(self, embeddings, labels):
        self.embeddings = embeddings
        self.labels = labels
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        embeddings = torch.tensor(self.embeddings[idx], dtype=torch.float32)
        labels = torch.tensor(self.labels[idx], dtype=torch.int64)
        return embeddings, labels

class ClassifierHead(nn.Module):

    def __init__(self, input_dim, num_classes):
        super().__init__()
        self.classifier = nn.Sequential(
            nn.Linear(input_dim, 1024),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(1024, num_classes), # Hier dann num_classes statt 2.
        )

    def forward(self, x):
        out = self.classifier(x)
        return out

In [23]:
def model_pass(num_classes, model, criterion, loader, optimizer=None, train=True):
    if train:
        model.train()
    else:
        model.eval()

    losses = []

    preds = []
    targs = []

    for features, labels in loader:
        features = features.to(device)
        labels = labels.to(device)

        with torch.set_grad_enabled(train):
            outputs = model(features)
            loss = criterion(outputs, labels)

        if train:
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        pred = torch.argmax(outputs, dim=1)

        preds.append(pred.detach())
        targs.append(labels.detach())

        losses.append(loss.item())

    preds = torch.cat(preds)
    targs = torch.cat(targs)

    acc = accuracy(preds, targs, task="multiclass", num_classes=num_classes)
    f1 = f1_score(preds, targs, task="multiclass", num_classes=num_classes)
    prec = precision(preds, targs, task="multiclass", num_classes=num_classes)
    rec = recall(preds, targs, task="multiclass", num_classes=num_classes)

    return {
        "loss": np.mean(losses),
        "accuracy": acc.item(),
        "f1": f1.item(),
        "precision": prec.item(),
        "recall": rec.item(),
    }


In [None]:
# Hier dann definierten num_classes statt 2
num_classes = 3

classifier = ClassifierHead(input_dim=512, num_classes=num_classes).to(device)

train_features, valid_features, train_labels, valid_labels = train_test_split(
    embeddings, labels, test_size=0.1, random_state=42, shuffle=True, stratify=labels
)

class_weights = compute_class_weight("balanced", classes=np.unique(train_labels), y=train_labels)
train_sample_weights = np.array([class_weights[label] for label in train_labels])

train_dataset = Dataset(train_features, train_labels)
valid_dataset = Dataset(valid_features, valid_labels)

train_loader = torch.utils.data.DataLoader(
    train_dataset, 
    batch_size=64, 
    sampler=torch.utils.data.WeightedRandomSampler(
        train_sample_weights, 
        500,
        replacement=True
    ),
)
valid_loader = torch.utils.data.DataLoader(
    valid_dataset, 
    batch_size=64, 
    shuffle=True
)

epochs = 250

criterion = nn.CrossEntropyLoss()

optimizer = torch.optim.AdamW(classifier.parameters(), lr=1e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs, eta_min=1e-6)

In [None]:
pbar = tqdm(range(epochs))

best_f1 = 0
for e in pbar:

    train_metrics = model_pass(num_classes, classifier, criterion, train_loader, optimizer, train=True)
    valid_metrics = model_pass(num_classes, classifier, criterion, valid_loader, optimizer=None, train=False)
    scheduler.step()

    if valid_metrics["f1"] > best_f1:
        best_f1 = valid_metrics["f1"]
        torch.save(classifier.state_dict(), "classifier.pt")
    
    pbar.set_description(f"Train F1 {train_metrics['f1']:.4f} - Valid F1 {best_f1:.4f}")

In [None]:
def predict_policy_stance(sentences):
    if not isinstance(sentences, list):
        sentences = [sentences]

    with torch.no_grad():
        embeddings = model.encode(sentences, convert_to_tensor=True).to(device)

    logits = classifier(embeddings)
    label = torch.argmax(logits, dim=1).item()

    # Hier dann andere labels. Wie oben definieren.
    if label == 0:
        return "kot"
    elif label == 1:
        return "expansion"
    elif label == 2:
        return "austerity"
    else:
        print("Irgendwas ist ganz schief gelaufen!!!")

# Example predictions
new_sentences = [
    "wir wollen die Schuldenbremse abschaffen.",
    "wir wollen mehr geld für den sozialstaat ausgeben",
    "wir wollen dir Schuldenbremse ausbauen.",
    "schuldenbremse gut",
    "schuldenbremse schlecht",
    "wir wollen mehr Geld für Schulen ausgeben.",
    "Wir wollen den Haushalt konsolidieren.",
    "Wir wollen zukünftigen Generationen keinen Schuldenberg hinterlassen.",
]

for sentence in new_sentences:
    print(f"'{sentence}' → {predict_policy_stance(sentence)}")
