In [1]:
# Import libraries

import numpy as np
from numpy import quantile, where, random

import pandas as pd
import random

import matplotlib.pyplot as plt
%matplotlib inline

from scipy.stats import multivariate_normal

from sklearn.model_selection import KFold
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.metrics import max_error, mean_squared_error, accuracy_score, precision_score, recall_score, f1_score
from sklearn.neighbors import LocalOutlierFactor
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier

from imblearn.over_sampling import SMOTE

from itertools import combinations

import warnings

import sys

sys.path.insert(0, 'C:/Users/gioca/GitHub/projects/maintenance_industry_4_2024/supporting_scripts/WP_4_20240528/')
from utility import read_all_test_data_from_path, extract_selected_feature, prepare_sliding_window, FaultDetectReg, read_all_csvs_one_test, run_cv_one_motor, show_reg_result, show_clf_result

In [2]:
# Data preprocessing definition (training)

from scipy.signal import butter, filtfilt

def compensate_seq_bias(df: pd.DataFrame):
    ''' # Description
    Adjust for the sequence-to-sequence bias.
    '''
    # Tranform the features relative to the first data point.
    df['temperature'] = df['temperature'] - df['temperature'].iloc[0]
    df['voltage'] = df['voltage'] - df['voltage'].iloc[0]
    df['position'] = df['position'] - df['position'].iloc[0]

# Function to design a Butterworth low-pass filter
def butter_lowpass(cutoff, fs, order=5):
    nyquist = 0.5 * fs
    normal_cutoff = cutoff / nyquist
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    return b, a

# Function to apply the Butterworth low-pass filter
def lowpass_filter(data, cutoff_freq, sampling_freq, order=5):
    b, a = butter_lowpass(cutoff_freq, sampling_freq, order=order)
    filtered_data = filtfilt(b, a, data)
    return filtered_data

# Set parameters for the low-pass filter
cutoff_frequency = .5  # Adjust as needed
sampling_frequency = 10  # Assuming your data is evenly spaced in time

def customized_outlier_removal(df: pd.DataFrame):
    ''' # Description
    Remove outliers from the dataframe based on defined valid ranges. 
    Define a valid range of temperature and voltage. 
    Use ffil function to replace the invalid measurement with the previous value.
    '''
    df['position'] = df['position'].where(df['position'] <= 1000, np.nan)
    df['position'] = df['position'].where(df['position'] >= 0, np.nan)
    df['position'] = df['position'].ffill()
    df['position'] = lowpass_filter(df['position'], cutoff_frequency, sampling_frequency)
    df['position'] = df['position'].rolling(window=10, min_periods=1).mean()
    df['position'] = df['position'].round()

    df['temperature'] = df['temperature'].where(df['temperature'] <= 100, np.nan)
    df['temperature'] = df['temperature'].where(df['temperature'] >= 0, np.nan)
    df['temperature'] = df['temperature'].rolling(window=10, min_periods=1).mean()

    # Make sure that the difference between the current and previous temperature cannot be too large.
    # Define your threshold
    threshold = 10
    # Shift the 'temperature' column by one row to get the previous temperature
    prev_tmp = df['temperature'].shift(1)
    # Calculate the absolute difference between current and previous temperature
    temp_diff = np.abs(df['temperature'] - prev_tmp)
    # Set the temperature to NaN where the difference is larger than the threshold
    df.loc[temp_diff > threshold, 'temperature'] = np.nan
    df['temperature'] = df['temperature'].ffill()

    df['voltage'] = df['voltage'].where(df['voltage'] >= 6000, np.nan)
    df['voltage'] = df['voltage'].where(df['voltage'] <= 8000, np.nan)
    df['voltage'] = df['voltage'].ffill()
    df['voltage'] = lowpass_filter(df['voltage'], cutoff_frequency, sampling_frequency)
    df['voltage'] = df['voltage'].rolling(window=5, min_periods=1).mean()

