**Case Study:** *A Comprehensive industrial component degradation modelling study for predictive maintainance.*
    
[Dataset](https://www.kaggle.com/datasets/inIT-OWL/one-year-industrial-component-degradation)

**Data Ingestion**

In [None]:
# fetch data from source

! pip install -q gdown

import gdown

file_id = "1B8lP4fY09ohS6-Z9uIrjIbP7cqp8NPcO"

# Construct download URL
url = f"https://drive.google.com/uc?id={file_id}"

# Download ZIP
output = "Dataset_One_Year_Component_Degradation.zip"
gdown.download(url, output, quiet=False)

# Unzip to current directory
!unzip -o Dataset_One_Year_Component_Degradation.zip

**Optional: Download Data Directly from Kaggle (Upstream Source)**

*Works only in colab environment*

The section downloads the dataset directly from the upstream source (Kaggle) to ensure you always get the latest version. Requires your own kaggle.json credentials.

In [None]:
! pip install kaggle

Generate an API token from your kaggle account:
Account > API > Create New Token.

This generates a kaggle.json file for authentication required for loading data directly.

Upload your kaggle.json to this colab runtime instant

In [None]:
from google.colab import files
uploaded = files.upload()

In [None]:
! mkdir -p ~/.kaggle
! mv kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json

Ingesting data from kaggle

In [None]:
! kaggle datasets download inIT-OWL/one-year-industrial-component-degradation

**Data Preprocessing**

1) Redundant Features and Multicollinearity

In [None]:
raw_df_corr_matric = raw_df.corr()

sns.heatmap(raw_df_corr_matric,
xticklabels=raw_df_corr_matric.columns,
yticklabels=raw_df_corr_matric.columns)

AFTER_DROPPING_CORRELATED_COLUNMS = ['pCut::Motor_Torque',
'pCut::CTRL_Position_controller::Actual_position',
'pSvolFilm::CTRL_Position_controller::Actual_position',
'pSvolFilm::CTRL_Position_controller::Lag_error', 'pSpintor::VAX_speed']

dropped_colunm_df = raw_df[AFTER_DROPPING_CORRELATED_COLUNMS]
drop_df_corr_matric = dropped_colunm_df.corr()
sns.heatmap(drop_df_corr_matric,
xticklabels=drop_df_corr_matric.columns,
yticklabels=drop_df_corr_matric.columns)

2) Feature Scaling

In [None]:
from sklearn import preprocessing
scaler = preprocessing.MinMaxScaler()

scaled_df[COLUNMS_TO_BE_SCALED] =
scaler.fit_transform(raw_df[COLUNMS_TO_BE_SCALED])

3) Feature Standardization

In [None]:
standard_scalar = preprocessing.StandardScaler()
standardized_df = raw_df.copy()
standard_scalar.fit(raw_df[COLUNMS_TO_BE_SCALED])
standardized_df[COLUNMS_TO_BE_SCALED] =
standard_scalar.transform(raw_df[COLUNMS_TO_BE_SCALED])
standardized_df.head()

**Modelling Using Auto Encoders**

1) Network Architecture

In [None]:
class Autoencoder32x(tf.keras.Model):
def __init__(self,latent_dim,input_dim):
    super(Autoencoder32x, self).__init__()
    self.latent_dim = latent_dim
    self.input_dim = input_dim
    self.dropout_factor = 0.3
    self.encoder = Sequential([
                    Dense(32, activation='elu',input_shape=(self.input_dim,)),
                    Dense(16, activation='elu'),
                    Dense(8, activation='elu'),
                    Dense(self.latent_dim, activation='elu')
    ])
    self.decoder = Sequential([
                    Dense(8, activation='elu', input_shape=(self.latent_dim,)),
                    Dense(16, activation='elu'),
                    Dense(32, activation='elu'),
                    Dense(self.input_dim, activation=None)
    ])
def call(self, inputs):
    encoder_out = self.encoder(inputs)
    return self.decoder(encoder_out)
    # This is the dimension of the latent space (encoding space)
latent_dim = 2
autoencoder_10_32x =
Autoencoder32x(latent_dim=latent_dim,input_dim=len(COLUNMS_FOR_AUTOENCODER))
autoencoder_10_32x.compile(loss='mse', optimizer='adam',metrics=['accuracy'])   

2) Model Training

In [None]:
# train test split
ae_train_x, ae_test_x, ae_train_y, ae_test_y =
train_test_split(standardized_df[COLUNMS_FOR_AUTOENCODER],
standardized_df[COLUNMS_FOR_AUTOENCODER], test_size=0.33)

