# Import libraries

In [None]:
import numpy as np
import joblib
import os
import tensorflow as tf
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from itertools import cycle
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.metrics import mean_absolute_error, accuracy_score, precision_score, recall_score, f1_score, roc_curve
from sklearn.metrics import confusion_matrix, classification_report, auc, precision_recall_curve, average_precision_score
import torch
import keras
import pydot
import graphviz

# Check CUDA GPU

In [None]:

print("Is CUDA enabled GPU Available?", torch.cuda.is_available())
if torch.cuda.is_available():
    print("GPU Number:", torch.cuda.device_count())
    print("Current GPU Index:", torch.cuda.current_device())
    print("GPU Type:", torch.cuda.get_device_name(device=None))
    print("GPU Capability:", torch.cuda.get_device_capability(device=None))
    print("Is GPU Initialized yet?", torch.cuda.is_initialized())

# Define constants

In [None]:
sensor = "Hypso"
comb = "biggerAOI"
dr_type = "original"

X_train_path = f"./data/{comb}/X_train.npy"
X_test_path = f"./data/{comb}/X_test.npy"
y_train_path = f"./data/{comb}/y_train.npy"
y_test_path = f"./data/{comb}/y_test.npy"


dir_name = f"{sensor}/{comb}/{dr_type}_binary"

epochs = 100
gamma = 2
optimizer_choice = "rmsprop"

model_version = f"MobileNet_v3_Small_{epochs}e_focalloss_g{gamma}_{optimizer_choice}"

# Load data

In [None]:
X_train = np.load(X_train_path)
X_test = np.load(X_test_path)
y_train = np.load(y_train_path)
y_test = np.load(y_test_path)
  

print(X_train.shape)
print(X_test.shape)
print(y_train.shape)
print(y_test.shape)

unique_labels, count = np.unique(y_train, return_counts=True)
count_classes = len(unique_labels)

print(unique_labels)
print(count)
print(count_classes)



# Define network

In [None]:
def Conv_1D_block(inputs, model_width, kernel, strides):
    # 1D Convolutional Block with BatchNormalization
    x = tf.keras.layers.Conv1D(model_width, kernel, strides=strides, padding="same", kernel_initializer="he_normal")(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)

    return x


