<a href="https://colab.research.google.com/github/blue-create/langlens/blob/main/to_publish/general_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Allgemeines Model
In diesem Notebook wird ein Bert-Model erstellt, welches zwischen den folgenden Kategorien unterscheidet: Unproblematischer Text, Text mit graphischen/ verstörenden Inhalten, Text mir sensationalistischer Sprache.

Schritte:

*   Vorbereiten der Daten
*   Modellieren
*   Evaluieren des Modelles




### Imports, Konstanten, Paths

In [None]:
%%capture
!pip install transformers==4.20.0

In [None]:
%%capture
!pip install datasets

In [None]:
# Allgemein
import numpy as np
import pandas as pd
import os
import json
from google.colab import drive

# Vorbereiten
import datasets
from datasets import Dataset

# Modelling

from transformers import TrainingArguments, Trainer
from transformers import AutoTokenizer,EarlyStoppingCallback
from transformers import AutoModelForSequenceClassification

# Evaluation
import torch
from torch import nn
from sklearn.metrics import  classification_report, multilabel_confusion_matrix
from transformers import EvalPrediction

In [None]:
# Verbinden mit GDrive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/data

Mounted at /content/drive


In [None]:
# Import des Skripts das die Annotierten Daten liest
from scripts import annotations

## Vorbereiten der Daten

### Einlesen aller Daten

In [None]:
# Lesen und zusammenfügen aller annotierten Dateien
dfs={}
for doc in os.listdir("3_Annotated/new_ontology"):
  # Lesen der Json-Datei und als Pandas Dataframe speichern
  if doc.endswith(".json"):
    json_data=json.load(open("3_Annotated/new_ontology/"+doc, encoding="utf-8"))
    data=pd.DataFrame(json_data["documents"])
    data.loc[:,"file"]=doc
    dfs[doc]=data

# Mergen aller Files
data=pd.concat(dfs,ignore_index=True)
data=data[data.annotations.apply(len)!=0]

In [None]:
# Extrahieren und Vorbereiten der annotierten Daten
data.loc[:,"artikel_id"]=data.attributes_flat.apply(lambda x: x["artikel_id"])
data.loc[:,"name"]=data.attributes_flat.apply(lambda x: x["name"])
data.loc[:,"titel"]=data.attributes_flat.apply(lambda x: x["titel"])
data.loc[:,"ressort"]=data.attributes_flat.apply(lambda x: x["ressort"])
data.loc[:,"annotations"]=data.annotations.apply(annotations.extract_annotations)
data.loc[:,"dice"]=data.annotations.apply(annotations.calculate_similarity,sim="dice")
data.loc[:,"annotations"]=data.apply(annotations.ground_truth_filter,min_coannotation=1,min_similarity=-1, similarity="dice",axis=1)
data.loc[:,"annotations"]=data.annotations.apply(list)
data.loc[:,"annotations"]=["*".join(i) for i in data.annotations]

In [None]:
# Hinzufügen der automatisch annotierten Daten
auto_data=pd.read_csv("3_Annotated/automatically_annotated/230822_automatic_label_full_cleaned.csv", parse_dates=True, index_col=0)
auto_data=auto_data.rename(columns={"revised":"annotations"})
auto_data=auto_data.drop("pred_label_lvl3",axis=1)
auto_data=auto_data.drop("pred_label_lvl2",axis=1)

In [None]:
# Formatieren der automatisch annotierten Daten
auto_data.iloc[:,-2:]=auto_data.iloc[:,-2:].T.ffill().T
auto_data=auto_data[~auto_data.annotations.isna()]
auto_data=auto_data[auto_data.annotations!="Others"]
auto_data.loc[:,"annotations"]=auto_data.annotations.str.split(", ")
auto_data.loc[:,"annotations"]=["*".join(i) for i in auto_data.annotations]

In [None]:
# Zusammenfügen beider Datensets
data=pd.concat([auto_data,data])

