### Import Modules and Libraries

In [None]:
%pip install tensorflow scikit-learn matplotlib seaborn numpy pandas

import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow.keras.layers import Conv1D, BatchNormalization, Activation, Dropout, GRU, Dense
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import BinaryAccuracy, AUC
from tensorflow.keras.models import load_model

from sklearn.model_selection import train_test_split

### Dataset Processing

In [None]:
# List all CSV files in the dataset folder
dataset_folder = '../dataset'
csv_files = [f for f in os.listdir(dataset_folder) if f.endswith('.csv')]

In [None]:
# Load all CSV files into a single pandas DataFrame
dataset = []

for episode_num, file in enumerate(csv_files):
    episode = pd.read_csv(os.path.join(dataset_folder, file))
    episode['episode'] = episode_num
    dataset.append(episode)

dataset = pd.concat(dataset)
dataset = dataset.drop(columns=['timestamp'])

# Get the target marix
for i in range (1, 5):
    dataset[f'motor_{i}'] = dataset['motor_failure'] == i
dataset['motor_failure'] = dataset['motor_failure'] == 0

# Squencing the dataset
sequence_length = 10
X_dataset = []
y_dataset = []

for episode in dataset['episode'].unique():
    episode_dataset = dataset[dataset['episode'] == episode]
    episode_dataset = episode_dataset.drop(columns=['episode'])
    
    for idx in range(episode_dataset.shape[0] - sequence_length):
        X_episode = episode_dataset.drop(columns=['motor_failure', 'motor_1', 'motor_2', 'motor_3', 'motor_4']).iloc[idx : idx + sequence_length]
        y_episode = episode_dataset[['motor_failure', 'motor_1', 'motor_2', 'motor_3', 'motor_4']].iloc[idx + sequence_length]
        X_dataset.append(X_episode)
        y_dataset.append(y_dataset)

# Split the dataset into train and test
X_dataset = np.array(X_dataset)
y_dataset = np.array(y_dataset)
X_train, X_test, y_train, y_test = train_test_split(X_dataset, y_dataset, test_size=0.3)

# Output shapes for verification
print(f'Training dataset shape: {X_train.shape}')
print(f'Test dataset shape: {X_test.shape}')

### Drone Failure Detection Model

In [None]:
class MotorFailureDetectionModel(tf.keras.Model):
    def __init__(self, input_shape):
        super(MotorFailureDetectionModel, self).__init__()
        
        # Step 1: CONV layer
        self.conv1d = Conv1D(filters=196, kernel_size=15, strides=4, input_shape=input_shape)
        self.batch_norm1 = BatchNormalization()
        self.relu = Activation("relu")
        self.dropout1 = Dropout(rate=0.8)
        
        # Step 2: First GRU Layer
        self.gru1 = GRU(units=128, return_sequences=True)
        self.dropout2 = Dropout(rate=0.8)
        self.batch_norm2 = BatchNormalization()
        
        # Step 3: Second GRU Layer
        self.gru2 = GRU(units=128, return_sequences=True)
        self.dropout3 = Dropout(rate=0.8)
        self.batch_norm3 = BatchNormalization()
        self.dropout4 = Dropout(rate=0.8)
        
        # Step 4: Dense output layer
        self.output_layer = Dense(5, activation="sigmoid")
        
    def call(self, inputs):
        # Step 1: CONV layer
        x = self.conv1d(inputs)
        x = self.batch_norm1(x)
        x = self.relu(x)
        x = self.dropout1(x)
        
        # Step 2: First GRU Layer
        x = self.gru1(x)
        x = self.dropout2(x)
        x = self.batch_norm2(x)
        
        # Step 3: Second GRU Layer
        x = self.gru2(x)
        x = self.dropout3(x)
        x = self.batch_norm3(x)
        x = self.dropout4(x)
        
        # Step 4: Dense output layer
        x = self.output_layer(x)
        
        return x

In [None]:
# Model initialization
input_shape = (sequence_length, X_dataset.shape[2])
model = MotorFailureDetectionModel(input_shape=input_shape)
model.build(input_shape=(None, sequence_length, X_dataset.shape[2]))
model.summary()

### Training the Model

In [None]:
# Loss function and optimizer
loss_fn = BinaryCrossentropy()
optimizer = Adam(learning_rate=0.001)
metrics = [BinaryAccuracy(), AUC()]

# Compile the model
model.compile(optimizer=optimizer, loss=loss_fn, metrics=metrics)

In [None]:
# Define the training parameters
model_name = 'Failure-Detection-0.h5'
epochs_num = 100
batch_size = 4
val_split = 0.2

# Callbacks
callbacks = [
    EarlyStopping(monitor='val_loss', patience=5),
    ModelCheckpoint(filepath=f'../models/{model_name}', monitor='val_loss', save_best_only=True)
]

# Training
history = model.fit(X_train, y_train, epochs=epochs_num, batch_size=batch_size, validation_split=val_split, callbacks=callbacks)

### Evaluate & Test the Model

In [None]:
# Evaluate the Model
results = model.evaluate(X_test, y_test)

print(f'Loss: {results[0]:.4f}')
for metric_name, metric_value in zip(model.metrics_names[1:], results[1:]):
    print(f'{metric_name}: {metric_value:.4f}')

### Save the Model

In [None]:
# Save the model
model_name = 'Failure-Detection-0.h5'
model.save(f'../models/{model_name}')

### Load the Model

In [None]:
# Load the model
model_name = 'Failure-Detection-0.h5'
model = load_model(f'../models/{model_name}')
model.summary()