#ColabNAS code

In [None]:
import tensorflow as tf
import numpy as np
import subprocess
import datetime
import glob
import re
import os

class ColabNAS :
    architecture_name = 'resulting_architecture'
    def __init__(self, max_RAM, max_Flash, max_MACC, path_to_training_set, val_split, cache=False, input_shape=(50,50,3), save_path='./') :
        self.learning_rate = 1e-3
        self.batch_size = 128
        self.epochs = 100 #minimum 2

        self.max_MACC = max_MACC
        self.max_Flash = max_Flash
        self.max_RAM = max_RAM
        self.path_to_training_set = path_to_training_set
        self.num_classes = len(next(os.walk(path_to_training_set))[1])
        self.val_split = val_split
        self.cache = cache
        self.input_shape = input_shape
        self.save_path = save_path

        self.path_to_trained_models = f"{self.save_path}/trained_models"
        os.makedirs(self.path_to_trained_models)

        self.load_training_set()

    # k number of kernels of the first convolutional layer
    # c number of cells added upon the first convolutional layer
    # pre-processing pipeline not included in MACC computation
    def Model(self, k, c) :
        kernel_size = (3,3)
        pool_size = (2,2)
        pool_strides = (2,2)

        number_of_cells_limited = False
        number_of_mac = 0

        inputs = tf.keras.Input(shape=self.input_shape)

        #preprocessing pipeline
        x = tf.keras.layers.RandomFlip('horizontal')(inputs)
        x = tf.keras.layers.RandomRotation(0.2, fill_mode='constant', interpolation='bilinear')(x)
        x = tf.keras.layers.Rescaling(1./255)(x)
        x = tf.keras.layers.BatchNormalization()(x)

        #convolutional base
        n = k
        multiplier = 2

        #first convolutional layer
        c_in = self.input_shape[2]
        x = tf.keras.layers.Conv2D(n, kernel_size, activation='relu', padding='same')(x)
        number_of_mac = number_of_mac + (c_in * kernel_size[0] * kernel_size[1] * x.shape[1] * x.shape[2] * x.shape[3])

        #adding cells
        for i in range(1, c + 1) :
            if x.shape[1] <= 1 or x.shape[2] <= 1 :
                number_of_cells_limited = True
                break;
            n = np.ceil(n * multiplier)
            multiplier = multiplier - 2**-i
            x = tf.keras.layers.MaxPooling2D(pool_size=pool_size, strides=pool_strides, padding='valid')(x)
            c_in = x.shape[3]
            x = tf.keras.layers.Conv2D(n, kernel_size, activation='relu', padding='same')(x)
            number_of_mac = number_of_mac + (c_in * kernel_size[0] * kernel_size[1] * x.shape[1] * x.shape[2] * x.shape[3])

        #classifier
        x = tf.keras.layers.GlobalAveragePooling2D()(x)
        input_shape = x.shape[1]
        x = tf.keras.layers.Dense(n, activation='relu')(x)
        number_of_mac = number_of_mac + (input_shape * x.shape[1])
        outputs = tf.keras.layers.Dense(self.num_classes, activation='softmax')(x)
        number_of_mac = number_of_mac + (x.shape[1] * outputs.shape[1])

        model = tf.keras.Model(inputs=inputs, outputs=outputs)

        opt = tf.keras.optimizers.Adam(learning_rate=self.learning_rate)
        model.compile(optimizer=opt,
                loss='categorical_crossentropy',
                metrics=['accuracy'])

        model.summary()

        return model, number_of_mac, number_of_cells_limited

    def load_training_set(self):
        if 3 == self.input_shape[2] :
            color_mode = 'rgb'
        elif 1 == self.input_shape[2] :
            color_mode = 'grayscale'

        train_ds = tf.keras.utils.image_dataset_from_directory(
            directory= self.path_to_training_set,
            labels='inferred',
            label_mode='categorical',
            color_mode=color_mode,
            batch_size=self.batch_size,
            image_size=self.input_shape[0:2],
            shuffle=True,
            seed=11,
            validation_split=self.val_split,
            subset='training'
        )

        validation_ds = tf.keras.utils.image_dataset_from_directory(
            directory= self.path_to_training_set,
            labels='inferred',
            label_mode='categorical',
            color_mode=color_mode,
            batch_size=self.batch_size,
            image_size=self.input_shape[0:2],
            shuffle=True,
            seed=11,
            validation_split=self.val_split,
            subset='validation'
        )

        if self.cache :
            self.train_ds = train_ds.cache().prefetch(buffer_size=tf.data.AUTOTUNE)
            self.validation_ds = validation_ds.cache().prefetch(buffer_size=tf.data.AUTOTUNE)
        else :
            self.train_ds = train_ds.prefetch(buffer_size=tf.data.AUTOTUNE)
            self.validation_ds = validation_ds.prefetch(buffer_size=tf.data.AUTOTUNE)

    def quantize_model_uint8(self) :
        def representative_dataset():
            for data in self.train_ds.rebatch(1).take(150) :
                yield [tf.dtypes.cast(data[0], tf.float32)]

        model = tf.keras.models.load_model(f"{self.path_to_trained_models}/{self.model_name}.h5")
        converter = tf.lite.TFLiteConverter.from_keras_model(model)
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        converter.representative_dataset = representative_dataset
        converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
        converter.inference_input_type = tf.uint8
        converter.inference_output_type = tf.uint8
        tflite_quant_model = converter.convert()

        with open(f"{self.path_to_trained_models}/{self.model_name}.tflite", 'wb') as f:
            f.write(tflite_quant_model)

        os.remove(f"{self.path_to_trained_models}/{self.model_name}.h5")

    def evaluate_flash_and_peak_RAM_occupancy(self) :
        #quantize model to evaluate its peak RAM occupancy and its Flash occupancy
        self.quantize_model_uint8()

        #evaluate its peak RAM occupancy and its Flash occupancy using STMicroelectronics' X-CUBE-AI
        proc = subprocess.Popen(["./stm32tflm", f"{self.path_to_trained_models}/{self.model_name}.tflite"], stdout=subprocess.PIPE)
        try:
            outs, errs = proc.communicate(timeout=15)
            Flash, RAM = re.findall(r'\d+', str(outs))
        except subprocess.TimeoutExpired:
            proc.kill()
            outs, errs = proc.communicate()
            print("stm32tflm error")
            exit()

        return int(Flash), int(RAM)

    def evaluate_model_process(self, k, c) :
        if k > 0 :
            self.model_name = f"k_{k}_c_{c}"
            print(f"\n{self.model_name}\n")
            checkpoint = tf.keras.callbacks.ModelCheckpoint(
                f"{self.path_to_trained_models}/{self.model_name}.h5", monitor='val_accuracy',
                verbose=1, save_best_only=True, save_weights_only=False, mode='auto')
            model, MACC, number_of_cells_limited = self.Model(k, c)
            #One epoch of training must be done before quantization, which is needed to evaluate RAM and Flash occupancy
            model.fit(self.train_ds, epochs=1, validation_data=self.validation_ds, validation_freq=1)
            model.save(f"{self.path_to_trained_models}/{self.model_name}.h5")
            Flash, RAM = self.evaluate_flash_and_peak_RAM_occupancy()
            print(f"\nRAM: {RAM},\t Flash: {Flash},\t MACC: {MACC}\n")
            if MACC <= self.max_MACC and Flash <= self.max_Flash and RAM <= self.max_RAM and not number_of_cells_limited :
                hist = model.fit(self.train_ds, epochs=self.epochs - 1, validation_data=self.validation_ds, validation_freq=1, callbacks=[checkpoint])
                self.quantize_model_uint8()
            return {'k': k,
                    'c': c if not number_of_cells_limited else "Not feasible",
                    'RAM': RAM if RAM <= self.max_RAM else "Outside the upper bound",
                    'Flash': Flash if Flash <= self.max_Flash else "Outside the upper bound",
                    'MACC': MACC if MACC <= self.max_MACC else "Outside the upper bound",
                    'max_val_acc':
                    np.around(np.amax(hist.history['val_accuracy']), decimals=3)
                    if 'hist' in locals() else -3}
        else :
            return{'k': 'unfeasible', 'c': c, 'max_val_acc': -3}

    def explore_num_cells(self, k) :
        previous_architecture = {'k': -1, 'c': -1, 'max_val_acc': -2}
        current_architecture = {'k': -1, 'c': -1, 'max_val_acc': -1}
        c = -1
        k = int(k)

        while(current_architecture['max_val_acc'] > previous_architecture['max_val_acc']) :
            previous_architecture = current_architecture
            c = c + 1
            self.model_counter = self.model_counter + 1
            current_architecture = self.evaluate_model_process(k, c)
            print(f"\n\n\n{current_architecture}\n\n\n")
        return previous_architecture

    def search(self) :
        self.model_counter = 0
        epsilon = 0.005
        k0 = 4

        start = datetime.datetime.now()

        k = k0
        previous_architecture = self.explore_num_cells(k)
        k = 2 * k
        current_architecture = self.explore_num_cells(k)

        if (current_architecture['max_val_acc'] > previous_architecture['max_val_acc']) :
            previous_architecture = current_architecture
            k = 2 * k
            current_architecture = self.explore_num_cells(k)
            while(current_architecture['max_val_acc'] > previous_architecture['max_val_acc'] + epsilon) :
                previous_architecture = current_architecture
                k = 2 * k
                current_architecture = self.explore_num_cells(k)
        else :
            k = k0 / 2
            current_architecture = self.explore_num_cells(k)
            while(current_architecture['max_val_acc'] >= previous_architecture['max_val_acc']) :
                previous_architecture = current_architecture
                k = k / 2
                current_architecture = self.explore_num_cells(k)

        resulting_architecture = previous_architecture

        end = datetime.datetime.now()

        if (resulting_architecture['max_val_acc'] > 0) :
            resulting_architecture_name = f"k_{resulting_architecture['k']}_c_{resulting_architecture['c']}.tflite"
            self.path_to_resulting_architecture = f"{self.save_path}/resulting_architecture_{resulting_architecture_name}"
            os.rename(f"{self.path_to_trained_models}/{resulting_architecture_name}", self.path_to_resulting_architecture)
            os.system(f"rm -rf {self.path_to_trained_models}")
            print(f"\nResulting architecture: {resulting_architecture}\n")
        else :
            print(f"\nNo feasible architecture found\n")
        print(f"Elapsed time (search): {end-start}\n")

        return self.path_to_resulting_architecture

