In [None]:
import os
from re import search
import pandas as pd
import numpy as np
import scipy.io

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC
from sklearn.metrics import confusion_matrix,classification_report, accuracy_score
from sklearn.linear_model import LogisticRegression

import seaborn as sns
import matplotlib.pyplot as plt

SAMPLE_FREQUENCY = 48_000

In [None]:
if not os.path.exists("images"):
    os.mkdir("images")

In [None]:
path = '/home/kenny/Area_2_Trabalho_Final/48k_DE/'

size_sample = 2048
df_original = pd.DataFrame()

for entry in os.scandir(path):
    if entry.is_file():
        mat = scipy.io.loadmat(path+entry.name)
        for i in mat.keys():
            if search('DE',i):
                key = i

        raw_data = [item for sublist in mat[key] for item in sublist]

        samples = [raw_data[i*size_sample:(i+1)*size_sample] for i in range(int(len(raw_data)/2048))]
        df_original_raw = pd.DataFrame(zip(samples),columns=['Samples'])
        df_original_raw['Fault'] = entry.name.split('.')[0].split('_')[0]
        df_original = pd.concat([df_original,df_original_raw])

In [None]:
df = df_original.copy(deep=True)
df = df.reset_index(drop=True)

In [None]:
import plotly.express as px

sample = 100
y = df.iloc[sample]['Samples']
x = np.array(range(len(y)))*1/SAMPLE_FREQUENCY

fig = px.line(x=x, y=y, title=f'Sample {sample}',
              labels={'y':'Acceleration [g]',
                      'x':'Time [s]'
              })

fig.show()

In [None]:
df['Max'] = df.apply(lambda x: np.array(x[0]).max(),axis=1)
df['Min'] = df.apply(lambda x: np.array(x[0]).min(),axis=1)
df['Mean'] = df.apply(lambda x: np.array(x[0]).mean(),axis=1)
df['RMS'] = df.apply(lambda x: np.sqrt(np.mean(np.array(x[0])**2)),axis=1)
df['Var'] = df.apply(lambda x: np.var(np.array(x[0])),axis=1)
df['Crest'] = df.apply(lambda x: (np.array(x[0]).max())/(np.sqrt(np.mean(np.array(x[0])**2))),axis=1)
df['Form'] = df.apply(lambda x: np.sqrt(np.mean(np.array(x[0])**2))/np.abs(np.array(x[0])).mean(),axis=1)
df['Impu'] = df.apply(lambda x: np.array(x[0]).max()/np.sqrt(np.mean(np.abs(np.array(x[0])))),axis=1)
df['Clear'] = df.apply(lambda x: np.array(x[0]).max()/np.mean(np.sqrt(np.abs(np.array(x[0])))),axis=1)

In [None]:
df = df.drop(columns=['Samples'])
df_full = df.copy(deep=True)

In [None]:
df = df.drop(columns=['Fault']).corr()
corr_matrix = df.corr()
plot = sns.heatmap(corr_matrix, annot=True)
fig = plot.get_figure()
fig.savefig("images/base_corr.png")

In [None]:
df = df_full.drop(columns=['Fault','Impu','Min'])
corr_matrix = df.corr()
plot = sns.heatmap(corr_matrix, annot=True)
fig = plot.get_figure()
fig.savefig("images/base_corr_drop.png")
fig.show()

In [None]:
scaler = StandardScaler()
data_time_scaled = scaler.fit_transform(df)
df_scaled = pd.DataFrame(data_time_scaled, columns=df.columns)
df_scaled

In [None]:
X_train, X_test, y_train, y_test = train_test_split(df_scaled,df_full['Fault'], test_size = 0.2, stratify = df_full['Fault'], random_state = 1)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size = 0.5, random_state = 1)

In [None]:
svc_model = SVC()
svc_model.fit(X_train, y_train)

In [None]:
train_predictions = svc_model.predict(X_train)
test_predictions = svc_model.predict(X_test)

In [None]:
train_confu_matrix = confusion_matrix(y_train, train_predictions)
test_confu_matrix = confusion_matrix(y_test, test_predictions)

In [None]:
fault_type = df_full.Fault.unique()

fig = plt.figure(1,figsize=(18,8))

