# Business Analytics und Künstliche Intelligenz

Prof. Dr. Jürgen Bock & Maximilian-Peter Radtke

---

In [None]:
%matplotlib inline

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import statsmodels.api as sm

from sklearn.neighbors import KNeighborsClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

## Modellbewertung auf Basis eines Validierungssets

Für die Modellbewertung auf Basis eines Validierungssets, benutzen wir die Funktion `train_test_split` von Scikit-Learn.

In [None]:
from sklearn.model_selection import train_test_split

Um zu zeigen wie dies funktioniert und welche Nachteile dieser Ansatz haben kann, führen wir eine Klassifikation auf dem Auto-Datensatz aus bezüglich der `Origin` Spalte aus.

In [None]:
auto = pd.read_csv('Auto_clean.csv')
auto['intercept'] = 1

In [None]:
auto.head()

Mithilfe von `train_test_split` können wir die Daten in ein Trainingsset und ein Validierungsset / Testset unterteilen

In [None]:
X = auto.drop(['year', 'origin', 'name'], axis=1)
y = auto.origin
XTrain, XTest, yTrain, yTest = train_test_split(X, y, test_size=0.2)

Wir nutzen den LDA Klassifikator, um die Herkunft der Autos auf Basis der Daten vorherzusagen.

In [None]:
ldaAuto = LinearDiscriminantAnalysis()
ldaAuto.fit(XTrain, yTrain)
print('Trainingsfehler ', 1-ldaAuto.score(XTrain, yTrain))
print('Testfehler: ', 1-ldaAuto.score(XTest, yTest))

Der Testfehler ist nur minimal größer als der Trainingsfehler. Allerdings kann diese Ansicht irreführend sein, da die Aufteilung zwischen Trainingsset und Validierungsset zufällig ist. Mit dem  `random_state`-Parameter lässt sich festlegen welche Mischung der Daten jeweils vorgenommen wird. Wenn uns über verschiedene Random States die Fehler ausgeben lassen, bekommen wir verschiedenen Fehler.

In [None]:
for i in range(10):
    XTrain, XTest, yTrain, yTest = train_test_split(X, y, test_size=0.2, random_state=i)
    ldaAuto = LinearDiscriminantAnalysis()
    ldaAuto.fit(XTrain, yTrain)
    print('Random State:', i)
    print('Trainingsfehler:', 1-ldaAuto.score(XTrain, yTrain))
    print('Testfehler:', 1-ldaAuto.score(XTest, yTest))
    print('\n')

## K-Fold Cross-Validation

Ein Weg diese Streuung zu umgehen, ist der in der Vorlesung besprochenen Cross-Validation Ansatz. Hierzu importieren wir `KFold` aus scikit-learn.

In [None]:
from sklearn.model_selection import KFold

In [None]:
n_splits = 5
kf = KFold(n_splits, random_state=1, shuffle=True)
CVFoldTestErrors = []
CVFoldTrainErrors = []
for trainIndex, testIndex in kf.split(X):
    XTrain, XTest = X.iloc[trainIndex], X.iloc[testIndex]
    yTrain, yTest = y.iloc[trainIndex], y.iloc[testIndex]
    ldaAuto = LinearDiscriminantAnalysis()
    ldaAuto.fit(XTrain, yTrain)
    CVFoldTrainErrors.append(1-ldaAuto.score(XTrain, yTrain))
    CVFoldTestErrors.append(1-ldaAuto.score(XTest, yTest))
print('CV Trainingsfehler:\n', CVFoldTrainErrors)
print('CV Durchschnittlicher Trainingsfehler:', np.mean(CVFoldTrainErrors))
print('CV Testfehler:\n', CVFoldTestErrors)
print('CV Durchschnittlicher Testfehler:', np.mean(CVFoldTestErrors))

Trotzdem gibt es eine gewisse Streuung zwischen den Ergebnissen, da auch hier zufällig die Splits ausgewählt werden. Wenn wir uns wieder über verschiedene Random States die Ergebnisse ausgeben lassen, erhalten wir das folgende Ergebnis:

