<a href="https://colab.research.google.com/github/SolomonGithu/Arduino_Nano_33_BLE_Sense_fire_detection_using_sensor_fusion/blob/main/notebook/fire_detection_sensor_fusion_model_training_and_deployment_with_EI_python_sdk.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Demonstration of sensor fusion by training a model to detect fires using image and temperature data

In [9]:
"""
# You can upload the dataset folder to a Google Drive directory and use it in the notebook
from google.colab import drive
drive.mount('/content/gdrive')
!ls "/content/gdrive/My Drive/Projects/Datasets/fire_detection_sensor_fusion_dataset"
# We can upload the dataset folder to a Google Drive directory
base_directory = '/content/gdrive/My Drive/Projects/Datasets/fire_detection_sensor_fusion_dataset'
"""

'\n# You can upload the dataset folder to a Google Drive directory and use it in the notebook\nfrom google.colab import drive\ndrive.mount(\'/content/gdrive\')\n!ls "/content/gdrive/My Drive/Projects/Datasets/fire_detection_sensor_fusion_dataset"\n# We can upload the dataset folder to a Google Drive directory\nbase_directory = \'/content/gdrive/My Drive/Projects/Datasets/fire_detection_sensor_fusion_dataset\'\n'

In [10]:
# Forcefully delete a folder
# !rm -rf '/content/Arduino_Nano_33_BLE_Sense_fire_detection_using_sensor_fusion'

In [11]:
# Clone the GitHub repository with the dataset
! git clone https://github.com/SolomonGithu/Arduino_Nano_33_BLE_Sense_fire_detection_using_sensor_fusion.git

fatal: destination path 'Arduino_Nano_33_BLE_Sense_fire_detection_using_sensor_fusion' already exists and is not an empty directory.


In [12]:
# List directories
!ls

Arduino_Nano_33_BLE_Sense_fire_detection_using_sensor_fusion  sample_data


In [13]:
# set the directory with the dataset
#base_directory = '/content/Arduino_Nano_33_BLE_Sense_fire_detection_using_sensor_fusion/fire_detection_sensor_fusion_dataset'
base_directory = '/content/Arduino_Nano_33_BLE_Sense_fire_detection_using_sensor_fusion/fire_detection_sensor_fusion_dataset_modified'

In [14]:
!python -m pip install edgeimpulse



In [15]:
import os
import numpy as np
import pandas as pd
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Flatten, Concatenate, Conv2D, MaxPooling2D, Dropout, Lambda
import tensorflow as tf
import edgeimpulse as ei

In [40]:
ei.API_KEY = "ei_.." # Put your Edge Impulse project API key. You can obtain it from the Dashboard page in the Keys section

In [17]:
# Load the CSV files with the temperature values
fire_temperature_data = pd.read_csv(base_directory + '/fire/fire.csv')
safe_environment_temperature_data = pd.read_csv(base_directory + '/safe_environment/safe_environment.csv')

In [18]:
# Add a label column to each dataframe
fire_temperature_data['label'] = 'fire'
safe_environment_temperature_data['label'] = 'safe_environment'

In [19]:
# Define the image width, height and channels
image_width = 32
image_height = 32
image_channels = 1 # 1 for grayscale, 3 for RGB

In [20]:
# Combine the dataframes with fire, "normal" temperatures and the their class labels
data = pd.concat([fire_temperature_data, safe_environment_temperature_data], ignore_index=True)
classes = ["fire", "safe_environment"]

In [21]:
# Encode string labels to numerical values
label_encoder = LabelEncoder()
data['label'] = label_encoder.fit_transform(data['label'])
num_classes = len(label_encoder.classes_)
print("Number of classes = ", num_classes)

Number of classes =  2


In [22]:
# Function to load and preprocess images
def load_and_preprocess_image(filepath):
    print("file loaded : ", filepath)
    if image_channels == 1:
      image = load_img(filepath, color_mode='grayscale', target_size=(image_width, image_height))  # Ensure grayscale
    else:
      image = load_img(filepath, target_size=(image_width, image_height))  # Resize the image
    image = img_to_array(image)
    image = image / 255.0  # Normalize pixel values
    return image