# training
ae_history = autoencoder_10_32x.fit(ae_train_x, ae_train_y,
validation_data=(ae_test_x, ae_test_y), epochs=100)
plt.subplots(figsize=(16,10))
plt.plot(ae_history.history.get('accuracy'),label='Train Accuracy')
plt.plot(ae_history.history.get('val_accuracy'),label="Test Accuracy")
plt.legend()

3) Prediction Error

In [None]:
def find_prediction_error(x,y):
    diff = x-y
    error = (diff**2).mean(axis=1)**0.5  
    return error

predicted_32x_ae_output =
autoencoder_10_32x.predict(standardized_df[COLUNMS_FOR_AUTOENCODER])
standardized_df['ae_error_32x_10'] =
find_prediction_error(standardized_df[COLUNMS_FOR_AUTOENCODER],predicted_3
2x_ae_output)
plt.subplots(figsize=(16,10))
plt.plot(standardized_df['ae_error_32x_10'],label='Prediction Error')
plt.legend()

4) Low Pass Filter

In [None]:
def butter_lowpass(cutoff, fs, order=5):
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    return b, a

def butter_lowpass_filter(data, cutoff, fs, order=5):
    b, a = butter_lowpass(cutoff, fs, order=order)
    y = lfilter(b, a, data)
    return y

order = 6
fs = 1100
cutoff = 0.8
b, a = butter_lowpass(cutoff, fs, order)
w, h = freqz(b, a, worN=8000)
plt.plot(0.5*fs*w/np.pi, np.abs(h), 'b')
plt.plot(cutoff, 0.5*np.sqrt(2), 'ko')
plt.axvline(cutoff, color='k')
plt.xlim(0, 0.5*fs)
plt.title("Lowpass Filter Frequency Response")
plt.xlabel('Frequency [Hz]')
plt.grid()
T = 4152
n = 1062912
t = np.linspace(0, T, n, endpoint=False)
low_pass_filtered_error =
butter_lowpass_filter(standardized_df['ae_error_32x_10'], cutoff, fs,
order)
plt.subplot(2, 1, 2)
plt.subplots(figsize=(16,10))
plt.plot(t, standardized_df['ae_error_32x_10'], 'b-', label='data')
plt.plot(t, low_pass_filtered_error, 'g-', linewidth=2, label='filtered
data')
plt.xlabel('Time [sec]')
plt.grid()
plt.legend()
plt.subplots_adjust(hspace=0.35)
plt.show()

5) Threshold

In [None]:
filtered_error_std = low_pass_filtered_error.std()
filtered_error_mean = low_pass_filtered_error.mean()
ae_threshold_line = np.array([filtered_error_std+filtered_error_mean for i
in range(len(low_pass_filtered_error))])
plt.subplots(figsize=(16,10))
plt.plot(low_pass_filtered_error,label='Prediction Error')
plt.plot(ae_threshold_line,label='First STD Threshold')
plt
plt.legend()
ae_threshold_line_std_2 =
np.array([2*filtered_error_std+filtered_error_mean for i in
range(len(low_pass_filtered_error))])
plt.subplots(figsize=(16,10))
plt.plot(low_pass_filtered_error,label='Prediction Error')
plt.plot(ae_threshold_line_std_2,label='Second STD Threshold')
plt.legend()

**Modelling using Self Organizing Maps**

1) Data Preparation

In [None]:
#New component data as training dataset
new_blade_data_standard_scalar = preprocessing.StandardScaler()
new_blade_standardized = new_blade_raw.copy()
new_blade_data_standard_scalar.fit(new_blade_raw[COLUNMS_FOR_SOM])
new_blade_standardized[COLUNMS_FOR_SOM] =
new_blade_data_standard_scalar.transform(new_blade_raw[COLUNMS_FOR_SOM])
som_train_data =np.array(new_blade_standardized[COLUNMS_FOR_SOM])

#lifecycle data as testing dataset
lc_standard_df_scalar = preprocessing.StandardScaler()
lc_standard_df = raw_df.copy()
lc_standard_df_scalar.fit(raw_df[COLUNMS_FOR_SOM])
lc_standard_df[COLUNMS_FOR_SOM] =
lc_standard_df_scalar.transform(raw_df[COLUNMS_FOR_SOM])
som_test_data =np.array(lc_standard_df[COLUNMS_FOR_SOM])
rows_data = som_train_data.shape[0]
x = int(np.sqrt(5*np.sqrt(rows_data)))
y = x
input_len = som_train_data.shape[1]

