-------PREPARING ENVIRONMENT--------

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

In [None]:
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

--------DATA PREPROCESSING--------------

In [None]:
from google.colab import drive
drive.mount("/content/gdrive")

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split

# Load the CSV file using numpy.genfromtxt()
data = np.genfromtxt('/content/gdrive/MyDrive/Bakalauras/train_data_RR_1_05_31_newest.csv', delimiter=',')

In [None]:
data = np.delete(data, 0, axis=0)
data = np.delete(data, 0, axis=1)

train_labels = data[:, 1]
train_data = np.delete(data, 1, axis=1)

print("Initial data")
print("Class 0:")
print(np.count_nonzero(train_labels == 0))
print("Class 1:")
print(np.count_nonzero(train_labels == 1))
print("Class 2:")
print(np.count_nonzero(train_labels == 2))
print("Initial data length:")
print(len(train_labels))

faulty_indices = np.where(train_labels == 3)
faulty_indices = faulty_indices[0].tolist()

train_labels = np.delete(train_labels,[faulty_indices],axis=0)
train_data = np.delete(train_data,[faulty_indices],axis=0)

print("Initial data length (no noise):")
print(len(train_labels))

# Split the data into training and validation sets

# SKIP WHEN TRAINING RANDOM FOREST
train_data, validation_data, train_labels, validation_labels = train_test_split(train_data, train_labels, test_size=0.2, random_state=42)

print("Training data")
print("Class 0:")
print(np.count_nonzero(train_labels == 0))
print("Class 1:")
print(np.count_nonzero(train_labels == 1))
print("Class 2:")
print(np.count_nonzero(train_labels == 2))
print("Training data length:")
print(len(train_labels))

print("Validation data")
print("Class 0:")
print(np.count_nonzero(validation_labels == 0))
print("Class 1:")
print(np.count_nonzero(validation_labels == 1))
print("Class 2:")
print(np.count_nonzero(validation_labels == 2))
print("Validation data length:")
print(len(validation_labels))

# END OF SKIPPABLE CODE

----------DATA BALANCING----------

In [None]:
# SMOTE
from imblearn.over_sampling import SMOTE

smote = SMOTE(random_state=42)
smote_train_data, smote_labels = smote.fit_resample(train_data, train_labels)

print("SMOTE balanced data")
print("Class 0:")
print(np.count_nonzero(smote_labels == 0))
print("Class 1:")
print(np.count_nonzero(smote_labels == 1))
print("Class 2:")
print(np.count_nonzero(smote_labels == 2))
print("SMOTE data length:")
print(len(smote_labels))

In [None]:
# RANDOM OVERSAMPLING
from imblearn.over_sampling import RandomOverSampler

oversample = RandomOverSampler(random_state=42)
random_oversampling_train_data, random_oversampling_labels = oversample.fit_resample(train_data, train_labels)

print("RANDOM OVERSAMPLING balanced data")
print("Class 0:")
print(np.count_nonzero(random_oversampling_labels == 0))
print("Class 1:")
print(np.count_nonzero(random_oversampling_labels == 1))
print("Class 2:")
print(np.count_nonzero(random_oversampling_labels == 2))
print("RANDOM OVERSAMPLING data length:")
print(len(random_oversampling_labels))

In [None]:
# RANDOM UNDERSAMPLING
from imblearn.under_sampling import RandomUnderSampler

undersample = RandomUnderSampler(random_state=42)
random_undersampling_train_data, random_undersampling_labels = undersample.fit_resample(train_data, train_labels)


print("RANDOM UNDERSAMPLING balanced data")
print("Class 0:")
print(np.count_nonzero(random_undersampling_labels == 0))
print("Class 1:")
print(np.count_nonzero(random_undersampling_labels == 1))
print("Class 2:")
print(np.count_nonzero(random_undersampling_labels == 2))
print("RANDOM UNDERSAMPLING data length:")
print(len(random_undersampling_labels))

In [None]:
from imblearn.over_sampling import ADASYN
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report

# Perform ADASYN to balance the dataset
adasyn = ADASYN(random_state=42)
adasyn_train_data, adasyn_labels = adasyn.fit_resample(train_data, train_labels)

