In [None]:
# Standard Libraries
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder, label_binarize
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import KFold
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, roc_curve, auc, precision_recall_curve, average_precision_score, classification_report

# TensorFlow/Keras Models and Layers
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, GRU, Dense, Conv1D, MaxPooling1D, Flatten, Bidirectional
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.metrics import Precision, Recall, AUC
from tensorflow.python.client import device_lib

# Transformer Libraries
from transformers import TFAutoModel, AutoTokenizer


In [2]:
print(device_lib.list_local_devices())

[name: "/device:CPU:0"
device_type: "CPU"
memory_limit: 268435456
locality {
}
incarnation: 12939630821492890327
xla_global_id: -1
]


In [3]:
file_path = r"C:\\Users\\Amirreza\\Downloads\\Documents\\Project\\LSTM Model\\Data\\all_data.csv"
df = pd.read_csv(file_path)

In [4]:
def preprocess_data(df, sequence_length):
    # drop unwanted column
    # df = df.drop(columns=['P-JUS-CKGL', 'T-JUS-CKGL', 'QGL'])

    # convert timestamp to datetime and extract features
    df['timestamp'] = pd.to_datetime(df['timestamp'], format='%Y-%m-%d %H:%M:%S.%f')
    df['hour'] = df['timestamp'].dt.hour
    df['minute'] = df['timestamp'].dt.minute
    df['second'] = df['timestamp'].dt.second
    df['seconds_since_midnight'] = df['hour'] * 3600 + df['minute'] * 60 + df['second']
    df = df.drop(columns=['timestamp', 'hour', 'minute', 'second'])

    #separate features and target
    X = df.drop(columns=['class'])
    y = df['class']

    # preprocessing pipeline for numeric features
    numeric_features = X.select_dtypes(include=['int64', 'float64']).columns
    numeric_transformer = StandardScaler()

    # Preprocessing pipeline for categorical features
    categorical_features = X.select_dtypes(include=['object', 'category']).columns
    categorical_transformer = OneHotEncoder(handle_unknown='ignore')

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('car', categorical_transformer, categorical_features)
        ]
    )

    # apply transformation 
    X_processed = preprocessor.fit_transform(X)

    # encode class labels
    label_encoder = LabelEncoder()
    y_encoded = label_encoder.fit_transform(y)

    # Initialize lists for sequences
    X_sequences = []
    y_sequences = []


    # Create sequences
    for i in range(len(X_processed) - sequence_length):
        X_sequences.append(X_processed[i:i + sequence_length])
        y_sequences.append(y_encoded[i + sequence_length])

    # create sequences
    X_sequences = np.array(X_sequences)
    y_sequences = np.array(y_sequences)

    return X_sequences, y_sequences

In [1]:
def create_bidirectional_lstm_model(input_shape):
    model = Sequential()
    model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(75, return_sequences=True), input_shape=input_shape))
    model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(75)))
    model.add(Dense(15, activation='softmax'))  # 15 units for 15 classes
    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy', Precision(name='precision'), Recall(name='recall'), AUC(name='auc')])
    return model

In [6]:
# Lists to store results
acc_scores = []
val_acc_scores = []
test_accuracy = []

# Define sequence length
sequence_length = 60   # 1 hours

# preprocess the data
X, y = preprocess_data(df, sequence_length)

# Split data into train, validation, and test sets
X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.05, random_state=42)

X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.05/(1-0.05), random_state=42)

# Check and print available devices
print("Num GPUs Available:", len(tf.config.experimental.list_physical_devices('GPU')))

# Create a MirroredStrategy.
strategy = tf.distribute.MirroredStrategy()

# Open a strategy scope.
with strategy.scope():
    # Create the model inside the strategy's scope.
    model = create_bidirectional_lstm_model(input_shape=(sequence_length, X_train.shape[2]))

    # Define the early stopping callback
    early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

    # Train the model
    history = model.fit(X_train, y_train, epochs=10, batch_size=32, validation_data=(X_val, y_val), callbacks=[early_stopping])

    # Evaluate on the test set
    test_loss, test_accuracy = model.evaluate(X_test, y_test)


Num GPUs Available: 0
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


Epoch 1/10


Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [7]:
# Print the results
print("Test Accuracy:", test_accuracy)

Test Accuracy: 0.9751362204551697


