# Prerequisites

In [None]:
# instalacja TensorFlow Hub
!pip install tensorflow_hub

In [None]:
# import bibliotek
from PIL import Image
import os
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import zipfile
import tensorflow_hub as hub
from tensorflow.keras import layers
import pandas as pd 
from sklearn.metrics import accuracy_score
import cv2

# Output configuration

In [None]:
# połączenie projektu z dyskiem Google, dzie znajdują się pliki .zip ze zbiorami danych
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)
model_dir = '/content/gdrive/My Drive/Colab Notebooks'

# zdefiniowanie ścieżek wyjść (output path)
OUTPUT_ROOT_DIR = os.path.join(model_dir, "/output")
OUTPUT_TFLITE_MODEL = os.path.join(OUTPUT_ROOT_DIR, "/retrained_graph_mv1_100_224.tflite")
OUTPUT_LABELS = os.path.join(OUTPUT_ROOT_DIR, "/retrained_labels_mv1_100_224.txt")
OUTPUT_READABLE_LABELS = os.path.join(OUTPUT_ROOT_DIR, "/labels_readable.txt")

# Model configuration

In [None]:
# dobór 
SELECTED_MOBILENET = "https://tfhub.dev/google/imagenet/mobilenet_v1_100_224/feature_vector/4"

# wymiary obrazów ze zbioru wejściowego
IMAGE_SHAPE = (224, 224)
MODEL_INPUT_HEIGHT = 224
MODEL_INPUT_WIDTH = 224

# średnia i odchylenie standardowe wyjścia - wynikają ze specyfiki sieci
MODEL_INPUT_MEAN = 0
MODEL_INPUT_STD = 255

# warstwa wejściowa sieci MobileNet v1 ma nazwę "Placeholder"
MODEL_INPUT_LAYER_NAME = "Placeholder"
# warstwa wyjściowa ma nazwę "final_result"
MODEL_OUTPUT_LAYER_NAME = "final_result"

# Loading dataset

In [None]:
#!rm -rf '{model_dir}'
#os.makedirs(model_dir, exist_ok=True)
!ls -ltra '{model_dir}'/..

In [None]:
TMP_DATA_DIR = f"{model_dir}/dataset/tmp"
TMP_LABELS_DIR = os.path.join(TMP_DATA_DIR, "GTSRB/Final_Test")

TRAINING_DATA_DIR = "dataset/training"
VALIDATION_DATA_DIR = "dataset/validation"

In [None]:
# ścieżki do 
to_unpack = [
    (f"{TMP_DATA_DIR}/Final_Training_Images.zip"),
    (f"{TMP_DATA_DIR}/Final_Test_Images.zip"),
    (f"{TMP_DATA_DIR}/Final_Test_GT.zip")
]
 
for file in to_unpack:
    # print("Unzipping {} to {}...".format(file, ))
    with zipfile.ZipFile(file,"r") as zip_ref:
        zip_ref.extractall(TMP_DATA_DIR)

# Training, validation, labels - prepare

labels

In [None]:
# etykiety klas
label_map = {
    0: '20_speed',
    3: '60_speed',
    12: 'right_of_way_general',
    13: 'give_way',
    14: 'stop',
    20: 'attention_right_turn',
    22: 'attention_bumpers',
    40: 'turn_circle',
}

if not os.path.exists(OUTPUT_ROOT_DIR):
        os.makedirs(OUTPUT_ROOT_DIR)

file = open(OUTPUT_READABLE_LABELS, 'w')
for key, val in sorted(label_map.items()):
    file.write("{}\n".format(val))
file.close()

Training dataset

In [None]:

tmp_train_data_dir = os.path.join(TMP_DATA_DIR, "Final_Training_Images/GTSRB/Final_Training/Images")

directories = [d for d in os.listdir(tmp_train_data_dir) 
               if os.path.isdir(os.path.join(tmp_train_data_dir, d))]

ppm_files_train = []
ppm_labels_train = []
for class_directory in directories:
    label_dir = os.path.join(tmp_train_data_dir, class_directory)
    file_names = [os.path.join(label_dir, f) 
                  for f in os.listdir(label_dir) if f.endswith(".ppm")]
    for image_file in file_names:
        ppm_files_train.append(image_file)
        ppm_labels_train.append(class_directory)
        
ppm_files_train.sort()
ppm_labels_train.sort()

