In this notebook we want to try to use a Neural Network to predict the correct party

In [None]:
from keras.models import Sequential
from keras.layers import Dense, Dropout
from tensorflow.keras.utils import to_categorical
from keras.losses import SparseCategoricalCrossentropy
import tensorflow as tf
import keras.backend as K
import pandas as pd
import seaborn as sn
import matplotlib.pyplot as plt
import numpy as np
import spacy
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from tqdm import tqdm
# exclude everything we dont need for faster performance
nlp = spacy.load("de_core_news_lg", exclude=['tagger', 'morphologizer', 'parser', 'senter', 'ner', 'attribute_ruler', 'lemmatizer'])

## Reading cleand data
The following Block creates a data Frame in which each tweets is labeled with its corresponding party.

In [None]:
data = pd.DataFrame(columns=['tweet', 'party'])

afd = pd.read_csv('../cleaned-data/AfD.csv')['text']
afd = pd.DataFrame([[i, 0] for i in afd], columns=['tweet', 'party'])

data = data.append(afd, ignore_index=True)

cdu = pd.read_csv('../cleaned-data/CDU.csv')['text']
csu = pd.read_csv('../cleaned-data/CSU.csv')['text']

cdu = pd.DataFrame([[i, 1] for i in cdu], columns=['tweet', 'party'])
csu = pd.DataFrame([[i, 1] for i in csu], columns=['tweet', 'party'])

data = data.append(cdu, ignore_index=True)
data = data.append(csu, ignore_index=True)

fdp = pd.read_csv('../cleaned-data/FDP.csv')['text']
fdp = pd.DataFrame([[i, 2] for i in fdp], columns=['tweet', 'party'])

data = data.append(fdp, ignore_index=True)

gru = pd.read_csv('../cleaned-data/GRÜNE.csv')['text']
gru = pd.DataFrame([[i, 3] for i in gru], columns=['tweet', 'party'])

data = data.append(gru, ignore_index=True)

lin = pd.read_csv('../cleaned-data/LINKE.csv')['text']
lin = pd.DataFrame([[i, 4] for i in lin], columns=['tweet', 'party'])

data = data.append(lin, ignore_index=True)

spd = pd.read_csv('../cleaned-data/SPD.csv')['text']
spd = pd.DataFrame([[i, 5] for i in spd], columns=['tweet', 'party'])

data = data.append(spd, ignore_index=True)

# Removing NaN. Those were probably tweets with only a link or emojis with dont include anything after cleaning
data = data.dropna()

## Transforming tweets to vectors
For this step we are using the pre-trained de_core_news_lg spacy model.

In [None]:
X = [nlp(tweet).vector for tweet in tqdm(data['tweet'].to_numpy())]
# We will save the vectors to a file sice its faster to read them in if we want to use them somewhere else.
# The file is ~2.2GB
np.savetxt('vector_tweets.out', X, delimiter=',')

In [None]:
# load the vector representation (300d)
X = np.loadtxt('vector_tweets.out', delimiter=',')
# load lable matrix. The function to_categorical will transform our labels, which are numbers from 0 to 5
# to one-hot encoded vectors. So 0 -> [1, 0, 0, 0, 0, 0], 1 -> [0, 1, 0, 0, 0, 0], 2 -> ....
y = to_categorical(data['party'].to_numpy())

In [None]:
# We will only use tweets with at least 10 words.
# Tweets with less words can hardly have any
filter_tweets = np.array([len(tweet.split(" ")) >= 7 for tweet in data['tweet']], dtype=np.bool_)

X_wo_short = X[filter_tweets]
y_wo_short = y[filter_tweets]

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X_wo_short, y_wo_short, test_size=0.3)

In [None]:
model1 = Sequential()
model1.add(Dense(300, activation='relu'))
model1.add(Dropout(0.25))
model1.add(Dense(6, activation='softmax'))
model1.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
model2 = Sequential()
model2.add(Dense(150, activation='relu'))
model2.add(Dropout(0.2))
model2.add(Dense(150, activation='relu'))
model2.add(Dropout(0.1))
model2.add(Dense(75, activation='relu'))
model2.add(Dropout(0.2))
model2.add(Dense(6, activation='softmax'))
model2.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
model3 = Sequential()
model3.add(Dense(75, activation='relu'))
model3.add(Dropout(0.1))
model3.add(Dense(50, activation='relu'))
model3.add(Dropout(0.075))
model3.add(Dense(50, activation='relu'))
model3.add(Dropout(0.075))
model3.add(Dense(50, activation='relu'))
model3.add(Dropout(0.05))
model3.add(Dense(6, activation='softmax'))
model3.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
ep = 100
hist1 = model1.fit(X_train, y_train, epochs=ep, batch_size=512, verbose=2,
                validation_data=(X_test, y_test));
hist2 = model2.fit(X_train, y_train, epochs=ep, batch_size=512, verbose=2,
                validation_data=(X_test, y_test));
hist3 = model3.fit(X_train, y_train, epochs=ep, batch_size=512, verbose=2,
                validation_data=(X_test, y_test));

In [None]:
# 
print("Model 1: 300-300-6")
vis_hist(hist1)
vis_confusion_mat(model1, X_test, X_train, y_test, y_train)
vis_classification_distribution(model1, X_test, y_test)
# vis_classification_distribution(model1, X_train, y_train)

