# IBM stance detection with only arguments

Stance detection of the IBM datasets using only arguments as input to train the model, without considering the main topic of each one

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import shap

shap.initjs()

In [None]:
#nltk.download('punkt')
#nltk.download('stopwords')
#nltk.download('wordnet')
#nltk.download('omw-1.4')

## 1. Import datasets

In [None]:
train_path = '../data/ibm/ibm_train.csv'
test_path = '../data/ibm/ibm_test.csv'

plots_path = '../plots/only arguments/'
models_path = '../models/ibm/'

### Training set

In [None]:
train = pd.read_csv(train_path) 

In [None]:
train.head()

In [None]:
train['topic'].nunique()

In [None]:
train_stats = train.groupby(by=['topic', 'stance']).agg({'argument': 'count'}).reset_index()
train_stats.pivot(index='topic', columns='stance', values='argument').rename(columns={-1: 'cons', 1: 'pros'}).plot(kind='bar', figsize=(10,4))
plt.title('Arguments pros/cons for each topic (training set)')
plt.ylabel('#arguments')
plt.xlabel('Topics')
plt.xticks(fontsize=10, rotation=90)
plt.legend(loc='upper right')
#plt.savefig(plots_path+'ibm_train_topic_stats.png', bbox_inches ="tight")
plt.show()

### Test set

In [None]:
test = pd.read_csv(test_path) 

In [None]:
test.head()

In [None]:
test['topic'].nunique()

In [None]:
test_stats = test.groupby(by=['topic', 'stance']).agg({'argument': 'count'}).reset_index()
test_stats.pivot(index='topic', columns='stance', values='argument').rename(columns={-1: 'cons', 1: 'pros'}).plot(kind='bar', figsize=(10,4))
plt.title('Arguments pros/cons for each topic (test set)')
plt.ylabel('#arguments')
plt.xlabel('Topics')
plt.xticks(fontsize=10, rotation=90)
plt.legend(loc='upper right')
#plt.savefig(plots_path+'ibm_test_topic_stats.png', bbox_inches ="tight")
plt.show()

## 2. Preprocessing data

In [None]:
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer
from nltk.stem import WordNetLemmatizer
from nltk.stem.porter import PorterStemmer
from autocorrect import Speller
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer

In [None]:
# lower
# remove extra whitespace
# tokenize
# spelling corrections
# remove stopwords (da verificare se migliora o peggiora)
# remove punctation
# lemmatization
# stemming 
# remove urls
# remove tags

In [None]:
class PreprocessArguments:
    def __init__(self):
        self.spell = Speller(lang='en')
        self.stopwords_set = set(stopwords.words('english'))
        self.punct_remover = RegexpTokenizer(r'\w+')
        self.porter = PorterStemmer()
        self.wnl = WordNetLemmatizer()
    
    def preprocess(self, s):    
        # lowercase
        s = s.lower()
        # remove double whitespaces
        s = ' '.join(s.split())
        # tokenize
        s = word_tokenize(s)
        # spell correction
        s = [self.spell(word) for word in s]
        # remove punctuation
        s = self.punct_remover.tokenize(' '.join(s))
        # remove stopwords
        s = [word for word in s if word not in self.stopwords_set]
        # stemming
        s = [self.porter.stem(word) for word in s]
        #lemmatization
        #s = [self.wnl.lemmatize(word) for word in s]
        
        return s

In [None]:
preproc = PreprocessArguments()

In [None]:
train['arg_tok'] = [preproc.preprocess(row['argument']) for idx, row in train.iterrows()]

In [None]:
test['arg_tok'] = [preproc.preprocess(row['argument']) for idx, row in test.iterrows()]

In [None]:
train.head()

In [None]:
count_vect = CountVectorizer(tokenizer=preproc.preprocess, lowercase=False, token_pattern=None)
count_fit = count_vect.fit_transform(train['argument'])

In [None]:
n = 50
count_sum = sorted(list(zip(count_fit.toarray().sum(axis=0), count_vect.get_feature_names_out())), reverse=True)
count_sum = count_sum[:n]
fig, ax = plt.subplots(figsize=(10,4))
plt.bar([p[1] for p in count_sum], [p[0] for p in count_sum])
#plt.bar(range(len(count_sum)), [p[0] for p in count_sum])
plt.title('Frequencies of words')
plt.ylabel('Frequence')
plt.xlabel('Word')
plt.xticks(fontsize=10, rotation=90)
#plt.yticks(np.arange(0,1200,100))
#plt.savefig(plots_path+str(n)+'_token_frequencies.png', bbox_inches ="tight")
plt.show()

## 3. Classification

In [None]:
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import SVC
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split, cross_validate, GridSearchCV
from sklearn.preprocessing import LabelBinarizer, OneHotEncoder

