### German BERT model for Emotion-Stimulus (see Section 3.3. of thesis)

This notebook focuses on training and testing the BERT model that were proposed in this master's thesis. The model was implemented using TensorFlows and HuggingFace.

Please keep in mind that these notebooks are primarily used for conducting experiments, live coding, and implementing and evaluating the approaches presented in the thesis. As a result, the code in this notebook may not strictly adhere to best practice coding standards.


*Here, training with GPU is required. Thus, either use Google Colab or setup you GPU properly.*

In [None]:
# ONLY IF USED ON LOCAL VIEW
# only execute once
import os

# Getting the parent directory
os.chdir("..")
os.chdir("..")
os.chdir("..")
os.chdir("..")

In [None]:
!pip install transformers

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
#import re

#from nltk.corpus import stopwords
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.utils import shuffle
from sklearn.preprocessing import LabelEncoder

def import_test_train(local):
  """
  This imports the given fullset and triggerset locally or not and returns it.

  :param local: If set to true, it will return the sets from a local view. Otherwise it will open drive mount and attempts to connect to your
  drive folders.
  """

  assert type(local) == bool, f"Type is not valid. Expected boolean, recieved: {type(local)}"

  if local:
    from google.colab import drive
    drive.mount('/content/gdrive')
    df = pd.read_csv('/content/gdrive/MyDrive/Experiment/translated_fullset.csv')
    df_trigger = pd.read_csv('/content/gdrive/MyDrive/Experiment/triggerset.csv')

    return df, df_trigger

  else:
    df = pd.read_csv('./Experiment/translated_fullset.csv')
    df_trigger = pd.read_csv('./Experiment/triggerset.csv')

    return df, df_trigger

# importing test and trainset
df, df_trigger = import_test_train(True)

In [None]:
## Dataset cleaning
df = df[~df['label'].str.contains('surprise')]
df = df[~df['label'].str.contains('disgust')]
df = df[~df['source'].str.contains('GoodNews')]
df = df[~df['source'].str.contains('DailyDialog')]
df = df[~df['source'].str.contains('GoEmotions')]
df = df[~df['source'].str.contains('Isear')]
df = df[~df['source'].str.contains('crowdflower')]
df_trigger = df_trigger[~df_trigger['label'].str.contains('disgust')]
df.reset_index(drop='True', inplace=True)
df['content'] = df['content_de']

In [None]:
lb_make = LabelEncoder()
df["label_id"] = lb_make.fit_transform(df["label"])

In [None]:
from sklearn.model_selection  import train_test_split

# train test splitting
df_train, df_test = train_test_split(df, test_size=0.20, random_state=42)

# start index at 0
df_train.reset_index(inplace=True)
df_test.reset_index(inplace=True)

In [None]:
# get label encoding of triggerset
labels = df.drop_duplicates(subset=['label'])[['label','label_id']]

label_ids_trigger = []

for i,row in df_trigger.iterrows():
  row_label = row['label']
  index = labels.to_dict('list')['label'].index(row_label)
  index_id = labels.to_dict('list')['label_id'][index]
  label_ids_trigger.append(index_id)

# assign labels
df_trigger["label_id"] = label_ids_trigger

In [None]:
df_train.label.value_counts()

sadness    379
joy        356
anger      341
fear       320
Name: label, dtype: int64

In [None]:
df_trigger_train, df_trigger_test = train_test_split(df_trigger, test_size=0.40, random_state=42)

In [None]:
### SPLIT TESTSIZE AGAIN
_, df_trigger_test = train_test_split(df_trigger_test, test_size=0.38, random_state=42)

In [None]:
# # drop possible duplicates caused by appending triggerset
df_train = df_train.drop_duplicates(subset=['content'])
df_test = df_test.drop_duplicates(subset=['content'])

# # reset index
df_test.reset_index(drop=True, inplace=True)
df_train.reset_index(drop=True, inplace=True)

In [None]:
from transformers import AutoTokenizer, TFAutoModel

tokenizer = AutoTokenizer.from_pretrained("dbmdz/bert-base-german-uncased")
bert = TFAutoModel.from_pretrained("dbmdz/bert-base-german-uncased")

Some layers from the model checkpoint at dbmdz/bert-base-german-uncased were not used when initializing TFBertModel: ['nsp___cls', 'mlm___cls']
- This IS expected if you are initializing TFBertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFBertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
All the layers of TFBertModel were initialized from the model checkpoint at dbmdz/bert-base-german-uncased.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFBertModel for predictions without further training.


