In [1]:
import sklearn
import pandas as pd
import numpy as np

### Datensatz laden

In [3]:
# Datensatz laden

daten = pd.read_json("..\Korpora\Referenzdatensatz_HateSpeech_Deutsch\HateSpeechDe_HATE_Gruppe.json")
#daten = daten.drop(axis=1, labels=["corpus_id"])
daten = daten.set_index(keys="corpus_id")
print(daten)

                 label                                              tweet
corpus_id                                                                
1112521    KeineGruppe  @user @user @user Weitaus schlimmer. Heute ist...
1114995    KeineGruppe   Das Deutsche Kaiserreich soll wieder auferstehen
1110545    KeineGruppe                   Die BRD ist eine einzige Schande
1114326         Gruppe  @user @user Die Grünen....besser kann man das ...
4112169    KeineGruppe  @user @user Scheiss deutsche Politiker! Mehr g...
...                ...                                                ...
1223336    KeineGruppe  Unbequeme Wahrheit:   Sexuelle Belästigung ist...
1221963    KeineGruppe  Vor was habt ihr Angst Liebe Bürger !!!???? St...
2220834         Gruppe  @user Schön den Dummkopf #Oppermann von der Vo...
1220580         Gruppe  @user   Hoffentlich sind die Nationalisten dan...
2222357    KeineGruppe  @user Klar, was sollen Sie denn auch anderes s...

[3050 rows x 2 columns]


### Annotationslabels encodieren

In [27]:
def encode(labels):
    """
    Input: labels = Liste der Annotationslabels für den Datensatz,
            z.B. ["KEINE", "KEINE", "VVH", "KEINE", ...] oder
                 [["KeineGruppe"], ["Politische Einstellung", "Geschlecht"], ["KeineGruppe"], ...]
    Output:
            namen = Liste der Klassenlabels in korrekter Reihenfolge;
                    eine der Listen in label_namen
            labels_encoded = Liste der Annotationslabels im binären Format
                    die Probleme stellen immer die Frage "Ist dieses Phänomen vorhanden?" - Ja/Nein
                    dann im binären Format: Ja = 1, Nein = 0
    """

    # Mögliche Klassen
    labels_vvh = ["KEINE", "VVH"]
    labels_gruppe = ["KeineGruppe", "Gruppe"]
    labels_handlung = ["KeineHandlung", "Handlung"]
    labels_gruppe_det = ["KeineGruppe", "Nationalität", 'ethnische Herkunft / "Rasse"', "Religion / Weltanschauung",
        "Politische Einstellung", "Geschlecht", "Anderes Merkmal"]
    labels_handlung_det = ["KeineHandlung", "Aufstachelung zu Hass", "Aufforderung zu Gewalt- oder Willkürmaßnahmen", "Angriff der Menschenwürde"]
    label_namen = [labels_vvh, labels_gruppe, labels_handlung, labels_gruppe_det, labels_handlung_det]


    # Klassifizierungsproblem, also das Set der vorhandenen Labels, ermitteln

    # Fall 1: Strings (binäre Klassen)
    namen = []
    if type(labels[0]) == str:
        labels_flat = set(labels)
    # Fall 2: Listen (mehrere Klassen)
    else:
        # Summe über die Sets aller Listen
        labels_flat = set([label for entry in labels for label in entry])
    for i in label_namen:
        if set(i) == labels_flat: namen = i


    # Labels transformieren

    # Fall 1: 2 Klassen
    eins = ["NEG", "VVH", "Gruppe", "Handlung"]
    if len(namen) == 2:
        labels_eins = np.array(list(map(lambda x: 1 if x in eins else 0, list(labels))))
        return (labels_eins, namen)

    # Fall 2: mehrere Klassen
    else:
        # leere Matrix der Form Einträge x Labelnamen
        label_array = [[0 for i in range(len(namen))] for j in range(len(labels))]
        # Matrix befüllen
        for i, label_liste in enumerate(labels):
            labels_bin = []
            for j, name in enumerate(namen):
                if name in label_liste:
                    label_array[i][j] = 1
        # die i-te Spalte Ndarrays label_array ist jeweils die Annotation für die i-te Klasse
        # also bei namen = handlung_det, ist label_array[:, 0] die Annotation für "KeineHandlung"
        label_array = [np.array(liste) for liste in label_array]
        label_array = np.array(label_array)
        return (label_array, namen)

# TODO
def decode(namen, labels):
    """ # ausgehend von den vorhergesagten Labels und der Liste der Labelname
    # 1. die Labels wieder von 0,1 in ihre Strings verwandeln

    """
    return True

### Tokenizer für die Klassifikation mit sklearn laden

In [11]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained('deepset/gelectra-large', truncation=True, padding=False)

def bert_tokenize(inputs):
    """Einen String mit BERT tokenisieren
    Output: Liste von Tokens"""
    token_ids = tokenizer(inputs)
    tokens = tokenizer.convert_ids_to_tokens(token_ids["input_ids"])
    return tokens

INFO:tensorflow:Enabling eager execution
INFO:tensorflow:Enabling v2 tensorshape
INFO:tensorflow:Enabling resource variables
INFO:tensorflow:Enabling tensor equality
INFO:tensorflow:Enabling control flow v2


### Klassifizierungsvorgehen