### 3.1 Baseline: Naive Bayes

#### Grid search

In [None]:
def dummy_tokenizer(sentence):
    return sentence

In [None]:
lb = LabelBinarizer()
y_train = lb.fit_transform(train['stance'])

In [None]:
scoring = ['accuracy', 'f1_macro', 'precision', 'recall']

In [None]:
pipe = Pipeline([('preproc', TfidfVectorizer()), ('nb', MultinomialNB())])

In [None]:
params = [
    {'preproc': [TfidfVectorizer()],
     'preproc__tokenizer': [dummy_tokenizer],
     'preproc__preprocessor': [dummy_tokenizer],
     'preproc__token_pattern': [None],
     #'preproc__min_df': [1, 10, 20, 50, 100, 200],
     'preproc__min_df': np.arange(1,6,1),
     #'preproc__max_features': [None, 100, 200, 500],
     'preproc__ngram_range': [(1,1), (1,2), (1,3)],
     #'preproc__ngram_range': [(1,1)],
     
     'nb': [MultinomialNB()],
     #'nb__alpha': [0.0001, 0.001, 0.01, 0.1, 1, 10, 100]
     #'nb__alpha': [0.01, 0.05, 0.08, 0.1, 0.5, 0.8, 1, 5, 8]
     'nb__alpha': np.arange(0.1,0.6,0.01)
    }
]

In [None]:
clf = GridSearchCV(estimator=pipe, param_grid=params, scoring=scoring, refit='f1_macro',
                   cv=3, return_train_score=True, n_jobs=-1, verbose=0)

In [None]:
clf.fit(train['arg_tok'], y_train.ravel())

In [None]:
clf.best_params_

In [None]:
#pd.DataFrame(clf.cv_results_).sort_values(by='rank_test_f1_macro')[['mean_test_f1_macro', 'param_nb', 'param_preproc__ngram_range', 'param_preproc__min_df']][:50]
#tmp = pd.DataFrame(clf.cv_results_).sort_values(by='rank_test_f1_macro')[['mean_test_f1_macro', 'param_nb', 'param_preproc__ngram_range', 'param_preproc__min_df']][:600]
#tmp.groupby(by='param_preproc__min_df').count()


In [None]:
#pd.DataFrame(clf.cv_results_).to_csv(models_path+'nb_gridsearch.csv')

#### Evaluation

In [None]:
best_clf = clf.best_estimator_
best_clf.fit(train['arg_tok'], y_train.ravel())
pred_test = best_clf.predict(test['arg_tok'])
y_test = lb.transform(test['stance'])

In [None]:
cm = confusion_matrix(y_test, pred_test)
fig, ax = plt.subplots(figsize=(5,5))
ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=lb.inverse_transform(clf.classes_)).plot(ax=ax)
plt.savefig(plots_path+'nb_cm.png', bbox_inches ="tight")

In [None]:
print(classification_report(y_test, pred_test))

In [None]:
tmp = test.copy()
tmp['pred'] = lb.inverse_transform(pred_test)

In [None]:
tmp['TP'] = tmp.apply(lambda row: row['stance'] == row['pred'] and row['stance'] == 'PRO', axis=1)
tmp['TN'] = tmp.apply(lambda row: row['stance'] == row['pred'] and row['stance'] == 'CON', axis=1)
tmp['FP'] = tmp.apply(lambda row: row['stance'] != row['pred'] and row['stance'] == 'CON', axis=1)
tmp['FN'] = tmp.apply(lambda row: row['stance'] != row['pred'] and row['stance'] == 'PRO', axis=1)
tmp['T'] = tmp.apply(lambda row: row['stance'] == row['pred'], axis=1)
tmp['F'] = tmp.apply(lambda row: row['stance'] != row['pred'], axis=1)
tmp = tmp.groupby(by='topic').agg({'TP': 'sum',
                                   'TN': 'sum',
                                   'FP': 'sum',
                                   'FN': 'sum',
                                   'T': 'sum',
                                   'F': 'sum'}).reset_index()
tmp.sort_values(by='topic', inplace=True)

In [None]:
plt.bar(tmp['topic'], tmp['T']/(tmp['T']+tmp['F'])*100, label='Correctly predicted')
plt.bar(tmp['topic'], tmp['F']/(tmp['T']+tmp['F'])*100, bottom=tmp['T']/(tmp['T']+tmp['F'])*100, label='Incorrectly predicted')
plt.title('Percentage of correctly and incorrectly predicted arguments by categories')
plt.ylabel('Percentage of arguments')
plt.yticks(np.arange(0,110,10))
plt.xticks(rotation=90)
plt.grid(axis='y', alpha=0.3)
plt.legend()
plt.savefig(plots_path+'nb_prediction_percentage.png', bbox_inches ='tight')
plt.show()