In [23]:
# Function to load images of different classes and from multiple directories
def load_images_from_directories(base_directory, directories, filenames):
    images = []
    for directory in directories:
        dir_path = os.path.join(base_directory, directory)
        for fname in filenames:
            filepath = os.path.join(dir_path, str(fname) + '.jpg')
            if os.path.exists(filepath):  # Check if the file exists
                image = load_and_preprocess_image(filepath)
                images.append(image)
            else:
                print(f"File {filepath} does not exist. Skipping.")
    return np.array(images)

# List of directories to load images from
directories = ['fire', 'safe_environment']

# Ensure filenames are unique and correspond to the right number of samples
filenames = data['filename'].unique()
images = load_images_from_directories(base_directory, directories, filenames)

# Check if the number of images matches the number of rows in the data
if len(images) != len(data):
    raise ValueError(f"Number of images ({len(images)}) does not match the number of rows in the data ({len(data)})")

temperatures = data['temperature'].values.reshape(-1, 1)
labels = to_categorical(data['label'].values, num_classes=num_classes)  # One-hot encode the labels

file loaded :  /content/Arduino_Nano_33_BLE_Sense_fire_detection_using_sensor_fusion/fire_detection_sensor_fusion_dataset_modified/fire/1.jpg
file loaded :  /content/Arduino_Nano_33_BLE_Sense_fire_detection_using_sensor_fusion/fire_detection_sensor_fusion_dataset_modified/fire/2.jpg
file loaded :  /content/Arduino_Nano_33_BLE_Sense_fire_detection_using_sensor_fusion/fire_detection_sensor_fusion_dataset_modified/fire/3.jpg
file loaded :  /content/Arduino_Nano_33_BLE_Sense_fire_detection_using_sensor_fusion/fire_detection_sensor_fusion_dataset_modified/fire/4.jpg
file loaded :  /content/Arduino_Nano_33_BLE_Sense_fire_detection_using_sensor_fusion/fire_detection_sensor_fusion_dataset_modified/fire/5.jpg
file loaded :  /content/Arduino_Nano_33_BLE_Sense_fire_detection_using_sensor_fusion/fire_detection_sensor_fusion_dataset_modified/fire/6.jpg
file loaded :  /content/Arduino_Nano_33_BLE_Sense_fire_detection_using_sensor_fusion/fire_detection_sensor_fusion_dataset_modified/fire/7.jpg
file l

In [24]:
  # Flatten the images and concatenate with temperatures
  flattened_images = images.reshape(images.shape[0], -1)
  print(f"Flattened images shape: {flattened_images.shape}")
  combined_inputs = np.concatenate([flattened_images, temperatures], axis=1)
  print(f"Combined inputs shape: {combined_inputs.shape}")

Flattened images shape: (120, 1024)
Combined inputs shape: (120, 1025)


In [25]:
# Loads a simple model, has 100% training and testing accuracy.
# Though during inference the model classifies all image data it was not trained with as fire (biased on fire) :(
def load_custom_multi_input_CNN_model():
  # Define the combined input layer
  input_layer = Input(shape=(image_width * image_height * image_channels + 1,))

  # Slice the combined input into image and temperature tensors
  image_tensor = Lambda(lambda x: x[:, :-1])(input_layer)
  temp_tensor = Lambda(lambda x: x[:, -1:])(input_layer)

  # Reshape the image tensor to the original image shape
  image_tensor = Lambda(lambda x: tf.reshape(x, [-1, image_width, image_height, image_channels]))(image_tensor)

  # Image input branch
  x = Conv2D(32, (3, 3), activation='relu')(image_tensor)
  x = MaxPooling2D((2, 2))(x)
  x = Conv2D(64, (3, 3), activation='relu')(x)
  x = MaxPooling2D((2, 2))(x)
  x = Conv2D(128, (3, 3), activation='relu')(x)
  x = MaxPooling2D((2, 2))(x)
  x = Flatten()(x)

  # Temperature input branch
  y = Dense(32, activation='relu')(temp_tensor)

  # Combine branches
  combined = Concatenate()([x, y])
  z = Dense(128, activation='relu')(combined)
  z = Dropout(0.5)(z)
  z = Dense(num_classes, activation='softmax')(z)

  # Define the model
  model = Model(inputs=input_layer, outputs=z)

  # Compile the model
  model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

  return model

