In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from sklearn.model_selection import train_test_split
from data_preprocess import Preprocessor

In [None]:
df_f0 = pd.read_csv("data/final/faulty/FaultyMotor1_20.csv", header=None)
df_n0 = pd.read_csv("data/final/normal/NormalMotor150mm_20.csv", header=None)

In [None]:
# data, channels, sr, hz, type
# Edit the channels, hz, and type for each dataset
# channels: is given as a tuple of the channels to be used
# sr: the sampling rate of the data
# hz: the frequency of the data, given as a tuple (only one value if the hz is the same for all channels)
# type: the type of the data, given as a string of n, f, or i for each channel. f is Faulty, n is Normal, and i is Imbalanced)
data_df = [
    [df_f0, (1,), 20000, (20,), "f"],
    [df_n0, (0, 1, 2, 3), 20000, (30,), "nnnn"],
]

In [None]:
data_interval = 4
preprocessor = Preprocessor(max_freq=600, kaiser_beta=10)
dataset = preprocessor.compute_preprocess(data_df, data_interval, shuffle=True, keep_channels_separate=False)

In [None]:
X_with_hz = dataset[:, :-1]
y = dataset[:, -1]

y = preprocessor.one_hot_encode(y)

In [None]:
trainXhz, testXhz, y_train, y_test = train_test_split(X_with_hz, y, test_size=0.12)
trainXhz, valXhz, y_train, y_val = train_test_split(trainXhz, y_train, test_size=0.12)

X_train = trainXhz[:, :-1]
X_val = valXhz[:, :-1]
X_test = testXhz[:, :-1]

train_hz = trainXhz[:, -1]
val_hz = valXhz[:, -1]
test_hz = testXhz[:, -1]

print(X_train.shape)
print(X_val.shape)
print(X_test.shape)
print(y_train.shape)
print(y_val.shape)
print(y_test.shape)
print(train_hz.shape)
print(val_hz.shape)
print(test_hz.shape)

In [None]:
# Function to reset keras so that a new model can be created without having to restart the kernel and reload all datasets
def reset():
    keras.backend.clear_session()
    tf.compat.v1.reset_default_graph()
    try:
        del x
        del model
    except:
        pass

In [None]:
reset()
input_shape = (X_train.shape[1], 1)
fft_input_layer = keras.Input(shape=input_shape)

x = keras.layers.Conv1D(32, kernel_size=3, activation='relu', padding='same', kernel_initializer="he_normal", use_bias=True)(fft_input_layer)
x = keras.layers.MaxPooling1D(pool_size=2)(x)
x = keras.layers.Dropout(0.4)(x)
x = keras.layers.Conv1D(64, kernel_size=3, activation='relu', padding='same', kernel_initializer="he_normal", use_bias=True)(x)
x = keras.layers.MaxPooling1D(pool_size=2)(x)
x = keras.layers.Dropout(0.2)(x)
x = keras.layers.Flatten()(x)

hz_input_layer = keras.Input(shape=(1,))
combined = keras.layers.concatenate([x, hz_input_layer])

x = keras.layers.Dense(64, activation='relu')(combined)
x = keras.layers.Dropout(0.1)(x)
x = keras.layers.Dense(32, activation='relu')(x)

output_layer = keras.layers.Dense(3, activation='softmax')(x)

In [None]:
model = keras.Model(inputs=[fft_input_layer, hz_input_layer], outputs=output_layer)
print(model.summary())

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
loss = tf.keras.losses.CategoricalCrossentropy()
model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])

In [None]:
callback = keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)
history = model.fit([X_train, train_hz], y_train, epochs=40, batch_size=64, validation_data=([X_val, val_hz], y_val), shuffle=True, callbacks=[callback])

In [None]:
plt.figure(figsize=(12, 6))
plt.plot(history.history['accuracy'], label='Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Model Accuracy')
plt.legend()
plt.show()

In [None]:
plt.figure(figsize=(12, 6))
plt.plot(history.history['loss'], label='Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Model Loss')
plt.legend()
plt.show()

In [None]:
model.evaluate([X_test, test_hz], y_test)

In [None]:
z = model.predict([X_test, test_hz])
side_by_side = np.concatenate((z, y_test), axis=1)
side_by_side = np.round(side_by_side, 2)
print(side_by_side)

In [None]:
model.save('models/cnn_mapping_model_test6_dropout_5.keras')