In [None]:
plt.bar(tmp['topic'], tmp['TP'], label='TP')
plt.bar(tmp['topic'], tmp['TN'], bottom=tmp['TP'], label='TN')
plt.bar(tmp['topic'], tmp['FP'], bottom=tmp['TP']+tmp['TN'], label='FP')
plt.bar(tmp['topic'], tmp['FN'], bottom=tmp['TP']+tmp['TN']+tmp['FP'], label='FN')
plt.title('Confusion matrix by categories')
plt.ylabel('# of arguments')
plt.yticks(np.arange(0,65,5))
plt.xticks(rotation=90)
plt.grid(axis='y', alpha=0.3)
plt.legend()
plt.savefig(plots_path+'nb_cm_categories.png', bbox_inches ='tight')
plt.show()

#### Shap analysis

In [None]:
explainer = shap.Explainer(best_clf.named_steps['nb'].predict,
                           best_clf.named_steps['preproc'].transform(train['arg_tok']).toarray(),
                           feature_names=best_clf.named_steps['preproc'].get_feature_names_out())

In [None]:
## 15min
shap_values = explainer(best_clf.named_steps['preproc'].transform(test['arg_tok'][:20]).toarray(), max_evals='auto')

In [None]:
shap.plots.beeswarm(shap_values, max_display=10, order=shap_values.abs.max(0), show=False)
plt.savefig(plots_path+'nb_shap_beeswarm.png', bbox_inches ='tight')
plt.show()

In [None]:
test['stance'][:10]

In [None]:
#shap.force_plot(shap_values[0])
shap.force_plot(shap_values[5], link='logit', matplotlib=True, show=False) 
plt.savefig(plots_path+'nb_shap_force_PRO.png', bbox_inches ='tight')
plt.show()

In [None]:
shap.force_plot(shap_values[18], link='logit', matplotlib=True, show=False) 
plt.savefig(plots_path+'nb_shap_force_CON.png', bbox_inches ='tight')
plt.show()

In [None]:
shap.plots.heatmap(shap_values, instance_order=shap_values.sum(1), max_display=10, show=False)
plt.savefig(plots_path+'nb_shap_heatmap.png', bbox_inches ='tight')
plt.show()

In [None]:
shap.plots.waterfall(shap_values[5], max_display=10, show=False)
plt.savefig(plots_path+'nb_shap_waterfall_PRO.png', bbox_inches ='tight')
plt.show()

In [None]:
shap.plots.waterfall(shap_values[18], max_display=10, show=False)
plt.savefig(plots_path+'nb_shap_waterfall_CON.png', bbox_inches ='tight')
plt.show()

### 3.2 SVM

#### Grid search

In [None]:
def dummy_tokenizer(sentence):
    return sentence

In [None]:
scoring = ['accuracy', 'f1_macro', 'precision', 'recall']

In [None]:
pipe = Pipeline([('preproc', TfidfVectorizer()), ('svm', SVC())])

In [None]:
params = [
    #{'preproc': [TfidfVectorizer()],
     #'preproc__tokenizer': [dummy_tokenizer],
     #'preproc__preprocessor': [dummy_tokenizer],
     #'preproc__token_pattern': [None],
     #'preproc__min_df': [1, 10, 20, 50, 100],
     #'preproc__min_df': np.arange(1,11,1),
     #'preproc__max_features': [None, 100, 200, 300, 400, 500, 600],
     #'preproc__ngram_range': [(1,1), (1,2), (1,3), (2,3), (1,4)],
     #'preproc__ngram_range': [(1,1), (1,2), (1,3)],
     #'preproc__ngram_range': [(1,1)],
     
     #'svm': [SVC()],
     #'svm__C': [0.1, 1, 2, 5, 10, 50],
     #'svm__C': np.arange(1,20,1),
     #'svm__C': np.arange(1,6,1),
     #'svm__kernel': ['poly'],
     #'svm__degree': [2, 3, 4, 5],
     #'svm__degree': np.arange(2,10,1),
     #'svm__degree': np.arange(2,6,1),
     #'svm__gamma': ['scale'],
     #'svm__shrinking': [True, False],     
    #},
    
    {'preproc': [TfidfVectorizer()],
     'preproc__tokenizer': [dummy_tokenizer],
     'preproc__preprocessor': [dummy_tokenizer],
     'preproc__token_pattern': [None],
     #'preproc__min_df': [1, 10, 20, 50, 100],
     #'preproc__min_df': np.arange(1,10,1),
     'preproc__min_df': np.arange(1,6,1),
     #'preproc__max_features': [None, 100, 200, 300, 400, 500, 600],
     #'preproc__ngram_range': [(1,1), (1,2), (1,3), (2,3), (1,4)],
     'preproc__ngram_range': [(1,1), (1,2), (1,3)],
     #'preproc__ngram_range': [(1,1)],
     
     'svm': [SVC()],
     #'svm__C': [0.1, 1, 2, 5, 10, 50],
     'svm__C': np.arange(0.1,2.5,0.1),
     #'svm__C': [1, 5, 10],
     #'svm__kernel': ['rbf', 'sigmoid'],
     'svm__kernel': ['rbf'],
     'svm__gamma': ['scale'],
     #'svm__shrinking': [True, False],     
    },
]

