In [None]:
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense, Dropout

from keras.utils import to_categorical
from keras import backend as K

from sklearn.base import TransformerMixin

from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier 
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import StackingClassifier
from sklearn.model_selection import RandomizedSearchCV
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline
import numpy as np
from sklearn.tree import DecisionTreeClassifier
from sklearn.neural_network import MLPClassifier
from itertools import product
import matplotlib.pyplot as plt
import re

In [None]:
class CustomTokenizer(TransformerMixin):
    def __init__(self) -> None:
        self.tokenizer = Tokenizer()
        
    def tokenize(self, data: list) -> list:
        X = self.tokenizer.texts_to_sequences(data)
        X = pad_sequences(X, maxlen=self.max_length)
        return X
    
    def tokenize_df(self, df: pd.DataFrame) -> list:
        return self.tokenize(df['tweet'].tolist())
    
    def fit(self, X, y=None):
        self.data = X['tweet'].tolist()
        self.tokenizer.fit_on_texts(self.data)
        self.vocab_size = len(self.tokenizer.word_index) + 1
        self.data_tok = self.tokenizer.texts_to_sequences(self.data)
        self.max_length = max(len(seq) for seq in self.data_tok)
        return self
    
    def transform(self, X):
        return self.tokenize_df(X)

In [None]:
def read_dataset(filename: str) -> tuple[pd.DataFrame, pd.DataFrame]:
    df = pd.read_csv(filename)
    y = df.loc[:, 'class']
    X = df.drop(['class'], axis=1)
    return X, y

def save_data_to_csv(data, path):
    f = open(path, mode='w')
    data.to_csv(f, index=False)
    f.close()

def searchMaxRowByColumn(dataframe, columnName):
    return dataframe[dataframe[columnName] == dataframe[columnName].max()].iloc[0]

In [None]:
X_train, y_train = read_dataset("../Data/train_data.csv")
X_test, y_test = read_dataset("../Data/test_data.csv")

In [None]:
tokenizer = CustomTokenizer()
tokenizer.fit(X_train)
# y_train_cat = to_categorical(y_train, num_classes=2)

In [None]:
model_list_1 = [
    ('rf1', RandomForestClassifier(n_estimators=15, max_depth=2, random_state=42)),
    ('gb1', GradientBoostingClassifier(n_estimators=15, max_depth=2,learning_rate=0.1, random_state=42)),
    ('sv1', SVC(C=1.0, kernel='rbf', gamma='scale', random_state=42)),
    ('kn1', KNeighborsClassifier(n_neighbors=5)),
    ('dt1', DecisionTreeClassifier(max_depth=2, random_state=42)),
    ('mlp1',MLPClassifier(solver='lbfgs', alpha=1e-5, hidden_layer_sizes=(50, ), random_state=42)),
]

model_list_2 = [
    ('rf2', RandomForestClassifier(n_estimators=15, max_depth=2, random_state=42)),
    ('gb2', GradientBoostingClassifier(n_estimators=15, max_depth=2, random_state=42)),
    ('sv2', SVC(C=1.0, kernel='rbf', gamma='scale', random_state=42)),
    ('kn2', KNeighborsClassifier(n_neighbors=5, )),
    ('dt2', DecisionTreeClassifier(max_depth=2, random_state=42)),
    ('mlp2', MLPClassifier(solver='lbfgs', alpha=1e-5, hidden_layer_sizes=(50, ), random_state=42)),
]

all_model_combinations = list(product(model_list_1, model_list_2))
formatted_model_combinations = [[(name1, model1), (name2, model2)] for ((name1, model1), (name2, model2)) in all_model_combinations]

stack = StackingClassifier(estimators=[], final_estimator=LogisticRegression())
pipeline = Pipeline([
    ('tokenizer', CustomTokenizer()),
    ('stack', stack)
])

param_grid = {
    'stack__estimators':formatted_model_combinations
}

grid_search = GridSearchCV(
    estimator=pipeline,
    param_grid=param_grid,
    n_jobs=-1,
    verbose=1,
    cv=2,
    scoring='balanced_accuracy'
    )

grid_search.fit(X_train, y_train)

cs_results = pd.DataFrame(grid_search.cv_results_)
save_data_to_csv(cs_results, f"stack_test.csv")

analisys_final_best = pd.read_csv('stack_test.csv')

print("\nNajlepszy znaleziony model:\n")
print(searchMaxRowByColumn(analisys_final_best,'mean_test_score'))
# najlepszy gb1 i dt2

Wykonajmy tuning hiperparametrów:

In [None]:
# Wartości zmiennych 
n_jobs=-1
n_iter_randomSearch=250
verbose=1
random_state=42
cv=3
numberOfPoints=10

n_estimators_lower_gb = 5
n_estimators_upper_gb = 50
max_depth_lower_gb = 1
max_depth_upper_gb = 8
min_samples_split_lower_gb = 1
min_samples_split_upper_gb = 10
min_samples_leaf_lower_gb = 1
min_samples_leaf_upper_gb = 5
learning_rate_lower_gb = 0.05
learning_rate_upper_gb = 0.5
subsample_lower_gb = 0.75
subsample_upper_gb = 1.0