# Load a MobileNet model and apply tensor slicing
from tensorflow.keras.applications import MobileNet
def load_MobileNet_model():
    # Define the combined input layer
    input_layer = Input(shape=(image_width * image_height * image_channels + 1,))

    # Slice the combined input into image and temperature tensors
    image_tensor = Lambda(lambda x: x[:, :-1])(input_layer)
    temp_tensor = Lambda(lambda x: x[:, -1:])(input_layer)

    # Reshape the image tensor to the original image shape
    image_tensor = Lambda(lambda x: tf.reshape(x, [-1, image_width, image_height, image_channels]))(image_tensor)

    if image_channels == 1:
      # Repeat the grayscale channel three times
      image_tensor = Lambda(lambda x: tf.image.grayscale_to_rgb(x))(image_tensor)

    # Load the MobileNetV1 model
    mobilenet = MobileNet(input_shape=(image_width, image_height, 3), alpha=0.25, include_top=False, pooling='avg')(image_tensor)

    # Temperature input branch
    temp_dense = Dense(32, activation='relu')(temp_tensor)

    # Combine branches
    combined = Concatenate()([mobilenet, temp_dense])
    z = Dense(128, activation='relu')(combined)
    z = Dropout(0.5)(z)
    z = Dense(num_classes, activation='softmax')(z)

    # Define the model
    model = Model(inputs=input_layer, outputs=z)

    # Compile the model
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

    return model


# Load a MobileNet model and apply tensor slicing
from tensorflow.keras.applications import MobileNetV2
def load_MobileNetV2_model():
    # Define the combined input layer
    input_layer = Input(shape=(image_width * image_height * image_channels + 1,))

    # Slice the combined input into image and temperature tensors
    image_tensor = Lambda(lambda x: x[:, :-1])(input_layer)
    temp_tensor = Lambda(lambda x: x[:, -1:])(input_layer)

    # Reshape the image tensor to the original image shape
    image_tensor = Lambda(lambda x: tf.reshape(x, [-1, image_width, image_height, image_channels]))(image_tensor)

    if image_channels == 1:
      # Repeat the grayscale channel three times
      image_tensor = Lambda(lambda x: tf.image.grayscale_to_rgb(x))(image_tensor)

    # Load the MobileNetV2 model
    mobilenet = MobileNetV2(input_shape=(image_width, image_height, 3), alpha=0.35, include_top=False, pooling='avg')(image_tensor)

    # Temperature input branch
    temp_dense = Dense(32, activation='relu')(temp_tensor)

    # Combine branches
    combined = Concatenate()([mobilenet, temp_dense])
    z = Dense(128, activation='relu')(combined)
    z = Dropout(0.5)(z)
    z = Dense(num_classes, activation='softmax')(z)

    # Define the model
    model = Model(inputs=input_layer, outputs=z)

    # Compile the model
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

    return model

In [26]:
model = load_custom_multi_input_CNN_model()
#model = load_MobileNet_model()
#model = load_MobileNetV2_model()
#model.get_weights() # print the model weights
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 1025)]               0         []                            
                                                                                                  
 lambda (Lambda)             (None, 1024)                 0         ['input_1[0][0]']             
                                                                                                  
 lambda_2 (Lambda)           (None, 32, 32, 1)            0         ['lambda[0][0]']              
                                                                                                  
 conv2d (Conv2D)             (None, 30, 30, 32)           320       ['lambda_2[0][0]']            
                                                                                              

In [27]:
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(combined_inputs, labels, test_size=0.2, random_state=42)

In [28]:
# Train the model
training_epochs = 50
batch_size = 32
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=training_epochs, batch_size=batch_size
)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [29]:
# Evaluate the model
loss, accuracy = model.evaluate(X_test, y_test)
print(f'Test accuracy: {accuracy * 100:.2f}%')

Test accuracy: 100.00%


