In [None]:
import glob
import time
import numpy as np
import pandas as pd
from typing import List
from Review import Review
from sklearn.svm import SVC
from train_model import train
import matplotlib.pyplot as plt
from split_data import split_data
from sklearn.dummy import DummyClassifier
from preprocess_data import preprocess_data
from extract_features import extract_features
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.experimental import enable_halving_search_cv
from sklearn.model_selection import (
    GridSearchCV,
    RandomizedSearchCV,
    HalvingGridSearchCV,
    HalvingRandomSearchCV,
)


class CustomClassifier(BaseEstimator, TransformerMixin):
    _seed = 42

    def __init__(
        self,
        target_feature: str,
        files: List[str],
        vectorizer_method: str,
        vectorizer_length: int,
        best_features: int,
        pca_comps: int,
        classifier_name: str,
    ):
        self.files = files
        self.target_feature = target_feature
        self.classifier_name = classifier_name
        self.vectorizer_method = vectorizer_method
        self.vectorizer_length = vectorizer_length
        self.best_features = best_features
        self.pca_comps = pca_comps

        self._train_df = None
        self._test_df = None
        self._y_pred_train = None
        self._y_pred_test = None
        self._y_train = None
        self._y_test = None

    def fit(self, X, y=None):
        train_df, test_df = split_data(X, self.target_feature)
        train_df, test_df = extract_features(
            train_df,
            test_df,
            self.target_feature,
            vectorizer_method=self.vectorizer_method,
            vectorizer_length=self.vectorizer_length,
        )

        y_pred_train, y_pred_test, y_train, y_test = train(
            self._get_classifier(),
            train_df,
            test_df,
            self.target_feature,
            n_best_features=self.best_features,
            n_pca_comps=self.pca_comps,
        )
        self._train_df = train_df
        self._test_df = test_df
        self._y_pred_train = y_pred_train
        self._y_pred_test = y_pred_test
        self._y_train = y_train
        self._y_test = y_test

        return self

    def fit_transform(self, X, y=None):
        return self.fit(X, y).transform(X, y)

    def transform(self, X, y=None):
        return self._test_df

    def predict(self, X=None):
        return self._y_pred_test

    def score(self, X, y=None):
        return accuracy_score(self._y_test, self._y_pred_test)

    def _get_classifier(self):
        if self.classifier_name == "dummy":
            return DummyClassifier(strategy="uniform", random_state=self._seed)
        elif self.classifier_name == "SVC":
            return SVC(random_state=self._seed)
        elif self.classifier_name == "RandomForest":
            return RandomForestClassifier(random_state=self._seed)
        else:
            raise ValueError(f"{self.classifier_name} not supported as classifier")


files = glob.glob("../data/input/*.json")
seed = 42

classifiers = ["SVC", "RandomForest"]
vectorizers = ["bag-of-words", "tf-idf", "word2vec"]
vectorizers_lenghts = [500, 750, 1000]
n_best_features = [200]
n_pca_comps = [50]


# classifiers = ["SVC"]
# vectorizers = ["bag-of-words"]
# vectorizers_lenghts = [500]
# n_best_features = [50]
# n_pca_comps = [10, 30]

param_grid = {
    "vectorizer_method": vectorizers,
    "vectorizer_length": vectorizers_lenghts,
    "best_features": n_best_features,
    "pca_comps": n_pca_comps,
    "classifier_name": classifiers,
}


def count_grid_combinations(param_grid):
    n_combinations = 1
    for key, values in param_grid.items():
        n_combinations *= len(values)
    return n_combinations


def estimate_iterations(search_class, search, param_grid, n_iter=None):
    if search_class == GridSearchCV:
        return count_grid_combinations(param_grid)
    elif search_class == RandomizedSearchCV:
        if n_iter is not None:
            return n_iter
        else:
            return min(10, count_grid_combinations(param_grid))  # 10 is default value
    elif getattr(search, "n_iterations_", None) is not None:
        return getattr(search, "n_iterations_", None)
    else:
        raise ValueError(f"could not estimate iterations {search_class}")


