# Features extracted from Wavelet coefficients
- Compare scores for wavelet coefficient features assigned by the feature selection method
- Choose feature selection metric `METRIC` as one of three options
- Recalculate wavelet features by enabling `GENERATE`

In [None]:
METRICS = ['C', 'F', 'MI']
METRIC = METRICS[2]
GENERATE = False

In [None]:
from zipfile import ZipFile
from typing import List, Tuple, Callable

import numpy as np
import pandas as pd
import matplotlib.pylab as plt

import pywt
from scipy.stats import kurtosis, entropy
from sklearn.feature_selection import mutual_info_classif, f_classif
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from tsfel import feature_extraction as ft

import os
import sys
sys.path.append('../')
from vibrodiagnostics import (
    mafaulda,
    extraction,
    selection
)

WT_COLUMNS_EXCLUDE = {'fault', 'severity', 'seq', 'rpm', 'axis', 'feature'}

In [None]:
PATH = '../datasets'
FEATURES_PATH = os.path.join(PATH, 'features')
DATASET_PATH = os.path.join(PATH, 'MAFAULDA.zip')
FEATURES = {
    'TD': os.path.join(FEATURES_PATH, 'MAFAULDA_TD.csv'),
    'FD': os.path.join(FEATURES_PATH, 'MAFAULDA_FD.csv'),
    'WT': os.path.join(FEATURES_PATH, 'MAFAULDA_WT.csv')
}
PARTS = 1

In [None]:
def calc_feature_selection_metric(
        fmetric: Callable,
        dataset: pd.DataFrame,
        columns: List[str]) -> pd.DataFrame:

    m = fmetric(dataset[columns], dataset['fault']) 
    if isinstance(m, tuple):
        m = m[0]
    return (pd.DataFrame(list(zip(columns, m)), columns=['feature', 'stat'])
                .set_index('feature')
                .sort_values(by='stat', ascending=False))
    

def calc_corr_stat(dataset, columns):
    return calc_feature_selection_metric(selection.corr_classif, dataset, columns)


def calc_f_stat(dataset, columns):
    return calc_feature_selection_metric(f_classif, dataset, columns)


def calc_mutual_information(dataset, columns):
    return calc_feature_selection_metric(mutual_info_classif, dataset, columns)

In [None]:
if METRIC == 'C':
    calc_func = calc_corr_stat
    title = 'Correlation'
elif METRIC == 'F':
    calc_func = calc_f_stat
    title = 'F statistic'
elif METRIC == 'MI':
    calc_func = calc_mutual_information
    title = 'Mutual information'

In [None]:
def normalize_features(features, columns):
    standard_transformer = Pipeline(steps=[('standard', StandardScaler())])
    minmax_transformer = Pipeline(steps=[('minmax', MinMaxScaler())])
    preprocessor = ColumnTransformer(
        remainder='passthrough',
        transformers=[
            ('std', standard_transformer, columns)
        ],
        verbose_feature_names_out=False
    )
    features_normalized = preprocessor.fit_transform(features)
    features_normalized = pd.DataFrame(
        features_normalized,
        columns=preprocessor.get_feature_names_out()
    )
    return features_normalized


def calc_score_in_wpd_features(src, func):
    c = pd.DataFrame()

    for metric, group in src.groupby(by='feature', observed=True):
        columns = list(set(group.columns) - WT_COLUMNS_EXCLUDE)
        df = func(group, columns)
        df['metric'] = metric
        c = pd.concat([c, df])

    c['metric'] = c['metric'].astype('category')
    return c


def plot_fscore_part(df: pd.DataFrame, part: str, title: str, n=None):
    num_of_windows = len(df[part].cat.categories)
    fig, ax = plt.subplots(1, num_of_windows, figsize=(20, 4))

    for i, grouper in enumerate(df.groupby(by=part, observed=True)):
        h, group = grouper
        if n is not None:
            group = group.iloc[:n]
        group.plot.bar(grid=True, xlabel='Feature', ylabel=title, legend=False, title=h, ax=ax[i])

    fig.tight_layout()
    plt.show()