plt.subplot(121)
sns.heatmap(train_confu_matrix, annot= True,fmt = "d",
xticklabels=fault_type, yticklabels=fault_type, cmap = "Blues", cbar = False)
plt.title('Training Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.subplot(122)

plt.subplot(122)
sns.heatmap(test_confu_matrix, annot = True,fmt = "d",
xticklabels=fault_type, yticklabels=fault_type, cmap = "Blues", cbar = False)
plt.title('Test Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')

fig.savefig('images/output_SVM.png')

In [None]:
# Classification report (test set)
class_report = classification_report(y_pred = test_predictions, y_true = y_test)
print(class_report)

In [None]:
parameters = {"C":[1, 10, 45, 47,49, 50, 51, 55, 100, 300, 500],
             'gamma':[0.01, 0.05, 0.1, 0.5, 1, 5],
             'kernel':["rbf","linear"]}

tuned_svm_clf = GridSearchCV(SVC(),parameters,n_jobs = -1, cv= 10)
tuned_svm_clf.fit(train_data_scaled, train_data['fault'])

print(tuned_svm_clf.best_params_)
print(tuned_svm_clf.best_estimator_)

train_predictions_best = tuned_svm_clf.best_estimator_.predict(train_data_scaled)
test_predictions_best = tuned_svm_clf.best_estimator_.predict(test_data_scaled)

train_confu_matrix_best = confusion_matrix(train_data['fault'], train_predictions_best)
test_confu_matrix_best = confusion_matrix(test_data['fault'], test_predictions_best)

In [None]:
plt.figure(1,figsize=(18,8))

plt.subplot(121)
sns.heatmap(train_confu_matrix_best, annot= True,fmt = "d",
xticklabels=fault_type, yticklabels=fault_type, cmap = "Blues", cbar = False)
plt.title('Training Confusion Matrix (best model)')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.subplot(122)

plt.subplot(122)
sns.heatmap(test_confu_matrix_best, annot = True,
            xticklabels=fault_type, yticklabels=fault_type, cmap = "Blues", cbar = False)
plt.title('Test Confusion Matrix (best model)')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()

class_report_best = classification_report(y_pred = test_predictions_best, y_true = test_data['fault'])
print(class_report_best)

In [None]:
# Setup the model
logis_model = LogisticRegression(multi_class='multinomial', solver='lbfgs', max_iter=1000)

# Train the model
logis_model.fit(X_train, y_train)

In [None]:
test_predictions_lr = logis_model.predict(X_test)

In [None]:
test_confu_matrix_lr = confusion_matrix(y_test, test_predictions_lr)


In [None]:
# Classification report
class_report_lr = classification_report(y_pred = test_predictions_lr, y_true = y_test)
print(class_report_lr)

In [None]:
# Compute the predictions
train_predictions_logis = logis_model.predict(X_train)
test_predictions_logis = logis_model.predict(X_test)

In [None]:
# Classification report (test set)
class_report_logis = classification_report(y_pred = test_predictions_logis, y_true = y_test)
print(class_report_logis)

In [None]:
plt.figure(1,figsize=(8,6))

sns.heatmap(test_confu_matrix_lr, annot = True,
xticklabels=fault_type, yticklabels=fault_type,fmt = "d", cmap = "Blues", cbar = False)
plt.title('Test Confusion Matrix (logistic regression)')
plt.xlabel('Predicted')
plt.ylabel('True')

plt.show()

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation

In [None]:
model = Sequential()
model.add(Dense(units=10, activation='relu',input_shape=(len(X_train.columns),)))
model.add(Dropout(0.1))
model.add(Dense(units=10, activation='relu'))
model.add(Dropout(0.1))
model.add(Dense(units=len(df['Fault'].unique()), activation='softmax'))
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=["accuracy"])
model.summary()

In [None]:
from sklearn import preprocessing

le = preprocessing.LabelEncoder()
le.fit(df['Fault'].unique())
list(le.classes_)

In [None]:
y_train_transformed = le.transform(y_train)
y_test_transformed = le.transform(y_test)

In [None]:
from tensorflow.keras.callbacks import EarlyStopping
early_stop = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=5)
model.fit(x = X_train, y = y_train_transformed, epochs=400, validation_data = (X_test, y_test_transformed),verbose=1, callbacks=[early_stop])
model_history = pd.DataFrame(model.history.history)
ax = model_history.plot()
ax.set_xlabel('Época')

In [None]:
from sklearn.model_selection import GridSearchCV
from scikeras.wrappers import KerasClassifier, KerasRegressor

# Define the model
def create_model(unitsA=10, unitsB=10, optimizer = 'adam', learning_rate=0.01, loss='sparse_categorical_crossentropy', metrics=['accuracy']):
    if optimizer == 'sgd':
        optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate)
    elif optimizer == 'rmsprop':
        optimizer = tf.keras.optimizers.RMSprop(learning_rate=learning_rate)
    elif optimizer == 'adagrad':
        optimizer = tf.keras.optimizers.Adagrad(learning_rate=learning_rate)
    elif optimizer == 'adamax':
        optimizer = tf.keras.optimizers.Adamax(learning_rate=learning_rate)
    else:
        optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

    model = Sequential()
    model.add(Dense(units=unitsA, activation='relu',input_shape=(len(X_train.columns),)))
    model.add(Dropout(0.1))
    model.add(Dense(units=unitsB, activation='relu'))
    model.add(Dropout(0.1))
    model.add(Dense(units=len(df['Fault'].unique()), activation='softmax'))
    model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=["accuracy"])
    model.summary()
    model.compile(optimizer=optimizer, loss=loss, metrics=metrics)
    return model

# Define the hyperparameters to tune
param_grid = {'optimizer': ['sgd', 'rmsprop', 'adagrad', 'adamax', 'adam'],
              'learning_rate': [0.1, 0.05, 0.01, 0.005, 0.001],
              'unitsA': list(range(1,100,10)),
              'unitsA': list(range(1,100,10)),
              }
# Create the grid search object
grid_search = GridSearchCV(KerasClassifier(model=create_model, epochs=1), 
                           param_grid=param_grid, cv=3, n_jobs=-1,
                           validation_data = (X_test, y_test),verbose=1, callbacks=[early_stop])

# Fit the grid search to the data
grid_search.fit(X_train, y_train)

# Print the best hyperparameters and their corresponding score
print("Best parameters: ", grid_search.best_params_)
print("Best score: ", grid_search.best_score_)