### Import used libraries

**Dataset**
labeled datasset collected from twitter (Lab 1 - Hate Speech.tsv)

**Objective**
classify tweets containing hate speech from other tweets. <br>
0 -> no hate speech <br>
1 -> contains hate speech <br>


**Evaluation metric**
macro f1 score

In [None]:
import pandas as pd
import numpy as np
import random
import re, html, emoji
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
import gensim.downloader as api
from sklearn.metrics import  f1_score

from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.base import TransformerMixin
from scipy.sparse import issparse
from sklearn.model_selection import GridSearchCV

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, Conv1D, GlobalMaxPooling1D, Dense

import matplotlib.pyplot as plt
import seaborn as sns



#pd.set_option('display.max_rows', 500)
pd.set_option('display.max_colwidth', 500)

import warnings
warnings.filterwarnings("ignore")

### Load Dataset

###### Note: search how to load the data from tsv file

In [None]:
data = pd.read_csv("Hate Speech.tsv", sep= "\t", index_col='id')
data.head(100)

In [None]:
data.count()

In [None]:
print(data['label'].value_counts())
print(f"\nclass distribution {data['label'].value_counts()[1]/data['label'].value_counts()[0]}")

### Data splitting

It is a good practice to split the data before EDA helps maintain the integrity of the machine learning process, prevents data leakage, simulates real-world scenarios more accurately, and ensures reliable model performance evaluation on unseen data.

In [None]:
train_size = int(0.7 * len(data))
test_size = val_size = int(0.15 * len(data))
print(f"train_size: {train_size}\ntest_size: {test_size}\nval_size: {val_size}")

In [None]:
train = data[:train_size]
val = data[train_size:train_size+val_size]
test = data[train_size+val_size:]

print(f"train class distribution\n{train['label'].value_counts()}")
print(f"class distribution {train['label'].value_counts()[1]/train['label'].value_counts()[0]}\n")
print(f"val class distribution\n{val['label'].value_counts()}")
print(f"class distribution {val['label'].value_counts()[1]/val['label'].value_counts()[0]}\n")
print(f"test class distribution\n{test['label'].value_counts()}")
print(f"class distribution {test['label'].value_counts()[1]/test['label'].value_counts()[0]}")

class distribution across splits is maintained as original

### EDA on training data

- check NaNs

In [None]:
train.isna().sum()

- check duplicates

In [None]:
train.duplicated(keep ="first").sum()

- show a representative sample of data texts to find out required preprocessing steps

In [None]:
def get_random_sample(data = None):
    n = random.randint(0,22074)
    print(data.iloc[[n],1].item())

for i in range(20):
    print(f"sample {i}")
    get_random_sample(train)

- check dataset balancing

In [None]:
print(data['label'].value_counts())
print(f"\nclass distribution {data['label'].value_counts()[1]/data['label'].value_counts()[0]}")

In [None]:
label_counts = train['label'].value_counts().reset_index()
label_counts.columns = ['label', 'count']

sns.barplot(data=label_counts, x='label', y='count', hue='label')
plt.show()

### Cleaning and Preprocessing

#### Extra: use custom scikit-learn Transformers

Using custom transformers in scikit-learn provides flexibility, reusability, and control over the data transformation process, allowing you to seamlessly integrate with scikit-learn's pipelines, enabling you to combine multiple preprocessing steps and modeling into a single workflow. This makes your code more modular, readable, and easier to maintain.

##### link: https://www.andrewvillazon.com/custom-scikit-learn-transformers/

#### My custom_transformer

In [None]:
class TextCleaner(BaseEstimator, TransformerMixin):
    
    def __init__(self, text_column='tweet'):
        self.text_column = text_column

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        X[self.text_column] = X[self.text_column].apply(self.clean_text)
        return X

    def clean_text(self, text):
        text = html.unescape(text)
        text = re.sub(r'@\w+', '', text)
        text = re.sub(r'#(\w+)', lambda m: re.sub(r'(?<=[a-z])(?=[A-Z])', ' ', m.group(1)), text)
        text = re.sub(r'http\S+|www\S+', '', text)
        text = emoji.demojize(text)
        text = re.sub(r':([a-zA-Z_]+):', r'\1', text)
        text = text.lower()
        text = re.sub(r'\d+', '<NUM>', text)
        text = re.sub(r'[^\x00-\x7F]+', '', text)
        text = re.sub(r'[^\w\s<>]', '', text)
        text = re.sub(r'_+', ' ', text).strip()
        text = re.sub(r'\s+', ' ', text).strip()
        return text

