In [1]:
targetFolder = r"G:\Semester Arbeit\Programming\masterarbeit\HBK_20kHz_healthy_and_wear\normalization\z-score\frequency"

# Environment Setup & Imports

In [2]:
import os
import pandas as pd
import glob
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import pickle
import numpy as np
import mlflow
from pytorch_lightning.loggers import MLFlowLogger
from scipy import stats
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import accuracy_score, balanced_accuracy_score, hamming_loss, hinge_loss, jaccard_score, log_loss, precision_score, recall_score, f1_score, make_scorer
from pathlib import Path
from pycaret.classification import * 
from torch import tensor
from torchmetrics.classification import BinaryAccuracy, MulticlassAccuracy
import optuna
import torch
import omegaconf.dictconfig
from sklearn.model_selection import train_test_split
from pytorch_tabular import TabularModel
from pytorch_tabular.models import GANDALFConfig, CategoryEmbeddingModel,GatedAdditiveTreeEnsembleConfig, NodeConfig, FTTransformerConfig, TabNetModelConfig
from pytorch_tabular.config import (
    DataConfig,
    OptimizerConfig,
    ModelConfig,
    TrainerConfig,
    ExperimentConfig,
)
from collections import Counter

## Import Dataset

In [3]:
p = Path(targetFolder)
print(p)

G:\Semester Arbeit\Programming\masterarbeit\HBK_20kHz_healthy_and_wear\normalization\z-score\frequency


In [4]:
domain = p.name
print(domain)
normalization = p.parent.name
print(normalization)
method = p.parents[1].name
print(method)
experiment = p.parents[2].name
experiment_name = f"{experiment}_{method}_{normalization}_{domain}"
normalization_dir = p.parent.parent

frequency
z-score
normalization


In [5]:
hbk_training_data = glob.glob(targetFolder + "/*.csv")
#siza_test_data = glob.glob(targetFolder + "/SIZA_test_data/*.csv")
siza_test_data = glob.glob(str(normalization_dir / "SIZA_test_data" / "*.csv"))

In [6]:
dfs_hbk = [pd.read_csv(file) for file in hbk_training_data]
combined_df_hbk = pd.concat(dfs_hbk, ignore_index=True)
dfs_siza = [pd.read_csv(file) for file in siza_test_data]
combined_df_siza = pd.concat(dfs_siza, ignore_index=True)

ValueError: No objects to concatenate

In [None]:
features_df_training = combined_df_hbk.drop(columns=['Speed', 'Torque'])
features_df_testing = combined_df_siza.drop(columns=['Speed', 'Torque'])

In [None]:
def plotHistograms(dataframe, domain, normalization):
    for col in dataframe.columns:
        if col == "Label":
            continue
        plt.figure(figsize=(8, 4))
        sns.histplot(data = dataframe, x=col, hue='Label', kde=True, multiple='layer', element='step', alpha=0.5)
        plt.title(f"Distribution of {col} in the {domain} domain with {normalization}")
        plt.show()

In [None]:
def normalizeDataframe(dataframe, normalization_method):
    y = dataframe['Label']                  
    X = dataframe.drop(columns=['Label']) 

    if normalization_method == "min_max":
        scaler = MinMaxScaler()
    elif normalization_method == "z_score":
        scaler = StandardScaler()
    
    X_scaled = pd.DataFrame(
        scaler.fit_transform(X),
        columns=X.columns,
        index=X.index
    )
    df_scaled = X_scaled.join(y)
    return df_scaled    

In [None]:
def plotPredictionHistograms(df, domain, normalization):
    # 1) mark correct vs incorrect
    df = df.copy()
    df['prediction_quality'] = np.where(
        df['Label'] == df['prediction_label'],
        'correct',
        'incorrect'
    )
    
    # 2) choose a palette (you can override these colors if you like)
    pal = dict(zip(
        ['correct','incorrect'],
        sns.color_palette(n_colors=2)
    ))
    
    skip = {'Label','prediction_label','prediction_score','prediction_quality'}
    for col in df.columns:
        if col in skip:
            continue
        
        fig, ax = plt.subplots(figsize=(8,4))
        sns.histplot(
            data=df, x=col, hue='prediction_quality',
            palette=pal,
            kde=True, multiple='layer', element='step',
            alpha=0.5,
            ax=ax
        )
        
        # 3) build a manual legend using the same palette
        handles = [
            mpatches.Patch(color=pal[k], label=k)
            for k in ['correct','incorrect']
        ]
        ax.legend(
            handles=handles,
            title='Prediction Quality'
        )
        
        ax.set_title(
            f"Distribution of {col} in the '{domain}' domain\n"
            f"(normalization = '{normalization}')"
        )
        ax.set_xlabel(col)
        ax.set_ylabel("Count")
        plt.tight_layout()
        plt.show()