In [None]:
n_splits = 5
for i in range(10):
    kf = KFold(n_splits, random_state=i, shuffle=True)
    CVFoldTestErrors = []
    CVFoldTrainErrors = []
    for trainIndex, testIndex in kf.split(X):
        XTrain, XTest = X.iloc[trainIndex], X.iloc[testIndex]
        yTrain, yTest = y.iloc[trainIndex], y.iloc[testIndex]
        ldaAuto = LinearDiscriminantAnalysis()
        ldaAuto.fit(XTrain, yTrain)
        CVFoldTrainErrors.append(1-ldaAuto.score(XTrain, yTrain))
        CVFoldTestErrors.append(1-ldaAuto.score(XTest, yTest))
    print('Random State:', i)
    print('CV Durchschnittlicher Trainingsfehler:', np.mean(CVFoldTrainErrors))
    print('CV Durchschnittlicher Testfehler:', np.mean(CVFoldTestErrors))
    print('\n')

Um überhaupt keine Streuung in den Ergebnissen zu haben, können wir Leave One Out Cross Validation nutzen. Dazu setzen wir die Anazhl der Splits auf die Anzahl der Zeilen in dem Datensatz.

In [None]:
n_splits = X.shape[0]
kf = KFold(n_splits, random_state=1, shuffle=True)
CVFoldTestErrors = []
CVFoldTrainErrors = []
for trainIndex, testIndex in kf.split(X):
    XTrain, XTest = X.iloc[trainIndex], X.iloc[testIndex]
    yTrain, yTest = y.iloc[trainIndex], y.iloc[testIndex]
    ldaAuto = LinearDiscriminantAnalysis()
    ldaAuto.fit(XTrain, yTrain)
    CVFoldTrainErrors.append(1-ldaAuto.score(XTrain, yTrain))
    CVFoldTestErrors.append(1-ldaAuto.score(XTest, yTest))
print('LOOCV Durchschnittlicher Trainingsfehler:', np.mean(CVFoldTrainErrors))
print('LOOCV Durchschnittlicher Testfehler:', np.mean(CVFoldTestErrors))

In [None]:
n_splits

Mit der `KFold`-Methode haben wir die Möglichkeit jegliche Art von Modell zu nutzen und genau zu bestimmen was mit den einzelnen Splits passiert. Falls wir aber einen Algorithmus von scikit-learn direkt nutzen, lässt sich Cross-Validation auch einfacher umsetzen. Hierzu müssen wir nur `cross_validate` importieren und anwenden.

In [None]:
from sklearn.model_selection import cross_validate

In [None]:
ldaAuto = LinearDiscriminantAnalysis()

In [None]:
scores = cross_validate(ldaAuto, X, y, cv=5, return_train_score=True)
print('Trainingsfehler:', 1-scores['train_score'])
print('Testfehler:', 1-scores['test_score'])

# Data leakage

Data leakage tritt auf, wenn Informationen aus dem Testdatensatz unbeabsichtigt den Trainingsprozess beeinflussen, was zu überoptimistischen Leistungsabschätzungen führen kann.
Dies kann durch unsachgemäße Verwendung von Informationen aus dem gesamten Datensatz während des Trainings oder der Validierung verursacht werden.

Im folgenden wollen wir das Beispiel aus der Vorlesung nachbauen, um dies zu veranschaulichen.

Zunächst simulieren wir uns hierzu einen Datensatz für ein binäres Klassifikationsprobelm.

In [None]:
# Setze Parameter
num_obs = 50 # Anzahl Observationen
num_predictors = 5000 # Anzahl Prädiktoren
num_selected = 100 # Anzahl an Prädiktoren die genutzt werden sollen
num_splits = 5 # Anzahl CV Splits
random_seed = 42

# Setze random seed um Wiederholbarkeit zu gewährleisten
np.random.seed(random_seed)

# Simuliere Daten für binäres Klassifikationsproblem
X = np.random.randn(num_obs, num_predictors)
y = np.random.randint(0, 2, num_obs)

Die Daten sind vollkommen zufällig. Entsprechend gibt es keinen Zusammenhang zwischen $X$ und $y$, d.h. viel besser als der Zufall (Genaugikeit von 50%) sollte unser Modell eigentlich nicht werden. Trotzdem wollen wir es versuchen. Dazu wählen wir die 50 Prädiktoren aus, die am stärksten mit der Zielvariable korrelieren und schätzen basierend auf diesen ein Modell.

Im ersten Versuch wählen wir die Prädiktoren vor dem Train/Test Split aus $\rightarrow$ Data Leakage!