print("ADASYN balanced data")
print("Class 0:")
print(np.count_nonzero(adasyn_labels == 0))
print("Class 1:")
print(np.count_nonzero(adasyn_labels == 1))
print("Class 2:")
print(np.count_nonzero(adasyn_labels == 2))
print("ADASYN data length:")
print(len(adasyn_labels))

In [None]:
from imblearn.combine import SMOTEENN
smote_enn = SMOTEENN(random_state=42)
balanced_train_data, balanced_labels = smote_enn.fit_resample(train_data, train_labels)

print("Mixed balanced data")
print("Class 0:")
print(np.count_nonzero(balanced_labels == 0))
print("Class 1:")
print(np.count_nonzero(balanced_labels == 1))
print("Class 2:")
print(np.count_nonzero(balanced_labels == 2))
print("Mixed data length:")
print(len(balanced_labels))

--------TRAINING---------

In [None]:
from tensorflow import keras

# Import provided model
model = keras.models.load_model('/content/gdrive/MyDrive/Bakalauras/best_model_final.h5')

In [None]:
import matplotlib.pyplot as plt

def train(train_data, train_labels, validation_data, validation_labels):
    # Copying layers from the provided model
    new_model = keras.models.clone_model(model)

    early_stopping_monitor = keras.callbacks.EarlyStopping(
      monitor='val_loss',
      min_delta=0,
      patience=5,
      verbose=0,
      mode='auto',
      baseline=None,
      restore_best_weights=True
    )

    # Compiling - adding optimizer, loss function, metrics
    new_model.compile(
        optimizer=keras.optimizers.Adam(),
        loss=keras.losses.SparseCategoricalCrossentropy(),
        metrics=[keras.metrics.SparseCategoricalAccuracy(), "MAE"],
    )

    # Training
    history = new_model.fit(
        train_data,
        train_labels,
        epochs=10,
        validation_data=(validation_data, validation_labels),
        callbacks=[early_stopping_monitor]
    )


    # Plotting model accuracy
    plt.plot(history.history['sparse_categorical_accuracy'])
    plt.plot(history.history['val_sparse_categorical_accuracy'])
    plt.title('Model Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['train', 'validation'], loc='upper left')
    plt.show()

    # Plotting model loss
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['train', 'validation'], loc='upper left')
    plt.show()

    return new_model


In [None]:
# After training the model, save with proper naming - For example, for SMOTE balancing, name the model - "SMOTE_model.h5"
print("Training primary model")
primary_model = train(train_data, train_labels, validation_data, validation_labels)
print("Training SMOTE model")
smote_model = train(smote_train_data, smote_labels, validation_data, validation_labels)
print("Training random oversampling model")
random_oversampling_model = train(random_oversampling_train_data, random_oversampling_labels, validation_data, validation_labels)
print("Training random undersampling model")
random_undersampling_model = train(random_undersampling_train_data, random_undersampling_labels, validation_data, validation_labels)
print("Training ADASYN model")
adasyn_model = train(adasyn_train_data, adasyn_labels, validation_data, validation_labels)
print("Training balanced model")
balanced_model = train(balanced_train_data, balanced_labels, validation_data, validation_labels)


primary_model.save('/content/gdrive/MyDrive/Bakalauras/primary_model.h5')
smote_model.save('/content/gdrive/MyDrive/Bakalauras/smote_model.h5')
random_oversampling_model.save('/content/gdrive/MyDrive/Bakalauras/random_oversampling_model.h5')
random_undersampling_model.save('/content/gdrive/MyDrive/Bakalauras/random_undersampling_model.h5')
adasyn_model.save('/content/gdrive/MyDrive/Bakalauras/adasyn_model.h5')
balanced_model.save('/content/gdrive/MyDrive/Bakalauras/balanced_model.h5')

-----------TESTING-------

In [None]:
import numpy as np
np.set_printoptions(suppress=True)

# PREPARING TEST DATA
test_data = np.genfromtxt('/content/gdrive/MyDrive/Bakalauras/test_data_RR_1_05_31_newest.csv', delimiter=',')

test_data = np.delete(test_data, 0, axis=0)
test_data = np.delete(test_data, 0, axis=1)

test_labels = test_data[:, 1]

test_data = np.delete(test_data, 1, axis=1)

print("Testing data")
print("Class 0:")
print(np.count_nonzero(test_labels == 0))
print("Class 1:")
print(np.count_nonzero(test_labels == 1))
print("Class 2:")
print(np.count_nonzero(test_labels == 2))
print("Testing data length:")
print(len(test_labels))