In [None]:
def get_incorrect_predictions(df):
    return df[
        ((df['Label'] == 'damaged')   & (df['prediction_label'] == 'healthy'))
      | ((df['Label'] == 'healthy')  & (df['prediction_label'] == 'damaged'))
    ].copy()

In [None]:
def get_feature_importance_df(model, df):
    importance = model.feature_importances_
    n = len(importance)
    features = df.columns[:n]
    fi_df = pd.DataFrame({
        'Features': features,
        'importance': importance
    })
    return fi_df.sort_values(by='importance', ascending=False).reset_index(drop=True)

In [None]:
def get_svm_feature_importance_df(model, df):
    if not hasattr(model, 'coef_'):
        raise ValueError("This SVM model has no coefficients. Use a linear kernel.")
    
    importance = model.coef_.ravel()  # Flatten in case of binary classification
    n = len(importance)
    features = df.columns[:n]
    fi_df = pd.DataFrame({
        'Features': features,
        'importance': abs(importance)
    })
    return fi_df.sort_values(by='importance', ascending=False).reset_index(drop=True)


In [None]:
feature_counter = Counter()
def add_top_features(feature_df: pd.DataFrame, top_n: int):
    top_features = feature_df.nlargest(top_n, 'importance')['Features']
    feature_counter.update(top_features)
    
def plot_feature_importance():
    feature_freq = pd.DataFrame(feature_counter.items(), columns=['Feature', 'Count'])
    plt.figure(figsize=(10, 5))
    sns.barplot(data=feature_freq.sort_values(by='Count', ascending=False),
                x='Feature', y='Count')
    plt.xticks(rotation=45)
    plt.title('Feature Frequency Across Experiments')
    plt.tight_layout()
    plt.show()

In [None]:
def save_feature_counter(exp_name: str, folder: str = f"top_features_{domain}"):
    os.makedirs(folder, exist_ok=True)
    filepath = os.path.join(folder, f"{exp_name}_features.csv")
    df = pd.DataFrame(feature_counter.items(), columns=['Feature', 'Count'])
    df.to_csv(filepath, index=False)

In [None]:
#add_safe_globals([omegaconf.dictconfig.DictConfig])

_original_torch_load = torch.load
def safe_torch_load(*args, **kwargs):
    kwargs['weights_only'] = False  # trust the checkpoint
    return _original_torch_load(*args, **kwargs)

torch.load = safe_torch_load

In [None]:
features_df_training_normalized = normalizeDataframe(features_df_training, "z_score")
features_df_testing_normalized = normalizeDataframe(features_df_testing, "z_score")

## Histograms Training (HBK) and Testing (SIZA) [NO-Normalization]

In [None]:
plotHistograms(features_df_training, domain, normalization)

In [None]:
plotHistograms(features_df_testing, domain, normalization)

## Histograms Training (HBK) and Testing (SIZA) [Z-Score Normalization]

In [None]:
plotHistograms(features_df_training_normalized, domain, normalization)

In [None]:
plotHistograms(features_df_testing_normalized, domain, normalization)

# Experiment Setup (ML)

## Setup Hyperparameters

In [None]:
experiment = setup(features_df_training_normalized, target='Label', log_experiment = True, experiment_name = experiment_name)

## Add aditional metrics

In [None]:
add_metric('balanced_acc', 'Balance Acc', balanced_accuracy_score, target='pred', greater_is_better=True)
add_metric('hamming_loss', 'Hamming Loss', hamming_loss, target='pred', greater_is_better=False)
add_metric('jaccard_score', 'Jaccard Score', jaccard_score, target='pred', greater_is_better=True)
add_metric('log_loss', 'Log Loss', log_loss, target='pred_proba', greater_is_better=False)

In [None]:
all_metrics = get_metrics()
all_metrics

## Best Models

In [None]:
best_models = compare_models(n_select=3)

## Extra Trees Classifier

In [None]:
et = create_model('et')

In [None]:
evaluate_model(et)

In [None]:
et_top_features = get_feature_importance_df(et, features_df_training_normalized)
et_top_features

In [None]:
add_top_features(et_top_features, top_n=3)

In [None]:
plot_model(et, plot='feature')

predictions_et = predict_model(et, data = features_df_testing_normalized)

predictions_et

## Light Gradient Boosting Machine

In [None]:
lightgbm = create_model('lightgbm')

In [None]:
evaluate_model(lightgbm)

In [None]:
lightgbm_top_features = get_feature_importance_df(lightgbm, features_df_training_normalized)
lightgbm_top_features

In [None]:
add_top_features(lightgbm_top_features, top_n=3)

In [None]:
plot_model(lightgbm, plot='feature')

