In [None]:
!pip install tensorflow_model_optimization==0.7.5

In [None]:
# download the dataset

import os

import tensorflow as tf

dataset_path = os.path.join(os.getcwd(), 'dataset')

# _ = tf.keras.utils.get_file(
#     origin='http://cnrpark.it/dataset/CNRPark+EXT.csv',
#     cache_subdir=dataset_path,
# )

# _ = tf.keras.utils.get_file(
#     origin='http://cnrpark.it/dataset/CNRPark-Patches-150x150.zip',
#     cache_subdir=os.path.join(dataset_path, 'CNRPark-Patches-150x150'),
#     extract=True,
# )

_ = tf.keras.utils.get_file(
    origin='https://github.com/fabiocarrara/deep-parking/releases/download/archive/CNR-EXT-Patches-150x150.zip', #'http://cnrpark.it/dataset/CNR-EXT-Patches-150x150.zip',
    cache_subdir=os.path.join(dataset_path, 'CNR-EXT-Patches-150x150'),
    extract=True,
)

In [None]:
# create variables for dataset paths

cnr_ext_path = os.path.join(dataset_path, 'CNR-EXT-Patches-150x150')

labels_path = os.path.join(cnr_ext_path, 'LABELS')
patches_path = os.path.join(cnr_ext_path, 'PATCHES')

train_txt_path = os.path.join(labels_path, 'train.txt')
val_txt_path = os.path.join(labels_path, 'val.txt')
test_txt_path = os.path.join(labels_path, 'test.txt')

In [None]:
# create function to load label data from the label .txt files

import pandas as pd

def load_labels(txt_path, split):
  df = pd.read_csv(
      txt_path,
      sep=' ',
      names=['path', 'label']
  )

  df_absolute_paths = df['path'].apply(lambda path: os.path.join(patches_path, path))

  df = df.assign(path=df_absolute_paths)
  df = df.assign(split=split)

  return df

In [None]:
# load the label files and concatenate into a single dataframe

train_df = load_labels(train_txt_path, 'train')
val_df = load_labels(val_txt_path, 'val')
test_df = load_labels(test_txt_path, 'test')

df = pd.concat([train_df, val_df, test_df])

df.head()

In [None]:
# copy the images to the

import shutil

def move_sample(row):
  dest_folder = os.path.join(dataset_path, row['split'], str(row['label']))
  os.makedirs(dest_folder, exist_ok=True)

  shutil.copy(row['path'], dest_folder)

_ = df.apply(move_sample, axis='columns')

In [None]:
def load_dataset(split):
  return tf.keras.utils.image_dataset_from_directory(
    os.path.join(dataset_path, split),
    color_mode='grayscale',
    label_mode='categorical',
    batch_size=32,
    image_size=(120, 120),
    shuffle=True,
    seed=42
)

train_ds = load_dataset('train')
val_ds = load_dataset('val')
test_ds = load_dataset('test')

In [None]:
ALPHA = 0.25
DROPOUT = 0.10

mobilenet_025_96 = tf.keras.applications.mobilenet.MobileNet(
    input_shape=(96, 96, 1),
    alpha=ALPHA,
    dropout=DROPOUT,
    weights=None,
    pooling='avg',
    classes=2,
)

# mobilenet_025_96.summary()


In [None]:
def modify_mobilenet_for_rp2040(mobilenet_model):
  input_type_spec = mobilenet_model.layers[0].input.type_spec

  input = tf.keras.Input(shape=(input_type_spec.shape[1:]))
  output = input

  for layer in mobilenet_model.layers[1:]:
    if (isinstance(layer, tf.keras.layers.Reshape)):
      print("replacing Reshape", layer.name)
      output = tf.keras.layers.Flatten()(output)
    else:
      output = layer(output)

  return tf.keras.Model(input, output)


mobilenet_025_96 = modify_mobilenet_for_rp2040(mobilenet_025_96)

In [None]:
data_augmentation = tf.keras.Sequential([
  tf.keras.layers.RandomFlip("horizontal", seed=42),
  tf.keras.layers.RandomRotation(0.2, seed=42),
  tf.keras.layers.RandomZoom(0.2, seed=42),
  tf.keras.layers.RandomContrast(0.3, seed=42),
  tf.keras.layers.RandomBrightness(0.3, seed=42),
  tf.keras.layers.GaussianNoise(0.01, seed=42)
])