def Conv_1D_block_2(inputs, model_width, kernel, strides, nl):
    # This function defines a 1D convolution operation with BN and activation.
    x = tf.keras.layers.Conv1D(model_width, kernel, strides=strides, padding="same", kernel_initializer="he_normal")(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    if nl == 'HS':
        x = x * tf.keras.activations.relu(x + 3.0, max_value=6.0) / 6.0
    elif nl == 'RE':
        x = tf.keras.activations.relu(x, max_value=6.0)

    return x


def Conv_1D_DW(inputs, model_width, kernel, strides, alpha):
    # 1D Depthwise Separable Convolutional Block with BatchNormalization
    model_width = int(model_width * alpha)
    x = tf.keras.layers.SeparableConv1D(model_width, kernel, strides=strides, depth_multiplier=1, padding='same')(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)
    x = tf.keras.layers.Conv1D(model_width, 1, strides=1, padding="same", kernel_initializer="he_normal")(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)

    return x


def bottleneck_block(inputs, filters, kernel, t, alpha, s, r=False):
    tchannel = tf.keras.backend.int_shape(inputs)[-1] * t
    cchannel = int(filters * alpha)

    x = Conv_1D_block(inputs, tchannel, 1, 1)
    x = tf.keras.layers.SeparableConv1D(filters, kernel, strides=s, depth_multiplier=1, padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)
    x = tf.keras.layers.Conv1D(cchannel, 1, strides=1, padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('linear')(x)

    if r:
        x = tf.keras.layers.concatenate([x, inputs], axis=-1)

    return x


def bottleneck_block_2(inputs, filters, kernel, e, s, squeeze, nl, alpha):
    # This function defines a basic bottleneck structure.

    input_shape = tf.keras.backend.int_shape(inputs)

    tchannel = int(e)
    cchannel = int(alpha * filters)

    r = s == 1 and input_shape[2] == filters

    x = Conv_1D_block_2(inputs, tchannel, 1, 1, nl)

    x = tf.keras.layers.SeparableConv1D(filters, kernel, strides=s, depth_multiplier=1, padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    if nl == 'HS':
        x = x * tf.keras.activations.relu(x + 3.0, max_value=6.0) / 6.0
    if nl == 'RE':
        x = tf.keras.activations.relu(x, max_value=6.0)

    if squeeze:
        x = _squeeze(x)

    x = tf.keras.layers.Conv1D(cchannel, 1, strides=1, padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)

    if r:
        x = tf.keras.layers.Add()([x, inputs])

    return x


def inverted_residual_block(inputs, filters, kernel, t, alpha, strides, n):
    if strides == 1:
        x = bottleneck_block(inputs, filters, kernel, t, alpha, strides, True)
    else:
        x = bottleneck_block(inputs, filters, kernel, t, alpha, strides)

    for i in range(1, n):
        x = bottleneck_block(x, filters, kernel, t, alpha, 1, True)

    return x


def _squeeze(inputs):
    # This function defines a squeeze structure.

    input_channels = int(inputs.shape[-1])

    x = tf.keras.layers.GlobalAveragePooling1D()(inputs)
    x = tf.keras.layers.Dense(input_channels, activation='relu')(x)
    x = tf.keras.layers.Dense(input_channels, activation='hard_sigmoid')(x)
    x = tf.keras.layers.Reshape((1, input_channels))(x)
    x = tf.keras.layers.Multiply()([inputs, x])

    return x


class MobileNet:
    def __init__(self, length, num_channel, num_filters, problem_type='Regression',
                 output_nums=1, pooling='avg', dropout_rate=False, alpha=1.0):
        self.length = length
        self.num_channel = num_channel
        self.num_filters = num_filters
        self.problem_type = problem_type
        self.output_nums = output_nums
        self.pooling = pooling
        self.dropout_rate = dropout_rate
        self.alpha = alpha

    def MLP(self, x):
        if self.pooling == 'avg':
            x = tf.keras.layers.GlobalAveragePooling1D()(x)
        elif self.pooling == 'max':
            x = tf.keras.layers.GlobalMaxPool1D()(x)
        # Final Dense Outputting Layer for the outputs
        x = tf.keras.layers.Flatten()(x)
        if self.dropout_rate:
            x = tf.keras.layers.Dropout(self.dropout_rate)(x)
        outputs = tf.keras.layers.Dense(self.output_nums, activation='linear')(x)
        if self.problem_type == 'Classification':
            outputs = tf.keras.layers.Dense(self.output_nums, activation='softmax')(x)

        return outputs

    def MobileNet_v1(self):
        inputs = tf.keras.Input((self.length, self.num_channel))

        x = Conv_1D_block(inputs, self.num_filters * (2 ** 0), 3, 2)
        x = Conv_1D_DW(x, self.num_filters, 3, 1, self.alpha)
        x = Conv_1D_DW(x, self.num_filters * (2 ** 1), 3, 2, self.alpha)
        x = Conv_1D_DW(x, self.num_filters, 3, 1, self.alpha)
        x = Conv_1D_DW(x, self.num_filters * (2 ** 2), 3, 2, self.alpha)
        x = Conv_1D_DW(x, self.num_filters, 3, 1, self.alpha)
        x = Conv_1D_DW(x, self.num_filters * (2 ** 3), 3, 2, self.alpha)
        for i in range(5):
            x = Conv_1D_DW(x, self.num_filters, 3, 1, self.alpha)
        x = Conv_1D_DW(x, self.num_filters * (2 ** 4), 3, 2, self.alpha)
        x = Conv_1D_DW(x, self.num_filters * (2 ** 5), 3, 2, self.alpha)

        outputs = self.MLP(x)
        model = tf.keras.Model(inputs, outputs)

        return model

    def MobileNet_v2(self):
        inputs = tf.keras.Input((self.length, self.num_channel))
        x = Conv_1D_block(inputs, self.num_filters, 3, 2)

        x = inverted_residual_block(x, 16, 3, t=1, alpha=self.alpha, strides=1, n=1)
        x = inverted_residual_block(x, 24, 3, t=6, alpha=self.alpha, strides=2, n=2)
        x = inverted_residual_block(x, 32, 3, t=6, alpha=self.alpha, strides=2, n=3)
        x = inverted_residual_block(x, 64, 3, t=6, alpha=self.alpha, strides=2, n=4)
        x = inverted_residual_block(x, 96, 3, t=6, alpha=self.alpha, strides=1, n=3)
        x = inverted_residual_block(x, 160, 3, t=6, alpha=self.alpha, strides=2, n=3)
        x = inverted_residual_block(x, 320, 3, t=6, alpha=self.alpha, strides=1, n=1)
        x = Conv_1D_block(x, 1280, 1, 1)

        outputs = self.MLP(x)
        model = tf.keras.Model(inputs, outputs)

        return model

    def MobileNet_v3_Small(self):
        inputs = tf.keras.Input((self.length, self.num_channel))

        x = Conv_1D_block_2(inputs, 16, 3, strides=2, nl='HS')
        x = bottleneck_block_2(x, 16, 3, e=16, s=2, squeeze=True, nl='RE', alpha=self.alpha)
        x = bottleneck_block_2(x, 24, 3, e=72, s=2, squeeze=False, nl='RE', alpha=self.alpha)
        x = bottleneck_block_2(x, 24, 3, e=88, s=1, squeeze=False, nl='RE', alpha=self.alpha)
        x = bottleneck_block_2(x, 40, 5, e=96, s=2, squeeze=True, nl='HS', alpha=self.alpha)
        x = bottleneck_block_2(x, 40, 5, e=240, s=1, squeeze=True, nl='HS', alpha=self.alpha)
        x = bottleneck_block_2(x, 40, 5, e=240, s=1, squeeze=True, nl='HS', alpha=self.alpha)
        x = bottleneck_block_2(x, 48, 5, e=120, s=1, squeeze=True, nl='HS', alpha=self.alpha)
        x = bottleneck_block_2(x, 48, 5, e=144, s=1, squeeze=True, nl='HS', alpha=self.alpha)
        x = bottleneck_block_2(x, 96, 5, e=288, s=2, squeeze=True, nl='HS', alpha=self.alpha)
        x = bottleneck_block_2(x, 96, 5, e=576, s=1, squeeze=True, nl='HS', alpha=self.alpha)
        x = bottleneck_block_2(x, 96, 5, e=576, s=1, squeeze=True, nl='HS', alpha=self.alpha)
        x = Conv_1D_block_2(x, 576, 1, strides=1, nl='HS')
        x = x * tf.keras.activations.relu(x + 3.0, max_value=6.0) / 6.0
        x = tf.keras.layers.Conv1D(1280, 1, padding='same')(x)
        
        outputs = self.MLP(x)
        model = tf.keras.Model(inputs, outputs)

        return model

    def MobileNet_v3_Large(self):
        inputs = tf.keras.Input((self.length, self.num_channel))

        x = Conv_1D_block_2(inputs, 16, 3, strides=2, nl='HS')
        x = bottleneck_block_2(x, 16, 3, e=16, s=1, squeeze=False, nl='RE', alpha=self.alpha)
        x = bottleneck_block_2(x, 24, 3, e=64, s=2, squeeze=False, nl='RE', alpha=self.alpha)
        x = bottleneck_block_2(x, 24, 3, e=72, s=1, squeeze=False, nl='RE', alpha=self.alpha)
        x = bottleneck_block_2(x, 40, 5, e=72, s=2, squeeze=True, nl='RE', alpha=self.alpha)
        x = bottleneck_block_2(x, 40, 5, e=120, s=1, squeeze=True, nl='RE', alpha=self.alpha)
        x = bottleneck_block_2(x, 40, 5, e=120, s=1, squeeze=True, nl='RE', alpha=self.alpha)
        x = bottleneck_block_2(x, 80, 5, e=240, s=2, squeeze=False, nl='HS', alpha=self.alpha)
        x = bottleneck_block_2(x, 80, 3, e=200, s=1, squeeze=False, nl='HS', alpha=self.alpha)
        x = bottleneck_block_2(x, 80, 3, e=184, s=1, squeeze=False, nl='HS', alpha=self.alpha)
        x = bottleneck_block_2(x, 80, 3, e=184, s=1, squeeze=False, nl='HS', alpha=self.alpha)
        x = bottleneck_block_2(x, 112, 3, e=480, s=1, squeeze=True, nl='HS', alpha=self.alpha)
        x = bottleneck_block_2(x, 112, 3, e=672, s=1, squeeze=True, nl='HS', alpha=self.alpha)
        x = bottleneck_block_2(x, 160, 5, e=672, s=2, squeeze=True, nl='HS', alpha=self.alpha)
        x = bottleneck_block_2(x, 160, 5, e=960, s=1, squeeze=True, nl='HS', alpha=self.alpha)
        x = bottleneck_block_2(x, 160, 5, e=960, s=1, squeeze=True, nl='HS', alpha=self.alpha)
        x = Conv_1D_block_2(x, 960, 1, strides=1, nl='HS')
        x = x * tf.keras.activations.relu(x + 3.0, max_value=6.0) / 6.0
        x = tf.keras.layers.Conv1D(1280, 1, padding='same')(x)

        outputs = self.MLP(x)
        model = tf.keras.Model(inputs, outputs)

        return model
    

# Prepare labels

In [None]:
def one_hot_encoding(data):
  L_E = LabelEncoder()
  integer_encoded = L_E.fit_transform(data)  
  onehot_encoder = OneHotEncoder(sparse_output=False)
  integer_encoded = integer_encoded.reshape(len(integer_encoded), 1)
  one_hot_encoded_data = onehot_encoder.fit_transform(integer_encoded)
  return one_hot_encoded_data

In [None]:
y_train_encoded = one_hot_encoding(y_train.ravel())
y_test_encoded = one_hot_encoding(y_test.ravel())


print(y_train_encoded.shape)
print(y_test_encoded.shape)

# Prepare model

In [None]:
"Configurations for the 1D Network in Classification Mode"
length = X_train.shape[1]       # Number of Features (or length of the signal)
model_width = 64                # Number of Filter or Kernels in the Input Layer (Power of 2 to avoid error)
num_channel = 1                 # Number of Input Channels
problem_type = 'Classification' # Regression or Classification
class_number = count_classes # Number of Output Class in Classification Mode (>=2)

In [None]:
Classification_Model = MobileNet(length, num_channel, model_width, problem_type=problem_type, output_nums=class_number).MobileNet_v3_Small()
optimizer = keras.optimizers.Adam() if optimizer_choice == 'adam' else keras.optimizers.RMSprop()
Classification_Model.compile(loss=keras.losses.CategoricalFocalCrossentropy(gamma=gamma), 
                             optimizer=optimizer, 
                             metrics=['mse', 'accuracy'])
# loss=keras.losses.CategoricalFocalCrossentropy(gamma=gamma)
# keras.losses.CategoricalCrossentropy()

In [None]:
# Classification_Model.summary()

# Train model

In [None]:
# Early Stopping and Model_Checkpoints are optional parameters
# Early Stopping is to stop the training based on certain condition set by the user
# Model Checkpoint is to save a model in a directory based on certain conditions so that it can be used later for Transfer Learning or avoiding retraining
callbacks = [tf.keras.callbacks.ModelCheckpoint(f"{dir_name}/{model_version}.keras", verbose=1, monitor='val_loss', save_best_only=True, mode='min')]
history = Classification_Model.fit(X_train, y_train_encoded, epochs=epochs, batch_size=128, verbose=1, validation_split=0.2, shuffle=True, callbacks=callbacks)

# Evaluate model

In [None]:
# Predictions from the Test Set from the Trained Model
best_model = tf.keras.models.load_model(f"{dir_name}/{model_version}.keras")
# model_version = "gamma4_lr0.0001_rmsprop"
#best_model = tf.keras.models.load_model(f"{tuner_dir}/gridsearch_model_trial_13.keras")
#tf.keras.models.save_model(best_model, f"{dir_name}/{model_version}.keras")

Predictions = Classification_Model.predict(X_test, verbose=1)
#Predictions = best_model.predict(X_test, verbose=1)

print(Predictions.shape)

In [None]:
# Error of the prediction, one of many evaluation metrics
# Using Mean Absolute Error (MAE) in this case as a sample
Error = mean_absolute_error(y_test_encoded, Predictions)
print(f"MAE: {Error}")

In [None]:
def history_plot(history):
  # list all dictionaries in history
  print(history.history.keys())
  # summarize history for error
  plt.figure(figsize=(12,10))
  plt.subplot(2,1,1)
  plt.plot(history.history['mse'])
  plt.plot(history.history['val_mse'])
  plt.title('Model Error Performance')
  plt.ylabel('Error')
  plt.xlabel('Epoch')
  plt.legend(['Train', 'Val'], loc='upper right')
  plt.show()
  # summarize history for loss
  plt.figure(figsize=(12,10))
  plt.subplot(2,1,2)
  plt.plot(history.history['loss'])
  plt.plot(history.history['val_loss'])
  plt.title('Model Loss')
  plt.ylabel('Loss')
  plt.xlabel('Epoch')
  plt.legend(['Train', 'Val'], loc='upper right')
  plt.show()
#
history_plot(history)

In [None]:
# Create directory if it does not exist

if not os.path.exists(dir_name):
    os.makedirs(f"{dir_name}", exist_ok=True)

In [None]:
from sklearn.metrics import accuracy_score, f1_score, classification_report, confusion_matrix
import seaborn as sns
import json
import matplotlib.pyplot as plt

# y_pred = np.argmax(Predictions, axis=1) + 1
y_pred = np.argmax(Predictions, axis=1)


# Calculate accuracy
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy}")

# Calculate F1-score
f1 = f1_score(y_test, y_pred, average='weighted')
print(f"F1-score: {f1}")

report_dict = classification_report(y_test, y_pred, output_dict=True)

# Load existing classification reports if the file exists
classification_reports_path = f"{dir_name}/classification_reports.json"
if os.path.exists(classification_reports_path):
    with open(classification_reports_path, "r") as json_file:
        classification_reports = json.load(json_file)
else:
    classification_reports = []

# Add the current model's classification report
classification_reports.append([model_version, report_dict])

# Sort the classification reports by macro F1-score
sorted_reports = sorted(
    classification_reports,
    key=lambda x: x[1]["macro avg"]["f1-score"],
    reverse=True
)

# Save the updated classification reports
with open(classification_reports_path, "w") as json_file:
    json.dump(sorted_reports, json_file, indent=4)

""" class_dict = {
    0: "Spruce",
    1: "Pine",
    3: "Deciduous"
}"""

class_dict = {
    0: "Conifer",
    1: "Deciduous"
}

# Confusion matrix
cm = confusion_matrix(y_test, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_dict.values(), yticklabels=class_dict.values())
plt.title("Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.savefig(f"{dir_name}/{model_version}_confusion_matrix.png")
plt.show()

In [None]:
# Print the scoreboard
print("Scoreboard (sorted by Macro F1-Score):")
print("{:<20} {:<10}".format("Model Version", "Macro F1-Score"))
print("-" * 30)
for model_version, report in sorted_reports:
    print("{:<20} {:.4f}".format(model_version, report["macro avg"]["f1-score"]))


# Gridsearch

In [None]:
"""import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import keras_tuner as kt
import os
import shutil  # For resetting KerasTuner directory

# Clear previous KerasTuner trials (fixes corrupted search spaces)
tuner_dir = f"{dir_name}/kerastuner_results"
i = 1
while os.path.exists(tuner_dir):
    tuner_dir = f"{dir_name}/kerastuner_results_{i}"
    i += 1

# Function to build the model
def build_model(hp):
    model = MobileNet(length, num_channel, model_width, problem_type=problem_type, output_nums=class_number).MobileNet_v3_Small()
    
    # Ensure gamma is valid for focal loss
    gamma = hp.Choice('gamma', [0, 2, 4])
    learning_rate = hp.Choice('learning_rate', [1e-4, 1e-3, 1e-2])
    optimizer_choice = hp.Choice('optimizer', ['adam', 'rmsprop'])

    # Define optimizer
    optimizer = keras.optimizers.Adam(learning_rate=learning_rate) if optimizer_choice == 'adam' else keras.optimizers.RMSprop(learning_rate=learning_rate)

    
    # Compile with correct gamma
    model.compile(
        loss=keras.losses.CategoricalFocalCrossentropy(gamma=gamma), 
        optimizer=optimizer, 
        metrics=['mse', 'accuracy']
    )
    return model

# Define tuner (change to RandomSearch if needed)
tuner = kt.GridSearch(
    build_model,
    objective='val_loss',
    max_trials=20,  # Number of combinations to try
    executions_per_trial=1,  # Can increase to average results
    directory=tuner_dir,  # Use a clean directory
    project_name="gridsearch_tuning"
)

# Run the search
tuner.search(
    X_train, y_train_encoded,
    epochs=20,  
    validation_split=0.2,
    callbacks=[tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, mode='min')]
)


"""

In [None]:
"""# Get the best hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print(f"Best Hyperparameters: {best_hps.values}")

# Retrieve the best model
best_model = tuner.get_best_models(num_models=1)[0]

# Save the best model
best_model.save(f"{tuner_dir}/best_gridsearch_model.keras")

# Save all models from the search
for i, trial in enumerate(tuner.oracle.trials.values()):
    model_path = f"{tuner_dir}/gridsearch_model_trial_{i}.keras"
    trial_model = tuner.get_best_models(num_models=1)[0]  # Get the trial model
    trial_model.save(model_path)
    print(f"Saved: {model_path}")"""

In [None]:
"""# Print all trial results for analysis
print("\n===== Hyperparameter Trials and Validation Loss =====\n")

# Sort trials by validation loss (best to worst)
sorted_trials = sorted(tuner.oracle.trials.values(), key=lambda x: x.metrics.get_best_value('val_loss') if x.metrics.get_best_value('val_loss') is not None else float('inf'))

for i, trial in enumerate(sorted_trials):
    trial_id = trial.trial_id
    trial_hparams = trial.hyperparameters.values
    val_loss = trial.metrics.get_best_value('val_loss')  # Get best val_loss for this trial
    val_mse = trial.metrics.get_best_value('val_mse')
    val_accuracy = trial.metrics.get_best_value('val_accuracy')

    print(f"Rank {i+1}:")
    print(f"  Trial ID: {trial_id}")
    print(f"  Hyperparameters: {trial_hparams}")
    print(f"  Best Validation Loss: {val_loss:.6f}\n")
    print(f"  Best Validation MSE: {val_mse:.6f}\n")
    print(f"  Best Validation Accuracy: {val_accuracy:.6f}\n")


print("=====================================================")"""
