In [2]:
import os

import numpy as np
import tensorflow as tf

# Import keras network elements
from keras.models import Sequential, Model
from keras.layers import (
    Multiply,
    Add,
    Rescaling,
    ZeroPadding2D,
    Lambda,
    Conv2D,
    DepthwiseConv2D,
    ReLU,
    MaxPooling2D,
    BatchNormalization,
    GlobalAveragePooling2D,
    Reshape,
    Layer,
    Dense,
    Input,
    multiply
)
from PIL import Image, ImageDraw

# Import Keras preprocessing facilities
from keras.preprocessing import image

# Import MobileNet models
from keras.applications import MobileNetV3Small,MobileNetV3Large
from keras.applications.mobilenet_v3 import preprocess_input, decode_predictions
import tensorflow_model_optimization as tfmot


In [3]:
# Here for testing purposes
def representative_dataset_gen():
    for _ in range(100):
        yield [np.random.rand(1,240,320,1).astype(np.float32)]

def save_model(converter, basename: str):
    # Save the .tfmodel first
    content = converter.convert()
    with open(f"{basename}.tflite","wb") as f:
        f.write(content)
    
    # Convert the model to a C array
    with open(f"{basename}.h", 'w') as f:
        f.write("const unsigned char model_data[] = {" + ",".join([f"0x{byte:02x}" for byte in content]) + "};")

In [4]:
quantized = tfmot.quantization.keras.quantize_annotate_layer

quantized_model = Sequential([
        quantized(Conv2D(4, (5, 5), strides=(3, 3), padding='same', activation=None, input_shape=(240, 320, 1))),
        quantized(ReLU()),
        quantized(Conv2D(12, (4, 4), padding='same', activation=None)),
        quantized(BatchNormalization()),
        quantized(ReLU()),
        quantized(GlobalAveragePooling2D()),

        quantized(Dense(100, activation='relu'))
])