def pre_processing(df: pd.DataFrame):
    ''' ### Description
    Preprocess the data:
    - remove outliers
    - Adjust for the sequence-to-sequence bias.
    - add new features about the difference between the current and previous n data point.
    '''     
    # Start processing.
    customized_outlier_removal(df)
    compensate_seq_bias(df)

from utility import read_all_csvs_one_test
import matplotlib.pyplot as plt
import os

base_dictionary = 'C:/Users/gioca/OneDrive - Politecnico di Milano/POLISSS/CentraleSupelec/Maintenance&I4.0/training_data/'

# Get all the folders in the base_dictionary
path_list = os.listdir(base_dictionary)
# Only keep the folders, not the excel file.
path_list = path_list[:-1]

# Read the data.
df_data_smooth = pd.DataFrame()
for tmp_path in path_list:
    path = base_dictionary + tmp_path
    # Read the data with the customized outlier removal function.
    tmp_df = read_all_csvs_one_test(path, tmp_path, pre_processing)
    df_data_smooth = pd.concat([df_data_smooth, tmp_df])
    df_data_smooth = df_data_smooth.reset_index(drop=True)

# df_data_smooth_tr = df_data_smooth[df_data_smooth['test_condition'].isin(normal_test_id)]

In [3]:
# Data reading and pre-processing (training)

# Read the data.
df_tr = pd.DataFrame()
for tmp_path in path_list:
    path = base_dictionary + tmp_path
    # Read the data with the customized outlier removal function.
    tmp_df = read_all_csvs_one_test(path, tmp_path, customized_outlier_removal)
    df_tr = pd.concat([df_tr, tmp_df])
    df_tr = df_tr.reset_index(drop=True)

conditions_path = r'C:/Users/gioca/OneDrive - Politecnico di Milano/POLISSS/CentraleSupelec/Maintenance&I4.0/training_data/Test conditions.xlsx'
conditions = pd.read_excel(conditions_path)
df_merged = df_tr.merge(conditions, how='left', left_on='test_condition', right_on='Test id')
df_tr['operation'] = df_merged['Description']
list_operations = pd.unique(conditions.Description).tolist()

In [4]:
# Data preprocessing definition (testing)

from scipy.signal import butter, filtfilt

def compensate_seq_bias(df: pd.DataFrame):
    ''' # Description
    Adjust for the sequence-to-sequence bias.
    '''
    # Tranform the features relative to the first data point.
    df['temperature'] = df['temperature'] - df['temperature'].iloc[0]
    df['voltage'] = df['voltage'] - df['voltage'].iloc[0]
    df['position'] = df['position'] - df['position'].iloc[0]

# Function to design a Butterworth low-pass filter
def butter_lowpass(cutoff, fs, order=5):
    nyquist = 0.5 * fs
    normal_cutoff = cutoff / nyquist
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    return b, a

# Function to apply the Butterworth low-pass filter
def lowpass_filter(data, cutoff_freq, sampling_freq, order=5):
    b, a = butter_lowpass(cutoff_freq, sampling_freq, order=order)
    filtered_data = filtfilt(b, a, data)
    return filtered_data

# Set parameters for the low-pass filter
cutoff_frequency = .5  # Adjust as needed
sampling_frequency = 10  # Assuming your data is evenly spaced in time