In [None]:
# Wähle die 100 Prädiktoren mit der stärksten Korrelation mit der Zielvariable aus
# Dieser Schritt passiert vor dem Train/Test Split! -> Data leakage
selected_predictors = np.argsort(
    np.abs(np.corrcoef(X.T, y)[num_predictors][:num_predictors])
)[-num_selected:]
X_selected = X[:, selected_predictors]

# Initialisiere Klassifikationsmodell
model = LinearDiscriminantAnalysis()

# Evaluiere "leaky" Modell mit cross validation
scores_leaky = cross_validate(model, X_selected, y, cv=num_splits)['test_score']

Im zweiten Versuch nehmen wir die Auswahl der besten Prädiktoren mit in unseren Cross Validation Loop $\rightarrow$ kein Data Leakage!

In [None]:
kf = KFold(num_splits, shuffle=True)
CVFoldTestErrors = []
# Loop für cross validation
for trainIndex, testIndex in kf.split(X):
    # Train/Test Split der Daten
    X_train, X_test = X[trainIndex, :], X[testIndex, :]
    y_train, y_test = y[trainIndex], y[testIndex]

    # Wähle die 100 Prädiktoren mit der stärksten Korrelation mit der Zielvariable
    # erst nach dem Train/Test split aus
    # -> Kein Data leakage
    selected_predictors = np.argsort(
        np.abs(np.corrcoef(X_train.T, y_train)[num_predictors][:num_predictors])
    )[-num_selected:]
    X_train_selected = X_train[:, selected_predictors]
    X_test_selected = X_test[:, selected_predictors]

    # Initialisiere Modell
    model = LinearDiscriminantAnalysis()
    
    # Trainiere Modell auf Basis der Trainingsdaten
    model.fit(X_train_selected, y_train)
    
    # Evaluaiere das Modell auf Basis der Testdaten für spezifischen Fold
    CVFoldTestErrors.append(model.score(X_test_selected, y_test))

In [None]:
# Ausgabe der Testgenauigkeiten
print(f'Testgenauigkeit mit data leakage: {np.mean(scores_leaky)}')
print(f'Testgenauigkeit ohne data leakage: {np.mean(CVFoldTestErrors)}')

# Übungsaufgaben

## Aufgabe 1

Nutzen Sie auch die logistische Regression und KNN um das Klassifikationsproblem aus der Übung zu lösen. Vergleichen Sie ihre Ergebnisse. Welcher Algorithmus schneidet am besten ab?

In [None]:
import statsmodels.api as sm

from sklearn.neighbors import KNeighborsClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import cross_validate, KFold

In [None]:
auto = pd.read_csv('Auto_clean.csv')
auto['intercept'] = 1
X = auto.drop(['year', 'origin', 'name'], axis=1)
y = auto.origin

In [None]:
ldaAuto = LinearDiscriminantAnalysis()
LDAscores = cross_validate(ldaAuto, X, y, cv=5, return_train_score=True)
print('Trainingsfehler:', 1-LDAscores['train_score'].mean())
print('Testfehler:', 1-LDAscores['test_score'].mean())

In [None]:
KNNAuto = KNeighborsClassifier(n_neighbors=5)
KNNscores = cross_validate(KNNAuto, X.values, y.values, cv=5, return_train_score=True)
print('Trainingsfehler:', 1-KNNscores['train_score'].mean())
print('Testfehler:', 1-KNNscores['test_score'].mean())

In [None]:
from sklearn.metrics import accuracy_score

In [None]:
n_splits = 5
kf = KFold(n_splits, random_state=1, shuffle=True)
CVFoldTestErrors = []
CVFoldTrainErrors = []
for trainIndex, testIndex in kf.split(X):
    XTrain, XTest = X.iloc[trainIndex], X.iloc[testIndex]
    yTrain, yTest = y.iloc[trainIndex], y.iloc[testIndex]
    LogAuto = sm.MNLogit(yTrain, XTrain)
    LogAutoRes = LogAuto.fit()
    testError = 1 - accuracy_score(yTest, LogAutoRes.predict(XTest).idxmax(axis=1) + 1)
    trainError = 1 - accuracy_score(yTrain, LogAutoRes.predict(XTrain).idxmax(axis=1) + 1)
    CVFoldTrainErrors.append(trainError)
    CVFoldTestErrors.append(testError)