In [None]:
clf = GridSearchCV(estimator=pipe, param_grid=params, scoring=scoring, refit='f1_macro',
                   cv=3, return_train_score=True, n_jobs=-1, verbose=0)

In [None]:
clf.fit(train['arg_tok'], y_train.ravel())

In [None]:
clf.best_params_

In [None]:
#pd.DataFrame(clf.cv_results_).sort_values(by='rank_test_f1_macro')[['mean_test_f1_macro', 'param_svm__kernel', 'param_svm__C', 'param_preproc__ngram_range', 'param_preproc__min_df']][:60]
#tmp = pd.DataFrame(clf.cv_results_).sort_values(by='rank_test_f1_macro')[['mean_test_f1_macro', 'param_svm__C', 'param_svm__degree', 'param_preproc__min_df', 'param_preproc__ngram_range']][:120]
#tmp.groupby(by=['param_preproc__min_df', 'param_preproc__ngram_range']).count()


In [None]:
#pd.DataFrame(clf.cv_results_).to_csv(models_path+'svc_gridsearch3.csv')

#### Evaluation

In [None]:
best_clf = clf.best_estimator_
best_clf.fit(train['arg_tok'], y_train.ravel())
pred_test = best_clf.predict(test['arg_tok'])
y_test = lb.transform(test['stance'])

In [None]:
cm = confusion_matrix(y_test, pred_test)
fig, ax = plt.subplots(figsize=(5,5))
ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=lb.inverse_transform(clf.classes_)).plot(ax=ax)
plt.savefig(plots_path+'svc_cm.png', bbox_inches ="tight")

In [None]:
print(classification_report(y_test, pred_test))

In [None]:
tmp = test.copy()
tmp['pred'] = lb.inverse_transform(pred_test)

In [None]:
tmp['TP'] = tmp.apply(lambda row: row['stance'] == row['pred'] and row['stance'] == 'PRO', axis=1)
tmp['TN'] = tmp.apply(lambda row: row['stance'] == row['pred'] and row['stance'] == 'CON', axis=1)
tmp['FP'] = tmp.apply(lambda row: row['stance'] != row['pred'] and row['stance'] == 'CON', axis=1)
tmp['FN'] = tmp.apply(lambda row: row['stance'] != row['pred'] and row['stance'] == 'PRO', axis=1)
tmp['T'] = tmp.apply(lambda row: row['stance'] == row['pred'], axis=1)
tmp['F'] = tmp.apply(lambda row: row['stance'] != row['pred'], axis=1)
tmp = tmp.groupby(by='topic').agg({'TP': 'sum',
                                   'TN': 'sum',
                                   'FP': 'sum',
                                   'FN': 'sum',
                                   'T': 'sum',
                                   'F': 'sum'}).reset_index()
tmp.sort_values(by='topic', inplace=True)

In [None]:
plt.bar(tmp['topic'], tmp['T']/(tmp['T']+tmp['F'])*100, label='Correctly predicted')
plt.bar(tmp['topic'], tmp['F']/(tmp['T']+tmp['F'])*100, bottom=tmp['T']/(tmp['T']+tmp['F'])*100, label='Incorrectly predicted')
plt.title('Percentage of correctly and incorrectly predicted arguments by categories')
plt.ylabel('Percentage of arguments')
plt.yticks(np.arange(0,110,10))
plt.xticks(rotation=90)
plt.grid(axis='y', alpha=0.3)
plt.legend()
plt.savefig(plots_path+'svc_prediction_percentage.png', bbox_inches ='tight')
plt.show()

In [None]:
plt.bar(tmp['topic'], tmp['TP'], label='TP')
plt.bar(tmp['topic'], tmp['TN'], bottom=tmp['TP'], label='TN')
plt.bar(tmp['topic'], tmp['FP'], bottom=tmp['TP']+tmp['TN'], label='FP')
plt.bar(tmp['topic'], tmp['FN'], bottom=tmp['TP']+tmp['TN']+tmp['FP'], label='FN')
plt.title('Confusion matrix by categories')
plt.ylabel('# of arguments')
plt.yticks(np.arange(0,65,5))
plt.xticks(rotation=90)
plt.grid(axis='y', alpha=0.3)
plt.legend()
plt.savefig(plots_path+'svc_cm_categories.png', bbox_inches ='tight')
plt.show()