def customized_outlier_removal(df: pd.DataFrame):
    ''' # Description
    Remove outliers from the dataframe based on defined valid ranges. 
    Define a valid range of temperature and voltage. 
    Use ffil function to replace the invalid measurement with the previous value.
    '''
    df['position'] = df['position'].where(df['position'] <= 1000, np.nan)
    df['position'] = df['position'].where(df['position'] >= 0, np.nan)
    df['position'] = df['position'].ffill()
    df['position'] = lowpass_filter(df['position'], cutoff_frequency, sampling_frequency)
    df['position'] = df['position'].rolling(window=10, min_periods=1).mean()
    df['position'] = df['position'].round()

    df['temperature'] = df['temperature'].where(df['temperature'] <= 100, np.nan)
    df['temperature'] = df['temperature'].where(df['temperature'] >= 0, np.nan)
    df['temperature'] = df['temperature'].rolling(window=10, min_periods=1).mean()

    # Make sure that the difference between the current and previous temperature cannot be too large.
    # Define your threshold
    threshold = 10
    # Shift the 'temperature' column by one row to get the previous temperature
    prev_tmp = df['temperature'].shift(1)
    # Calculate the absolute difference between current and previous temperature
    temp_diff = np.abs(df['temperature'] - prev_tmp)
    # Set the temperature to NaN where the difference is larger than the threshold
    df.loc[temp_diff > threshold, 'temperature'] = np.nan
    df['temperature'] = df['temperature'].ffill()

    df['voltage'] = df['voltage'].where(df['voltage'] >= 6000, np.nan)
    df['voltage'] = df['voltage'].where(df['voltage'] <= 8000, np.nan)
    df['voltage'] = df['voltage'].ffill()
    df['voltage'] = lowpass_filter(df['voltage'], cutoff_frequency, sampling_frequency)
    df['voltage'] = df['voltage'].rolling(window=5, min_periods=1).mean()

def pre_processing(df: pd.DataFrame):
    ''' ### Description
    Preprocess the data:
    - remove outliers
    - Adjust for the sequence-to-sequence bias.
    - add new features about the difference between the current and previous n data point.
    '''     
    # Start processing.
    customized_outlier_removal(df)
    compensate_seq_bias(df)

from utility import read_all_csvs_one_test
import matplotlib.pyplot as plt
import os

base_dictionary = 'C:/Users/gioca/OneDrive - Politecnico di Milano/POLISSS/CentraleSupelec/Maintenance&I4.0/testing_data/'

# Get all the folders in the base_dictionary
path_list = os.listdir(base_dictionary)
# Only keep the folders, not the excel file.
path_list = path_list[:-1]

# Read the data.
df_data_smooth = pd.DataFrame()
for tmp_path in path_list:
    path = base_dictionary + tmp_path
    # Read the data with the customized outlier removal function.
    tmp_df = read_all_csvs_one_test(path, tmp_path, pre_processing)
    df_data_smooth = pd.concat([df_data_smooth, tmp_df])
    df_data_smooth = df_data_smooth.reset_index(drop=True)

# df_data_smooth_tr = df_data_smooth[df_data_smooth['test_condition'].isin(normal_test_id)]

In [5]:
# Data reading and pre-processing (test)

# Read the data.

df_te = pd.DataFrame()
for tmp_path in path_list:
    path = base_dictionary + tmp_path
    # Read the data with the customized outlier removal function.
    tmp_df_test = read_all_csvs_one_test(path, tmp_path, customized_outlier_removal)
    df_te = pd.concat([df_te, tmp_df_test])
    df_te = df_te.reset_index(drop=True)

conditions_path = r'C:/Users/gioca/OneDrive - Politecnico di Milano/POLISSS/CentraleSupelec/Maintenance&I4.0/testing_data/Test conditions.xlsx'
conditions = pd.read_excel(conditions_path)
df_merged = df_te.merge(conditions, how='left', left_on='test_condition', right_on='Test id')
df_te['operation'] = df_merged['Description']
list_operations = pd.unique(conditions.Description).tolist()

