In [None]:
from typing import Union, List

import numpy as np
from scipy.sparse import csr_matrix
from sklearn.base import BaseEstimator
from sklearn.exceptions import NotFittedError
from sklearn.metrics import f1_score
from tqdm import tqdm


class PeriodEstimatorWrapper(BaseEstimator):

    def __init__(self, clf: BaseEstimator, **params):
        self.clf = clf(**params)
        if params.get('verbose'):
            self.verbose = params['verbose']

    def fit(self, X_train: Union[csr_matrix, np.ndarray], y_train: np.array):
        """
        Fits the estimator.

        :param X_train: normal feature matrix e.g. shape (n_samples, n_features)
        :param y_train: label vector shape (n_samples,)
        :return: fitted instance of itself
        """

        self.clf.fit(X_train, y_train)
        self.fitted_ = True

        return self

    def predict(self, X_test: List[Union[csr_matrix, np.ndarray]]):
        """
        Predicts classes for n periods
        :param X_test: list of feature matrices (n_samples, n_features) to predict (one for each period)
        :return: list of predicted label vectors
        """

        if not self.fitted_:
            raise NotFittedError

        result = []
        if self.verbose:
            iterator = tqdm(X_test, desc='Predicting classes for periods')
        else:
            iterator = X_test

        for X in iterator:
            result.append(self.clf.predict(X))

        return result

    def predict_proba(self, X_test: List[Union[csr_matrix, np.ndarray]]):
        """
        Predicts probabilities for n periods
        :param X_test: list of feature matrices (n_samples, n_features) to predict (one for each period)
        :return: list of predicted label vectors
        """
        if not hasattr(self.clf, 'predict_proba'):
            raise Exception(f"Method predict_proba is not implemented in {self.clf.__class__.__name__}")

        if not self.fitted_:
            raise NotFittedError

        result = []
        if self.verbose:
            iterator = tqdm(X_test, desc='Predicting classes for periods')
        else:
            iterator = X_test

        for X in iterator:
            result.append(self.clf.predict_proba(X))

        return result

    def decision_function(self, X_test: List[Union[csr_matrix, np.ndarray]]):
        """
        Predicts decision scores for n periods
        :param X_test: list of feature matrices (n_samples, n_features) to predict (one for each period)
        :return: list of predicted label vectors
        """
        if not hasattr(self.clf, 'decision_function'):
            raise Exception(f"Method decision_function is not implemented in {self.clf.__class__.__name__}")

        if not self.fitted_:
            raise NotFittedError

        result = []
        if self.verbose:
            iterator = tqdm(X_test, desc='Predicting classes for periods')
        else:
            iterator = X_test

        for X in iterator:
            result.append(self.clf.predict_proba(X))

        return result

    def score(self,
              X_test: List[Union[csr_matrix, np.ndarray]],
              y_true: List[np.array],
              scoring_func: callable = lambda y_true, y_pred: f1_score(y_true, y_pred, average='macro'),
              pooling_func: callable = np.mean):

        if not self.fitted_:
            raise NotFittedError

        scores = []
        for X, y in zip(X_test, y_true):
            y_pred = self.clf.predict(X)
            score = scoring_func(y, y_pred)
            scores.append(score)

        return pooling_func(scores)


### Problem: Wie tunen wir die Hyperparameter?

Problem: Unsere Idee sieht vor ein Modell auf alle Genres innerhalb einer "Periode" zu trainieren und auf alle anderen anzuwenden, um abzuschätzen wie sehr sich die Genres über die Zeit verändern. Hierbei stellt sich die Frage, wie man die Hyperparameter der Modelle valide und gleichzeitig effektiv optimieren kann.

* Möglichkeit 1:
    * Gridsearch auf Ausgangsperiode
    * Vorteile:
        * Wahrscheinlich am ehesten valide
    * Nachteile:
        * Unsere Datengrundlage ist zu klein, um dass für einzelne Epochen sinnvoll durchzuführen
* Möglichkeit 2:
    * Gridsearch auf allen Daten
    * Vorteile:
        * Große Datenmenge
        * Modell würde auf alle Eigenheiten der Perioden getuned werden (wobei das eher ein Nachteil ist)
    * Nachteil:
        * Spätere Testdaten würden fürs Optimieren verwendet werden
* Möglichkeit 3:
    * ParamDict verwenden, um die den eigentlich Lauf (das Trainieren auf einer Epoche und Testen auf allen Anderen) mit allen möglichen Hyperparamtern zu testen. Eigene Evaulation (bsp. Mittelwert der F1-Scores für die verschiedenen Epochen)
    * Vorteile:
        * Klare Trennung von Test und Trainingsdaten
        * Mehr Daten für die Optimierung als bei Möglichkeit 1
    * Nachteile:
        * keine cross-validation

In [None]:
import pandas as pd

In [None]:
df = pd.read_csv('full_dataset.csv')

In [None]:
# remove news genre
df = df[df.genre != 'NEWS']

In [None]:
df_p1 = df.loc[df['period'] == 'P1']
df_rest = df.loc[df['period'] != 'P1']

# Feature Extraction

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from stop_words import get_stop_words

tfidf = TfidfVectorizer()

# Bauen der Pipeline

In [None]:
from sklearn.pipeline import make_pipeline, make_union, Pipeline

In [None]:
from sklearn.svm import LinearSVC
from sklearn.linear_model import SGDClassifier

In [None]:
pipe_svm = Pipeline([('tfidf', tfidf), ('linearsvc', LinearSVC(loss='hinge'))])
pipe_svm

In [None]:
pipe_svm_params = {
    'tfidf__max_features': [100, 500, 1000, 5000, 10000, 15000, 20000],
    'tfidf__stop_words': [None, get_stop_words('de')],
    'tfidf__analyzer': ['word', 'char', 'char_wb'],
    'tfidf__ngram_range': [(1, 1), (1, 2), (1, 3), (1, 5), (1, 5)],
    'linearsvc__C': list(range(1,21)),
    'linearsvc__penalty': ['l2']
    
}

# 1. Möglichkeit: Gridsearch auf Trainingsperiode

In [None]:
from sklearn.model_selection import GridSearchCV

gridsearch = GridSearchCV(
    pipe_svm,
    pipe_svm_params,
    scoring='f1_macro',
    verbose=1,
    n_jobs=-1)

In [None]:
gridsearch.fit(df_p1.text, df_p1.genre.to_numpy())

In [None]:
gridsearch.best_params_, gridsearch.best_score_

In [None]:
svm_results = pd.DataFrame.from_dict(gridsearch.cv_results_)
svm_results