def plot_wpd_energy_ratio_per_level(features: pd.DataFrame, wpd_axis: List[str]):
    features = features[features['axis'].isin(wpd_axis)]  
    features_energy_ratio = features[features['feature'] == 'energy_ratio']
    
    fig, ax = plt.subplots(6, 1, figsize=(15, 20))
    
    for level in range(1, 7):
        cols = np.array(columns)
        cols = cols[np.char.startswith(cols, f'L{level}')]
        mi = calc_func(features_energy_ratio, cols)
        
        o = ax.flatten()[level-1]
        o.bar(mi.index, mi.values.T[0])
        o.grid(True)
        o.set_xlabel('Feature')
        o.set_ylabel('MI')

        o.set_xticks(o.get_xticks())
        o.set_xticklabels(o.get_xticklabels(), rotation=45, ha='right')

    fig.suptitle(f'WPD energy ratio: Axis "{wpd_axis}"', fontsize=16, y=0.9)
    plt.show()


def level_to_frequency_bands(level, fs):
    bin_count = 2 ** level
    bin_width = (fs / 2) / bin_count
    for bin in range(bin_count):
        a = bin * bin_width
        b = a + bin_width
        print(f'L{level}_{bin} = [{a}; {b}] Hz')

level_to_frequency_bands(level=4, fs=50000)

Time domain

Unnormalized vs. Normalized features
- Result found: F score is independent of scaling

In [None]:
features = extraction.load_features(FEATURES['TD'], ['az'], mafaulda.LABEL_COLUMNS)
features = mafaulda.assign_labels(features, 'A')
features['fault'] = features['label']
columns = [c for c in features.columns if c not in ('label', 'fault')]

fscore = calc_func(features, columns)
features_normalized = normalize_features(features, columns)
fscore_norm = calc_func(features_normalized, columns)

fig, ax = plt.subplots(1, 2, figsize=(20, 5))
fscore.plot.bar(figsize=(10, 4), grid=True, xlabel='Feature', ylabel=title, legend=False, title='Unnormalized', ax=ax[0])
fscore_norm.plot.bar(figsize=(10, 4), grid=True, xlabel='Feature', ylabel=title, legend=False, title='Normalized', ax=ax[1])
plt.show()

Wavelet packet transform

In [None]:
def features_wavelet_domain(dataset: ZipFile, filename: str, parts: int = PARTS) -> pd.DataFrame:
    return mafaulda.features_by_domain(extraction.wavelet_features_calc, dataset, filename, parts=parts, multirow=True)

In [None]:
if GENERATE is True:
    features = extraction.load_files_split(ZipFile(DATASET_PATH), features_wavelet_domain)
    features.to_csv(FEATURES['WT'])
else:
    features = pd.read_csv(FEATURES['WT'], low_memory=False, index_col=0)

features = features[features['axis'].isin(('ax', 'ay', 'az'))]
features

In [None]:
df = calc_score_in_wpd_features(features, calc_func)
plot_fscore_part(df, 'metric', title, n=20)

WPD features in one layer

In [None]:
level = 3
df = calc_score_in_wpd_features(features, calc_func)
layer = df[df.index.str.startswith(f'L{level}')]
plot_fscore_part(layer, 'metric', title)

In [None]:
level = 4
df = calc_score_in_wpd_features(features, calc_func)
layer = df[df.index.str.startswith(f'L{level}')]
plot_fscore_part(layer, 'metric', title)

In [None]:
features_energy = features[features['feature'] == 'energy']
columns = list(set(features_energy.columns) - WT_COLUMNS_EXCLUDE)

print(len(features_energy))
mi = calc_func(features_energy, columns)
mi.iloc[:30].plot.bar(figsize=(20, 4), grid=True, ylabel=title, title='WPD Energy')
plt.show()

In [None]:
plot_wpd_energy_ratio_per_level(features, ['ax', 'ay', 'az'])

In [None]:
features_entropy = features[features['feature'] == 'negentropy']
columns = list(set(features_entropy.columns) - WT_COLUMNS_EXCLUDE)
print(len(features_entropy))


mi = calc_func(features_entropy, columns)
mi.iloc[:30].plot.bar(figsize=(20, 4), grid=True, ylabel=title, title='WPD Negentropy')
plt.show()

In [None]:
features_kurtosis = features[features['feature'] == 'kurtosis']
columns = list(set(features_energy.columns) - WT_COLUMNS_EXCLUDE)
print(len(features_kurtosis))

mi = calc_func(features_entropy, columns)
mi.iloc[:30].plot.bar(figsize=(20, 4), grid=True, ylabel=title, title='WPD Kurtosis')
plt.show()