max_depth_lower_dt = 1
max_depth_upper_dt = 10
min_samples_split_lower_dt = 1
min_samples_split_upper_dt = 5
min_samples_leaf_lower_dt = 1
min_samples_leaf_upper_dt = 10
max_features_dt = ['sqrt', 'log2']

In [None]:
stack = StackingClassifier(estimators=[
        ('gb1', GradientBoostingClassifier(random_state=42)),
        ('dt1', DecisionTreeClassifier(random_state=42)),
    ], final_estimator=LogisticRegression())
pipeline = Pipeline([
        ('tokenizer', CustomTokenizer()),
        ('model', stack)
    ])

param_dist = {
    'model__gb1__n_estimators': np.linspace(n_estimators_lower_gb, n_estimators_upper_gb, n_estimators_upper_gb - n_estimators_lower_gb).astype(int),
    'model__gb1__max_depth': np.linspace(max_depth_lower_gb, max_depth_upper_gb, max_depth_upper_gb- max_depth_lower_gb).astype(int),
    'model__gb1__min_samples_split': np.linspace(min_samples_split_lower_gb, min_samples_split_upper_gb, min_samples_split_upper_gb - min_samples_split_lower_gb).astype(int),
    'model__gb1__min_samples_leaf': np.linspace(min_samples_leaf_lower_gb, min_samples_leaf_upper_gb, min_samples_leaf_upper_gb - min_samples_leaf_lower_gb).astype(int),
    'model__gb1__learning_rate': np.linspace(learning_rate_lower_gb, learning_rate_upper_gb, numberOfPoints).astype(float),
    'model__gb1__subsample': np.linspace(subsample_lower_gb, subsample_upper_gb, numberOfPoints).astype(float),
    'model__dt1__max_depth': np.linspace(max_depth_lower_dt, max_depth_upper_dt, max_depth_upper_dt- max_depth_lower_dt).astype(int),
    'model__dt1__min_samples_split': np.linspace(min_samples_split_lower_dt, min_samples_split_upper_dt, min_samples_split_upper_dt - min_samples_split_lower_dt).astype(int),
    'model__dt1__min_samples_leaf': np.linspace(min_samples_leaf_lower_dt, min_samples_leaf_upper_dt, min_samples_leaf_upper_dt - min_samples_leaf_lower_dt).astype(int),
    'model__dt1__max_features': max_features_dt,
    }

random_search = RandomizedSearchCV(
        estimator=pipeline,
        param_distributions=param_dist,
        n_jobs=n_jobs,
        n_iter=n_iter_randomSearch,
        verbose=verbose,
        random_state=random_state,
        cv=cv,
        scoring='balanced_accuracy'
        )

random_search.fit(X_train, y_train)

cs_ran_results = pd.DataFrame(random_search.cv_results_)
save_data_to_csv(cs_ran_results, f"stack_tuning.csv")

Wybór najlepszego modelu:

In [None]:
analisys_final_best = pd.read_csv('stack_tuning.csv')

print("\nNajlepszy znaleziony model:\n")
print(searchMaxRowByColumn(analisys_final_best,'mean_test_score'))

In [None]:
stack = StackingClassifier(estimators=[
        ('gb1', GradientBoostingClassifier(n_estimators=43, subsample=0.9444444444444444, min_samples_leaf=2, min_samples_split=8, max_depth=8, learning_rate=0.35, random_state=42)),
        ('dt1', DecisionTreeClassifier(max_depth=6, max_features="sqrt", min_samples_leaf=4, min_samples_split=2, random_state=42)),
    ], final_estimator=LogisticRegression())
pipeline = Pipeline([
        ('tokenizer', CustomTokenizer()),
        ('stack', stack)
    ])
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)
report = classification_report(y_test, y_pred)

y_pred_dt = pd.DataFrame(y_pred)

print("Accuracy for test dataset:", accuracy)
print("Classification Report:")
print(report)

y_pred_train = pipeline.predict(X_train)

accuracy = accuracy_score(y_train, y_pred_train)
report = classification_report(y_train, y_pred_train)

y_pred_train_dt = pd.DataFrame(y_pred_train)

print("Accuracy for train dataset:", accuracy)
print("Classification Report:")
print(report)

### Analiza FP i FN

In [None]:
# 0 -> ok
# 1 -> hate

cm = confusion_matrix(y_test, y_pred)
tn,fp,fn,tp = cm.ravel()
print(tn, fp, fn, tp)

print("FP:", fp / (fp + fn) * 100, "%")
print("FN:", fn / (fp + fn) * 100, "%")

incorrect_idx = y_pred != y_test
incorrect = X_test[incorrect_idx]
incorrect['len'] = incorrect['tweet'].apply(len)

plt.hist(incorrect['len'], bins=10, color='skyblue', edgecolor='black')
plt.xlabel('Długość tweeta')
plt.ylabel('Liczba tweetów')
plt.title('Histogram długości tweetów sklasyfikowanych błędnie\nprzez model zbudowany od podstaw')
plt.savefig('hist-incorrect-manual.png')
plt.show()