In [8]:
model.save("Bidirectional_LSTM_final_model.h5")

  saving_api.save_model(


In [9]:

model1 = load_model("Bidirectional_LSTM_final_model.h5")

In [None]:
# styling for plots
sns.set(style="whitegrid")
plt.style.use('ggplot')

In [None]:
# Confusion Matrix
def plot_confusion_matrix(y_test , y_pred, classes):
    cm = confusion_matrix(y_test, y_pred)
    plt.figure(figsize=(10, 7))
    sns.heatmap(cm, annot=True, fmt='d', cmap='blues', xticklabels=classes, yticklabels=classes)
    plt.title(f'Bidirectional LSTM_confusion_matrix')
    plt.ylabel('True label')
    plt.xlabel('predicted label')
    plt.savefig(f'Bidirectional_LSTM_confusion_matrix.png')
    plt.show()

In [None]:
# ROC Curve and AUC
def plot_roc_curve(y_test, y_score, n_classes):
    fpr = dict()
    tpr = dict()
    roc_auc = dict()
    
    for i in range (n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_test[:,i], y_score[:,i])
        roc_auc[i] = auc(fpr[i], tpr[i])
        
    plt.figure(figsize=(10,7))
    for i in range(n_classes):
        plt.plot(fpr[i], tpr[i], lw=2, label='class{0} (area = {1:0.2f})'.format(i, roc_auc))
        
    plt.plot([0,1], [0,1], 'k--', lw=2)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'Bidirectional LSTM - Receivor Operating Characteristic (ROC)')
    plt.legend(loc="lower right")
    plt.savefig(f'Bidirectional_LSTM_roc_curve.png')
    plt.show()

In [None]:
# Precision-Recall Curve
def plot_precision_recall_curve(y_test, y_score, n_classes):
    precision = dict()
    recall = dict()
    average_precision = dict()
    
    for i in range (n_classes):
        precision[i], recall[i], _ = precision_recall_curve(y_test[:,i], y_score[:,i])
        average_precision[i] = average_precision_score(y_test[:,i], y_score[:,i])
        
    plt.figure(figsize=(10,7))
    for i in range (n_classes):
        plt.plot(recall[i], precision[i], lw=2, label='Class[0] (area = {1:0.2f})'.format(i, average_precision[i]))
        
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title(f'Bidirectional LSTM - precision-recall curve')
    plt.legend(loc="best")
    plt.savefig(f'Bidirectional_LSTM_precision_recall_curve.png')
    plt.show()
        

In [2]:
# Training and Validation plots
def plot_training_history(history):
    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']
    loss = history.history['loss']
    val_loss = history.history['val_loss']

    epochs = range(1, len(acc) + 1)
    
    # accuracy plot 
    plt.figure(figsize=(7,5))
    plt.plot(epochs, acc, 'bo-', label='Training accuracy')
    plt.plot(epochs, val_acc, 'r*-', label='Validation accuracy')
    plt.title('Bidirectional LSTM - Training and validation accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.savefig(f'Bidirectional_LSTM_training_accuracy')
    
    
    # Loss plot 
    plt.figure(figsize=(7,5))
    plt.plot(epochs, loss, 'bo-', label='Training loss')
    plt.plot(epochs, val_loss, 'r*-', label='Validation loss')
    plt.title('Bidirectional LSTM - Training and validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.savefig(f'Bidirectional_LSTM_training_loss')
    plt.show()



In [None]:
# Generate classification report
def save_classification_report(y_test, y_pred, classes):
    report = classification_report(y_test, y_pred, target_names=classes)
    print(report)
    with open(f"Bidirectional_LSTM_classification_report.txt", "w") as f:
        f.write(report)

In [None]:
classes = [str(i) for i in range(15)] 

# Predict on test data
y_pred = np.argmax(model1.predict(X_test), axis=-1)
y_score = model1.predict(X_test)

# Binarize the output for multiclass metrics
y_test_bin = label_binarize(y_test, classes=range(len(classes)))

# Plot and save all the metrics
plot_training_history(history)
plot_confusion_matrix(y_test, y_pred, classes)
plot_roc_curve(y_test_bin, y_score, len(classes))
plot_precision_recall_curve(y_test_bin, y_score, len(classes))
save_classification_report(y_test, y_pred, classes)