In [None]:
# wprowadzenie skali szarości, wyrównanie histogramu, konwersja z .ppm na .jpg
for ppm_file, label in zip(ppm_files_train, ppm_labels_train):
    image = Image.open(ppm_file)
    gray_image = cv2.cvtColor(np.uint8(image), cv2.COLOR_BGR2GRAY)
    eq_image = cv2.equalizeHist(gray_image)
    directory = os.path.join(TRAINING_DATA_DIR, label)
    image_filename = "{}.jpg".format(os.path.splitext(os.path.basename(ppm_file))[0])

    if not os.path.exists(directory):
        os.makedirs(directory)
    
    # gray_image.save(os.path.join(directory, image_filename))
    cv2.imwrite(os.path.join(directory, image_filename), eq_image)

In [None]:
# przykładowy obraz każdego ze znaków ze zbioru danych
preprocessed_training_dirs = [d for d in os.listdir(TRAINING_DATA_DIR) 
               if os.path.isdir(os.path.join(TRAINING_DATA_DIR, d))]
preprocessed_training_dirs.sort()

training_images = []
for training_dir in preprocessed_training_dirs:
    training_images.append(os.path.join(TRAINING_DATA_DIR, training_dir, "00000_00000.jpg"))


label_number = list(label_map.keys())
i = 0
plt.figure(figsize=(17, 30))
for image in training_images:
    plt.subplot(10,7, i + 1)
    plt.axis('off')
    plt.title("{}".format(label_map[label_number[i]]))
    i += 1
    plt.imshow(Image.open(image))
plt.show()

In [None]:
# normalizacja
image_generator = tf.keras.preprocessing.image.ImageDataGenerator(rescale=1/255)
# liczba obrazów i liczba klas
image_data = image_generator.flow_from_directory(str(TRAINING_DATA_DIR), target_size=IMAGE_SHAPE)

In [None]:
# kształt partii obrazu i etykiety
for image_batch, label_batch in image_data:
  print("Image batch shape: ", image_batch.shape)
  print("Label batch shape: ", label_batch.shape)
  break

Validation dataset

In [None]:
# wczytanie zbioru testowego
tmp_validation_data_dir = os.path.join(TMP_DATA_DIR, "Final_Test_Images/GTSRB/Final_Test/Images")

In [None]:
tmp_validation_data_files = [f for f in os.listdir(tmp_validation_data_dir) if f.endswith(".ppm")]
validation_images = []

# eksport plików .jpg
for ppm_file in tmp_validation_data_files:
    image_dir = os.path.join(tmp_validation_data_dir, ppm_file) 
    image = Image.open(image_dir)
    gray_image = cv2.cvtColor(np.uint8(image), cv2.COLOR_BGR2GRAY)
    eq_image = cv2.equalizeHist(gray_image)
    directory = VALIDATION_DATA_DIR
    image_filename = "{}.jpg".format(os.path.splitext(os.path.basename(ppm_file))[0])

    
    if not os.path.exists(directory):
        os.makedirs(directory)
        
    final_image = os.path.join(directory, image_filename)
    # final_image = cv2.imwrite(os.path.join(directory, image_filename), eq_image)
    image.save(final_image)

    validation_images.append(final_image)
    validation_images.sort()
    
print("Validation images count:", len(validation_images))

# Training

In [None]:
# headless model 
feature_extractor_url = "https://tfhub.dev/google/imagenet/mobilenet_v2_100_224/feature_vector/4" 
feature_extractor_layer = hub.KerasLayer(feature_extractor_url,
                                         input_shape=(224,224,3))
# returns: size of detected object, vector for each image
feature_batch = feature_extractor_layer(image_batch)


In [None]:
# false, beacuse next blocks affects next layers
feature_extractor_layer.trainable = False

In [None]:
model = tf.keras.Sequential([
  feature_extractor_layer,
  layers.Dense(image_data.num_classes)
])

model.summary()

In [None]:
# the output of an algorithm after it has been trained on a historical 
# dataset and applied to new data when forecasting the likelihood of a particular outcome
predictions = model(image_batch)

In [None]:
predictions.shape

Start training

In [None]:
model.compile(
  optimizer=tf.keras.optimizers.Adam(),
  loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
  metrics=['acc'], 
  run_eagerly=True)

In [None]:
class CollectBatchStats(tf.keras.callbacks.Callback):
  def __init__(self):
    self.batch_losses = []
    self.batch_acc = []

  def on_train_batch_end(self, batch, logs=None):
    self.batch_losses.append(logs['loss'])
    self.batch_acc.append(logs['acc'])
    self.model.reset_metrics()