print("Model 2: 300-150-150-75-6")
vis_hist(hist2)
vis_confusion_mat(model2, X_test, X_train, y_test, y_train)
vis_classification_distribution(model2, X_test, y_test)
# vis_classification_distribution(model2, X_train, y_train)

print("Model 3: 300-75-50-50-50-6")
vis_hist(hist3)
vis_confusion_mat(model3, X_test, X_train, y_test, y_train)
vis_classification_distribution(model3, X_test, y_test)
# vis_classification_distribution(model3, X_train, y_train)

In [None]:
# Method to make a prediction based on a combination of models aka a forest
def predict(models, X):
    predictions = []
    for m in models:
        predictions.append(m.predict(X))
    p = np.array(predictions)
    p = np.sum(p, axis=0)
    p = np.exp(p)
    p = p.T/np.sum(p, axis=1)
    return p.T

In [None]:
# Evaluating Acc, Remetricsl, Prec and F1 for each party
from sklearn.metrics import classification_report

yhat_test = np.argmax(predict([model1, model2, model3], X_test), axis=1)
y_label_test = np.argmax(y_test, axis=1)

print(classification_report(y_label_test, yhat_test, digits=4))

In [None]:
def export_model(model, name):
    result = []
    for layer in model.layers:
        if type(layer) == Dense:
            weights = layer.get_weights()[0]
            bias = layer.get_weights()[1]
            result.append({'weights': weights.tolist(), 'bias': bias.tolist()})
    with open(name+'.txt', 'w') as outfile:
        json.dump(result, outfile)

In [None]:
import json
export_model(model3, 'model3')

In [None]:
def vis_hist(hist):
    fig, axes = plt.subplots(1, 2, figsize=(16,7))
    a = axes.ravel()
    a[0].plot(hist.history['loss'])
    a[0].plot(hist.history['val_loss'])
    a[0].set_title('Model loss')
    a[0].set_ylabel('Loss')
    a[0].set_xlabel('Epoch')
    a[0].legend(['Train', 'Validation'], loc='upper left')
    a[1].plot(hist.history['accuracy'])
    a[1].plot(hist.history['val_accuracy'])
    a[1].set_title('Model accuracy')
    a[1].set_ylabel('Accuracy')
    a[1].set_xlabel('Epoch')
    a[1].legend(['Train', 'Validation'], loc='upper left')
    plt.show()

def vis_confusion_mat(model, X_test, X_train, y_test, y_train):
    fig, axes = plt.subplots(1, 2, figsize=(16,7))
    # Block to evaluate test data
    yhat_test_prop = model.predict(X_test)
    yhat_test = np.argmax(yhat_test_prop, axis=1)

    y_label_test = np.argmax(y_test, axis=1)

    mat = confusion_matrix(y_label_test, yhat_test)
    df = pd.DataFrame(mat, index = ["AfD", "Union", "FDP", "Grüne", "Linke", "SPD"],
                      columns = ["AfD", "Union", "FDP", "Grüne", "Linke", "SPD"])
    # ax1.figure(figsize = (7,5))
    sn.heatmap(df, annot=True ,cmap='Blues', fmt='g', ax=axes[0]).set_title('Test Data');
    # ax1.show()

    # Block to evaluate train data
    yhat_train_prop = model.predict(X_train)
    yhat_train = np.argmax(yhat_train_prop, axis=1)

    y_label_train = np.argmax(y_train, axis=1)

    mat = confusion_matrix(y_label_train, yhat_train)
    df = pd.DataFrame(mat, index = ["AfD", "Union", "FDP", "Grüne", "Linke", "SPD"],
                      columns = ["AfD", "Union", "FDP", "Grüne", "Linke", "SPD"])
    # ax2.figure(figsize = (7,5))
    sn.heatmap(df, annot=True ,cmap='Blues', fmt='g', ax=axes[1]).set_title('Train Data');
    plt.show()
    print('Party: \t Test \t Train\nAfd:\t',sum(y_label_test == 0), "\t" , sum(y_label_train == 0))
    print('Union:\t',sum(y_label_test == 1), "\t" , sum(y_label_train == 1))
    print('FDP:\t',sum(y_label_test == 2), "\t" , sum(y_label_train == 2))
    print('Grüne:\t',sum(y_label_test == 3), "\t" , sum(y_label_train == 3))
    print('Linke:\t',sum(y_label_test == 4), "\t" , sum(y_label_train == 4))
    print('SPD:\t',sum(y_label_test == 5), "\t" , sum(y_label_train == 5))
    print('\nAcc:\t', "{:2.2f}%".format(accuracy_score(y_label_test,yhat_test)*100), "{:2.2f}%".format(accuracy_score(y_label_train,yhat_train)*100))
    
def vis_classification_distribution(model, X_test, y_test):
    fig, axes = plt.subplots(2, 3, figsize=(16,7))
    a = axes.ravel()
    
    yhat_test_prop = model.predict(X_test)
    y_label_test = np.argmax(y_test, axis=1)

    party_probs = [[], [], [], [], [], []]
    for ind,val in enumerate(yhat_test_prop):
        party = y_label_test[ind]
        party_probs[party].append(val[party])

    labels = ["AfD", "Union", "FDP", "Grüne", "Linke", "SPD"]
    for i,j in enumerate(party_probs):
        a[i].hist(j, bins=20, density=True)
        a[i].set_title(labels[i])
    plt.show()