#### Shap analysis

In [None]:
explainer = shap.Explainer(best_clf.named_steps['svm'].predict,
                           best_clf.named_steps['preproc'].transform(train['arg_tok']).toarray(),
                           feature_names=best_clf.named_steps['preproc'].get_feature_names_out())

In [None]:
shap_values = explainer(best_clf.named_steps['preproc'].transform(test['arg_tok'][:20]).toarray(),
                        max_evals='auto')

In [None]:
shap.plots.beeswarm(shap_values, max_display=10, order=shap_values.abs.max(0), show=False)
plt.savefig(plots_path+'svc_shap_beeswarm.png', bbox_inches ='tight')
plt.show()

In [None]:
test['stance'][:10]

In [None]:
#shap.force_plot(shap_values[0])
shap.force_plot(shap_values[5], link='logit', matplotlib=True, show=False) 
plt.savefig(plots_path+'svc_shap_force_PRO.png', bbox_inches ='tight')
plt.show()

In [None]:
shap.force_plot(shap_values[2], link='logit', matplotlib=True, show=False) 
plt.savefig(plots_path+'svc_shap_force_CON.png', bbox_inches ='tight')
plt.show()

In [None]:
shap.plots.heatmap(shap_values, instance_order=shap_values.sum(1), max_display=10, show=False)
plt.savefig(plots_path+'svc_shap_heatmap.png', bbox_inches ='tight')
plt.show()

In [None]:
shap.plots.waterfall(shap_values[5], max_display=10, show=False)
plt.savefig(plots_path+'svc_shap_waterfall_PRO.png', bbox_inches ='tight')
plt.show()

In [None]:
shap.plots.waterfall(shap_values[2], max_display=10, show=False)
plt.savefig(plots_path+'svc_shap_waterfall_CON.png', bbox_inches ='tight')
plt.show()

### 3.3 BERT

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments, EarlyStoppingCallback, pipeline
import torch
from datasets import Dataset, DatasetDict
from sklearn.model_selection import train_test_split
import evaluate
import json

#### Load and encode the dataset

In [None]:
train_bert, val_bert = train_test_split(train, test_size=0.2, random_state=42)

In [None]:
train_bert = Dataset.from_pandas(train_bert[['argument', 'stance']], split='train', preserve_index=False)
val_bert = Dataset.from_pandas(val_bert[['argument', 'stance']], split='validation', preserve_index=False)
test_bert = Dataset.from_pandas(test[['argument', 'stance']], split='test', preserve_index=False)

In [None]:
ibm_dataset = DatasetDict(train=train_bert, val=val_bert, test=test_bert)

In [None]:
id2label = {0: "CON", 1: "PRO"}
label2id = {"CON": 0, "PRO": 1}
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

In [None]:
def preprocess_data(data):
    encoding = tokenizer(data['argument'], padding=True, truncation=True)
    encoding['labels'] = [label2id[l] for l in data['stance']]
    return encoding

In [None]:
tokenized_dataset = ibm_dataset.map(preprocess_data, batched=True, batch_size=16, remove_columns=['argument', 'stance'])

In [None]:
tokenized_dataset.set_format('torch')

#### Finetune the model

In [None]:
model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2, id2label=id2label, label2id=label2id)

In [None]:
arguments = TrainingArguments(
    output_dir=models_path+'bert',
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=5,
    evaluation_strategy='epoch',
    save_strategy='epoch',
    learning_rate=2e-5,
    weight_decay=0.01,
    metric_for_best_model='f1',
    load_best_model_at_end=True,
)

In [None]:
metrics = evaluate.combine(["accuracy", "f1", "precision", "recall"])

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return metrics.compute(predictions=predictions, references=labels, average='macro')

In [None]:
trainer = Trainer(
    model=model,
    args=arguments,
    train_dataset=tokenized_dataset['train'],
    eval_dataset=tokenized_dataset['val'],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

trainer.add_callback(EarlyStoppingCallback())

In [None]:
trainer.train()

In [None]:
trainer.state.save_to_json(models_path+'bert/training_state.json')

In [None]:
with open(models_path+'bert/train_metrics.json', 'w') as fp:
    json.dump(trainer.evaluate(tokenized_dataset['train']), fp)

In [None]:
with open(models_path+'bert/val_metrics.json', 'w') as fp:
    json.dump(trainer.evaluate(tokenized_dataset['val']), fp)

In [None]:
with open(models_path+'bert/test_metrics.json', 'w') as fp:
    json.dump(trainer.evaluate(tokenized_dataset['test']), fp)

In [None]:
#test_pred = trainer.predict(tokenized_dataset['test'])
#y_pred = test_pred.predictions.argmax(axis=-1)

#### Evaluate the model

In [None]:
finetuned_model = AutoModelForSequenceClassification.from_pretrained(models_path+'bert/checkpoint-240')
id2label = {0: "CON", 1: "PRO"}
label2id = {"CON": 0, "PRO": 1}
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

In [None]:
model_inputs = tokenizer(test['argument'].to_list(), return_tensors='pt', padding=True, truncation=True)

In [None]:
with torch.no_grad():
    pred = finetuned_model(**model_inputs)

In [None]:
y_pred = torch.argmax(pred.logits, axis=-1).numpy()

In [None]:
y_test = [label2id[l] for l in test['stance']]

In [None]:
cm = confusion_matrix(y_test, y_pred, labels=[0,1])
fig, ax = plt.subplots(figsize=(5,5))
ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['CON', 'PRO']).plot(ax=ax)
plt.savefig(plots_path+'bert_cm.png', bbox_inches ="tight")