In [None]:
steps_per_epoch = np.ceil(image_data.samples/image_data.batch_size)
 
batch_stats_callback = CollectBatchStats()
 
history = model.fit(image_data, epochs=4,
                    steps_per_epoch=steps_per_epoch,
                    callbacks=[batch_stats_callback])

In [None]:
plt.figure()
plt.ylabel("Loss")
plt.xlabel("Training Steps")
plt.ylim([0,2])
plt.plot(batch_stats_callback.batch_losses)

In [None]:
plt.figure()
plt.ylabel("Accuracy")
plt.xlabel("Training Steps")
plt.ylim([0,1])
plt.plot(batch_stats_callback.batch_acc)

# Model evaluation

In [None]:
tmp_validation_labels_csv = os.path.join(TMP_DATA_DIR, "Final_Test_GT/GT-final_test.csv") 
val_data_frame = pd.read_csv(tmp_validation_labels_csv, header=0, sep=',') 
val_data_frame['Filename'] = val_data_frame['Filename'].str.replace('.ppm','.jpg') 
val_data_frame['ClassId'] = val_data_frame['ClassId'].astype(str).str.zfill(5)

image_val_data = image_generator.flow_from_dataframe(val_data_frame, x_col="Filename", directory=VALIDATION_DATA_DIR, y_col="ClassId", target_size=IMAGE_SHAPE) #(image)

In [None]:
for image_val_batch, label_val_batch in image_val_data:
  print("Image batch shape: ", image_val_batch.shape)
  print("Label batch shape: ", label_val_batch.shape)
  break

In [None]:
predicted_batch = model.predict(image_val_batch)
predicted_id = np.argmax(predicted_batch, axis=-1)
label_id = np.argmax(label_val_batch, axis=-1)

In [None]:
batch_size = image_val_batch.shape[0]
num_plot_column = 5
num_plot_row = batch_size // num_plot_column + (batch_size % num_plot_column > 0)

plt.figure(figsize=(10,9))
plt.subplots_adjust(hspace=0.5)
label_number = list(label_map.keys())
print(label_number)

print("Accuracy of the shown eval batch:")
accuracy_score(label_id, predicted_id)

In [None]:
score = model.evaluate(x=image_val_data, batch_size=image_val_data.batch_size, steps=image_val_data.samples/image_val_data.batch_size)
print("Loss: ", score[0], "Accuracy: ", score[1])

# Saving model

In [None]:
import time
t = time.time()

export_path = f"{OUTPUT_ROOT_DIR}/model{int(t)}"
model.save(export_path, save_format='tf')
tf.keras.models.save_model(model, export_path)
# model.save('saved_model/my_model2', save_format='tf')
model.summary()

export_path

In [None]:
# converting model
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()

# saving model
with tf.io.gfile.GFile(OUTPUT_TFLITE_MODEL, 'wb') as f:
  f.write(tflite_model)

In [None]:
# load model + adding TPU
interpreter = tf.lite.Interpreter(model_path=OUTPUT_TFLITE_MODEL)
# interpreter = tf.lite.Interpreter(model_path=export_path) #####
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

batch_size = image_val_batch.shape[0]
predicted_id = np.zeros(batch_size)
 
for i, image in enumerate(np.split(image_val_batch, batch_size)):
  interpreter.set_tensor(input_details[0]['index'], image)
  interpreter.invoke()
  output_data = interpreter.get_tensor(output_details[0]['index'])
  predicted_id[i] = np.argmax(output_data)

label_id = np.argmax(label_val_batch, axis=-1)

num_plot_column = 5
num_plot_row = batch_size // num_plot_column + (batch_size % num_plot_column > 0)

plt.figure(figsize=(10,9))
plt.subplots_adjust(hspace=0.5)
model.save(f'{OUTPUT_ROOT_DIR}/tpu_saved_model/tf_model')

print("Accuracy of the shown eval batch, with the TensorFlow Lite model:")
accuracy_score(label_id, predicted_id)

In [None]:
%pwd
%cd gdrive/MyDrive/'Colab Notebooks'/
%ls
!unzip 'tpu_saved_model.zip'

In [None]:
converter = tf.lite.TFLiteConverter.from_saved_model('tpu_saved_model/tf_model')
tflite_model = converter.convert()

with open('model.tflite', 'wb') as f:
    f.write(tflite_model)

In [None]:
from google.colab import files

files.download("model.tflite")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>