In [None]:
model = tf.keras.Sequential([
  data_augmentation,
  tf.keras.layers.Resizing(96, 96, crop_to_aspect_ratio=True),
  mobilenet_025_96
])

In [None]:
# LEARNING_RATE = 0.01
# EPOCHS = 20

callbacks = [
  # tf.keras.callbacks.LearningRateScheduler(
  #     schedule=lambda epoch, lr: lr * tf.math.exp(-0.1)
  # ),
  # tf.keras.callbacks.ModelCheckpoint(
  #   filepath='/tmp/checkpoint',
  #   monitor='val_loss',
  #   verbose=1,
  #   save_best_only=True,
  #   mode='min',
  #   save_weights_only=True,
  # )

  tf.keras.callbacks.EarlyStopping(
      monitor='val_loss',
      patience=3,
      verbose=0,
      restore_best_weights=True,
  )
]

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss=tf.keras.losses.CategoricalCrossentropy(),
    metrics=['accuracy']
)

In [None]:
train_ds_cached_shuffled = train_ds.cache().shuffle(1024, seed=42).prefetch(tf.data.AUTOTUNE)
val_ds_cached = val_ds.cache().prefetch(tf.data.AUTOTUNE)

history = model.fit(
    train_ds_cached_shuffled,
    validation_data=val_ds_cached,
    epochs=10,
    batch_size=32,
    callbacks=callbacks
)

In [None]:
model.evaluate(test_ds)

In [None]:
model.save('model')

In [None]:
mobilenet_025_96.save('mobilenet_025_96_model')

In [None]:
import numpy as np

def image_resize_96_96(x, y):
  x = tf.image.resize(x, (96, 96))
  x = tf.expand_dims(x, axis=0)

  return x, y

val_ds_96_96 = val_ds.unbatch().map(image_resize_96_96)

def representative_dataset():
  for image, label in val_ds_96_96.take(1000):
    yield [image]

converter = tf.lite.TFLiteConverter.from_keras_model(
  tf.keras.Model(
        inputs=[
            mobilenet_025_96.layers[0].input
        ],
        outputs=tf.concat([
            mobilenet_025_96.outputs[0],
            mobilenet_025_96.layers[-2].output,
        ], axis=1)
    )
)
converter.optimizations = [ tf.lite.Optimize.DEFAULT ]
converter.representative_dataset = representative_dataset

converter.target_spec.supported_ops = [ tf.lite.OpsSet.TFLITE_BUILTINS_INT8 ]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.float32

tflite_quant_model = converter.convert()

In [None]:
with open('model.tflite', 'wb') as output:
  print(len(tflite_quant_model))
  output.write(tflite_quant_model);


In [None]:
interpreter = tf.lite.Interpreter('model.tflite')
interpreter.allocate_tensors()

print(interpreter.get_input_details())
print(interpreter.get_output_details())

for x, y in train_ds.unbatch().take(10):
  x = tf.image.resize(x, (96, 96))
  x = tf.expand_dims(x, axis=0)
  x -= 128
  x = tf.cast(x, tf.int8)

  print(x.shape, x.dtype, y)

  # input = tf.fill((1, 96, 96, 1), 255) #zeros((1, 96, 96, 1), dtype=tf.int8)
  # input -= 128
  # input = tf.cast(input, dtype=tf.int8)
  # x = input

  interpreter.set_tensor(0, x)
  interpreter.invoke()

  print((interpreter.get_tensor(interpreter.get_output_details()[0]['index'])))


In [None]:
!apt-get install -y xxd

In [None]:
%%shell
echo "alignas(8) const unsigned char tflite_model[] = {" > tflite_model.h
cat model.tflite | xxd -i                                >> tflite_model.h
echo "};"                                                >> tflite_model.h

In [None]:
#### ...

In [None]:
# import tensorflow_model_optimization as tfmot

In [None]:
# # pruning_schedule = tfmot.sparsity.keras.ConstantSparsity(target_sparsity=0.75, begin_step=0)

# # # pruning_schedule = tfmot.sparsity.keras.PolynomialDecay(initial_sparsity=0.25, final_sparsity=0.75, begin_step=0, end_step=2953)

# end_step = len(train_ds_cached_shuffled) * 5