In [None]:
print(classification_report(y_test, y_pred))

In [None]:
tmp = test.copy()
tmp['pred'] = [id2label[i] for i in y_pred]

In [None]:
tmp['TP'] = tmp.apply(lambda row: row['stance'] == row['pred'] and row['stance'] == 'PRO', axis=1)
tmp['TN'] = tmp.apply(lambda row: row['stance'] == row['pred'] and row['stance'] == 'CON', axis=1)
tmp['FP'] = tmp.apply(lambda row: row['stance'] != row['pred'] and row['stance'] == 'CON', axis=1)
tmp['FN'] = tmp.apply(lambda row: row['stance'] != row['pred'] and row['stance'] == 'PRO', axis=1)
tmp['T'] = tmp.apply(lambda row: row['stance'] == row['pred'], axis=1)
tmp['F'] = tmp.apply(lambda row: row['stance'] != row['pred'], axis=1)
tmp = tmp.groupby(by='topic').agg({'TP': 'sum',
                                   'TN': 'sum',
                                   'FP': 'sum',
                                   'FN': 'sum',
                                   'T': 'sum',
                                   'F': 'sum'}).reset_index()
tmp.sort_values(by='topic', inplace=True)

In [None]:
plt.bar(tmp['topic'], tmp['T']/(tmp['T']+tmp['F'])*100, label='Correctly predicted')
plt.bar(tmp['topic'], tmp['F']/(tmp['T']+tmp['F'])*100, bottom=tmp['T']/(tmp['T']+tmp['F'])*100, label='Incorrectly predicted')
plt.title('Percentage of correctly and incorrectly predicted arguments by categories')
plt.ylabel('Percentage of arguments')
plt.yticks(np.arange(0,110,10))
plt.xticks(rotation=90)
plt.grid(axis='y', alpha=0.3)
plt.legend()
plt.savefig(plots_path+'bert_prediction_percentage.png', bbox_inches ='tight')
plt.show()

In [None]:
plt.bar(tmp['topic'], tmp['TP'], label='TP')
plt.bar(tmp['topic'], tmp['TN'], bottom=tmp['TP'], label='TN')
plt.bar(tmp['topic'], tmp['FP'], bottom=tmp['TP']+tmp['TN'], label='FP')
plt.bar(tmp['topic'], tmp['FN'], bottom=tmp['TP']+tmp['TN']+tmp['FP'], label='FN')
plt.title('Confusion matrix by categories')
plt.ylabel('# of arguments')
plt.yticks(np.arange(0,65,5))
plt.xticks(rotation=90)
plt.grid(axis='y', alpha=0.3)
plt.legend()
plt.savefig(plots_path+'bert_cm_categories.png', bbox_inches ='tight')
plt.show()

#### Shap analysis

In [None]:
pred = pipeline("text-classification", model=finetuned_model, tokenizer=tokenizer)

In [None]:
explainer = shap.Explainer(pred)

In [None]:
shap_values = explainer(test['argument'][:20])

In [None]:
shap.plots.bar(shap_values[:,:,1].mean(0), max_display=10, show=False)
plt.savefig(plots_path+'bert_shap_PRO.png', bbox_inches ='tight')
plt.show()

In [None]:
shap.plots.bar(shap_values[:,:,0].mean(0), max_display=10, show=False)
plt.savefig(plots_path+'bert_shap_CON.png', bbox_inches ='tight')
plt.show()

In [None]:
test['stance'][0:10]

In [None]:
shap.plots.waterfall(shap_values[0,:,1], max_display=10, show=False)
plt.savefig(plots_path+'bert_shap_waterfall_PRO.png', bbox_inches ='tight')
plt.show()

In [None]:
shap.plots.waterfall(shap_values[11,:,0], max_display=10, show=False)
plt.savefig(plots_path+'bert_shap_waterfall_CON.png', bbox_inches ='tight')
plt.show()

In [None]:
shap.plots.text(shap_values[11,:,0])

