In [None]:
import pandas as pd
import os
import numpy as np
import matplotlib.pyplot as plt
import json
import seaborn as sns


In [None]:
METHODS = [
    #['cv_plus', 'lin', None],
    #['jackknife_plus', 'lin', None],

    ['cv_plus', 'lin_lasso', None],
    ['jackknife_plus', 'lin_lasso', None],

    ['cv_plus', 'cart', None],
    ['jackknife_plus', 'cart', None],

    ['cv_plus', 'rf', None],
    ['jackknife_plus', 'rf', None],

    ['cv_plus', 'kr', None],
    ['jackknife_plus', 'kr', None],

    # ['cv_plus', 'svr', None],
    # ['jackknife_plus', 'svr', None],

    #['cqr', 'lin_quant', None],
    #['cqr', 'lgbm_quant', None],

]

In [None]:
def get_folder_names(path):
    folder_names = []
    for item in os.listdir(path):
        item_path = os.path.join(path, item)
        if os.path.isdir(item_path) and item not in ['$RECYCLE.BIN', 'System Volume Information']:
            folder_names.append(item)

    return folder_names

In [None]:
def set_column_values_above_alpha_to_alpha(df_func, column_name, alpha_func):
    """
    Set values in a specific column of the DataFrame higher than alpha to alpha.

    Parameters:
    df (pd.DataFrame): The DataFrame to process.
    column_name (str): The name of the column to process.
    alpha (int or float): The threshold value.

    Returns:
    pd.DataFrame: The DataFrame with values higher than alpha in the specified column set to alpha.
    """
    df_func[column_name] = df_func[column_name].apply(lambda x: alpha_func if x and x > alpha_func else x)
    return df_func

In [None]:
exp_path = "/mnt/e/Experiment_x264_energy"
sampling_strategies = get_folder_names(exp_path)
sampling_strategies.remove("old")
print(sampling_strategies)

In [None]:
#auswahl_test_strategien = ["OW", "T-wise_2", "T-wise_3"]
auswahl_test_strategien = ["Distance_1", "Distance_2", "Distance_3"]
# auswahl_test_strategien = ["Distance_1", "Distance_2", "Distance_3", "Random_1", "Random_2", "Random_3"]
# auswahl_test_strategien = ["Random_1", "Random_2", "Random_3"]

runs = np.empty(len(sampling_strategies), dtype=object)
for i in range(len(sampling_strategies)):
    runs[i] = get_folder_names(os.path.join(exp_path, sampling_strategies[i]))
    print(runs[i])

In [None]:
result_dfs = {}
alpha = 0.1

In [None]:
for m in range(len(METHODS)):
    ml_models = METHODS[m]
    df = pd.DataFrame()
    for s in range(len(auswahl_test_strategien)):
        cov_dif = []
        for r in runs[s]:
            json_name = f"{ml_models[0]}_{ml_models[1]}_{ml_models[2]}_result.txt"
            json_path = os.path.join(exp_path, auswahl_test_strategien[s], r, json_name)
            try:
                with open(json_path) as json_file:
                    data = json.load(json_file)
                    data["Cov_dif"] = abs(data['coverage'] - (1 - alpha))
                    cov_dif.append(data['Cov_dif'])
            except FileNotFoundError:
                print(f"{auswahl_test_strategien[s]}: {r} has no Data for this ML-Model({ml_models})")
                cov_dif.append(None)
        df[auswahl_test_strategien[s]] = cov_dif
    result_dfs[m] = df


In [None]:
data_dict = {}
for m in range(len(METHODS)):
    data_dict[f"{METHODS[m]}"] = result_dfs[m]

In [None]:
data_dict

In [None]:
# Liste für die kombinierten DataFrames
combined_dfs = []