In [6]:
def find_test_conditions_for_motor_x(dataset, motor_number):
    """
    Trova le condizioni di test per le quali, riducendo il dataset, compare almeno un data_motor_x_label = 1.
    
    Args:
    - dataset (pd.DataFrame): Il dataset da analizzare.
    - motor_number (int): Il numero del motore da considerare.
    
    Returns:
    - List: Lista delle test_condition che soddisfano il criterio.
    """
    # Nome della colonna da cercare
    target_column = f'data_motor_{motor_number}_label'
    
    # Trova tutte le condizioni di test uniche
    test_conditions = dataset['test_condition'].unique()
    
    # Lista per memorizzare le condizioni di test valide
    valid_conditions = []
    
    # Verifica ogni condizione di test
    for condition in test_conditions:
        # Filtra il dataset per la condizione di test corrente
        filtered_dataset = dataset[dataset['test_condition'] == condition]
        
        # Verifica se esiste almeno una riga con data_motor_x_label = 1
        if (filtered_dataset[target_column] == 1).any():
            valid_conditions.append(condition)
    
    return valid_conditions

In [7]:
from utility import extract_selected_feature, prepare_sliding_window
from sklearn.model_selection import GridSearchCV

# Define the motor index.
motor_idx = 3

valid_conditions = find_test_conditions_for_motor_x(df_tr, motor_idx)
    
# Filtra il DataFrame originale usando le condizioni di test valide
df_train = df_tr[df_tr['test_condition'].isin(valid_conditions)]

# Define the features.
feature_list_all = ['time', 'data_motor_1_position', 'data_motor_1_temperature', 'data_motor_1_voltage',
                    'data_motor_2_position', 'data_motor_2_temperature', 'data_motor_2_voltage',
                    'data_motor_3_position', 'data_motor_3_temperature', 'data_motor_3_voltage',
                    'data_motor_4_position', 'data_motor_4_temperature', 'data_motor_4_voltage',
                    'data_motor_5_position', 'data_motor_5_temperature', 'data_motor_5_voltage',
                    'data_motor_6_position', 'data_motor_6_temperature', 'data_motor_6_voltage']
# Extract the features.
df_tr_x, df_tr_y = extract_selected_feature(df_train, feature_list_all, motor_idx, mdl_type='clf')

# Prepare the training data based on the defined sliding window.
window_size = 1
sample_step = 1
X_train, y_train = prepare_sliding_window(df_x=df_tr_x, y=df_tr_y, window_size=window_size, sample_step=sample_step, mdl_type='clf')

# Define the classification model.
# Define the steps of the pipeline
steps = [
    ('standardizer', StandardScaler()),  # Step 1: StandardScaler
    ('mdl', LogisticRegression(class_weight='balanced'))    # Step 2: Linear Regression
]

# Create the pipeline
pipeline = Pipeline(steps)

# Define hyperparameters to search
param_grid = {
    'mdl__C': [0.001, 0.01, 0.1, 1, 10, 100]  # Inverse of regularization strength
}

# Initialize GridSearchCV
grid_search = GridSearchCV(estimator=pipeline, param_grid=param_grid, scoring='f1', cv=5)

# Train the model.
mdl = grid_search.fit(X_train, y_train)

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver opt

In [8]:
# Prepare for the testing dataset.
# Define the features.
feature_list_all = ['time', 'data_motor_1_position', 'data_motor_1_temperature', 'data_motor_1_voltage',
                    'data_motor_2_position', 'data_motor_2_temperature', 'data_motor_2_voltage',
                    'data_motor_3_position', 'data_motor_3_temperature', 'data_motor_3_voltage',
                    'data_motor_4_position', 'data_motor_4_temperature', 'data_motor_4_voltage',
                    'data_motor_5_position', 'data_motor_5_temperature', 'data_motor_5_voltage',
                    'data_motor_6_position', 'data_motor_6_temperature', 'data_motor_6_voltage']
# Add test_condition for extracting different sequences.
feature_list_all.append('test_condition')
# Get the features.
df_test_x = df_te[feature_list_all]
# Augument the features in the same way as the training data.
X_test = prepare_sliding_window(df_x=df_test_x, window_size=window_size, sample_step=sample_step, mdl_type='clf')
# Make prediction.
y_pred_3 = grid_search.predict(X_test)

df_pred = pd.DataFrame({
    'time': df_te['time'],
    'test_condition': df_te['test_condition']
})