def messure_params_search_method(search_class, n_iter=None):
    clf = CustomClassifier(
        target_feature=Review.target_feature,
        files=files,
        vectorizer_method=vectorizers[0],
        vectorizer_length=vectorizers_lenghts[0],
        best_features=n_best_features[0],
        pca_comps=n_pca_comps[0],
        classifier_name=classifiers[0],
    )
    reviews_df = preprocess_data(files)
    search = None
    if n_iter is not None and search_class == RandomizedSearchCV:
        search = search_class(clf, param_grid, n_jobs=1, verbose=1, n_iter=n_iter)
    else:
        search = search_class(clf, param_grid, n_jobs=1, verbose=1)
    start_time = time.time()
    search.fit(reviews_df)
    elapsed_time = time.time() - start_time

    score = search.best_score_
    params = search.best_params_
    num_iterations = estimate_iterations(search_class, search, param_grid)
    max_iterations = getattr(search, "n_possible_iterations_", num_iterations)

    print(f"{search_class.__name__}:")
    print("Best score:", score)
    print("Time taken(s):", elapsed_time)
    print("Iterations:", num_iterations)
    print("Max possible iterations", max_iterations)
    print("Best params", params)
    print("\n")

    return search_class.__name__, score, elapsed_time

RandomSearch - metoda polegająca na losowym przeszukiwaniu przestrzeni hiperparametrów. W każdej iteracji losowane są wartości hiperparametrów z określonego przedziału i tworzony jest model z takimi parametrami. Możliwości: losowy charakter pozwala na szybkie przeszukanie dużej przestrzeni hiperparametrów. Ograniczenia: może przegapić najlepsze ustawienia hiperparametrów.

GridSearch - metoda polegająca na przeszukaniu siatki punktów w przestrzeni hiperparametrów. Tworzone są modele dla każdej kombinacji hiperparametrów zdefiniowanych w siatce. Możliwości: dokładne przeszukanie wszystkich kombinacji hiperparametrów. Ograniczenia: może być bardzo kosztowne obliczeniowo, szczególnie dla dużej przestrzeni hiperparametrów.

HalvingGridSearch - metoda polegająca na przeszukaniu siatki punktów w przestrzeni hiperparametrów, ale z wykorzystaniem strategii dzielenia i podbierania próbek (ang. halving). W każdej iteracji losowana jest połowa punktów z siatki, a następnie tworzone są modele z użyciem tych punktów. Następnie wybierana jest najlepsza połowa z tych modeli i proces jest powtarzany aż do uzyskania najlepszego zestawu hiperparametrów. Możliwości: szybkie przeszukanie przestrzeni hiperparametrów w porównaniu do GridSearch. Ograniczenia: może przegapić najlepsze ustawienia hiperparametrów.

HalvingRandomSearch - metoda polegająca na losowym przeszukiwaniu przestrzeni hiperparametrów z wykorzystaniem strategii dzielenia i podbierania próbek. W każdej iteracji losowana jest połowa punktów z przestrzeni hiperparametrów, a następnie tworzone są modele z użyciem tych punktów. Następnie wybierana jest najlepsza połowa z tych modeli i proces jest powtarzany aż do uzyskania najlepszego zestawu hiperparametrów. Możliwości: szybkie przeszukanie przestrzeni hiperparametrów w porównaniu do RandomSearch. Ograniczenia: może przegapić najlepsze ustawienia hiperparametrów.

In [None]:
methods = []
best_scores = []
elapsed_times = []

In [None]:
name, best_score, elapsed_time = messure_params_search_method(RandomizedSearchCV)
methods.append(name)
best_scores.append(best_score)
elapsed_times.append(elapsed_time)

In [None]:
name, best_score, elapsed_time = messure_params_search_method(GridSearchCV)
methods.append(name)
best_scores.append(best_score)
elapsed_times.append(elapsed_time)

In [None]:
name, best_score, elapsed_time = messure_params_search_method(HalvingGridSearchCV)
methods.append(name)
best_scores.append(best_score)
elapsed_times.append(elapsed_time)

In [None]:
# name, best_score, elapsed_time = messure_params_search_method(HalvingRandomSearchCV)
# methods.append(name)
# best_scores.append(best_score)
# elapsed_times.append(elapsed_time)

In [None]:
plt.figure(figsize=(10, 5))

plt.subplot(1, 2, 1)
plt.bar(methods, best_scores)
plt.xlabel("Methods of hyperparameter optimization")
plt.ylabel("Best result (F1 score)")
plt.title("Quality of the best model")

plt.subplot(1, 2, 2)
plt.bar(methods, elapsed_times)
plt.xlabel("Methods of hyperparameter optimization")
plt.ylabel("Execution time (s)")
plt.title("Execution time")

plt.tight_layout()
plt.show()

