<a href="https://colab.research.google.com/github/filippomenegatti/Hate_Detection/blob/main/Code_Sentiment_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Hate Speech Analysis

## Librerie

In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import requests
import string
import re
from wordcloud import WordCloud

import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import *
nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')

from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.model_selection import KFold, StratifiedKFold, train_test_split, GridSearchCV, cross_val_score
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.naive_bayes import MultinomialNB
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn import linear_model, datasets, metrics
from sklearn.metrics import plot_confusion_matrix

import warnings
import numpy as np
from xgboost.sklearn import XGBClassifier
warnings.filterwarnings('ignore')

## Set Up e Funzioni

In [None]:
nltk_stopwords = stopwords.words('english')
nltk_stopwords.extend(['rt'])

In [None]:
def preprocessing(text, tokenize = False, stopwords = [], alphabetic=True, stem=False, lem=False, proc_type=None):

  '''
  This function to perform some preprocessing on the data:
    - The standard list of stopwords is empty and can be arbitrarily replaced.
    - It is possible to choose to remove or keep the non-alphabetical tokens.
    - it is possible to choose which kind of elaboration perform between nothing, lemmatization, and stemming.
    - ...
  '''
  
  #create tokens from text
  tokens = word_tokenize(text.lower())

  if alphabetic == True: # remove all tokens that are not alphabetic
    tokens = [word for word in tokens if word.isalpha()]
  else:
    tokens = tokens
 
  # remove all stopwords
  tokens = [word for word in tokens if not word in stopwords]

  # perform stemming or lemmatization and choose what type
  if stem == True:
    method = proc_type
    stem_tokens = [method.stem(token) for token in tokens]
    if tokenize == False:
      stem_tokens = ' '.join(stem_tokens)
      return stem_tokens
    else:
      return stem_tokens
  
  elif lem == True:
    method = proc_type
    lem_tokens = [method.lemmatize(token) for token in tokens]
    if tokenize == False:
      lem_tokens = ' '.join(lem_tokens)
      return lem_tokens
    else:
      return lem_tokens
  
  else:
    if tokenize == False:
      tokens = ' '.join(tokens)
      return tokens
    else:
      return tokens

In [None]:
def tfidf_vec(input, ngrams):
  vectorizer = TfidfVectorizer(
      ngram_range=ngrams,
      use_idf=True,
      lowercase = False,
      smooth_idf=False,
      decode_error='replace',
      max_features=10000,
      min_df=5,
      max_df=0.501
      )
  matrix = vectorizer.fit_transform(input).toarray()
  return matrix

## Caricamento del data set classificato

Download of labeled data.

In [None]:
labeled_data = pd.read_csv('https://raw.githubusercontent.com/t-davidson/hate-speech-and-offensive-language/master/data/labeled_data.csv')
pd.set_option('display.max_colwidth', None)

labeled_data = labeled_data.drop(columns=['Unnamed: 0'])
display(labeled_data)

In [None]:
labeled_data['clean_sents'] = labeled_data.apply(lambda row: preprocessing(row['tweet'], stopwords = ['rt'],
                                                                           stem = True, lem = False, proc_type = PorterStemmer(), tokenize = False), axis=1)
labeled_data.head(10)

In [None]:
labeled_data.drop(['count', 'hate_speech', 'offensive_language', 'neither', 'tweet'],axis=1,inplace=True)

In [None]:
#use only if tokenize was set equal to false (default) in preprocessing function
labeled_data['tokens'] = labeled_data['clean_sents'].map(lambda x: word_tokenize(x))

labeled_data.head()

In [None]:
#Get POS tags for tweets and save as a string
tweet_tags = []
for t in labeled_data['tokens']:
  tags = nltk.pos_tag(t)
  tag_list = [x[1] for x in tags]
  tag_str = " ".join(tag_list)
  tweet_tags.append(tag_str)

In [None]:
#We can use the TFIDF vectorizer to get a token matrix for the POS tags
pos_vectorizer = TfidfVectorizer(
    tokenizer=None,
    lowercase=False,
    preprocessor=None,
    ngram_range=(1, 3),
    stop_words=None,
    use_idf=False,
    smooth_idf=False,
    norm=None,
    decode_error='replace',
    max_features=5000,
    min_df=5,
    max_df=0.501,
    )

pos = pos_vectorizer.fit_transform(pd.Series(tweet_tags)).toarray()

In [None]:
X = tfidf_vec(labeled_data.clean_sents, (1,3))
y = labeled_data['class']

In [None]:
X_pos = np.concatenate([X, pos], axis = 1)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X_pos, y, test_size=0.25, random_state=42)

## SMOTE Algorithm

In [None]:
pd.value_counts(y_train)