predictions_lightgbm = predict_model(lightgbm, data = features_df_testing_normalized)

predictions_lightgbm

get_incorrect_predictions(predictions_lightgbm)

plotPredictionHistograms(predictions_lightgbm, domain, normalization)

## Random Forest Classifier

In [None]:
rf = create_model('rf')

In [None]:
evaluate_model(rf)

In [None]:
rf_top_features = get_feature_importance_df(rf, features_df_training_normalized)
rf_top_features

In [None]:
add_top_features(rf_top_features, top_n=3)

In [None]:
plot_model(rf, plot='feature')

predictions_rf = predict_model(rf, data = features_df_testing_normalized)

get_incorrect_predictions(predictions_rf)

plotPredictionHistograms(predictions_rf, domain, normalization)

## KNN

In [None]:
knn = create_model('knn')

In [None]:
evaluate_model(knn)

predictions_knn = predict_model(knn, data=features_df_testing_normalized)

get_incorrect_predictions(predictions_knn)

plotPredictionHistograms(predictions_knn, domain, normalization)

## SVM

In [None]:
svm = create_model('svm')

In [None]:
evaluate_model(svm)

In [None]:
svm_top_features = get_svm_feature_importance_df(svm, features_df_training_normalized)
svm_top_features

In [None]:
add_top_features(svm_top_features, top_n=3)

In [None]:
plot_model(svm, plot='feature')

predictions_svm = predict_model(svm, data=features_df_testing_normalized)

get_incorrect_predictions(predictions_svm)

plotPredictionHistograms(predictions_svm, domain, normalization)

# Experiment Setup (DL)

## Configure Data

In [None]:
train, test = train_test_split(features_df_training_normalized, test_size=0.2, random_state=42)
train, val = train_test_split(train, test_size=0.2, random_state=42)
print(f"Train Shape: {train.shape} | Val Shape: {val.shape} | Test Shape: {test.shape}")

In [None]:
target = "Label"

categorical_cols = [
    col
    for col in features_df_training_normalized.select_dtypes(include=["object","category"]).columns
    if col != target
]

continuous_cols = features_df_training_normalized.select_dtypes(include=["number"]).columns.tolist()

In [None]:
print("Target:", target)
print("Categorical inputs:", categorical_cols)  
print("Continuous inputs:", continuous_cols)    

In [None]:
data_config = DataConfig(
    target=[target],
    continuous_cols=continuous_cols,
    categorical_cols=categorical_cols,
)

In [None]:
available_gpu=1 if torch.cuda.is_available() else 0
print(f"Available GPU: {'Yes' if available_gpu else 'No'}")

In [None]:
trainer_config = TrainerConfig(
    auto_lr_find=True,
    max_epochs=10,
    accelerator='gpu' if torch.cuda.is_available() else 'cpu',
    batch_size=256,
)
optimizer_config = OptimizerConfig()

## TabNet

In [None]:
tabnet_config = TabNetModelConfig(
        task="classification",
        n_d=63,
        n_a=10,
        n_steps=6,
        gamma=1.026540650882887,
        embedding_dropout=0.307433096979297,
        learning_rate=1.1025281120081813e-05,
        n_independent=2,
        metrics=[
            "accuracy", 
            "auroc",
            "recall",
            "precision",
            "f1_score",
            "cohen_kappa",
            "matthews_corrcoef",
            "hamming_distance",
            "jaccard_index",
        ],
        metrics_prob_input=[
            False,  #acc
            True,   # auroc
            False,  # recall
            False,  # precision
            False,  # f1_score
            False,  # cohen_kappa
            False,  # matthews_corrcoef
            False,  # hamming_distance
            False,  # jaccard_index
        ],
        metrics_params=[
            {"average": "macro"},
            {"average": "macro", "num_classes": 2},  # auroc
            {"average": "macro", "num_classes": 2},  # recall
            {"average": "macro", "num_classes": 2},  # precision
            {"average": "macro", "num_classes": 2},  # f1_score
            {"num_classes": 2},                      # cohen_kappa
            {},                                      # matthews_corrcoef
            {},                                      # hamming_distance
            {"average": "macro", "num_classes": 2},  # jaccard_index
        ]
    )

In [None]:
tabnet_model = TabularModel(
        data_config=data_config,
        model_config=tabnet_config,
        optimizer_config=optimizer_config,
        trainer_config=trainer_config,
        verbose=True
    )

In [None]:
tabnet_model.fit(train=train, validation=val)

In [None]:
tabnet_pred_df = tabnet_model.predict(test)
tabnet_pred_df.head(10)
tabnet_result = tabnet_model.evaluate(test)

In [None]:
tabnet_result_siza = tabnet_model.evaluate(features_df_testing_normalized)

## Gandalf