a useful function for testing tflite models

In [None]:
def test_tflite_model(path_to_resulting_architecture, test_ds) :
    interpreter = tf.lite.Interpreter(path_to_resulting_architecture)
    interpreter.allocate_tensors()

    output = interpreter.get_output_details()[0]  # Model has single output.
    input = interpreter.get_input_details()[0]  # Model has single input.

    correct = 0
    wrong = 0

    for image, label in test_ds :
        # Check if the input type is quantized, then rescale input data to uint8
        if input['dtype'] == tf.uint8:
            input_scale, input_zero_point = input["quantization"]
            image = image / input_scale + input_zero_point
        input_data = tf.dtypes.cast(image, tf.uint8)
        interpreter.set_tensor(input['index'], input_data)
        interpreter.invoke()
        if label.numpy().argmax() == interpreter.get_tensor(output['index']).argmax() :
            correct = correct + 1
        else :
            wrong = wrong + 1
    print(f"\nTflite model test accuracy: {correct/(correct+wrong)}")


Upload the stm32tflm script in the files folder of Google Colaboratory's VM

Enable its execution

In [None]:
!chmod +x stm32tflm

#Example of usage

In [None]:
import numpy as np
import tensorflow as tf

input_shape = (50,50,3)

#target: STM32L412KBU3
#273 CoreMark, 40 kiB RAM, 128 kiB Flash
peak_RAM_upper_bound = 40960
Flash_upper_bound = 131072
MACC_upper_bound = 2730000 #CoreMark * 1e4

#Each dataset must comply with the following structure
#main_directory/
#...class_a/
#......a_image_1.jpg
#......a_image_2.jpg
#...class_b/
#......b_image_1.jpg
#......b_image_2.jpg
path_to_training_set = './path/to/training/set'
val_split = 0.3

#whether or not to cache datasets in memory
#if the dataset cannot fit in the main memory, the application will crash
cache = True

#where to save results
save_path = './path/to/resulting/architecture'

#to show the GPU used
!nvidia-smi

colabNAS = ColabNAS(peak_RAM_upper_bound, Flash_upper_bound, MACC_upper_bound, path_to_training_set, val_split, cache, input_shape, save_path=save_path)

#search
path_to_tflite_model = colabNAS.search()

#test
path_to_test_set = './path/to/test/set'

test_ds = tf.keras.utils.image_dataset_from_directory(
    directory= path_to_test_set,
    labels='inferred',
    label_mode='categorical',
    color_mode='rgb',
    batch_size=1,
    image_size=input_shape[0:2],
    shuffle=True
)

test_tflite_model(path_to_tflite_model, test_ds)