Ze względu na użycie selekcji cech oraz PCA które w dużym stopniu redukują potrzebe ustawiania dużych wartośći vectorizers_lenghts = [500, 750, 1000] uznaje ,że HalvingGridSearchCV jest najlepszym wyborem w moim przypadku. 

In [None]:
methods = []
best_scores = []
elapsed_times = []

In [None]:
classifiers = [SVC, RandomForestClassifier]
vectorizers = ["bag-of-words", "tf-idf", "word2vec"]

for classifier_class in classifiers:
    for vectorizer in vectorizers:
        reviews_df = preprocess_data(files)
        train_df, test_df = split_data(reviews_df, Review.target_feature)
        start_time = time.time()
        train_df, test_df = extract_features(
            train_df, test_df, Review.target_feature, vectorizer_method=vectorizer
        )
        classifier = classifier_class(random_state=seed)
        y_pred_train, y_pred_test, y_train, y_test = train(
            classifier,
            train_df,
            test_df,
            Review.target_feature,
        )

        f1_score_test = f1_score(y_test, y_pred_test, average="weighted")
        elapsed_time = time.time() - start_time
        methods.append(f"{classifier_class.__name__}-{vectorizer}")
        best_scores.append(f1_score_test)
        elapsed_times.append(elapsed_time)

In [None]:
plt.figure(figsize=(15, 5))

bar_width = 0.4
index = np.arange(len(methods))

plt.subplot(1, 2, 1)
plt.bar(index, best_scores, width=bar_width)
plt.xticks(index, methods, rotation=45)
plt.xlabel("Methods")
plt.ylabel("Best result (F1 score)")
plt.title("Quality of the best model")

plt.subplot(1, 2, 2)
plt.bar(index, elapsed_times, width=bar_width)
plt.xticks(index, methods, rotation=45)
plt.xlabel("Methods")
plt.ylabel("Execution time (s)")
plt.title("Execution time")

plt.tight_layout()
plt.show()

W eksperymencie porównaliśmy dwa klasyfikatory, maszynę wektorów nośnych (SVM) oraz las losowy, z trzema różnymi wektoryzatorami: Bag-of-Words, TF-IDF i Word2Vec.

Wyniki pokazały, że różnice w wynikach F1 między klasyfikatorami nie były znaczące, ale były zauważalne różnice przy porównaniu wektoryzatorów. W szczególności, SVM osiągnął lepsze wyniki z wektoryzatorem TF-IDF niż las losowy z tym samym wektoryzatorem. Ponadto, las losowy osiągnął lepsze wyniki z wektoryzatorem Bag-of-Words niż SVM z tą samą metodą wektoryzacji.

Wektoryzator Word2Vec natomiast, potrzebował najwięcej czasu na przetworzenie i nie dał najlepszych wyników dla żadnego z klasyfikatorów. Sugeruje to, że dla tego konkretnego problemu wektoryzatory Bag-of-Words i TF-IDF mogą być bardziej odpowiednie, ponieważ zapewniają lepszą wydajność i wymagają mniej zasobów obliczeniowych.

SVM z wektoryzatorem TF-IDF oraz las losowy z wektoryzatorem Bag-of-Words zdają się dawać lepsze rezultaty w tym konkretnym przypadku

In [None]:
# reviews_df = preprocess_data(files)
# train_df, test_df = split_data(reviews_df, Review.target_feature)

# train_df, test_df = extract_features(
#     train_df,
#     test_df,
#     Review.target_feature,
#     vectorizer_length = 6
# )
# classifier = RandomForestClassifier(random_state=seed)
# y_pred_train, y_pred_test, y_train, y_test = train(
#     classifier,
#     train_df,
#     test_df,
#     Review.target_feature,
# )

In [None]:
# import shap

# explainer = shap.Explainer(classifier)
# shap_values = explainer(test_df.drop(Review.target_feature, axis=1))
# shap.summary_plot(shap_values, test_df.drop(Review.target_feature, axis=1))

# incorrect_indices = y_test != y_pred_test
# incorrect_df = test_df[incorrect_indices]
# incorrect_shap_values = shap_values[incorrect_indices]

# index_to_analyze = 0
# expected_value = explainer.model_output(incorrect_shap_values.base_values[index_to_analyze])

# shap.force_plot(
#     explainer.expected_value[0],
#     incorrect_shap_values[index_to_analyze, :],
#     incorrect_df.iloc[index_to_analyze].drop(Review.target_feature),
#     feature_names=incorrect_df.drop(Review.target_feature, axis=1).columns,
# )