In [None]:
plt.bar(np.arange(0, 3), np.bincount(y))
plt.title('Frequency of the classes')
plt.xlabel('classes')
plt.ylabel('frequency')
plt.show()

In [None]:
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler

In [None]:
# transform the dataset
strategy = {1:8638}
undersample = RandomUnderSampler(sampling_strategy=strategy)
X_train_smote, y_train_smote = undersample.fit_resample(X_train, y_train)

In [None]:
X_train_smote.shape

In [None]:
pd.value_counts(y_train_smote)

## save/load dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/My\ Drive/Sentiment_data

In [None]:
import pickle

In [None]:
# with open('portstem_data.pickle', 'wb') as output:
#     pickle.dump(X_train_smote, output)
# with open('portstem_target.pickle', 'wb') as output:
#     pickle.dump(y_train_smote, output)
# with open('portstem_test.pickle', 'wb') as output:
#     pickle.dump(X_test, output)
# with open('portstem_test_target.pickle', 'wb') as output:
#     pickle.dump(y_test, output)

In [None]:
#load data

with open('portstem_target.pickle', 'rb') as data:
    y_train = pickle.load(data)
with open('portstem_data.pickle', 'rb') as data:
    X_train = pickle.load(data)

with open('portstem_test_target.pickle', 'rb') as data:
    y_test = pickle.load(data)
with open('portstem_test.pickle', 'rb') as data:
    X_test = pickle.load(data)


## Modelli base

### Random Forest

In [None]:
classifier = RandomForestClassifier(n_estimators=300, random_state=42, verbose = 40, class_weight='balanced')
classifier.fit(X_train, y_train) 
y_pred = classifier.predict(X_test)

In [None]:
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score

print(confusion_matrix(y_test,y_pred))
print(classification_report(y_test,y_pred))
print('The Accuracy Score is:',accuracy_score(y_test, y_pred))
print('The Weighted F1 Score is:',f1_score(y_test, y_pred, average='weighted'))

In [None]:
import tensorflow as tf
from sklearn import linear_model, datasets, metrics

In [None]:
conf_matrix=tf.math.confusion_matrix(y_test,y_pred,num_classes=3).numpy()
con_mat_norm = np.around(conf_matrix.astype('float') / conf_matrix.sum(axis=1)[:, np.newaxis], decimals=2)
print(con_mat_norm)
plt.matshow(con_mat_norm,cmap=plt.cm.hot)
plt.show()

In [None]:
from sklearn.metrics import confusion_matrix, classification_report
from matplotlib import pyplot as plt
import seaborn as sns

def plot_confusion_matrix(y_test,y_scores, classNames):
    # y_test=np.argmax(y_test, axis=1)
    # y_scores=np.argmax(y_scores, axis=1)
    classes = len(classNames)
    cm = confusion_matrix(y_test, y_scores)
    print("**** Confusion Matrix ****")
    print(cm)
    print("**** Classification Report ****")
    print(classification_report(y_test, y_scores, target_names=classNames))
    con = np.zeros((classes,classes))
    for x in range(classes):
        for y in range(classes):
            con[x,y] = cm[x,y]/np.sum(cm[x,:])

    plt.figure(figsize=(5,5))
    sns.set(font_scale=1.0) # for label size
    df = sns.heatmap(con, annot=True,fmt='.2', xticklabels= classNames , yticklabels= classNames)
    df.figure.savefig("image2.png")

classNames = ['0', '1', '2'] 
plot_confusion_matrix(y_test,y_pred, classNames) 

### Naive Bayes

In [None]:
### Naive Bayes
alpha = np.linspace(1, 100, 100)
nb_grid = GridSearchCV(estimator= MultinomialNB(),
                       param_grid=dict(
                           alpha = alpha),
                  scoring="f1_weighted",
                  cv = 5)
nb_grid.fit(X_train_smote, y_train_smote)

nb_grid_best_hyperparameter = nb_grid.best_estimator_.alpha

nb_best_hyper = MultinomialNB(alpha = nb_grid_best_hyperparameter)
nb_best_hyper.fit(X_train, y_train)

np_y_preds = nb_best_hyper.predict(X_test)

In [None]:
print('The best score is:',nb_grid.best_score_,'\n')
print('The best hyperparameter found is:',nb_grid_best_hyperparameter,'\n')
print('The classification report is: \n',classification_report(y_test, np_y_preds))
print('The Weighted F1 Score is:',f1_score(y_test, np_y_preds, average='weighted'))

### Support Vector Machine

In [None]:
svm = SVC()
svm.fit(X_train,y_train)
y_pred = svm.predict(X_test)
print(confusion_matrix(y_test,y_pred))
print(classification_report(y_test,y_pred))
print(accuracy_score(y_test, y_pred))