# Durch alle DataFrames und Methoden iterieren
for df, method in zip(result_dfs, METHODS):
    method_str = ""
    model_str = ""
    if method[0] == "cv_plus":
        method_str = "CrossValidation+"
    if method[0] == "jackknife_plus":
        method_str = "Jackknife+"

    if method[1] == "rf":
        model_str = "RandomForest"
    if method[1] == "kr":
        model_str = "KernelRidge"
    if method[1] == "cart":
        model_str = "DecisionTree"
    if method[1] == "lin_lasso":
        model_str = "LinearLasso"

    result_dfs[df]['Method'] = method_str
    result_dfs[df]['Model'] = model_str
    combined_dfs.append(result_dfs[df])  # DataFrame zur Liste hinzufügen

# Alle DataFrames zu einem einzelnen DataFrame kombinieren
final_df = pd.concat(combined_dfs, ignore_index=True)
final_df

In [None]:
sns.set(font_scale=1.8)

# Liste der Spalten, die geplottet werden sollen
#columns_to_plot = ['Random_1', 'Random_2', 'Random_3']
columns_to_plot = auswahl_test_strategien

# DataFrame in ein long format umwandeln, um seaborn FacetGrid zu verwenden
df_long = final_df.melt(id_vars=['Method', 'Model'], value_vars=columns_to_plot, var_name='Random', value_name='Coverage Fehler')

# Sortiere nach Method und Model für die richtige Anordnung
df_long = df_long.sort_values(by=['Method', 'Model'])

# FacetGrid erstellen
g = sns.FacetGrid(df_long, row='Method', col='Model', hue='Random', sharex=True, sharey=True, height=3,  aspect=1.5)

# KDE-Plots zu FacetGrid hinzufügen
g.map(sns.kdeplot, 'Coverage Fehler', fill=True, cut=2)

g.set(xlim=(0, 0.4), ylim=(0, 30))

# Legende und Titel hinzufügen
#g.add_legend()

# Anpassung der Achsenbeschriftungen
for ax, title in zip(g.axes.flat, df_long['Model'].unique()):
    ax.set_title(title)

# Linke Beschriftungen anpassen
for ax, row_val in zip(g.axes[:, 0], df_long['Method'].unique()):
    ax.set_ylabel(f"{row_val}\n\n Dichte")

# Remove titles for all but the first row
for ax in g.axes[1:, :].flatten():
    ax.set_title('')

for ax in g.axes[0, 1:]:
    ax.set_ylabel("")

for ax in g.axes[1, 1:]:
    ax.set_ylabel("")

plt.subplots_adjust(top=0.9)
#g.fig.suptitle('KDE Plots for Different Methods and Models')

# tight_layout anwenden
plt.tight_layout(rect=[0, 0, 1, 0.95])

plt.legend(title='Sampling', bbox_to_anchor=(0.43, 2.34), loc='upper left', borderaxespad=0., frameon=True, fontsize=15)

# Plot speichern, ohne dass die Beschriftungen abgeschnitten werden
g.savefig(os.path.join(exp_path, "distance-com-grid"), bbox_inches='tight')

# Plot anzeigen
plt.show()

# Statistische Tests

In [None]:
final_df

In [None]:
import scipy.stats as stats
grouped = final_df.groupby(['Method', 'Model'])

# Initialisierung eines Zählers für normalverteilte Kombinationen
normal_count = 0

# Anzahl der durchgeführten Tests (3 Tests pro Kombination)
num_tests = 24

# Signifikanzniveau
alpha = 0.05

# Anpassung des p-Werts durch Bonferroni-Korrektur
adjusted_alpha = alpha / num_tests

# Überprüfen der Normalverteilung für jede Gruppe
for name, group in grouped:
    # Shapiro-Wilk-Test für jede der Random-Spalten
    shapiro_results = [stats.shapiro(group[col])[1] for col in ['Random_1', 'Random_2', 'Random_3']]
    print(shapiro_results)
    # Überprüfen, ob alle p-Werte größer sind als der angepasste Alpha-Wert
    for p in shapiro_results:
        if p > adjusted_alpha:
            normal_count += 1
    #if all(p > adjusted_alpha for p in shapiro_results):
    #    normal_count += 1

