In [1]:
# Refer to path of package
import sys
sys.path.append('/Users/watcharapongwongrattanasirikul/Documents/Git/Jupyter/my_env/lib/python3.8/site-packages')

In [2]:
# Import Lib
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
import seaborn as sns
from sklearn.ensemble import IsolationForest
from sklearn.metrics import confusion_matrix, accuracy_score, roc_auc_score, classification_report
import re
from random import choices, randint, randrange, uniform

In [3]:
from ml_helper import MlHelper
from eda_helper import EdaHelper
from sampling_helper import SamplingHelper
from impute_helper import ImputeHelper
from impute_helper import imputation_strategy
from ml_helper import model_type
from ml_helper import resampler_type
from ml_helper import scaler_type
import cleansing_helper as CleansingHelper

In [4]:
file_path = '/Users/watcharapongwongrattanasirikul/Documents/Git/predictive-maintenance/train_timeseries.csv'

df = pd.read_csv(file_path)
df.head(2)

Unnamed: 0,UDI,Product ID,Type,Air temperature [K],Process temperature [K],Rotational speed [rpm],Torque [Nm],Tool wear [min],Machine failure,TWF,HDF,PWF,OSF,RNF
0,1,M14860,M,298.1,308.6,1551,42.8,0,0,0,0,0,0,0
1,2,L47181,L,298.2,308.7,1408,46.3,3,0,0,0,0,0,0


In [5]:
df.drop(['RNF'], axis=1, inplace=True)
df.head(2)

Unnamed: 0,UDI,Product ID,Type,Air temperature [K],Process temperature [K],Rotational speed [rpm],Torque [Nm],Tool wear [min],Machine failure,TWF,HDF,PWF,OSF
0,1,M14860,M,298.1,308.6,1551,42.8,0,0,0,0,0,0
1,2,L47181,L,298.2,308.7,1408,46.3,3,0,0,0,0,0


In [6]:
# Rename of columns
df = df.rename(columns={
    'Air temperature [K]':'AirTemp',
    'Process temperature [K]':'ProcessTemp',
    'Rotational speed [rpm]':'RotationalSpeed',
    'Torque [Nm]':'Torque',
    'Tool wear [min]': 'ToolWear',
    'Machine failure':'MF'
})
df.head(2)

Unnamed: 0,UDI,Product ID,Type,AirTemp,ProcessTemp,RotationalSpeed,Torque,ToolWear,MF,TWF,HDF,PWF,OSF
0,1,M14860,M,298.1,308.6,1551,42.8,0,0,0,0,0,0
1,2,L47181,L,298.2,308.7,1408,46.3,3,0,0,0,0,0


In [7]:
# The importance feature of power failure  is Power then we create the Power feature 
# Power(kW) = Torque (N.m) x Speed (rpm)

df['Power'] = df['Torque'] * df['RotationalSpeed']
df.head(2)

Unnamed: 0,UDI,Product ID,Type,AirTemp,ProcessTemp,RotationalSpeed,Torque,ToolWear,MF,TWF,HDF,PWF,OSF,Power
0,1,M14860,M,298.1,308.6,1551,42.8,0,0,0,0,0,0,66382.8
1,2,L47181,L,298.2,308.7,1408,46.3,3,0,0,0,0,0,65190.4


In [8]:
# The importance feature of heat dissipation failure  is temperature
df['AirDiff'] = df['ProcessTemp'] - df['AirTemp']
df.head(2)

Unnamed: 0,UDI,Product ID,Type,AirTemp,ProcessTemp,RotationalSpeed,Torque,ToolWear,MF,TWF,HDF,PWF,OSF,Power,AirDiff
0,1,M14860,M,298.1,308.6,1551,42.8,0,0,0,0,0,0,66382.8,10.5
1,2,L47181,L,298.2,308.7,1408,46.3,3,0,0,0,0,0,65190.4,10.5


### Utility Function

In [9]:
# This function use to encode data point to be class label
# a = data less than or equal percentile 25
# b = data less than or equal percentile 50
# c = data less than or equal percentile 75
# d = data greater than percentile 75

def encode_feature(df, is_print_info=False):
    
    mean = np.mean(df)
    std = np.std(df)
    
    if is_print_info:
        print(f'mean: {mean}')
        print(f'std: {std}')
        print(f'a: {mean - (3*std)}')
        print(f'b: {mean - (2*std)}')
        print(f'c: {mean - (1*std)}')
        print(f'd: {mean}')
        print(f'e: {mean + (1*std)}')
        print(f'f: {mean + (2*std)}')
        print(f'g: {mean + (3*std)}')
    
    encoder_list = []
    
    for data_point in df:
        
        if is_print_info:
            print(data_point)
        
        if data_point <= (mean - (3*std)):
            encoder_list.append('a')
            continue
        
        if data_point <= (mean - (2*std)):
            encoder_list.append('b')
            continue
            
        if data_point <= (mean - (1*std)):
            encoder_list.append('c')
            continue
            
        if data_point <= mean:
            encoder_list.append('d')
            continue
            
        if data_point <= (mean + (1*std)):
            encoder_list.append('e')
            continue
            
        if data_point <= (mean + (2*std)):
            encoder_list.append('f')
            continue       
            
        if data_point <= (mean + (3*std)):
            encoder_list.append('g')
            continue   
                        
        encoder_list.append('h')

    return encoder_list