In [None]:
# Formatieren der Annotationen, Löschen von Duplikaten
df=pd.merge(data[["text","artikel_id","annotations"]],data.annotations.str.get_dummies(sep="*"), left_index=True, right_index=True)
df=df.drop_duplicates(keep="last")

In [None]:
# Liste aller Annotationskategorien, Mapping von Index zu Label und vice versa
labels=df.columns.to_list()[-4:]
id2label = {idx:label for idx, label in enumerate(labels)}
label2id = {label:idx for idx, label in enumerate(labels)}

### Downsampling

Die überrepräsentieren Annotationsgruppen werden in diesem Schritt an die Größe der anderen Kategorien angeglichen.

In [None]:
# Definieren der Unter- und Überrepräsentierten Annotationskategorien
underrep_groups=df[df.annotations!="Domestic Violence"]
sample_overrep_group=df[df.annotations!="Problematic"].sample(int(underrep_groups.shape[0]))

In [None]:
# Erstellen eines ausgeglichenen Datensets
downsampled=pd.concat([sample_overrep_group,underrep_groups])
downsampled = downsampled.sample(frac = 1).reset_index(drop=True)

In [None]:
# Teilen der Daten in Trainings und Testsets
df_train, df_val, df_test = np.split(downsampled.sample(frac=1, random_state=42), [int(.8 * len(downsampled)), int(.9 * len(downsampled))])

In [None]:
# Formatieren der Daten
ds=datasets.DatasetDict({"train":Dataset.from_dict(df_train),"val":Dataset.from_dict(df_val),"test":Dataset.from_dict(df_test)})

## Modellieren

In [None]:
# Definieren der Labelkategorien
id2label = {idx:label for idx, label in enumerate(labels)}
label2id = {label:idx for idx, label in enumerate(labels)}

In [None]:
# Vortrainierte Modelle, die getestet wurden

#model_id="oliverguhr/german-sentiment-bert"
#model_id="krupper/text-complexity-classification"
#model_id="classla/xlm-roberta-base-multilingual-text-genre-classifier"
#model_id="deepset/bert-base-german-cased-hatespeech-GermEval18Coarse"
#model_id="nlptown/bert-base-multilingual-uncased-sentiment"
model_id="bert-base-german-cased"
#model_id="dbmdz/bert-base-german-cased"

In [None]:
# laden des vortrainierten Tokenizers
tokenizer = AutoTokenizer.from_pretrained(model_id)

Downloading:   0%|          | 0.00/29.0 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/433 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/249k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/474k [00:00<?, ?B/s]

In [None]:
# Funktion zum Tokenisieren und Formatieren des Input-Texts
def preprocess_data(df):
  """ Funktion zum Toekenisieren und Formatieren eines Input-Texts
  Parameters:
    - df (DataFrame): Pandas DataFrame mit Variable "Text"
  Returns:
    - encoding (DataFrame): Pandas DataFrame mit dem Tokenisierten Text und den entsprechenden Labels
  """
  text = df["text"]
  encoding = tokenizer(text, padding="max_length",)
  labels_batch = {k: df[k] for k in df.keys() if k in labels}
  labels_matrix = np.zeros((len(text), len(labels)))
  for idx, label in enumerate(labels):
    labels_matrix[:, idx] = labels_batch[label]
  encoding["labels"] = labels_matrix.tolist()
  return encoding

In [None]:
# Encoding der Input-Texte
ds_encoded=ds.map(preprocess_data, batched=True, remove_columns=ds['train'].column_names)
ds_encoded.set_format("torch")

Map:   0%|          | 0/1348 [00:00<?, ? examples/s]

Map:   0%|          | 0/169 [00:00<?, ? examples/s]

Map:   0%|          | 0/169 [00:00<?, ? examples/s]

In [None]:
# Erstellen der Vortrainierten Modells
model = AutoModelForSequenceClassification.from_pretrained(model_id,
                                   problem_type="multi_label_classification",
                                   num_labels=len(labels),
                                   id2label=id2label,
                                   label2id=label2id)