In [33]:
from sklearn.model_selection import train_test_split
from skmultilearn.model_selection import iterative_train_test_split,IterativeStratification
from collections import Counter
from skmultilearn.model_selection.measures import get_combination_wise_output_matrix
from sklearn.datasets import make_multilabel_classification
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn import preprocessing
from sklearn.naive_bayes import MultinomialNB, GaussianNB
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn import metrics
from sklearn.metrics import PrecisionRecallDisplay, precision_recall_curve, multilabel_confusion_matrix

def iterative_train_test_split_ran(X, y, test_size, random_state=42):
    """Leichte Abänderung der Funktion iterativ_train_test_split 
    (skmultilearn.model_selection.iterative_stratification.iterative_train_test_split),
    um die Übergabe eines Random-States an das interne Objekt IterativeStratification zu ermöglichen
    """
    stratifier = IterativeStratification(n_splits=2, order=2, sample_distribution_per_fold=[test_size, 1.0-test_size], random_state=random_state)
    train_indexes, test_indexes = next(stratifier.split(X, y))

    X_train, y_train = X[train_indexes, :], y[train_indexes, :]
    X_test, y_test = X[test_indexes, :], y[test_indexes, :]

    return X_train, y_train, X_test, y_test

def train_test_split_multilabel(daten, ziele, test_size=0.3, random_state=42):
    """TODO
    """

    # Input daten als Input für den Multilabel-Stratifizierer (http://scikit.ml/stratification.html) vorbereiten:
    # Format ndarray (beispiele) x ndarray (features)
    # da allerdings die Features erst nach dem Train/Test-Split berechnet werden,
    # werden stattdessen die Korpus-IDs übergeben, anhand derer dann die Tweets zugeordnet werden
    #X, y  = daten.index, ziele
    X, y = np.array([np.array([entry, ]) for entry in daten.index]), ziele
    #X = np.array(X)

    # Datenanalyse: Anzahl der Einträge pro Klasse / Klassenkombination
    # Counter(combination for row in get_combination_wise_output_matrix(y, order=1) for combination in row))

    X_train, y_train, X_test, y_test = iterative_train_test_split_ran(X, y, test_size = test_size, random_state=random_state) # Multilabel

    # Tweets anhand der IDs dem Train/Test-Split zuordnen
    X_train_tweets = [daten.loc[index[0]]["tweet"] for index in X_train]
    X_test_tweets =  [daten.loc[index[0]]["tweet"] for index in X_test]

    return X_train_tweets, X_test_tweets, y_train, y_test


def split_feature(daten, ziele, labels):
    """Train/Test-Split Preprocessing und Merkmalsauswahl
    Input: Datensatz (Pandas Dataframe mit Index und Spalte "tweets"),
           ziele (Zielannotation im binären Format)
    Output: X_train, y_train, X_test, y_test
            nach stratifiziertem Train/Test-Split, Preprocessing, Merkmalsauswahl und Normalisierung
    """

    # 1. Stratifizierter Train/Test-Split
    if len(labels) == 2:
        X_train, X_test, y_train, y_test = train_test_split(daten["tweet"], ziele, test_size=0.33, random_state=36, stratify=ziele)
    else:
        X_train, X_test, y_train, y_test = train_test_split_multilabel(daten, ziele, test_size=0.3, random_state=36)


    # 2. Preprocessing und Merkmalsauswahl
    vectorizer = TfidfVectorizer(analyzer='word', ngram_range=(1, 2), lowercase=True, tokenizer=bert_tokenize)
    X_train_feat = vectorizer.fit_transform(X_train)
    X_test_feat = vectorizer.transform(X_test)

    # 3. Normalisierung
    max_abs_scaler = preprocessing.MaxAbsScaler()
    X_train_maxabs = max_abs_scaler.fit_transform(X_train_feat)
    X_test_maxabs = max_abs_scaler.transform(X_test_feat)

    return (X_train_maxabs,y_train), (X_test_maxabs, y_test)


def train_eval(train, test, labels):
    """TODO
    Training und Evaluation
    Input: train(daten, annotation), test(daten, annotation), label_set
    Output: Ergebnisse der Evaluation
    """
    X_train, y_train = train
    X_test, y_test = test

    # Training
    model = LogisticRegression()
    model.fit(X_train, y_train)    

    # Evaluation
    # TODO: restliche Metriken ergänzen
    evaluation = dict() # Precision, Recall, Accuracy, F1, MCC, Confusion Matrix

    predicted = model.predict(X_test)
    # metrics.classification_report(y_test, predicted, labels)
    evaluation["confusion"] = metrics.confusion_matrix(y_test, predicted)

    return evaluation

def pipeline(daten, ziele, labels):
    '''TODO
    für zwei Klassen
    '''
    train, test = split_feature(daten, ziele, labels)
    evaluation = train_eval(train, test, labels)
    return evaluation

def multilabel_pipeline(train, test, labels):
    """TODO
    """
    # 1. Klassifikationspipeline (train_eval) für jede Klasse einmal laufen lassen,
    #    Evaluationsergebnisse sammeln

    # 2. Gesammelte Ergebnisse evaluieren

    return True

### Pipeline für eine oder mehrere Klassen laufen lassen

In [34]:
#ergebnisse = []

labels_encoded, label_namen = encode(list(daten["label"]))

print(pipeline(daten, labels_encoded, label_namen))

#print(metrics.multilabel_confusion_matrix(y_test, predicted, labels=label_namen))


#for i in range(len(label_namen)-1):
#    ergebnisse.append(pipeline(daten, labels_encoded[i], [label_namen[0], label_namen[i+1]]))


# globale Evaluation
#print(ergebnisse)

{'confusion': array([[515,  79],
       [162, 251]], dtype=int64)}