In [10]:
def get_expression(data, num_pattern=1):
    
    
    or_flag = False
    expression_group = []
    
    for j in range(num_pattern):
        reg_result = ""
        
        for idx, i in enumerate(data[j]):

            if i == '|' and reg_result == "":
                raise Exception(f"The symbol '|' can't be the first parameter: {data}")

            if i == '|' and (idx + 1) == len(data[j]):
                raise Exception(f"The symbol '|' can't be the last parameter: {data}")

            if i in ('a', 'b', 'c', 'd','e','f','g','h','.*') and or_flag == False:
                reg_result = reg_result + i
                buf = i
                continue

            if i in ('a', 'b', 'c', 'd','e','f','g','h','.*') and or_flag == True:
                reg_result = reg_result + f"{i}]"
                or_flag = False
                continue

            if or_flag == True and (i == '|' or i == '.*'):
                reg_result = reg_result + 'd'
                continue

            if i == '|':
                reg_result = reg_result[0:(len(reg_result)- len(buf))]
                reg_result = reg_result + f"[{buf}|"
                or_flag = True
                continue
                
        expression_group.append('.*'+reg_result+".*")
 
    return expression_group

### Model

In [11]:
# Custom Ensemble Classification Model

from sklearn.base import BaseEstimator

class MachineFailureClassification(BaseEstimator):
    def __init__(self):
        self.TWF = ['e', 'f', 'f', '|', 'g']
        self.HDF = ['e', 'd', 'f', '|', 'd']
        self.PWF = ['e', '|', 'c', 'f', 'f']
        self.OSF = ['e', 'e', 'e', '|', 'c']
        
        AIRDIFF_THRESHOLD = 9.27  # This is mean - (0.5 x sd)
    
    def fit(self):
        return self
    
    def predict(self, df, encoded_power, encoded_torque, encoded_toolwear):
        
        fails_type = []
        predict_fail = False
        
        # ------------------------- Predict PWF -------------------------
        reg_expression_pwf = get_expression([self.TWF])[0]
        
        match_pwf = re.search(reg_expression_pwf, encoded_power)

        if match_pwf is not None:
            predict_pwf = True
            fails_type.append('PWF')
        else:
            predict_pwf = False
        
        # ------------------------- Predict HDF -------------------------
        reg_expression_hdf = get_expression([self.HDF])[0]
        
        match_hdf = re.search(reg_expression_hdf, encoded_torque)
        
        if df['AirDiff'].mean() < AIRDIFF_THRESHOLD:
        
            if match_hdf is not None:
                predict_hdf = True
                fails_type.append('HDF')
            else:
                predict_hdf = False
            
        else:
            predict_hdf = False
    
        # ------------------------- Predict OSF -------------------------
        reg_expression_osf = get_expression([self.OSF])[0]
        
        match_osf = re.search(reg_expression_osf, encoded_toolwear)
        
        if match_osf is not None:
            predict_osf = True
            fails_type.append('OSF')
        else:
            predict_osf = False
            
        # ------------------------- Predict OSF -------------------------
        reg_expression_twf = get_expression([self.TWF])[0]
        
        match_twf = re.search(reg_expression_twf, encoded_toolwear)
        
        if match_twf is not None:
            predict_twf = True
            fails_type.append('TWF')
        else:
            predict_twf = False
            
        # ------------------------- Summary -------------------------
        
        predict_fail = predict_pwf | predict_hdf | predict_osf | predict_twf
         
        return predict_fail, fails_type
    

### Prediction

In [20]:
# Prepare data

encoded_torque = encode_feature(df['Torque'])
encoded_power = encode_feature(df['Power'])
encoded_toolwear = encode_feature(df['ToolWear'])

In [12]:
# Initial input

ml_classification = MachineFailureClassification()
encoded_torque = encode_feature(df['Torque'])

In [21]:
def prediction_period(df, window_size:int, encoded_torque):
    
    # Calcurate number of rolling
    number_rolling = (len(df)/window_size) - 1
    
    # Initial setting for window
    start = 0
    end = window_size
    
    TP = 0
    FP = 0
    TN = 0
    FN = 0
    
    # Start rolling
    for idx in range(int(number_rolling)):
        
        torque = encoded_torque[start:end]
        power = encoded_power[start:end]
        toolwear = encoded_toolwear[start:end]
        
        
        predict, fails_type = ml_classification.predict(df, power, torque, toolwear)
        
        # Calculate Fitness Score
        if  predict_fail == True and is_fail_next_period == True:
            TP += 1
            
        if predict_fail == True and is_fail_next_period == False:
            FP += 1
            
        if predict_fail == False and is_fail_next_period == False:
            TN += 1
            
        if predict_fail == False and is_fail_next_period == True:
            FN += 1
        
        
        start += window_size
        end += window_size
        
    
    

In [22]:
prediction_period(
    df=df,
    window_size=20,
    encoded_torque=encoded_torque
)