In [None]:
param_grid = { 'C':[0.1,1,100],
              'kernel':['rbf','poly','sigmoid','linear'],
              'degree':[1,2,3],
              'gamma': [1, 0.1, 0.01, 0.001]}

In [None]:
grid = GridSearchCV(SVC(),
                    param_grid,
                    refit=True,
                    cv = 5,
                    verbose=40,
                    n_jobs=-1)

grid.fit(X_train_smote, y_train_smote)

In [None]:
print(grid.best_estimator_)

In [None]:
grid_predictions = grid.predict(X_test)
print(confusion_matrix(y_test,grid_predictions))
print(classification_report(y_test,grid_predictions))  #Output

## Sparse Tensor Classifier

In [None]:
!pip install stc

In [None]:
from stc import SparseTensorClassifier

In [None]:
models = {
    'Multinomial NV': MultinomialNB(),
    'Random Forest': RandomForestClassifier(),
    'XGBoost' : XGBClassifier()
}

In [None]:
train, test = train_test_split(labeled_data, test_size=0.25, random_state = 42)
train = train.reset_index(drop=True)
test = test.reset_index(drop=True)
print(train.shape)
print(test.shape)

In [None]:
vectorizer = TfidfVectorizer(tokenizer=nltk.word_tokenize)
X_train = vectorizer.fit_transform(train.clean_sents)
X_test = vectorizer.transform(test.clean_sents)
y_train, y_test = train['class'], test['class']

In [None]:
# transform the dataset
strategy = {1:8638}
undersample = RandomUnderSampler(sampling_strategy=strategy)
X_train_smote, y_train_smote = undersample.fit_resample(X_train, y_train)

In [None]:
for model_name, model in models.items():
    print("Training: {}".format(model_name))
    models[model_name].fit(X_train_smote, y_train_smote)

In [None]:
predictions = {}
for model_name, model in models.items():
    print("Predicting: {}".format(model_name))
    predictions[model_name] = model.predict(X_test)

In [None]:
json_train, json_test = [], []
for i, doc in list(enumerate(train.clean_sents)):
    json_train.append({'words': nltk.word_tokenize(doc), 'target': [train['class'][i]]})
for i, doc in list(enumerate(test.clean_sents)):
    json_test.append({'words': nltk.word_tokenize(doc)})

In [None]:
STC = SparseTensorClassifier(features=['words'], targets=['target'])
STC.fit(json_train)

In [None]:
expl_words = STC.explain()

In [None]:
#Top 10 per target
expl_words['features'] = expl_words['features'].map(lambda x: x.replace("words: ", ""))
hate = expl_words[expl_words.index == "0"]
#hate['features'] = hate['features'].map(lambda x: x.replace("words: ", ""))
hate.head(10)

In [None]:
offensive = expl_words[expl_words.index == "1"]
offensive.head(10)

In [None]:
neutral = expl_words[expl_words.index == "2"]
neutral.head(10)

In [None]:
from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator
from PIL import Image

In [None]:
#Word Cloud
text_dict= dict(hate.values)

# create the WordCloud object
wordcloud = WordCloud(background_color='white', colormap= "Dark2")

# generate the word cloud
wordcloud.generate_from_frequencies(text_dict)

#plot
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.show()

In [None]:
text_dict= dict(offensive.values)

# create the WordCloud object
wordcloud = WordCloud(background_color='white', colormap= "inferno")

# generate the word cloud
wordcloud.generate_from_frequencies(text_dict)

#plot
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.show()

In [None]:
text_dict= dict(neutral.values)

# create the WordCloud object
wordcloud = WordCloud(background_color='white')

# generate the word cloud
wordcloud.generate_from_frequencies(text_dict)

#plot
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.show()

In [None]:
labels, _, _ = STC.predict(json_test, explain=False)
labels = labels.fillna(0)
predictions['Sparse Tensor Classifier'] = labels.target.values.astype(int)

In [None]:
import sklearn.metrics as mtr

In [None]:
E = []
for estimator, y_pred in predictions.items():
    report = mtr.classification_report(y_test, y_pred, output_dict=True, zero_division=0)
    E.append({
        'Model': estimator, 'Accuracy': report['accuracy'],
        'Avg Precision (macro)': report['macro avg']['precision'],
        'Avg Recall (macro)': report['macro avg']['recall'],
        'Avg F1-score (macro)': report['macro avg']['f1-score'],
        'Avg Precision (weighted)': report['weighted avg']['precision'],
        'Avg Recall (weighted)': report['weighted avg']['recall'],
        'Avg F1-score (weighted)': report['weighted avg']['f1-score']
    })
E = pd.DataFrame(E).set_index('Model', inplace=False)