In [None]:
# Verlustfunktion zur Evaluierung des Models
def compute_metrics(p: EvalPrediction):
    """ Funktion zum Evaluieren der Performance eines Text-Modells
    Parameters:
      - p (EvalPrediction): Vorhersagen des Textmodells
    Returns:
      - result (DataFrame): Performance des Textmodells: Verlustfunktion, F1, Accuracy
    """
    preds = p.predictions[0] if isinstance(p.predictions,
            tuple) else p.predictions
    result = multi_label_metrics(
        predictions=preds,
        labels=p.label_ids)
    return result


# Klasse mit einem Lossfunktion, die eine Gewichtung der Kategorien erlaubt
class CustomTrainer(Trainer):

    def compute_loss(self, model, inputs, return_outputs=False):
      """ Funktion zum Evaluieren der Performance eines Text-Modells
          - p (EvalPrediction): Vorhersagen des Textmodells
        Returns:
          - result (DataFrame): Performance des Textmodells mit Gewichtung der Labels: Verlustfunktion, F1, Accuracy
      """
        labels = inputs.get("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")
        class_indices = torch.argmax(labels, dim=1)
        loss_fct = nn.CrossEntropyLoss(weight=torch.tensor([1.0,3.0,3.0,3.0], device=model.device))
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), class_indices)
        return (loss, outputs) if return_outputs else loss

In [None]:
# Definieren der Trainingsparametern
args = TrainingArguments(
    f"general-model-finetuned",
    evaluation_strategy = "epoch",
    save_strategy = "epoch",
    learning_rate=5e-6,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    num_train_epochs=10,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    logging_steps=20,
)

In [None]:
# Definieren der Modelparametern mit einer der beiden Verlustfunktionen

#trainer = CustomTrainer(
trainer = Trainer(
    model,
    args,
    train_dataset=ds_encoded["train"],
    eval_dataset=ds_encoded["val"],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)],
)

In [None]:
## Trainieren des Models
trainer.train()

## Model Evaluation
Vohersagen der Kategorien und Erstellen einer Konfusionsmatrix zur Evaluierung des Models

### Vorhersagen

In [None]:
sigmoid = torch.nn.Sigmoid()

In [None]:
# Zu testende Texte
test_pred=pd.DataFrame(ds["test"])

In [None]:
# Funktion zur Vorhersage der Textkategorien
def predict_text(text):
    """ Funktion zur Vorhersage von Text-Kategorien
        - text (str): Text, dessen Kategorie vorhergesagt werden soll
      Returns:
        - predictions (list of int): Liste mit Integern mit Länge der Anzahl an Annotationskategorien, mit 1 Kategorie ist zutreffend und 0 nicht zutreffend
    """
  encoding = tokenizer(text, return_tensors="pt")
  encoding = {k: v.to(trainer.model.device) for k,v in encoding.items()}
  outputs = trainer.model(**encoding)
  logits = outputs.logits
  probs = sigmoid(logits.squeeze().cpu())
  predictions = np.zeros(probs.shape)
  predictions[np.where(probs >= 0.5)] = 1
  return predictions


In [None]:
# Vorhersage von Text-Kategorien
pred=[]
for t in ds["test"]["text"]:
  pred.append(predict_text(t))
test_pred["prediction"]=pred

### Evaluierung

In [None]:
# Vergleich der Vorhergesagten und tatsächlichen Kategorien
temp=pd.DataFrame(columns=["Domestic Violence",	"Graphic",	"Sensationalist",	"Statement of responsibility"])
actual_labels=test_pred.iloc[:,-5:-1]
predicted_labels=pd.DataFrame(pred).rename(columns={0: "Domestic Violence", 1: "Graphic",2:"Sensationalist",3:"Statement of responsibility"}).astype(int)

In [None]:
# Darstellen der Modelperformance in einer Konfusionsmatrix
print(multilabel_confusion_matrix(actual_labels, predicted_labels))
print(classification_report(actual_labels, predicted_labels))

Exportieren des Models

In [None]:
# Speichern des trainierten Models
trainer.save_model("4_Models/best_model_general")