faulty_indices = np.where(test_labels == 3)
faulty_indices = faulty_indices[0].tolist()

test_labels = np.delete(test_labels,[faulty_indices],axis=0)
test_data = np.delete(test_data,[faulty_indices],axis=0)

print("Testing data length (no noise):")
print(len(test_labels))

In [None]:
from sklearn.metrics import confusion_matrix, classification_report
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

def test(model):
    results = model.evaluate(test_data, test_labels)
    print("test loss, test acc:", results)
  
    y_pred = model.predict(test_data)
  
    predictions = []
    for prediction in y_pred:
        predictions.append(np.argmax(prediction))
    
    print(confusion_matrix(test_labels, predictions))
    print(classification_report(test_labels, predictions))

    report = classification_report(test_labels, predictions, output_dict=True)
    report_df = pd.DataFrame(report).transpose()

    fig, ax = plt.subplots(figsize=(8, 6))
    sns.heatmap(report_df.iloc[:-3, :-1], annot=True, cmap="Blues", ax=ax, cbar=False)
    ax.set_ylabel('Labels')
    plt.show()


In [None]:
from tensorflow import keras

# Load model for testing. 
primary_model = keras.models.load_model('/content/gdrive/MyDrive/Bakalauras/primary_model.h5')
smote_model = keras.models.load_model('/content/gdrive/MyDrive/Bakalauras/smote_model.h5')
random_oversampling_model = keras.models.load_model('/content/gdrive/MyDrive/Bakalauras/random_oversampling_model.h5')
random_undersampling_model = keras.models.load_model('/content/gdrive/MyDrive/Bakalauras/random_undersampling_model.h5')
adasyn_model = keras.models.load_model('/content/gdrive/MyDrive/Bakalauras/adasyn_model.h5')
balanced_model = keras.models.load_model('/content/gdrive/MyDrive/Bakalauras/balanced_model.h5')

In [None]:
print("Testing primary model")
test(primary_model)
print("----------")
print("Testing SMOTE")
test(smote_model)
print("----------")
print("Testing random oversampling")
test(random_oversampling_model)
print("----------")
print("Testing random undersampling")
test(random_undersampling_model)
print("----------")
print("Testing adasyn undersampling")
test(adasyn_model)
print("----------")
print("Testing balanced model")
test(balanced_model)

-------------------------RANDOM FOREST--------------------------------

In [None]:
# Modelling
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, precision_score, recall_score, ConfusionMatrixDisplay, classification_report
from sklearn.model_selection import RandomizedSearchCV, train_test_split, cross_val_score, cross_val_predict
from scipy.stats import randint

# Tree Visualisation
from sklearn.tree import export_graphviz
from IPython.display import Image
import graphviz

def find_random_forest_hyperparams(train_data, train_labels):
    param_grid = {
    'n_estimators': [75, 88, 100],
    'max_features': ['sqrt'],
    'max_depth': [16, 18, None],
    'max_leaf_nodes': [15, 18, None],
    }

    # Create a random forest classifier
    rf = RandomForestClassifier()

    # Use random search to find the best hyperparameters
    rand_search = RandomizedSearchCV(rf, param_grid)

    # Fit the random search object to the data
    rand_search.fit(train_data, train_labels)

    # Create a variable for the best model
    best_rf = rand_search.best_estimator_

    # Print the best hyperparameters
    print('Best hyperparameters:',  rand_search.best_params_)

    return best_rf


def train_random_forest(train_data, train_labels):
    # Create a random forest classifier
    
    # Best hyperparameters: {'n_estimators': 75, 'max_leaf_nodes': None, 'max_features': 'sqrt', 'max_depth': 18}

    rf = RandomForestClassifier(n_estimators=75, max_leaf_nodes=None, max_features="sqrt", max_depth=18)

    # Fit the random search classifier to the data
    rf.fit(train_data, train_labels)
    predicted_labels = cross_val_predict(rf, train_data, train_labels, cv=10)

    accuracy = accuracy_score(train_labels, predicted_labels)
    confusion_mat = confusion_matrix(train_labels, predicted_labels)
    classif_report = classification_report(train_labels, predicted_labels)

    print("Classification report:")
    print(classif_report)
    print("Accuracy: %0.2f" % accuracy)
    print("Confusion matrix:")
    print(confusion_mat)

    return rf