In [30]:
def classify_environment(image_path, temperature, output_file='input_features.txt'):
    print("Testing the model on an image.....")
    print(f'file loaded: {image_path}')

    # Load and preprocess the image
    new_image = load_and_preprocess_image(image_path).flatten()

    # Flatten the temperature array to be a single value array
    new_temperature = np.array([temperature])

    # Concatenate the image and temperature arrays
    combined_input = np.concatenate([new_image, new_temperature], axis=0)

    # Print the input features separated by commas, for debugging!
    #print(','.join(map(str, combined_input)))

    # Save the  input features separated by commas to a .txt file, for debugging!
    with open(output_file, 'w') as file:
        file.write(','.join(map(str, combined_input)))

    # Reshape combined input to match the expected input shape
    combined_input = combined_input.reshape(1, -1)

    # Make prediction
    prediction = model.predict(combined_input)

    # Get confidence scores
    confidence_scores = prediction[0]

    # Get the predicted class index and label
    predicted_class_index = np.argmax(confidence_scores)
    predicted_label = label_encoder.inverse_transform([predicted_class_index])[0]

    return predicted_label, confidence_scores

In [31]:
# Example usage, test fire cases
#image_path = base_directory + '/' + 'fire_test' + '/' + '1.jpg' # Test image
#temperature = 68.0  # Test temperature value. Obtained the same time as the test image

# Example usage, test safe environment cases
image_path = base_directory + '/' + 'safe_environment_test' + '/' + '7.jpg' # Test image
temperature = 27.0  # Test temperature value. Obtained the same time as the test image

prediction, confidence_scores = classify_environment(image_path, temperature, output_file='input_features.txt')
print(confidence_scores)
print('The model classified the environment as ' + prediction + ' with a confidence of ' + str(confidence_scores[np.argmax(confidence_scores)]))

Testing the model on an image.....
file loaded: /content/Arduino_Nano_33_BLE_Sense_fire_detection_using_sensor_fusion/fire_detection_sensor_fusion_dataset_modified/safe_environment_test/7.jpg
file loaded :  /content/Arduino_Nano_33_BLE_Sense_fire_detection_using_sensor_fusion/fire_detection_sensor_fusion_dataset_modified/safe_environment_test/7.jpg
[0.01151043 0.98848957]
The model classified the environment as safe_environment with a confidence of 0.98848957


# Using the Edge Impulse Python SDK

In [32]:
# List the available profile target devices
ei.model.list_profile_devices()

['alif-he',
 'alif-hp',
 'arduino-nano-33-ble',
 'arduino-nicla-vision',
 'arduino-nicla-vision-m4',
 'portenta-h7',
 'brainchip-akd1000',
 'brickml',
 'cortex-m4f-80mhz',
 'cortex-m7-216mhz',
 'espressif-esp32',
 'himax-we-i',
 'infineon-cy8ckit-062s2',
 'infineon-cy8ckit-062-ble',
 'mbp-16-2020',
 'memryx-mx3',
 'microchip-sama7g54',
 'nordic-nrf52840-dk',
 'nordic-nrf5340-dk',
 'nordic-nrf9160-dk',
 'jetson-nano',
 'jetson-orin-nx',
 'jetson-orin-nano',
 'openmv-h7p',
 'particle-boron',
 'particle-p2',
 'raspberry-pi-4',
 'raspberry-pi-rp2040',
 'renesas-ck-ra6m5',
 'renesas-ek-ra8d1',
 'renesas-rzg2l',
 'renesas-rzv2l-cpu',
 'renesas-rzv2l',
 'st-iot-discovery-kit',
 'seeed-sense-cap',
 'wio-terminal',
 'seeed-vision-ai',
 'silabs-xg24',
 'silabs-thunderboard-sense-2',
 'sony-spresense',
 'synaptics-ka10000',
 'ti-am62a',
 'ti-am68a',
 'ti-launchxl',
 'ti-tda4vm',
 'neox']

In [33]:
# List the available profile target devices
ei.model.list_deployment_targets()