### 3.4 Prompt tuning GPT2

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments, EarlyStoppingCallback, pipeline
from peft import PromptTuningConfig, PromptTuningInit, PeftType, TaskType, get_peft_model, PromptEncoderConfig, PeftConfig, PeftModel
import torch
from datasets import Dataset, DatasetDict
from sklearn.model_selection import train_test_split
import evaluate
import json

#### Load and encode the dataset

In [None]:
train_bert, val_bert = train_test_split(train, test_size=0.2, random_state=42)

In [None]:
train_bert = Dataset.from_pandas(train_bert[['argument', 'stance']], split='train', preserve_index=False)
val_bert = Dataset.from_pandas(val_bert[['argument', 'stance']], split='validation', preserve_index=False)
test_bert = Dataset.from_pandas(test[['argument', 'stance']], split='test', preserve_index=False)
ibm_dataset = DatasetDict(train=train_bert, val=val_bert, test=test_bert)

In [None]:
id2label = {0: "CON", 1: "PRO"}
label2id = {"CON": 0, "PRO": 1}
tokenizer = AutoTokenizer.from_pretrained("gpt2", truncation=True, padding_side='left')
tokenizer.pad_token_id = tokenizer.eos_token_id

In [None]:
def preprocess_data(data):
    encoding = tokenizer(data['argument'], padding=True)
    encoding['labels'] = [label2id[l] for l in data['stance']]
    return encoding

In [None]:
tokenized_dataset = ibm_dataset.map(preprocess_data, batched=True, batch_size=16, remove_columns=['argument', 'stance'])
tokenized_dataset.set_format('torch')

#### Prompt tuning of the model

In [None]:
peft_config = PromptEncoderConfig(#PromptTuningConfig(
    #peft_type=PeftType.PROMPT_TUNING,
    peft_type=PeftType.P_TUNING,
    task_type=TaskType.SEQ_CLS,
    #prompt_tuning_init=PromptTuningInit.TEXT,
    num_virtual_tokens=16,
    #prompt_tuning_init_text='Detect if the stance of this tweet is PRO or CON:',
    #tokenizer_name_or_path='gpt2',
)

In [None]:
model = AutoModelForSequenceClassification.from_pretrained("gpt2",
                                                           num_labels=2,
                                                           id2label=id2label,
                                                           label2id=label2id)
model.config.pad_token_id = tokenizer.pad_token_id

In [None]:
model = get_peft_model(model, peft_config)

In [None]:
arguments = TrainingArguments(
    output_dir=models_path+'gpt2',
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=5,
    evaluation_strategy='epoch',
    save_strategy='epoch',
    learning_rate=2e-5,
    weight_decay=0.01,
    metric_for_best_model='f1',
    load_best_model_at_end=True,
)

In [None]:
metrics = evaluate.combine(["accuracy", "f1", "precision", "recall"])

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=1)
    return metrics.compute(predictions=predictions, references=labels, average='macro')

In [None]:
trainer = Trainer(
    model=model,
    args=arguments,
    train_dataset=tokenized_dataset['train'],
    eval_dataset=tokenized_dataset['val'],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

trainer.add_callback(EarlyStoppingCallback())

In [None]:
trainer.train()

In [None]:
trainer.save_model(models_path+'gpt2/final_model')
trainer.model.config.to_json_file(models_path+'gpt2/final_model/config.json')

In [None]:
trainer.state.save_to_json(models_path+'gpt2/training_state.json')

In [None]:
with open(models_path+'gpt2/train_metrics.json', 'w') as fp:
    json.dump(trainer.evaluate(tokenized_dataset['train']), fp)

In [None]:
with open(models_path+'gpt2/val_metrics.json', 'w') as fp:
    json.dump(trainer.evaluate(tokenized_dataset['val']), fp)

In [None]:
with open(models_path+'gpt2/test_metrics.json', 'w') as fp:
    json.dump(trainer.evaluate(tokenized_dataset['test']), fp)

In [None]:
#test_pred = trainer.predict(tokenized_dataset['test'])
#y_pred = test_pred.predictions.argmax(axis=1)

In [None]:
#y_test = tokenized_dataset['test']['labels']

In [None]:
#print(classification_report(y_test, test_pred.predictions.argmax(axis=1)))

#### Evaluate the model

In [None]:
config = PeftConfig.from_pretrained(models_path+'gpt2/final_model')
inference_model = AutoModelForSequenceClassification.from_pretrained(config.base_model_name_or_path)

tokenizer = AutoTokenizer.from_pretrained(config.base_model_name_or_path, padding_side='left')
tokenizer.pad_token_id = tokenizer.eos_token_id

ptuned_model = PeftModel.from_pretrained(inference_model, models_path+'gpt2/final_model')
ptuned_model.config.pad_token_id = tokenizer.pad_token_id

In [None]:
id2label = {0: "CON", 1: "PRO"}
label2id = {"CON": 0, "PRO": 1}

In [None]:
model_inputs = tokenizer(test['argument'].to_list(), return_tensors='pt', padding=True, truncation=True)

In [None]:
with torch.no_grad():
    pred = ptuned_model(**model_inputs)

In [None]:
y_pred = torch.argmax(pred.logits, axis=1).numpy()
y_test = [label2id[l] for l in test['stance']]

In [None]:
cm = confusion_matrix(y_test, y_pred, labels=[0,1])
fig, ax = plt.subplots(figsize=(5,5))
ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['CON', 'PRO']).plot(ax=ax)
plt.savefig(plots_path+'gpt2_cm.png', bbox_inches ="tight")