In [None]:
import joblib

#After training the model, save with proper naming - For example, for SMOTE balancing, name the model - "SMOTE_model.h5"
best_rf = find_random_forest_hyperparams(train_data, train_labels)
print("Training primary random forest classifier")
best_rf_primary = train_random_forest(train_data, train_labels)
joblib.dump(best_rf_primary, "/content/gdrive/MyDrive/Bakalauras/best_rf_primary.joblib")
print("-----------------------")
print("Training SMOTE random forest classifier")
best_rf_smote = train_random_forest(smote_train_data, smote_labels)
joblib.dump(best_rf_smote, "/content/gdrive/MyDrive/Bakalauras/best_rf_smote.joblib")
print("-----------------------")
print("Training random oversampling random forest classifier")
best_rf_rand_oversampling = train_random_forest(random_oversampling_train_data, random_oversampling_labels)
joblib.dump(best_rf_rand_oversampling, "/content/gdrive/MyDrive/Bakalauras/best_rf_rand_oversampling.joblib")
print("-----------------------")
print("Training random undersampling random forest classifier")
best_rf_rand_undersampling = train_random_forest(random_undersampling_train_data, random_undersampling_labels)
joblib.dump(best_rf_rand_undersampling, "/content/gdrive/MyDrive/Bakalauras/best_rf_rand_undersampling.joblib")
print("Training adasyn random forest classifier")
best_rf_adasyn_undersampling = train_random_forest(adasyn_train_data, adasyn_labels)
joblib.dump(best_rf_adasyn_undersampling, "/content/gdrive/MyDrive/Bakalauras/best_rf_adasyn_undersampling.joblib")
print("Training balanced classifier")
best_rf_balanced = train_random_forest(balanced_train_data, balanced_labels)
joblib.dump(best_rf_balanced, "/content/gdrive/MyDrive/Bakalauras/best_rf_balanced.joblib")

In [None]:
# Modelling
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, precision_score, recall_score, ConfusionMatrixDisplay, classification_report
from sklearn.model_selection import RandomizedSearchCV, train_test_split, cross_val_score, cross_val_predict
from scipy.stats import randint

def test_random_forest(classifier):
  y_pred = classifier.predict(test_data)

  # Create the confusion matrix
  cm = confusion_matrix(test_labels, y_pred)

  ConfusionMatrixDisplay(confusion_matrix=cm).plot();
  accuracy = accuracy_score(test_labels, y_pred)
  confusion_mat = confusion_matrix(test_labels, y_pred)
  classif_report = classification_report(test_labels, y_pred)
  print("Classification report:")
  print(classif_report)
  print("Accuracy: %0.2f" % accuracy)
  print("Confusion matrix:")
  print(confusion_mat)

In [None]:
import joblib

# Loading RF classifiers
best_rf_primary = joblib.load("/content/gdrive/MyDrive/Bakalauras/best_rf_primary.joblib")
best_rf_smote = joblib.load("/content/gdrive/MyDrive/Bakalauras/best_rf_smote.joblib")
best_rf_rand_oversampling = joblib.load("/content/gdrive/MyDrive/Bakalauras/best_rf_rand_oversampling.joblib")
best_rf_rand_undersampling = joblib.load("/content/gdrive/MyDrive/Bakalauras/best_rf_rand_undersampling.joblib")
best_rf_adasyn_undersampling = joblib.load("/content/gdrive/MyDrive/Bakalauras/best_rf_adasyn_undersampling.joblib")
best_rf_balanced = joblib.load("/content/gdrive/MyDrive/Bakalauras/best_rf_balanced.joblib")

In [None]:
# Testing RF classifiers
print("Testing primary RF classifier")
test_random_forest(best_rf_primary)
print("----------")
print("Testing SMOTE RF classifier")
test_random_forest(best_rf_smote)
print("----------")
print("Testing random oversampling RF classifier")
test_random_forest(best_rf_rand_oversampling)
print("----------")
print("Testing random undersampling RF classifier")
test_random_forest(best_rf_rand_undersampling)
print("----------")
print("Testing adasyn undersampling RF classifier")
test_random_forest(best_rf_adasyn_undersampling)
print("----------")
print("Testing balanced RF classifier")
test_random_forest(best_rf_balanced)