quantized_model.summary()


Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 quantize_annotate (Quantiz  (None, 80, 107, 4)        104       
 eAnnotate)                                                      
                                                                 
 quantize_annotate_1 (Quant  (None, 80, 107, 4)        0         
 izeAnnotate)                                                    
                                                                 
 quantize_annotate_2 (Quant  (None, 80, 107, 12)       780       
 izeAnnotate)                                                    
                                                                 
 quantize_annotate_3 (Quant  (None, 80, 107, 12)       48        
 izeAnnotate)                                                    
                                                                 
 quantize_annotate_4 (Quant  (None, 80, 107, 12)       0

In [41]:
non_quantized_model = Sequential([
        Conv2D(4, (5, 5), strides=(3, 3), padding='same', activation=None, input_shape=(240, 320, 1)),
        ReLU(),
        Conv2D(12, (4, 4), padding='same', activation=None),
        BatchNormalization(),
        ReLU(),
        GlobalAveragePooling2D(),

        Dense(100, activation='relu')
])

In [42]:
image_folder_path = '../trainlib/'

# Load the images as a dataset
# You can specify the image size, batch size, and other parameters as needed
image_size = (240, 320)  # Set the size of your images
batch_size = 32  # Number of images to process at a time

dataset = tf.keras.preprocessing.image_dataset_from_directory(
    image_folder_path,
    shuffle=True,
    batch_size=batch_size,
    image_size=image_size,
    color_mode="grayscale"
)

normalization_layer = tf.keras.layers.Rescaling(1./255)
dataset = dataset.map(lambda x, y: (normalization_layer(x), y))


non_quantized_model.compile(optimizer='adam', loss='binary_crossentropy')
non_quantized_model.fit(dataset, epochs=10)

Found 128 files belonging to 2 classes.


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x191743a6950>

In [43]:
non_quantized_model.save('working.h5')

  saving_api.save_model(


In [39]:
Model_to_convert = non_quantized_model


# Convert the model to TFLite
converter = tf.lite.TFLiteConverter.from_keras_model(Model_to_convert)

# Setup converter optimizations
converter.optimizations = [tf.lite.Optimize.DEFAULT]

# Restrain the available operations and set the desired data types
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8] # All layers must use 8-bit operations
converter.inference_input_type = tf.int8                                    # Input is UINT8 (image data, 0-255)
converter.inference_output_type = tf.float32                                # Output is supposed to be float32

# Build the representative dataset
converter.representative_dataset = representative_dataset_gen

# Convert and save the model
save_model(converter,"working")


INFO:tensorflow:Assets written to: C:\Users\Blake\AppData\Local\Temp\tmptqj8mukr\assets


INFO:tensorflow:Assets written to: C:\Users\Blake\AppData\Local\Temp\tmptqj8mukr\assets


# Testing the model on a picture from our Testing Dataset

We perform prediction using our selected model and obtain the feature vector. This is useful to cross-check the results we get from the Arduino and the distributed learning

In [None]:
# Load and preprocess an image
img_path = '../images-data1/1-image-9.jpg'  # Replace with your image path
img = image.load_img(img_path, color_mode='grayscale')
img_array = image.img_to_array(img)
#img_array = np.repeat(img_array, 3, axis=-1)
img_array = tf.expand_dims(img_array, 0)  # Create a batch
img_array = preprocess_input(img_array)

# Model prediction
predictions = keras_model.predict(img_array)
print("feature vector:", predictions)

feature vector: [[ 9.442699    5.302475    0.          0.          2.0050862   5.2273526
   1.9076254   0.          0.          7.533764    0.         15.076152
   0.          0.36931926  0.          0.          0.         12.06289
   2.907904    0.          2.3004632   0.          1.4628525   0.
   4.422877    0.5961104   9.192916    0.3921283   0.07196429  0.
   3.0060604   9.682591    7.0994124   0.          5.856418    1.3322003
   0.          0.          3.6338456   0.          0.          0.
   4.889263    0.82927424 10.199273    0.          8.563771    0.
   0.          0.          0.          0.          0.          0.
   5.8622146   0.          0.33981398  0.          5.9617314   5.2294292
   0.          3.7462883   0.         10.648001    6.656627    0.
   3.5234249   0.          0.          1.0622663   0.          0.
   0.          0.          0.          6.2285557   4.5131865   0.
   0.          0.          3.5571074   1.9741532   8.416868    0.
   5.6782813   4.478606   15

In [11]:
from sklearn.metrics.pairwise import cosine_similarity
# Flatten the embeddings if they are not already 2D
embedding1_flat = embedding1.reshape(1, -1)
embedding2_flat = embedding2.reshape(1, -1)

# Now, calculate the cosine similarity
similarity = cosine_similarity(embedding1_flat, embedding2_flat)
similarity

array([[0.99992025]], dtype=float32)

# Testing using MobileNet v3

We test the prediction performance using the shrunk version of MobileNet v3

In [None]:
keras_model = MobileNetV3Small(weights='imagenet', include_top=True)





In [10]:
img_path = '../images-data1/2-image-40.jpg'
img = image.load_img(img_path,color_mode='grayscale')  # Replace with your model's expected input size
img_array = image.img_to_array(img)
#img_array = np.repeat(img_array, 3, axis=-1)
img_array = np.expand_dims(img_array, axis=0)
img_array = preprocess_input(img_array)
img_path2 = '../images-data1/2-image-6.jpg'
img2 = image.load_img(img_path2,color_mode='grayscale')  # Replace with your model's expected input size
img_array2 = image.img_to_array(img2)
#img_array2 = np.repeat(img_array2, 3, axis=-1)
img_array2 = np.expand_dims(img_array2, axis=0)
img_array2 = preprocess_input(img_array2)




embedding1 = keras_model.predict(img_array)
embedding2 = keras_model.predict(img_array2)



# Customizing MobileNet v3

We experimented with a custom and truncated version of MobileNet, however due to the intrinsic structure of the model we were unable to do so with satisfactory results while meeting the stringent requirements of the Nano 33 BLE platform.

In [None]:
# Assuming 'expanded_conv_10/Add' is the layer before 'Conv_1'
layer_name = 'expanded_conv_3/expand'
new_base_model_output = keras_model.get_layer(layer_name).output

# Create a new model
new_model = Model(inputs=keras_model.input, outputs=new_base_model_output)

new_output_layer = Dense(units=100, activation='relu')(new_model.output)  # Adjust units and activation as needed

# Create a new model with the added Dense layer
new_model = Model(inputs=new_model.input, outputs=new_output_layer)

new_model.summary()

Model: "model_15"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_9 (InputLayer)        [(None, None, None, 3)]      0         []                            
                                                                                                  
 rescaling_9 (Rescaling)     (None, None, None, 3)        0         ['input_9[0][0]']             
                                                                                                  
 Conv (Conv2D)               (None, None, None, 16)       432       ['rescaling_9[0][0]']         
                                                                                                  
 Conv/BatchNorm (BatchNorma  (None, None, None, 16)       64        ['Conv[0][0]']                
 lization)                                                                                 

                                                                                                  
 expanded_conv/project/Batc  (None, None, None, 16)       64        ['expanded_conv/project[0][0]'
 hNorm (BatchNormalization)                                         ]                             
                                                                                                  
 expanded_conv_1/expand (Co  (None, None, None, 72)       1152      ['expanded_conv/project/BatchN
 nv2D)                                                              orm[0][0]']                   
                                                                                                  
 expanded_conv_1/expand/Bat  (None, None, None, 72)       288       ['expanded_conv_1/expand[0][0]
 chNorm (BatchNormalization                                         ']                            
 )                                                                                                
          

In [15]:
def squeeze_excite_block(input, ratio=4):
    ''' Create a squeeze and excite block '''
    filters = input.shape[-1]
    se_shape = (1, 1, filters)

    se = GlobalAveragePooling2D()(input)
    se = Rescaling(1. / filters)(se)
    se = Dense(filters // ratio, activation='relu')(se)
    se = Dense(filters, activation='sigmoid')(se)

    return Multiply()([input, se])

def conv_block(inputs, filters, kernel_size, strides=1, padding='same'):
    x = Conv2D(filters, kernel_size, strides=strides, padding=padding)(inputs)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    return x

def depthwise_conv_block(inputs, pointwise_conv_filters, depth_multiplier=1, strides=1):
    x = DepthwiseConv2D((3, 3), padding='same', depth_multiplier=depth_multiplier, strides=strides)(inputs)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    x = Conv2D(pointwise_conv_filters, (1, 1), padding='same')(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    return x

# Input layer
inputs = Input(shape=(240, 320, 1))

# First Convolution Block
x = conv_block(inputs, 16, (3, 3))

# Squeeze and Excitation block
x = squeeze_excite_block(x)

# Depthwise Convolution Blocks
x = depthwise_conv_block(x, 16)
x = depthwise_conv_block(x, 24)
x = depthwise_conv_block(x, 24, strides=2)  # Assuming stride for downsampling

# Add Global Average Pooling to flatten the output
x = GlobalAveragePooling2D()(x)

# Final dense layer
output = Dense(100, activation='relu')(x)

# Create the model
model = Model(inputs, output)

# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Model summary
model.summary()


Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 240, 320, 1)]        0         []                            
                                                                                                  
 conv2d_8 (Conv2D)           (None, 240, 320, 16)         160       ['input_1[0][0]']             
                                                                                                  
 batch_normalization_4 (Bat  (None, 240, 320, 16)         64        ['conv2d_8[0][0]']            
 chNormalization)                                                                                 
                                                                                                  
 re_lu_8 (ReLU)              (None, 240, 320, 16)         0         ['batch_normalization_4[0]

# Attempting classification with 3 Classes

We briefly experimented with 3-class classification:

- Empty picture
- Train/Tram
- People

The Arduino did not prove capable enough to run this larger and more complex model, so we abandoned the idea

In [22]:
image_folder_path = '../trainlib/'

# Load the images as a dataset
# You can specify the image size, batch size, and other parameters as needed
image_size = (240, 320)  # Set the size of your images
batch_size = 32  # Number of images to process at a time

dataset = tf.keras.preprocessing.image_dataset_from_directory(
    image_folder_path,
    shuffle=True,
    batch_size=batch_size,
    image_size=image_size
)

normalization_layer = tf.keras.layers.Rescaling(1./255)
dataset = dataset.map(lambda x, y: (normalization_layer(x), y))


model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.fit(dataset, epochs=10)

Found 128 files belonging to 3 classes.


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x19173c15f10>

In [23]:
model.save('my_modelunsup.h5')

  saving_api.save_model(