In [None]:
gandalf_config = GANDALFConfig(
        task="classification",
        gflu_stages=10,
        gflu_dropout=0.1849998231685943,
        gflu_feature_init_sparsity=0.10823203851577612,
        learnable_sparsity=True,
        embedding_dropout= 0.03018586684994884,
        batch_norm_continuous_input=True,
        learning_rate=2.4840389554746963e-05,
        metrics=[
            "accuracy", 
            "auroc",
            "recall",
            "precision",
            "f1_score",
            "cohen_kappa",
            "matthews_corrcoef",
            "hamming_distance",
            "jaccard_index",
        ],
        metrics_prob_input=[
            False,  #acc
            True,   # auroc
            False,  # recall
            False,  # precision
            False,  # f1_score
            False,  # cohen_kappa
            False,  # matthews_corrcoef
            False,  # hamming_distance
            False,  # jaccard_index
        ],
        metrics_params=[
            {"average": "macro"},
            {"average": "macro", "num_classes": 2},  # auroc
            {"average": "macro", "num_classes": 2},  # recall
            {"average": "macro", "num_classes": 2},  # precision
            {"average": "macro", "num_classes": 2},  # f1_score
            {"num_classes": 2},                      # cohen_kappa
            {},                                      # matthews_corrcoef
            {},                                      # hamming_distance
            {"average": "macro", "num_classes": 2},  # jaccard_index
        ]
    )

In [None]:
gandalf_model = TabularModel(
    data_config=data_config,
    model_config=gandalf_config,
    optimizer_config=optimizer_config,
    trainer_config=trainer_config,
    verbose=True
)

In [None]:
gandalf_model.fit(train=train, validation=val)

In [None]:
gandalf_result = gandalf_model.evaluate(test)

In [None]:
gandalf_result_siza = gandalf_model.evaluate(features_df_testing_normalized)

In [None]:
gandalf_top_features = gandalf_model.feature_importance().sort_values("importance", ascending=False)
gandalf_top_features

In [None]:
add_top_features(gandalf_top_features, top_n=3)

In [None]:
plot_feature_importance()

## FTTransformerModel

In [None]:
ftt_config = FTTransformerConfig(
        task="classification",
        input_embed_dim=128,
        embedding_initialization='kaiming_uniform',
        embedding_bias=True,
        share_embedding=False,
        share_embedding_strategy='add',
        shared_embedding_fraction=0.2691299923873223,
        attn_feature_importance=True,
        num_heads=2,
        num_attn_blocks=3,
        transformer_head_dim=160,
        attn_dropout=0.07407153026858466,
        add_norm_dropout=0.41081964891975553,
        ff_dropout=0.4205791050633416,
        ff_hidden_multiplier=6,
        transformer_activation='ReLU',
        embedding_dropout=0.08383814184519878,
        batch_norm_continuous_input=True,
        learning_rate=1.9746243632817873e-05,
        metrics=[
            "accuracy", 
            "auroc",
            "recall",
            "precision",
            "f1_score",
            "cohen_kappa",
            "matthews_corrcoef",
            "hamming_distance",
            "jaccard_index",
        ],
        metrics_prob_input=[
            False,  #acc
            True,   # auroc
            False,  # recall
            False,  # precision
            False,  # f1_score
            False,  # cohen_kappa
            False,  # matthews_corrcoef
            False,  # hamming_distance
            False,  # jaccard_index
        ],
        metrics_params=[
            {"average": "macro"},
            {"average": "macro", "num_classes": 2},  # auroc
            {"average": "macro", "num_classes": 2},  # recall
            {"average": "macro", "num_classes": 2},  # precision
            {"average": "macro", "num_classes": 2},  # f1_score
            {"num_classes": 2},                      # cohen_kappa
            {},                                      # matthews_corrcoef
            {},                                      # hamming_distance
            {"average": "macro", "num_classes": 2},  # jaccard_index
        ]
    )

In [None]:
ftt_model = TabularModel(
        data_config=data_config,
        model_config=ftt_config,
        optimizer_config=optimizer_config,
        trainer_config=trainer_config,
        verbose=True,
    )

In [None]:
ftt_model.fit(train=train, validation=val)

In [None]:
ftt_pred_df = ftt_model.predict(test)
ftt_pred_df.head(10)
ftt_result = ftt_model.evaluate(test)

In [None]:
ftt_result_siza = ftt_model.evaluate(features_df_testing_normalized)

In [None]:
ftt_top_features = ftt_model.feature_importance().sort_values("importance", ascending=False)
ftt_top_features

In [None]:
add_top_features(ftt_top_features, top_n=3)

In [None]:
plot_feature_importance()

In [None]:
save_feature_counter(experiment_name, f"top_features_{domain}")