In [None]:
class Vectorizer(BaseEstimator, TransformerMixin):

    def __init__(self, method="BOW", ngram_range=(1, 1), vector_size=300, text_column="tweet", max_len=100, num_words=1000):
        self.method = method
        self.ngram_range = ngram_range
        self.vector_size = vector_size
        self.text_column = text_column
        self.max_len = max_len
        self.num_words = num_words

    def fit(self, X, y=None):
        texts = X[self.text_column].values
        method = self.method.upper()

        if method == "BOW":
            self.vectorizer_ = CountVectorizer()
            self.vectorizer_.fit(texts)

        elif method in ["TFIDF", "NGRAM"]:
            self.vectorizer_ = TfidfVectorizer(ngram_range=self.ngram_range)
            self.vectorizer_.fit(texts)

        elif method == "WORD2VEC":
            self.embeddings_ = api.load("word2vec-google-news-300")

        elif method == "GLOVE":
            self.embeddings_ = api.load("glove-wiki-gigaword-300")

        elif method == "FASTTEXT":
            self.embeddings_ = api.load("fasttext-wiki-news-subwords-300")

        elif method == "CNN":
            self.tokenizer_ = Tokenizer(num_words=self.num_words)
            self.tokenizer_.fit_on_texts(texts)
            self._build_cnn()

        else:
            raise ValueError(f"Unknown vectorizer method: {self.method}")

        return self

    def transform(self, X):
        texts = X[self.text_column].values
        method = self.method.upper()

        if method in ["BOW", "TFIDF", "NGRAM"]:
            return self.vectorizer_.transform(texts)

        elif method in ["WORD2VEC", "GLOVE", "FASTTEXT"]:
            return np.vstack([self._avgvec(text) for text in texts])

        elif method == "CNN":
            sequences = self.tokenizer_.texts_to_sequences(texts)
            padded = pad_sequences(sequences, maxlen=self.max_len)
            return self.cnn_model_.predict(padded, verbose=0)

        else:
            raise ValueError(f"Unknown vectorizer method: {self.method}")

    def _avgvec(self, text):
        tokens = text.split()
        vectors = [self.embeddings_[word] for word in tokens if word in self.embeddings_]
        if not vectors:
            return np.zeros(self.vector_size)
        return np.mean(vectors, axis=0)

    def _build_cnn(self):
        self.cnn_model_ = Sequential([
            Embedding(input_dim=self.num_words, output_dim=128, input_length=self.max_len),
            Conv1D(filters=64, kernel_size=5, activation='relu'),
            GlobalMaxPooling1D(),
            Dense(100, activation='relu')
        ])
        self.cnn_model_.compile(optimizer='adam', loss='binary_crossentropy')
    
class ToDense(TransformerMixin):
    
    def fit(self, X, y=None):
        return self
    def transform(self, X):
        return X.toarray() if issparse(X) else X

### Modelling

In [None]:
model = RandomForestClassifier()
pipeline = Pipeline(steps=[
    ('preprocessing', TextCleaner()),
    ('vectorizing', Vectorizer(method='bow')),
    ('model', model),
    ])

pipeline.fit(train, train['label'])

#### Evaluation

**Evaluation metric:**
macro f1 score

Macro F1 score is a useful metric in scenarios where you want to evaluate the overall performance of a multi-class classification model, **particularly when the classes are imbalanced**

In [None]:
pred = pipeline.predict(test)
macro_f1 = f1_score(test['label'], pred, average='macro')
print(f"Macro F1: {macro_f1}")

### Enhancement

- Using different vectorizers with different hyperparameters
- Trying different ML models and doing hyperparameter tuning

In [None]:
vectorizes = ["BOW", "TFIDF", "WORD2VEC", "GLOVE", "FASTTEXT", "CNN"]
scores = {}

#### Random Forest

In [None]:
model = RandomForestClassifier()

for v in vectorizes:
    pipeline = Pipeline(steps=[
        ('preprocessing', TextCleaner()),
        ('Vectorizing', Vectorizer(method= v)),
        ('model', model),
    ])

    pipeline.fit(train, train["label"])

    pred = pipeline.predict(test)
    macro_f1 = f1_score(test['label'], pred, average='macro')
    scores["RandomForest-"+v] = macro_f1
    print(f"RandomForestClassifier with {v} status: Done")

#### Gradient Boosting

In [None]:
model = GradientBoostingClassifier()

for v in vectorizes:
    pipeline = Pipeline(steps=[
        ('preprocessing', TextCleaner()),
        ('Vectorizing', Vectorizer(method= v)),
        ('model', model),
    ])

    pipeline.fit(train, train["label"])

    pred = pipeline.predict(test)
    scores["GradientBoosting-"+v] = macro_f1
    print(f"GradientBoostingClassifier with {v} status: Done")

#### Naive Bayes

In [None]:
model = GaussianNB()

for v in vectorizes:
    pipeline = Pipeline(steps=[
    ('preprocessing', TextCleaner()),
    ('vectorizing', Vectorizer(method=v)),
    ('to_dense', ToDense()),
    ('model', model),
    ])

    pipeline.fit(train, train["label"])

    pred = pipeline.predict(test)
    scores["NaiveBayes-"+v] = macro_f1
    print(f"GaussianNB with {v} status: Done")

#### Final Descision

In [None]:
plt.figure(figsize=(15, 5))
plt.bar(scores.keys(), scores.values())

plt.title('Models-Vectorizers MaCro F1 score', fontsize=14)
plt.ylabel('Macro F1 score', fontsize=12)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.xticks(rotation=40)
plt.tight_layout() 
plt.show()

model : __Random Forest__<br/>vectorizer : __TFIDF__

In [None]:
model = RandomForestClassifier()
pipeline = Pipeline(steps=[
    ('preprocessing', TextCleaner()),
    ('Vectorizing', Vectorizer(method="TFIDF", ngram_range=(1,1))),
    ('model', model),
])

param_grid = {
    'model__n_estimators': [100, 200, 400],
    'model__max_depth': [None, 10, 20, 30],
    'model__min_samples_split': [2, 5],
}

grid_search = GridSearchCV(pipeline, param_grid, scoring='f1_macro', cv=3, n_jobs=-1,  verbose=2)
grid_search.fit(train, train['label'])

In [None]:
pred = grid_search.predict(test)
macro_f1 = f1_score(test['label'], pred, average='macro')
print(f"Best params: {grid_search.best_params_}")
print(f"Macro F1: {macro_f1}")

### Conclusion and final results


1. The Data has a big imbalance in the classes
2. Different Vectorization techniques gives different scores based on the problem
3. TFIDF with Random Forest gives the best Macro F1 score
4. ML Models can handle simple NLP tasks