['zip',
 'arduino',
 'cubemx',
 'wasm',
 'wasm-browser-simd',
 'wasm-node-simd',
 'tensorrt',
 'ethos-alif-ensemble-e7-hp',
 'ethos-alif-ensemble-e7-he',
 'ethos-alif-ensemble-e7-he-cmsis-pack',
 'ethos-alif-ensemble-e7-hp-cmsis-pack',
 'ethos-himax-wiseeye2',
 'synaptics-tensaiflow-lib',
 'meta-tf',
 'memryx-dfp',
 'tidl-lib-am62a',
 'tidl-lib-am68a',
 'slcc',
 'think-silicon-neox',
 'arduino-nano-33-ble-sense',
 'arduino-nicla-vision',
 'runner-linux-aarch64-advantech-icam540',
 'espressif-esp32',
 'raspberry-pi-rp2040',
 'silabs-xg24',
 'infineon-cy8ckit-062s2',
 'infineon-cy8ckit-062-ble',
 'nordic-thingy53',
 'nordic-thingy53-nrf7002eb',
 'sony-spresense-commonsense',
 'renesas-ck-ra6m5',
 'brickml',
 'brickml-module',
 'alif-ensemble-e7',
 'runner-linux-aarch64',
 'runner-linux-armv7',
 'runner-linux-x86_64',
 'runner-linux-aarch64-akd1000',
 'runner-linux-x86_64-akd1000',
 'runner-mac-x86_64',
 'runner-linux-aarch64-tda4vm',
 'runner-linux-aarch64-am62a',
 'particle',
 'iar',
 '

In [34]:
# Estimate the RAM, ROM, and inference time for our model on the target hardware family
try:
    profile = ei.model.profile(model=model,
                              device='arduino-nano-33-ble')
    print(profile.summary())
except Exception as e:
    print(f"Could not profile: {e}")

Target results for float32:
{
    "device": "arduino-nano-33-ble",
    "tfliteFileSizeBytes": 656412,
    "isSupportedOnMcu": true,
    "memory": {
        "tflite": {
            "ram": 178939,
            "rom": 711544,
            "arenaSize": 178547
        },
        "eon": {
            "ram": 146904,
            "rom": 682584
        }
    },
    "timePerInferenceMs": 6800
}


Performance on device types:
{
    "variant": "float32",
    "lowEndMcu": {
        "description": "Estimate for a Cortex-M0+ or similar, running at 40MHz",
        "timePerInferenceMs": 17995,
        "memory": {
            "tflite": {
                "ram": 178747,
                "rom": 699768
            },
            "eon": {
                "ram": 146760,
                "rom": 679320
            }
        },
        "supported": true
    },
    "highEndMcu": {
        "description": "Estimate for a Cortex-M7 or other high-end MCU/DSP, running at 240MHz",
        "timePerInferenceMs": 246,
        

In [35]:
"""
# Convert the model to a TensorFlow Lite model
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()

# Save the model.
with open('model.tflite', 'wb') as f:
  f.write(tflite_model)

# Estimate the RAM, ROM, and inference time for the TensorFlow Lite model on the target hardware family
try:
    profile = ei.model.profile(model=tflite_model,
                              device='arduino-nano-33-ble')
    print(profile.summary())
except Exception as e:
    print(f"Could not profile: {e}")
"""

'\n# Convert the model to a TensorFlow Lite model\nconverter = tf.lite.TFLiteConverter.from_keras_model(model)\ntflite_model = converter.convert()\n\n# Save the model.\nwith open(\'model.tflite\', \'wb\') as f:\n  f.write(tflite_model)\n\n# Estimate the RAM, ROM, and inference time for the TensorFlow Lite model on the target hardware family\ntry:\n    profile = ei.model.profile(model=tflite_model,\n                              device=\'arduino-nano-33-ble\')\n    print(profile.summary())\nexcept Exception as e:\n    print(f"Could not profile: {e}")\n'

In [36]:
#model.save('saved_model')      # save model in SavedModel format

In [37]:
"""
from google.colab import files
!zip -r /content/saved_model.zip /content/saved_model
#files.download('saved_model.zip')
"""

"\nfrom google.colab import files\n!zip -r /content/saved_model.zip /content/saved_model\n#files.download('saved_model.zip')\n"

In [38]:
# Export the validation set
#np.save('X_test_img.npy', X_test_img)
#np.save('X_test_temp.npy', X_test_temp)
#np.save('y_test.npy', y_test)

In [39]:
# Export the validation set
#np.savez('validation_set.npz', X_test_img=X_test_img, X_test_temp=X_test_temp, y_test=y_test)