for x in range(1, 7):
    df_pred[f'data_motor_{x}_label'] = -1

# Inserimento dei valori di y_pred nella colonna data_motor_3_label
df_pred['data_motor_3_label'] = y_pred_3
df_pred.reset_index(inplace=True)
df_pred.rename(columns={'index': 'idx'}, inplace=True)

df_pred.to_csv('df_pred_new.csv', index=False)

In [5]:
# Random Forest on the training dataset

models = []

for motor_number in range(1, 6+1):

    print("Motor ", motor_number)

    # Trova le condizioni di test valide per il motore corrente
    valid_conditions = find_test_conditions_for_motor_x(df, motor_number)
    
    # Filtra il DataFrame originale usando le condizioni di test valide
    df_tr_x = df[df['test_condition'].isin(valid_conditions)]

    # Initialize rf model

    rf_model = LogisticRegression()
    results = []

    for condition in valid_conditions:
        # Dividi il dataset in test e training
        df_train = df_tr_x[df_tr_x['test_condition'] != condition]
        df_test = df_tr_x[df_tr_x['test_condition'] == condition]
    
        X_train = df_train[[f'data_motor_{motor_number}_position', f'data_motor_{motor_number}_temperature', f'data_motor_{motor_number}_voltage']]
        y_train = df_train[[f'data_motor_{motor_number}_label']]
        y_train = y_train[f'data_motor_{motor_number}_label']
    
        X_test = df_test[[f'data_motor_{motor_number}_position', f'data_motor_{motor_number}_temperature', f'data_motor_{motor_number}_voltage']]
        y_test = df_test[[f'data_motor_{motor_number}_label']]
        y_test = y_test[f'data_motor_{motor_number}_label']
    
        # Addestra il modello sui dati di training
        rf_model.fit(X_train, y_train)
    
        # Fai previsioni sui dati di test
        y_pred = rf_model.predict(X_test)
    
        # Calcola le metriche
        accuracy = accuracy_score(y_test, y_pred)
        precision = precision_score(y_test, y_pred, zero_division=0)
        recall = recall_score(y_test, y_pred, zero_division=0)
        f1 = f1_score(y_test, y_pred, zero_division=0)
    
        # Memorizza i risultati
        results.append({
            'condition': condition,
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1_score': f1
        })
    
        # Stampa i risultati per ogni condizione
        print(f"Results for test condition {condition}:")
        print(f"Accuracy: {accuracy:.2f}")
        print(f"Precision: {precision:.2f}")
        print(f"Recall: {recall:.2f}")
        print(f"F1 Score: {f1:.2f}")
        print("")

Motor  1
Results for test condition 20240325_155003:
Accuracy: 0.80
Precision: 0.00
Recall: 0.00
F1 Score: 0.00

Results for test condition 20240426_140055:
Accuracy: 0.95
Precision: 0.00
Recall: 0.00
F1 Score: 0.00

Results for test condition 20240524_094877:
Accuracy: 0.67
Precision: 0.00
Recall: 0.00
F1 Score: 0.00

Results for test condition 20240524_100052:
Accuracy: 0.80
Precision: 0.00
Recall: 0.00
F1 Score: 0.00

Results for test condition 20240524_110994:
Accuracy: 0.87
Precision: 0.00
Recall: 0.00
F1 Score: 0.00

Motor  2
Results for test condition 20240325_155003:
Accuracy: 0.14
Precision: 1.00
Recall: 0.14
F1 Score: 0.25

Results for test condition 20240426_140055:
Accuracy: 0.94
Precision: 1.00
Recall: 0.23
F1 Score: 0.37

Results for test condition 20240524_110994:
Accuracy: 0.20
Precision: 0.07
Recall: 0.93
F1 Score: 0.13

Motor  3
Results for test condition 20240325_155003:
Accuracy: 0.99
Precision: 0.00
Recall: 0.00
F1 Score: 0.00

Results for test condition 20240426_1