print(E)

## Analysis and tuning

In [None]:
!pip install optuna

In [None]:
import optuna
from optuna.trial import TrialState

### Random Forest

In [None]:
def objective(trial):
  criterion = trial.suggest_categorical('criterion', ['gini', 'entropy'])
  bootstrap = trial.suggest_categorical('bootstrap',['True','False'])
  max_depth = trial.suggest_int('max_depth', 20, 35)
  n_estimators =  trial.suggest_int('n_estimators', 50, 300, step=50)
    
  clf = RandomForestClassifier(bootstrap = bootstrap, criterion = criterion,
                                 max_depth = max_depth,
                                 n_estimators = n_estimators,n_jobs=-1, class_weight='balanced')
  score = cross_val_score(clf, X_train, y_train, cv=5, scoring="f1_weighted").mean()
  return score

In [None]:
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=30)

In [None]:
trial = study.best_trial
print('F1 Score: {}'.format(trial.value))
print("Best hyperparameters: {}".format(trial.params))

In [None]:
optimised_rf = RandomForestClassifier(bootstrap = study.best_params['bootstrap'], criterion = study.best_params['criterion'],
                                    max_depth = study.best_params['max_depth'], n_estimators = study.best_params['n_estimators'], n_jobs=-1, class_weight='balanced')


optimised_rf.fit(X_train ,y_train)

y_pred = optimised_rf.predict(X_test)

print(classification_report(y_test,y_pred))
print('The Accuracy Score is:',accuracy_score(y_test, y_pred))
print('The Weighted F1 Score is:',f1_score(y_test, y_pred, average='weighted'))

# Confusion Matrix
plot_confusion_matrix(optimised_rf, X_test, y_test, normalize= 'true', cmap= 'magma')  
plt.show()

### Multinomial Naive Bayes

In [None]:
def objective(trial):
  alpha = trial.suggest_float('alpha', 0.01, 10, log=True)

  clf = MultinomialNB(alpha=alpha)

  score = cross_val_score(clf, X_train, y_train, cv=5, scoring="f1_weighted").mean()
  return score

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=100)

In [None]:
trial = study.best_trial
print('F1 - Score: {}'.format(trial.value))
print("Best hyperparameters: {}".format(trial.params))

In [None]:
optimised_mnb = MultinomialNB(alpha = study.best_params['alpha'])

optimised_mnb.fit(X_train, y_train)

y_pred = optimised_mnb.predict(X_test)

print(classification_report(y_test,y_pred))
print('The Accuracy Score is:',accuracy_score(y_test, y_pred))  
print('The Weighted F1 Score is:',f1_score(y_test, y_pred, average='weighted'))

# Confusion matrix
plot_confusion_matrix(optimised_mnb, X_test, y_test, normalize= 'true', cmap= 'magma')  
plt.show()

In [None]:
plot_confusion_matrix(optimised_mnb, X_test, y_test, normalize= 'true', cmap= 'magma')  
plt.show()

### XGBoost

In [None]:
from xgboost import XGBClassifier

In [None]:
# optuna's objective function
def objective(trial):
  learning_rate = trial.suggest_float("learning_rate", 0.01, 0.5, log=True)
  max_depth = trial.suggest_int("max_depth", 2, 10,step=2, log=False)
  n_estimators = trial.suggest_int("n_estimators", 100, 300,step=100, log=False)

  model = XGBClassifier(objective= 'multi:softmax',
                        learning_rate = learning_rate,
                        n_estimators = n_estimators,
                        max_depth = max_depth,
                        seed=42)

  # Handle pruning based on the intermediate value.
  if trial.should_prune():
    raise optuna.exceptions.TrialPruned()

  score = cross_val_score(model, X_train, y_train, cv=5, scoring="f1_weighted").mean()
  return score

In [None]:
# study to maximize the accuracy metric
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=5)

In [None]:
trial = study.best_trial
print('F1 Score: {}'.format(trial.value))
print("Best hyperparameters: {}".format(trial.params))

In [None]:
optimised_xgb = XGBClassifier(objective= 'multi:softmax',
                        learning_rate = study.best_params['learning_rate'],
                        n_estimators = study.best_params['n_estimators'],
                        max_depth = study.best_params['max_depth'],
                        seed=42)

optimised_xgb.fit(X_train, y_train)

y_pred = optimised_xgb.predict(X_test)

print(classification_report(y_test,y_pred))
print('The Accuracy Score is:',accuracy_score(y_test, y_pred))
print('The Weighted F1 Score is:',f1_score(y_test, y_pred, average='weighted'))

# Confusion Matrix
plot_confusion_matrix(optimised_xgb, X_test, y_test, normalize= 'true', cmap= 'magma')  
plt.show()