In [None]:
print(classification_report(y_test, y_pred))

In [None]:
tmp = test.copy()
tmp['pred'] = [id2label[i] for i in y_pred]

In [None]:
tmp['TP'] = tmp.apply(lambda row: row['stance'] == row['pred'] and row['stance'] == 'PRO', axis=1)
tmp['TN'] = tmp.apply(lambda row: row['stance'] == row['pred'] and row['stance'] == 'CON', axis=1)
tmp['FP'] = tmp.apply(lambda row: row['stance'] != row['pred'] and row['stance'] == 'CON', axis=1)
tmp['FN'] = tmp.apply(lambda row: row['stance'] != row['pred'] and row['stance'] == 'PRO', axis=1)
tmp['T'] = tmp.apply(lambda row: row['stance'] == row['pred'], axis=1)
tmp['F'] = tmp.apply(lambda row: row['stance'] != row['pred'], axis=1)
tmp = tmp.groupby(by='topic').agg({'TP': 'sum',
                                   'TN': 'sum',
                                   'FP': 'sum',
                                   'FN': 'sum',
                                   'T': 'sum',
                                   'F': 'sum'}).reset_index()
tmp.sort_values(by='topic', inplace=True)

In [None]:
plt.bar(tmp['topic'], tmp['T']/(tmp['T']+tmp['F'])*100, label='Correctly predicted')
plt.bar(tmp['topic'], tmp['F']/(tmp['T']+tmp['F'])*100, bottom=tmp['T']/(tmp['T']+tmp['F'])*100, label='Incorrectly predicted')
plt.title('Percentage of correctly and incorrectly predicted arguments by categories')
plt.ylabel('Percentage of arguments')
plt.yticks(np.arange(0,110,10))
plt.xticks(rotation=90)
plt.grid(axis='y', alpha=0.3)
plt.legend()
plt.savefig(plots_path+'gpt2_prediction_percentage.png', bbox_inches ='tight')
plt.show()

In [None]:
plt.bar(tmp['topic'], tmp['TP'], label='TP')
plt.bar(tmp['topic'], tmp['TN'], bottom=tmp['TP'], label='TN')
plt.bar(tmp['topic'], tmp['FP'], bottom=tmp['TP']+tmp['TN'], label='FP')
plt.bar(tmp['topic'], tmp['FN'], bottom=tmp['TP']+tmp['TN']+tmp['FP'], label='FN')
plt.title('Confusion matrix by categories')
plt.ylabel('# of arguments')
plt.yticks(np.arange(0,65,5))
plt.xticks(rotation=90)
plt.grid(axis='y', alpha=0.3)
plt.legend()
plt.savefig(plots_path+'gpt2_cm_categories.png', bbox_inches ='tight')
plt.show()

#### Shap analysis

In [None]:
pred = pipeline("text-classification", model=ptuned_model, tokenizer=tokenizer)
explainer = shap.Explainer(pred)

In [None]:
shap_values = explainer(test['argument'][:20])

In [None]:
shap.plots.bar(shap_values[:,:,1].mean(0), max_display=10, show=False)
plt.savefig(plots_path+'gpt2_shap_PRO.png', bbox_inches ='tight')
plt.show()

In [None]:
shap.plots.bar(shap_values[:,:,0].mean(0), max_display=10, show=False)
plt.savefig(plots_path+'gpt2_shap_CON.png', bbox_inches ='tight')
plt.show()

In [None]:
test['stance'][:10]

In [None]:
shap.plots.waterfall(shap_values[5,:,1], max_display=10, show=False)
plt.savefig(plots_path+'gpt2_shap_waterfall_PRO.png', bbox_inches ='tight')
plt.show()

In [None]:
shap.plots.waterfall(shap_values[2,:,0], max_display=10, show=False)
plt.savefig(plots_path+'gpt2_shap_waterfall_CON.png', bbox_inches ='tight')
plt.show()

In [None]:
shap.plots.text(shap_values[5,:,1])