2) Model Preparation

In [None]:
def train_som(x, y, input_len, sigma, learning_rate,iterations):
    som = MiniSom(x=x,
    y=y,
    input_len = input_len,
    sigma=sigma,
    learning_rate=learning_rate)
    som.random_weights_init(som_train_data)
    #training
    start_time = time.time()
    som.train_batch(som_train_data, iterations) # trains the SOM with 100
    iterations
    elapsed_time = time.time()-start_time
    print(elapsed_time, " seconds")
    return som

3) Sigma and Learning Rate Optimization

In [None]:
space={
'sig' : hp.uniform("sig", 0.001, 3),
'learning_rate' : hp.uniform("learning_rate", 0.001, 10)
}

def som_fn(space):
    sig = space['sig']
    lr = space['learning_rate']
    val = MiniSom(x=x,y=y,input_len =
    input_len,sigma=sig,learning_rate=lr).quantization_error(som_train_data)
    print(val)
    return {'loss':val, 'status': STATUS_OK}

trials = Trials()
best = fmin(fn = som_fn,
    space = space,
    algo = tpe.suggest,
    max_evals = 1000,
        trials = trials)
print('best: {}'.format(best))

for i, trial in enumerate(trials.trials[0:2]):
    #update sigma and learning
    sigma = best['sig']
    learning_rate = best['learning_rate']


4) Model Training

In [None]:
iterations = 100
som = train_som(x,y,input_len=input_len,sigma=sigma,learning_rate=learning_rate,
        iterations=iterations)

5) SOM Grid Visualization

In [None]:
from pylab import plot, axis, show, pcolor, colorbar, bone
plt.figure(figsize=(16,12))
bone()
pcolor(som.distance_map().T) #distance map as background
colorbar()
#use different color and markers for each label
markers = ['o', 's', 'D']
colors = ['r', 'g', 'b']

6) Degradation Visualization using life cycle data and trained SOM:

In [None]:
quantization_errors = np.array([])
quantization_errors = quantization_errors.astype(int)
for i in range(6000,np.shape(som_test_data)[0],6000):
    quantization_errors = np.append(quantization_errors,
    np.linalg.norm(som.quantization(som_test_data[i-6000:i]) -
                som_test_data[i-6000:i], axis=1))

#moving average to remove noise from data
kernel_size = 2000
kernel = np.ones(kernel_size) / kernel_size
#quantization_errors = np.convolve(quantization_errors, kernel,
mode='same')
plt.subplots(figsize=(16,10))
print(type(quantization_errors))
plt.plot(quantization_errors)
print(np.shape(som_test_data)[0])
plt.xlabel('Samples')
plt.ylabel('Quantization Error')

7) Threshold Estimation

In [None]:
z = quantization_errors

def normal_dist(x , mean , sd):
    prob_density = (np.pi*sd) * np.exp(-0.5*((x-mean)/sd)**2)
    return prob_density

#Calculate mean and Standard deviation.
mean = np.mean(z)
sd = np.std(z)
#Apply function to the data to find normal distribution.
pdf = normal_dist(z,mean,sd)
# Threshold as Error value for Τ = 0.99 from the distribution
threshold = np.quantile(z, 0.99)
print("Error value for Τ = 0.99 from the distribution : ", threshold)
#Plotting the Results
plt.subplots(figsize=(16,10))
plt.plot(z,pdf , color = 'blue')
plt.plot([threshold,threshold], [0,3], color = 'red')
plt.xlabel('Error Values')
plt.ylabel('Probability Density')
quantization_errors = np.array([])

8) Visualization with Threshold

In [None]:
for i in range(6000,np.shape(som_test_data)[0],6000):
        quantization_errors = np.append(quantization_errors,
            np.linalg.norm(som.quantization(som_test_data[i-6000:i]) -
            som_test_data[i-6000:i], axis=1))

plt.subplots(figsize=(16,10))
plt.plot(quantization_errors)
plt.plot([6000,np.shape(som_test_data)[0]], [threshold,threshold])
plt.xlabel('Samples')
plt.ylabel('Quantization Error')
Visualization With Threshold line(Zoomed in):
plt.subplots(figsize=(32,20))
plt.plot(quantization_errors[122500-500:122500])
plt.plot([0,500], [threshold, threshold])
plt.xlabel('Samples')
plt.ylabel('Quantization Error')