In [None]:
# max length of berttokenizer  is 512
max_length=100

#creating mask for tokens
Xids_train=np.zeros((df_train.shape[0],max_length))
Xmask_train=np.zeros((df_train.shape[0],max_length))
y_train=np.zeros((df_train.shape[0],1))

#creating mask for tokens
Xids_test=np.zeros((df_test.shape[0],max_length))
Xmask_test=np.zeros((df_test.shape[0],max_length))

In [None]:
from numpy import array
from numpy import argmax
from tensorflow.keras.utils import to_categorical

for i,sequence in enumerate(df_train['content']):
    tokens=tokenizer.encode_plus(sequence,max_length=max_length,padding='max_length',add_special_tokens=True,
                           truncation=True,return_token_type_ids=False,return_attention_mask=True,
                           return_tensors='tf')

    Xids_train[i,:] = tokens['input_ids']
    Xmask_train[i,:] = tokens['attention_mask']
    y_train[i,0] = df_train.loc[i,'label_id']

y_train = to_categorical(y_train)

for i,sequence in enumerate(df_test['content']):
    tokens=tokenizer.encode_plus(sequence,max_length=max_length,padding='max_length',add_special_tokens=True,
                           truncation=True,return_token_type_ids=False,return_attention_mask=True,
                           return_tensors='tf')

    Xids_test[i,:] = tokens['input_ids']
    Xmask_test[i,:] = tokens['attention_mask']

In [None]:
dataset=tf.data.Dataset.from_tensor_slices((Xids_train,Xmask_train,y_train))

def map_func(input_ids,mask,labels):
    return {'input_ids':input_ids,'attention_mask':mask},labels

dataset=dataset.map(map_func)
dataset=dataset.shuffle(100000).batch(64).prefetch(1000)

DS_size=len(list(dataset))

train=dataset.take(round(DS_size*0.90))
val=dataset.skip(round(DS_size*0.90))

In [None]:
dataset_test=tf.data.Dataset.from_tensor_slices((Xids_test,Xmask_test))

def map_func(input_ids,mask):
    return {'input_ids':input_ids,'attention_mask':mask}

dataset_test=dataset_test.map(map_func)
# batching it to or the predictions will be multiplied by the shape
dataset_test=dataset_test.batch(64).prefetch(1000)

In [None]:

import tensorflow as tf

def emotion_model():
  input_ids=tf.keras.layers.Input(shape=(max_length,),name='input_ids',dtype='int32')
  input_mask=tf.keras.layers.Input(shape=(max_length,),name='attention_mask',dtype='int32')

  embedding=bert(input_ids,attention_mask=input_mask)[0]
  x=tf.keras.layers.GlobalMaxPool1D()(embedding)
  x=tf.keras.layers.BatchNormalization()(x)
  x=tf.keras.layers.Dense(256,activation='relu')(x)
  x=tf.keras.layers.Dropout(0.2)(x)
  output=tf.keras.layers.Dense(4,activation='softmax')(x)

  model=tf.keras.Model(inputs=[input_ids,input_mask],outputs=output)

  model.layers[2].trainable=False

  model.compile(loss=tf.keras.losses.CategoricalCrossentropy(),
              optimizer='adam',metrics=[tf.keras.metrics.AUC()])

  return model

## Define train and test

In [None]:
model = emotion_model()
model.summary()

Model: "model_3"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_ids (InputLayer)         [(None, 100)]        0           []                               
                                                                                                  
 attention_mask (InputLayer)    [(None, 100)]        0           []                               
                                                                                                  
 tf_bert_model_2 (TFBertModel)  TFBaseModelOutputWi  109927680   ['input_ids[0][0]',              
                                thPoolingAndCrossAt               'attention_mask[0][0]']         
                                tentions(last_hidde                                               
                                n_state=(None, 100,                                         

## Define model and saving path

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

es = EarlyStopping(
    monitor='val_auc',
    patience=2,
    min_delta=0.0010,
    mode='max'
)

In [None]:
model.fit(train,
          validation_data=val,
          epochs=30,
          callbacks=[es])

In [None]:
y_pred=model.predict(dataset_test)

In [None]:
y_pred_new = np.argmax(y_pred,axis=1)
y_true = df_test['label_id'].values

In [None]:
from sklearn.metrics import accuracy_score
accuracy_score(y_true, y_pred_new)

In [None]:
from sklearn import metrics
print(metrics.classification_report(y_true, y_pred_new))