# SBM

In [180]:
# import all the necessary libraries
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy
from scipy.stats import skew, kurtosis
from scipy import integrate
import time

In [181]:
def get_fft(signal, window_size=8192, overlap_percentage=75, sample_rate=50000):
    # generate a Hanning window of the window size
    hanning_window = np.hanning(window_size)
    overlap_samples = int((window_size * overlap_percentage) / 100)

    # calculate the number of frames
    num_frames = (len(signal) - window_size) // overlap_samples + 1

    # initialize arrays to store FFT results and frequencies
    fft_results = np.zeros((num_frames, window_size//2+1), dtype=complex)
    frequencies = np.fft.rfftfreq(window_size, 1/sample_rate)

    # apply Hanning window, compute FFT, and store the results
    for i in range(num_frames):
        start = i * overlap_samples
        end = start + window_size
        frame = signal[start:end] * hanning_window
        fft_results[i, :] = np.fft.rfft(frame, window_size)

    # calculate the magnitude spectrum
    magnitude_spectrum = (np.abs(fft_results))/100

    # calculate the average magnitude spectrum
    avg_magnitude_spectrum = np.mean(magnitude_spectrum, axis=0)
    
    return avg_magnitude_spectrum, frequencies

In [182]:
database_path = 'mafaulda' # load the database path
N = 250000 # the number of points in the time series in each column
sample_rate = 50000 # sampling rate
Nfft = 8192 # no of FFT points
window_size = 8192
overlap_percentage = 75
cutoff_frequency_high = 10
cutoff_frequency_low = 10000

# function that takes a csv file path and gives features corresponding to that file
def calculate_features(file_path):
    df = pd.read_csv(file_path, header=None) # load the dataframe
    df.columns = [str(i) for i in range(len(df.columns))] # name the columns
    for column in df.columns:
        df[column] = df[column]/(np.std(df[column].values))
        # design filters
        b_high, a_high = scipy.signal.butter(N=4, Wn=cutoff_frequency_high, btype='high', fs=sample_rate)
        b_low, a_low = scipy.signal.butter(N=4, Wn=cutoff_frequency_low, btype='low', fs=sample_rate)
        # apply the filter to your signal
        df[column] = scipy.signal.filtfilt(b_high, a_high, df[column].values)
        df[column] = scipy.signal.filtfilt(b_low, a_low, df[column].values)
    
    # calculate a rotation frequency
    tachometer_signal = df['0'].values # load the tachometer signal
    tacho_fft, freqs = get_fft(tachometer_signal, window_size, overlap_percentage, sample_rate)
    
    freq_max = [] # list of the top four frequencies which give peak
    freq_max_idx = [] # list of the top four frequency indices which give peak
    fft_dummy = tacho_fft # copy the fft

    for i in range(4):
        max_idx = np.argmax(fft_dummy) # calculate the index corresponding to the peak
        f = max_idx * (sample_rate/Nfft) # calculate the corresponding frequency
        freq_max.append(f) # append the frequency to the list
        freq_max_idx.append(max_idx) # append the index to the list
        for j in range(-3, 4):
            fft_dummy[max_idx + j] = 0 # make the fft value zero at indices from max_idx-3 to max_idx+3 and iterate the process
    
    fr = np.min(freq_max) # rotating freq will be the minimum of all four frequencies
    fr_index = freq_max_idx[np.argmin(freq_max)] # extract the index corresponding to the rotating frequency
        
    # calculate spectrum features at fr, 2fr and 3fr for all the remaining signals
    acc_spectrum_features = [] # create an empty list for acceleration spectrum features
    vel_spectrum_features = [] # create an empty list for velocity spectrum features
    for i in range(1, 4):
        freq_index = i * fr_index # for multiples of fr
        freqs_adjacent_idx = [freq_index - i for i in range(5, 0, -1)] + [freq_index] + [freq_index + i for i in range(1, 6)]

        for column in df.columns[1:]:
            if (int(column) != 0 and int(column) != 7):
                vel_signal = integrate.cumtrapz(df[column], initial=0, dx=1/sample_rate)
                vel_fft_signal, vel_freqs = get_fft(vel_signal, window_size, overlap_percentage, sample_rate)
                vel_spectrum_feature = np.mean([vel_fft_signal[idx] for idx in freqs_adjacent_idx]) 
                vel_spectrum_features.append(vel_spectrum_feature)
            acc_signal = df[column]
            acc_fft_signal, acc_freqs = get_fft(acc_signal, window_size, overlap_percentage, sample_rate)
            acc_spectrum_feature = np.mean([acc_fft_signal[idx] for idx in freqs_adjacent_idx]) 
            acc_spectrum_features.append(acc_spectrum_feature)
    
    # calculate statistical features for all the signals
    statistical_features_acc = [] # create an empty list for statistical features of acceleration
    statistical_features_vel = [] # create an empty list for statistical features of velocity
    for column in df.columns:
        if (int(column) != 0 and int(column) != 7):
            vel = integrate.cumtrapz(df[column], initial=0, dx=1/sample_rate)
            vel_mean_val = vel.mean()
            vel_median_val = np.median(vel)
            vel_std_val = vel.std()
            vel_rms_val = np.sqrt(np.mean(vel**2))
            statistical_features_vel.extend([vel_mean_val, vel_median_val, vel_std_val, vel_rms_val])
        acc_mean_val = df[column].mean() # calculate mean value
        acc_median_val = df[column].median() # calculate median value
        acc_std_val = df[column].std() # calculate standard deviation value
        acc_rms_val = np.sqrt(np.mean(df[column]**2)) # calculate rms value
#         skew_val = skew(df[column]) # calculate skewness value
#         kurtosis_val = kurtosis(df[column]) # calculate kurtosis value
        statistical_features_acc.extend([acc_mean_val, acc_median_val, acc_std_val, acc_rms_val])
        
    # combine all the features to form a feature vector for the file
    all_features = [fr] + acc_spectrum_features + vel_spectrum_features + statistical_features_acc + statistical_features_vel
    return all_features

In [183]:
start_time = time.time() # start time for extracting features

# normal class features
normal_features_list = [] # create an empty list for normal features
normal_class_path = os.path.join(database_path, 'normal')

if os.path.isdir(normal_class_path): # check if it's a directory
    for file_name in os.listdir(normal_class_path): # iterate through each CSV file in the folder
        file_path = os.path.join(normal_class_path, file_name)
        
        # check if it's a file
        if os.path.isfile(file_path) and file_name.endswith(".csv"): 
            features = calculate_features(file_path)
            normal_features_list.append(features)

# create a dataframe from the list of all features
result_df = pd.DataFrame(normal_features_list)

# save the result dataframe to a new CSV file
result_df.to_csv("normal_features.csv", index=False)

end_time = time.time() # end time for extracting features

print('Time needed to extract normal features: ', (end_time - start_time)/60, 'min')

Time needed to extract normal features:  1.758551788330078 min


In [184]:
start_time = time.time() # start time for extracting features

# imbalance class features
imbalance_features_list = [] # create an empty list for imbalance features
imbalance_class_path = os.path.join(database_path, 'imbalance')

for folder_name in os.listdir(imbalance_class_path): # go to each of the folders in it
    folder_path = os.path.join(imbalance_class_path, folder_name)
    
    # check if it's a directory
    if os.path.isdir(folder_path):
        # iterate through each CSV file in the folder
        for file_name in os.listdir(folder_path):
            file_path = os.path.join(folder_path, file_name)

            # check if it's a file
            if os.path.isfile(file_path) and file_name.endswith(".csv"):
                features = calculate_features(file_path)
                imbalance_features_list.append(features)

# create a dataframe from the list of all features
result_df = pd.DataFrame(imbalance_features_list)

# save the result dataframe to a new CSV file
result_df.to_csv("imbalance_features.csv", index=False)

end_time = time.time() # end time for extracting features

print('Time needed to extract imbalance features: ', (end_time - start_time)/60, 'min')

Time needed to extract imbalance features:  11.730530261993408 min


In [185]:
start_time = time.time() # start time for extracting features

# horizontal parallel misalignment class features
horizontal_features_list = [] # create an empty list for horizontal parallel misalignment features
horizontal_class_path = os.path.join(database_path, 'horizontal-misalignment')

for folder_name in os.listdir(horizontal_class_path): # go to each of the folders in it
    folder_path = os.path.join(horizontal_class_path, folder_name)
    
    # check if it's a directory
    if os.path.isdir(folder_path):
        # iterate through each CSV file in the folder
        for file_name in os.listdir(folder_path):
            file_path = os.path.join(folder_path, file_name)

            # check if it's a file
            if os.path.isfile(file_path) and file_name.endswith(".csv"):
                features = calculate_features(file_path)
                horizontal_features_list.append(features)

# create a dataframe from the list of all features
result_df = pd.DataFrame(horizontal_features_list)

# save the result dataframe to a new CSV file
result_df.to_csv("horizontal_features.csv", index=False)

end_time = time.time() # end time for extracting features

print('Time needed to extract horizontal features: ', (end_time - start_time)/60, 'min')

Time needed to extract horizontal features:  6.952353421847025 min


In [186]:
start_time = time.time() # start time for extracting features

# vertical parallel misalignment class features
vertical_features_list = [] # create an empty list for vertical parallel misalignment features
vertical_class_path = os.path.join(database_path, 'vertical-misalignment')

for folder_name in os.listdir(vertical_class_path): # go to each of the folders in it
    folder_path = os.path.join(vertical_class_path, folder_name)
    
    # check if it's a directory
    if os.path.isdir(folder_path):
        # iterate through each CSV file in the folder
        for file_name in os.listdir(folder_path):
            file_path = os.path.join(folder_path, file_name)

            # check if it's a file
            if os.path.isfile(file_path) and file_name.endswith(".csv"):
                features = calculate_features(file_path)
                vertical_features_list.append(features)

# create a dataframe from the list of all features
result_df = pd.DataFrame(vertical_features_list)

# save the result dataframe to a new CSV file
result_df.to_csv("vertical_features.csv", index=False)

end_time = time.time() # end time for extracting features

print('Time needed to extract vertical features: ', (end_time - start_time)/60, 'min')

Time needed to extract vertical features:  10.553336644172669 min


In [187]:
start_time = time.time() # start time for extracting features

# underhang class features
underhang_features_list = [] # create an empty list for underhang features
underhang_class_path = os.path.join(database_path, 'underhang')

for folder_name in os.listdir(underhang_class_path): # go to each of the folders in it
    folder_path = os.path.join(underhang_class_path, folder_name)
    
    if os.path.isdir(folder_path):
        for subfolder_name in os.listdir(folder_path): # go to each of the subfolders in it
            subfolder_path = os.path.join(folder_path, subfolder_name)
            
            # check if it's a directory
            if os.path.isdir(subfolder_path):
                # iterate through each CSV file in the subfolder
                for file_name in os.listdir(subfolder_path):
                    file_path = os.path.join(subfolder_path, file_name)
                    
                    # check if it's a file
                    if os.path.isfile(file_path) and file_name.endswith(".csv"):
                        features = calculate_features(file_path)
                        underhang_features_list.append(features)
        
        
        
# create a dataframe from the list of all features
result_df = pd.DataFrame(underhang_features_list)

# save the result dataframe to a new CSV file
result_df.to_csv("underhang_features.csv", index=False)

end_time = time.time() # end time for extracting features

print('Time needed to extract underhang features: ', (end_time - start_time)/60, 'min')

Time needed to extract underhang features:  19.652845307191214 min


In [188]:
start_time = time.time() # start time for extracting features

# overhang class features
overhang_features_list = [] # create an empty list for overhang features
overhang_class_path = os.path.join(database_path, 'overhang')

for folder_name in os.listdir(overhang_class_path): # go to each of the folders in it
    folder_path = os.path.join(overhang_class_path, folder_name)
    
    if os.path.isdir(folder_path):
        for subfolder_name in os.listdir(folder_path): # go to each of the subfolders in it
            subfolder_path = os.path.join(folder_path, subfolder_name)
            
            # check if it's a directory
            if os.path.isdir(subfolder_path):
                # iterate through each CSV file in the subfolder
                for file_name in os.listdir(subfolder_path):
                    file_path = os.path.join(subfolder_path, file_name)
                    
                    # check if it's a file
                    if os.path.isfile(file_path) and file_name.endswith(".csv"):
                        features = calculate_features(file_path)
                        overhang_features_list.append(features)
        
        
        
# create a dataframe from the list of all features
result_df = pd.DataFrame(overhang_features_list)

# save the result dataframe to a new CSV file
result_df.to_csv("overhang_features.csv", index=False)

end_time = time.time() # end time for extracting features

print('Time needed to extract overhang features: ', (end_time - start_time)/60, 'min')

Time needed to extract overhang features:  18.44502481619517 min


In [189]:
# function to calculate similarity using cauchy kernel similarity
def calculate_similarity(xi, xj, gamma=0.005):
    dij = np.linalg.norm(xi - xj, ord=2) # calculate the distance
    similarity = 1 / np.sqrt(1 + (gamma**2) * (dij**2)) # based on the distance, calculate the similarity
    return similarity

In [190]:
# function which takes features file to create memory matrix
def memory_matrix(features_file, t=11):
    df = pd.read_csv(features_file) # read the dataframe
    X = df.values # extract 2D array from the dataframe
    
    # first set of vectors for the memory matrix
    extrema_indices = [] # create an empty list to accomodate the indices corresponding to extremas
    for m in range(X.shape[1]):
        min_index = np.argmin(X[:, m]) # extract index corresponding to the minimum
        max_index = np.argmax(X[:, m]) # extract index corresponding to the maximum
        extrema_indices.extend([min_index, max_index]) # add them to the list

    unique_extrema_indices = np.unique(extrema_indices) # just keep unique indices
    representatives_from_extrema = X[unique_extrema_indices] # extract the required first set of vectors

    # second set of vectors for the memory matrix
    remaining_indices = [i for i in range(X.shape[0]) if i not in unique_extrema_indices] # array of remaining indices
    sorted_remaining_indices = sorted(remaining_indices, key=lambda i: np.linalg.norm(X[i], ord=2), reverse=True) # sort them in the decreasing order of the l2 norm values
    decimated_remaining_indices = sorted_remaining_indices[::t] # decimate them by a factor of t
    representatives_from_decimated = X[decimated_remaining_indices] # extract the required second set of vectors

    # Combine representative samples from extrema and decimated remaining samples
    model_matrix = np.concatenate([representatives_from_extrema, representatives_from_decimated])
    
    testing_matrix = np.delete(X, np.concatenate([unique_extrema_indices, decimated_remaining_indices]), axis=0)

    return model_matrix, testing_matrix

In [191]:
# memory matrices for all the classes

# memory matrix for normal class
normal_matrix, normal_test_matrix = memory_matrix(features_file='normal_features.csv')
print("normal matrix dimension: ", normal_matrix.shape, normal_test_matrix.shape)

# memory matrix for imbalance class
imbalance_matrix, imbalance_test_matrix = memory_matrix(features_file='imbalance_features.csv')
print("imbalance matrix dimensions: ", imbalance_matrix.shape, imbalance_test_matrix.shape)

# memory matrix for horizontal misalignment class
horizontal_matrix, horizontal_test_matrix = memory_matrix(features_file='horizontal_features.csv')
print("horizontal matrix dimensions: ", horizontal_matrix.shape, horizontal_test_matrix.shape)

# memory matrix for vertical misalignment class
vertical_matrix, vertical_test_matrix = memory_matrix(features_file='vertical_features.csv')
print("vertical matrix dimensions: ", vertical_matrix.shape, vertical_test_matrix.shape)

# memory matrix for underhang class
underhang_matrix, underhang_test_matrix = memory_matrix(features_file='underhang_features.csv')
print("underhang matrix dimensions: ", underhang_matrix.shape, underhang_test_matrix.shape)

# memory matrix for overhang class
overhang_matrix, overhang_test_matrix = memory_matrix(features_file='overhang_features.csv')
print("overhang matrix dimensions: ", overhang_matrix.shape, overhang_test_matrix.shape)


normal matrix dimension:  (39, 96) (10, 96)
imbalance matrix dimensions:  (103, 96) (230, 96)
horizontal matrix dimensions:  (82, 96) (115, 96)
vertical matrix dimensions:  (94, 96) (207, 96)
underhang matrix dimensions:  (128, 96) (430, 96)
overhang matrix dimensions:  (126, 96) (387, 96)


In [192]:
# function to calculate the similarity of any incoming vector with respect to a memory matrix
def similarity_using_estimate(inc_vec, mem_mat):
    # assuming memory matrix to be of the size L x M
    L, M = mem_mat.shape
    
    # initialize a matrix G
    G = np.zeros((L, L))
    
    # calculate elements of G using the similarity function
    for i in range(L):
        for j in range(L):
            G[i, j] = calculate_similarity(mem_mat[i, :], mem_mat.T[:, j])
    
    # calculate inverse of the G matrix
    inv_G = np.linalg.inv(G)
    
    # calculate the vector a_n
    a_n = [calculate_similarity(mem_mat[i, :], inc_vec) for i in range(L)]
    
    # calculate the weight vector w_n
    w_n = np.dot(inv_G, a_n)
    
    # calculate the normalized weight vector
    normalized_w_n = w_n / np.linalg.norm(w_n, ord=1)
    
    # calculate the estimate
    estimate = np.dot(mem_mat.T, normalized_w_n)
    
    similarity = calculate_similarity(inc_vec, estimate)
    
    return similarity

In [193]:
# function to estimate the class of an incoming vector based on the similarity measure
def estimate_class(inc_vec):
    similarity_list = []
    
    # similarity with the normal class
    normal_similarity = similarity_using_estimate(inc_vec, normal_matrix)
    similarity_list.append(normal_similarity)
    
    # similarity with the imbalance class
    imbalance_similarity = similarity_using_estimate(inc_vec, imbalance_matrix)
    similarity_list.append(imbalance_similarity)
    
    # similarity with the horizontal misalignment class
    horizontal_similarity = similarity_using_estimate(inc_vec, horizontal_matrix)
    similarity_list.append(horizontal_similarity)
    
    # similarity with the vertical misalignment class
    vertical_similarity = similarity_using_estimate(inc_vec, vertical_matrix)
    similarity_list.append(vertical_similarity)
    
    # similarity with the underhang class
    underhang_similarity = similarity_using_estimate(inc_vec, underhang_matrix)
    similarity_list.append(underhang_similarity)
    
    # similarity with the overhang class
    overhang_similarity = similarity_using_estimate(inc_vec, overhang_matrix)
    similarity_list.append(overhang_similarity)
    
    # estimate the class index based on maximum similarity
    class_estimate_index = np.argmax(similarity_list)
    
    # mapping showing the class label names
    label_mapping = {0: 'Normal', 1: 'Imbalance Fault', 2: 'Horizontal Parallel Misalignment Fault', 
                 3: 'Vertical Parallel Misalignment Fault', 4: 'Underhang Bearing Fault', 5: 'Overhang Bearing Fault'}
    
    class_estimate = label_mapping[class_estimate_index]
    
    return class_estimate

In [194]:
# testing of individual vectors
start_time = time.time()
X = underhang_test_matrix
vec = X[5]
end_time = time.time()
print("Estimated class:", estimate_class(vec), "| Time taken for prediction:", (end_time - start_time)*1e6, "microsec")

Estimated class: Underhang Bearing Fault | Time taken for prediction: 0.0 microsec


In [195]:
# testing of each class
start_time = time.time()
X = normal_test_matrix
accumulate = 0
num_vec = X.shape[0]
for i in range(num_vec):
    class_type = estimate_class(X[i])
    if class_type == 'Normal':
        accumulate += 1

accuracy = (accumulate/num_vec)*100
end_time = time.time()
print("Accuracy:", accuracy, "| Time taken:", (end_time - start_time)/60, "min")

Accuracy: 100.0 | Time taken: 0.24444880882898967 min


In [196]:
# testing of each class
start_time = time.time()
X = imbalance_test_matrix
accumulate = 0
num_vec = X.shape[0]
for i in range(num_vec):
    class_type = estimate_class(X[i])
    if class_type == 'Imbalance Fault':
        accumulate += 1

accuracy = (accumulate/num_vec)*100
end_time = time.time()
print("Accuracy:", accuracy, "| Time taken:", (end_time - start_time)/60, "min")

Accuracy: 98.69565217391305 | Time taken: 5.516671466827392 min


In [197]:
# testing of each class
start_time = time.time()
X = horizontal_test_matrix
accumulate = 0
num_vec = X.shape[0]
for i in range(num_vec):
    class_type = estimate_class(X[i])
    if class_type == 'Horizontal Parallel Misalignment Fault':
        accumulate += 1

accuracy = (accumulate/num_vec)*100
end_time = time.time()
print("Accuracy:", accuracy, "| Time taken:", (end_time - start_time)/60, "min")

Accuracy: 96.52173913043478 | Time taken: 2.692330479621887 min


In [198]:
# testing of each class
start_time = time.time()
X = vertical_test_matrix
accumulate = 0
num_vec = X.shape[0]
for i in range(num_vec):
    class_type = estimate_class(X[i])
    if class_type == 'Vertical Parallel Misalignment Fault':
        accumulate += 1

accuracy = (accumulate/num_vec)*100
end_time = time.time()
print("Accuracy:", accuracy, "| Time taken:", (end_time - start_time)/60, "min")

Accuracy: 99.03381642512076 | Time taken: 4.877285444736481 min


In [199]:
# testing of each class
start_time = time.time()
X = underhang_test_matrix
accumulate = 0
num_vec = X.shape[0]
for i in range(num_vec):
    class_type = estimate_class(X[i])
    if class_type == 'Underhang Bearing Fault':
        accumulate += 1

accuracy = (accumulate/num_vec)*100
end_time = time.time()
print("Accuracy:", accuracy, "| Time taken:", (end_time - start_time)/60, "min")

Accuracy: 93.72093023255815 | Time taken: 10.051030913988749 min


In [200]:
# testing of each class
start_time = time.time()
X = overhang_test_matrix
accumulate = 0
num_vec = X.shape[0]
for i in range(num_vec):
    class_type = estimate_class(X[i])
    if class_type == 'Overhang Bearing Fault':
        accumulate += 1

accuracy = (accumulate/num_vec)*100
end_time = time.time()
print("Accuracy:", accuracy, "| Time taken:", (end_time - start_time)/60, "min")

Accuracy: 98.44961240310077 | Time taken: 9.25152721007665 min


In [201]:
# testing of overall SBM model
start_time = time.time()
test_matrices = [normal_test_matrix, imbalance_test_matrix, horizontal_test_matrix, vertical_test_matrix, underhang_test_matrix, overhang_test_matrix]
accumulate = 0
num_vec_total = 0
for i in range(len(test_matrices)):
    X = test_matrices[i]
    num_vec_total += X.shape[0]

# mapping showing the class label names
label_mapping = {0: 'Normal', 1: 'Imbalance Fault', 2: 'Horizontal Parallel Misalignment Fault', 
                 3: 'Vertical Parallel Misalignment Fault', 4: 'Underhang Bearing Fault', 5: 'Overhang Bearing Fault'}

for idx, test_matrix in enumerate(test_matrices):
    X = test_matrix
    num_vec_in_this = X.shape[0]
    for i in range(num_vec_in_this):
        class_type = estimate_class(X[i])
        if class_type == label_mapping[idx]:
            accumulate += 1

accuracy = (accumulate/num_vec_total)*100
end_time = time.time()
print("Accuracy:", accuracy, "| Time taken:", (end_time - start_time)/60, "min")

Accuracy: 96.95431472081218 | Time taken: 13.296646293004354 min


In [202]:
# function to estimate similarity measures and the class of an incoming vector based on the similarity measure
def estimate_similarity_class(inc_vec):
    similarity_list = []
    
    # similarity with the normal class
    normal_similarity = similarity_using_estimate(inc_vec, normal_matrix)
    similarity_list.append(normal_similarity)
    
    # similarity with the imbalance class
    imbalance_similarity = similarity_using_estimate(inc_vec, imbalance_matrix)
    similarity_list.append(imbalance_similarity)
    
    # similarity with the horizontal misalignment class
    horizontal_similarity = similarity_using_estimate(inc_vec, horizontal_matrix)
    similarity_list.append(horizontal_similarity)
    
    # similarity with the vertical misalignment class
    vertical_similarity = similarity_using_estimate(inc_vec, vertical_matrix)
    similarity_list.append(vertical_similarity)
    
    # similarity with the underhang class
    underhang_similarity = similarity_using_estimate(inc_vec, underhang_matrix)
    similarity_list.append(underhang_similarity)
    
    # similarity with the overhang class
    overhang_similarity = similarity_using_estimate(inc_vec, overhang_matrix)
    similarity_list.append(overhang_similarity)
    
    # estimate the class index based on maximum similarity
    class_estimate_index = np.argmax(similarity_list)
    
    # mapping showing the class label names
    label_mapping = {0: 'Normal', 1: 'Imbalance Fault', 2: 'Horizontal Parallel Misalignment Fault', 
                 3: 'Vertical Parallel Misalignment Fault', 4: 'Underhang Bearing Fault', 5: 'Overhang Bearing Fault'}
    
    class_estimate = label_mapping[class_estimate_index]
    
    return class_estimate, similarity_list

In [203]:
# predicting the fault

# mapping that gives class name based on the index
label_mapping = {0: 'Normal', 1: 'Imbalance Fault', 2: 'Horizontal Parallel Misalignment Fault', 
                 3: 'Vertical Parallel Misalignment Fault', 4: 'Underhang Bearing Fault', 5: 'Overhang Bearing Fault'}

start_time = time.time()
X = underhang_test_matrix # load any matrix
vec = X[4] # extract any vector
top3_faults = [] # list to store top 3 fault types

estimated_class, similarity_measures = estimate_similarity_class(vec) # calculate the estimated class and similarities
print(similarity_measures)

if(estimated_class == "Normal"): # if the class is normal
    print("The machine is normal")
    print()
else: # if not then extract the top three fault types
    fault_classes = similarity_measures[1:]
    for i in range(3):
        max_idx = np.argmax(fault_classes)
        fault = label_mapping[max_idx + 1]
        top3_faults.append(fault)
        fault_classes[max_idx] = 0
    print("Top most probable faults:", top3_faults)
    print()

end_time = time.time()

print("Time taken for prediction:", (end_time - start_time)*1e3, "msec")

[0.9954824342417192, 0.9954065795336863, 0.9954313998164173, 0.9954220672463203, 0.9961786491793477, 0.9957563349477487]
Top most probable faults: ['Underhang Bearing Fault', 'Overhang Bearing Fault', 'Horizontal Parallel Misalignment Fault']

Time taken for prediction: 572.4105834960938 msec


# RFC

In [204]:
# function just to give the estimation error
def estimation_error(inc_vec, mem_mat):
    # assuming memory matrix to be of the size L x M
    L, M = mem_mat.shape
    
    # initialize a matrix G
    G = np.zeros((L, L))
    
    # calculate elements of G using the similarity function
    for i in range(L):
        for j in range(L):
            G[i, j] = calculate_similarity(mem_mat[i, :], mem_mat.T[:, j])
    
    # calculate inverse of the G matrix
    inv_G = np.linalg.inv(G)
    
    # calculate the vector a_n
    a_n = [calculate_similarity(mem_mat[i, :], inc_vec) for i in range(L)]
    
    # calculate the weight vector w_n
    w_n = np.dot(inv_G, a_n)
    
    # calculate the normalized weight vector
    normalized_w_n = w_n / np.linalg.norm(w_n, ord=1)
    
    # calculate the estimate
    estimate = np.dot(mem_mat.T, normalized_w_n)
    
    # calculate the estimation error and its norm
    estimate_error = inc_vec - estimate
    estimate_error_norm = np.linalg.norm(estimate_error, ord=2)
    
    return estimate_error_norm

In [205]:
# labeling each csv file

# list of all the csv files
files_list = ['normal_features.csv', 'imbalance_features.csv', 'horizontal_features.csv',
              'vertical_features.csv', 'underhang_features.csv', 'overhang_features.csv']

# list of the names of updated csv files
updated_csv_names = ['normal_class_features.csv', 'imbalance_class_features.csv', 'horizontal_class_features.csv', 
                     'vertical_class_features.csv', 'underhang_class_features.csv', 'overhang_class_features.csv']

for idx, file in enumerate(files_list):
    df = pd.read_csv(file)
    df[96] = idx
    df.columns = list(range(97))
    df.to_csv(updated_csv_names[idx],header=None,index=None)
    
print("Done")

Done


In [206]:
from sklearn.model_selection import train_test_split

# list of the csv files
csv_files = ['normal_class_features.csv', 'imbalance_class_features.csv', 'horizontal_class_features.csv', 
                     'vertical_class_features.csv', 'underhang_class_features.csv', 'overhang_class_features.csv']

# split each csv file into training and testing sets
for csv_file in csv_files:
    df = pd.read_csv(csv_file, header=None)

    # split the dataframe into features and labels
    X = df.iloc[:, :-1]
    y = df.iloc[:, -1]

    # split into training (90%) and testing (10%) sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # save the training and testing sets to separate csv files
    training_file = csv_file.replace('.csv', '_train.csv')
    testing_file = csv_file.replace('.csv', '_test.csv')

    pd.concat([X_train, y_train], axis=1).to_csv(training_file, index=False, header=False)
    pd.concat([X_test, y_test], axis=1).to_csv(testing_file, index=False, header=False)

print("Done")

Done


In [207]:
# combine all training files into one training file
training_files = [f.replace('.csv', '_train.csv') for f in csv_files]
combined_training_file = 'combined_training_data.csv'

combined_training_dfs = []
for training_file in training_files:
    df = pd.read_csv(training_file)
    df.columns = list(range(97))
    combined_training_dfs.append(df)

# concatenate the data frames along rows
combined_training_df = pd.concat(combined_training_dfs, axis=0, ignore_index=True)

# save the combined training dataframe to a csv file
combined_training_df.to_csv(combined_training_file, index=False, header=False)

# combine all testing files into one testing file
testing_files = [f.replace('.csv', '_test.csv') for f in csv_files]
combined_testing_file = 'combined_testing_data.csv'

combined_testing_dfs = []
for testing_file in testing_files:
    df = pd.read_csv(testing_file)
    df.columns = list(range(97))
    combined_testing_dfs.append(df)

# concatenate the data frames along rows
combined_testing_df = pd.concat(combined_testing_dfs, axis=0, ignore_index=True)

# save the combined testing dataframe to a csv file
combined_testing_df.to_csv(combined_testing_file, index=False, header=False)

print("Done")

Done


In [208]:
# also make a file of combined data
combined_data_file = 'combined_data.csv'
combined_dfs = []

training_file_df = pd.read_csv('combined_training_data.csv')
training_file_df.columns = list(range(97))
combined_dfs.append(training_file_df)

testing_file_df = pd.read_csv('combined_testing_data.csv')
testing_file_df.columns = list(range(97))
combined_dfs.append(testing_file_df)

# concatenate the data frames along rows
combined_df = pd.concat(combined_dfs, axis=0, ignore_index=True)
combined_df = combined_df.sort_values(by=96)

# save the combined testing dataframe to a csv file
combined_df.to_csv(combined_data_file, index=False, header=False)

print("Done")

Done


In [209]:
from sklearn.model_selection import cross_validate
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import make_scorer, precision_score, recall_score, f1_score

# load the combined data
combined_data = pd.read_csv('combined_data.csv', header=None)

X = combined_data.iloc[:, :-1]
y = combined_data.iloc[:, -1]

# initialize the Random Forest Classifier
rf_classifier = RandomForestClassifier(n_estimators=100, random_state=42)

# define the scoring metrics
scoring = {
    'accuracy': 'accuracy',
    'precision': make_scorer(precision_score, average='weighted'),
    'recall': make_scorer(recall_score, average='weighted'),
    'f1': make_scorer(f1_score, average='weighted')
}

# Perform cross-validation with multiple scoring metrics
cv_results = cross_validate(rf_classifier, X, y, scoring=scoring, cv=5)

# Print the results
print("Cross-Validation Results:")
print("Accuracy:", cv_results['test_accuracy'].mean())
print("Precision:", cv_results['test_precision'].mean())
print("Recall:", cv_results['test_recall'].mean())
print("F1-score:", cv_results['test_f1'].mean())

Cross-Validation Results:
Accuracy: 0.9850249074296066
Precision: 0.9857459873601092
Recall: 0.9850249074296066
F1-score: 0.9847950656554293


In [210]:
# confusion matrix for the above 
from sklearn.model_selection import cross_val_predict
from sklearn.metrics import confusion_matrix

y_pred_cv = cross_val_predict(rf_classifier, X, y, cv=5)
cm_cv = confusion_matrix(y, y_pred_cv)
print("Confusion Matrix:")
print(cm_cv)

Confusion Matrix:
[[ 36   0   8   0   0   1]
 [  0 328   0   0   3   0]
 [  0   2 190   2   0   1]
 [  0   0   4 295   0   0]
 [  0   5   0   0 551   0]
 [  0   1   1   1   0 508]]