# pruning_params = {
#       'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(initial_sparsity=0.25,
#                                                                final_sparsity=0.75,
#                                                                begin_step=0,
#                                                                end_step=end_step),
#       # 'pruning_policy': tfmot.sparsity.keras.PruneForLatencyOnXNNPack()
# }


# pruning_ready_mobilenet_025_96 = tfmot.sparsity.keras.prune_low_magnitude(mobilenet_025_96, **pruning_params)

# pruning_ready_mobilenet_025_96.summary()

In [None]:
# pruned_model = tf.keras.Sequential([
#   data_augmentation,
#   tf.keras.layers.Resizing(96, 96, crop_to_aspect_ratio=True),
#   pruning_ready_mobilenet_025_96
# ])

In [None]:
# !rm -rf /content/pruning

In [None]:
# pruned_model.compile(
#     optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
#     loss=tf.keras.losses.CategoricalCrossentropy(),
#     metrics=['accuracy']
# )

In [None]:
# history = pruned_model.fit(
#     train_ds_cached_shuffled,
#     validation_data=val_ds_cached,
#     epochs=5,
#     batch_size=32,
#     callbacks=[
#         tfmot.sparsity.keras.UpdatePruningStep(),
#         tfmot.sparsity.keras.PruningSummaries(log_dir='/content/pruning'),
#     ]
# )


In [None]:
#

In [None]:
# pruned_model.evaluate(test_ds)

In [None]:
# %load_ext tensorboard

In [None]:
# #docs_infra: no_execute
# %tensorboard --logdir=/content/pruning

In [None]:
# model_for_export = tfmot.sparsity.keras.strip_pruning(pruning_ready_mobilenet_025_96)

# converter = tf.lite.TFLiteConverter.from_keras_model(model_for_export)
# converter.optimizations = [ tf.lite.Optimize.DEFAULT ]
# converter.representative_dataset = representative_dataset

# converter.target_spec.supported_ops = [ tf.lite.OpsSet.TFLITE_BUILTINS_INT8 ]
# converter.inference_input_type = tf.int8
# converter.inference_output_type = tf.int8

# tflite_pruned_quant_model = converter.convert()

In [None]:
# pruning_ready_mobilenet_025_96.layers

In [None]:
# # with open('pruned_model.tflite', 'wb') as output:
#   print(len(tflite_quant_model))
#   output.write(tflite_quant_model);

In [None]:
#### ...

In [None]:
# import tensorflow_model_optimization as tfmot

# quant_aware_mobilenet_025_96 = tfmot.quantization.keras.quantize_model(mobilenet_025_96)

In [None]:
# quant_aware_model = tf.keras.Sequential([
#   data_augmentation,
#   tf.keras.layers.Resizing(96, 96, crop_to_aspect_ratio=True),
#   quant_aware_mobilenet_025_96
# ])

In [None]:
# quant_aware_model.compile(
#     optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
#     loss=tf.keras.losses.CategoricalCrossentropy(),
#     metrics=['accuracy']
# )

In [None]:
# history = quant_aware_model.fit(
#     train_ds_cached_shuffled,
#     validation_data=val_ds_cached,
#     epochs=1,
#     batch_size=32,
# )

In [None]:
#### ...

In [None]:
# !du -hs dataset

In [None]:
# # train_ds = tf.data.Dataset.from_tensor_slices(
#     (train_df.path, train_df.label)
# )

In [None]:
# train_ds, val_ds = tf.keras.utils.image_dataset_from_directory(
#     os.path.join(os.getcwd(), 'datasets', 'CNRPark-Patches-150x150'),
#     color_mode='grayscale',
#     batch_size=32,
#     image_size=(150, 150),
#     shuffle=True,
#     seed=42,
#     validation_split=0.2,
#     subset='both',
# )

In [None]:
# for x, y in train_ds.unbatch().take(10):
#   print(x.shape, y)

In [None]:
  # all_ds = tf.keras.utils.image_dataset_from_directory(
  #     directory=os.path.join(os.getcwd(), 'datasets', 'CNR-EXT-Patches-150x150'),
  #     labels=None,
  #     color_mode='grayscale',
  #     batch_size=32,
  #     image_size=(150, 150),
  #     # shuffle=True,
  #     # seed=42,
  #     # validation_split=0.2,
  #     # subset='both',
  # )