# Ausgabe der Anzahl normalverteilten Kombinationen
normal_count

In [None]:
from scipy.stats import kruskal
results = []

grouped = final_df.groupby(['Method', 'Model'])

# Iteriere über jede Gruppe
for (method, model), group in grouped:
    # Kruskal-Wallis Test für Random_1 vs Distance_1
    stat_1, p_1 = kruskal(group['Random_1'], group['Distance_1'])
    # Kruskal-Wallis Test für Random_2 vs Distance_2
    stat_2, p_2 = kruskal(group['Random_2'], group['Distance_2'])
    # Kruskal-Wallis Test für Random_3 vs Distance_3
    stat_3, p_3 = kruskal(group['Random_3'], group['Distance_3'])

    # Speichere Ergebnisse
    results.append({
        'Method': method,
        'Model': model,
        'Stat_R1_vs_D1': stat_1,
        'p_R1_vs_D1': p_1,
        'Stat_R2_vs_D2': stat_2,
        'p_R2_vs_D2': p_2,
        'Stat_R3_vs_D3': stat_3,
        'p_R3_vs_D3': p_3
    })

# Erstelle ein DataFrame mit den Ergebnissen
results_df = pd.DataFrame(results)

In [None]:
results_df

In [None]:
from scipy.stats import shapiro, levene

def check_assumptions(data):
    # Check for normality using Shapiro-Wilk test
    normality_results = {}
    for col in data.columns:
        stat, p = shapiro(data[col])
        normality_results[col] = {'Shapiro-Wilk': {'statistic': stat, 'p-value': p, 'normality': p > 0.05}}

    # Check for homogeneity of variances using Levene's test
    stat, p = levene(*[data[col] for col in data.columns])
    homogeneity_of_variances = {'Levene': {'statistic': stat, 'p-value': p, 'homogeneity': p > 0.05}}

    return normality_results, homogeneity_of_variances

In [None]:
def reshape_data(data):
    reshaped_data = data.melt(var_name='Sampling_Strategy', value_name='Prediction')
    return reshaped_data

from scikit_posthocs import posthoc_dunn
# Post-hoc pairwise comparisons with Dunn-Bonferroni test
for model, data in data_dict.items():
    print()
    dunn_results = posthoc_dunn(reshape_data(data), group_col="Sampling_Strategy", val_col="Prediction", p_adjust='bonferroni')
    print(f"\nPost-hoc Pairwise Comparisons (Dunn-Bonferroni Test) {model}:")
    print(dunn_results)

In [None]:
final_df

In [None]:
from scipy.stats import kruskal

# Annahme: df ist dein DataFrame mit Spalten Random_1, Random_2, Random_3, Method und Model

# Liste der Spalten, in denen du den Test durchführen möchtest
columns_to_test = ['Random_1', 'Random_2', 'Random_3']

# Ein leeres DataFrame, um die Ergebnisse zu speichern
results = []
models = final_df["Model"].unique().tolist()
# Iteriere über jede Spalte, um den Kruskal-Wallis-Test durchzuführen
for m in models:
    for column in columns_to_test:
        # Daten für Method A und Method B filtern
        data_method_A = final_df[(final_df['Method'] == 'Jackknife+') & (final_df['Model'] == m)][column]
        data_method_B = final_df[(final_df['Method'] == 'CrossValidation+') & (final_df['Model'] == m)][column]

        # Kruskal-Wallis-Test durchführen
        stat, p_value = kruskal(data_method_A, data_method_B)

        # Ergebnisse speichern
        results.append({
            'Column': column,
            'Method A': 'Jackknife+',
            'Method B': 'CrossValidation+',
            'Model': m,
            'Statistic': stat,
            'P-Value': p_value
        })

# Ergebnisse anzeigen oder weiter verarbeiten
results_df = pd.DataFrame(results)
results_df