#print('CV Trainingsfehler:\n', CVFoldTrainErrors)
print('CV Durchschnittlicher Trainingsfehler:', np.mean(CVFoldTrainErrors))
#print('CV Testfehler:\n', CVFoldTestErrors)
print('CV Durchschnittlicher Testfehler:', np.mean(CVFoldTestErrors))

In [None]:
LogAutoRes.summary()

## Aufgabe 2

Sehen Sie sich die Unterschiede zwischen den Ergebnissen mit LDA und dem Validierungsdatensatz etwas genauer an. Wieso kommt es zu den Unterschieden? (Tipp: Eine Ansatz wäre es sich die Confusion-Matrix für die einzelnen Splits anzusehen)

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.model_selection import train_test_split

In [None]:
# Initialisiere Dataframe mit Vorhersagen
yPred = pd.DataFrame({'y': y, 'yPredTr0': 0})
# Wiederhole Berechnung auf 10 verschiedenen Random Seeds
for i in range(10):
    # Split in Train und Validierungsset
    XTrain, XTest, yTrain, yTest = train_test_split(X, y, test_size=0.2, random_state=i)
    # Initalisiere LDA Klassifikator
    ldaAuto = LinearDiscriminantAnalysis()
    # Trainier LDA Klassifikator
    ldaAuto.fit(XTrain, yTrain)
    # Berechne Conusion Matrix
    confMat = confusion_matrix(yTest, ldaAuto.predict(XTest))
    # Gebe Random State aus
    print('Random State:', i)
    # Gebe Trainingsfehler aus
    print('Trainingsfehler:', 1-ldaAuto.score(XTrain, yTrain))
    # Gebe Testfehler aus
    print('Testfehler:', 1-ldaAuto.score(XTest, yTest))
    # Plotte absolute Häufigkeiten der Observationen im Testset
    yTest.value_counts().sort_index().plot.bar()
    plt.title('Observationen pro Klasse für Testset')
    plt.show()
    # Berechne Confusionmatrix
    disp = ConfusionMatrixDisplay(confusion_matrix=confMat,
                                  display_labels=ldaAuto.classes_)
    # Plotte Confusionmatrix
    disp.plot()
    plt.title('Confusion Matrix für Testset')
    plt.show()
    # Neue Zeile in Ausagabe
    print('\n')
    # Fülle Dataframe mit Vorhersagen
    colTrain = 'yPredTr' + str(i)
    yPred.loc[XTrain.index, colTrain] = ldaAuto.predict(XTrain)
    colTest = 'yPredTe' + str(i)
    yPred.loc[XTest.index, colTest] = ldaAuto.predict(XTest)

In [None]:
# Prüfe welche Observationen im Training bzw. Test richtig zugeordnet wurden
yPredBool = yPred.drop('y', axis=1) == np.resize(y, (yPred.shape[0],1))
# Berechne wie oft die Observation in den 10 Versuchen richtig klassifiziert wurde
yPredTrue = yPredBool.sum(axis=1)

In [None]:
# Anteil der Observationen, welche nie richtig klassifiziert wurden, am aktuellen Testset - "schwierige Observationen"
len(yPredTrue[yPredTrue == 0].index.intersection(yTest.index)) / yTest.shape[0]

In [None]:
for i in range(10):
    # Split in Train und Validierungsset
    XTrain, XTest, yTrain, yTest = train_test_split(X, y, test_size=0.2, random_state=i)
    # Initalisiere LDA Klassifikator
    ldaAuto = LinearDiscriminantAnalysis()
    # Trainier LDA Klassifikator
    ldaAuto.fit(XTrain, yTrain)
    # Berechne Conusion Matrix
    confMat = confusion_matrix(yTest, ldaAuto.predict(XTest))
    # Gebe Random State aus
    print('Random State:', i)
    # Gebe Trainingsfehler aus
    print('Trainingsfehler:', 1-ldaAuto.score(XTrain, yTrain))
    # Gebe Testfehler aus
    print('Testfehler:', 1-ldaAuto.score(XTest, yTest))
    # Gebe Anteil der schiwerig zu klassifierenden Observationen aus
    print('Anteil schwierige Observationen:', len(yPredTrue[yPredTrue == 0].index.intersection(yTest.index)) / yTest.shape[0])
    # Neue Zeile in Ausagabe
    print('\n')