<a href="https://colab.research.google.com/github/KDZ7/ImageClassification-TensorFlow-DeepLearning/blob/main/GTSRB.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Comminication entre l'API Kaggle et Google Colab**

In [None]:
output_dir      = '/content/drive/MyDrive/kaggle/work'
output_dir_tmp  = '/content/tmp/work'
dataset_dir_tmp = '/content/tmp/dataset'

In [None]:
ratio = 0.5 # %Images Training
pixels = 48

# **Repertoire permanent et temporaire**

In [None]:
!pip install kaggle -q

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!mkdir -p ~/.kaggle/ && cp '{output_dir}/../kaggle.json' ~/.kaggle/ && chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets list

In [None]:
!mkdir -p '{dataset_dir_tmp}' && kaggle datasets download -d meowmeowmeowmeowmeow/gtsrb-german-traffic-sign -p '{dataset_dir_tmp}' && unzip -n {dataset_dir_tmp}/gtsrb-german-traffic-sign.zip -d '{dataset_dir_tmp}'

In [None]:
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import h5py

import tensorflow as tf
from sklearn.utils import shuffle
from sklearn.preprocessing import StandardScaler
from datetime import datetime

import cv2
from skimage.morphology import disk
from skimage.util import img_as_ubyte
from skimage.filters import rank
from skimage import io, color, exposure, transform
from tqdm import tqdm

import os

*Summary of dataset preparation. For more details visit* [here](https://gricad-gitlab.univ-grenoble-alpes.fr/talks/fidle/-/blob/master/GTSRB.Keras3/01-Preparation-of-data.ipynb?ref_type=heads)

**Enhanced datasets**

# **1. Preparation of datasets**

In [None]:
df = pd.read_csv(f'{dataset_dir_tmp}/Test.csv', header=0)
display(df.head(10))

# ***Usefull functions***
**A nice function for reading a dataset from an index.csv file.\
Input: an intex.csv file\
Output: an array of images ans an array of corresponding labels**

In [None]:
def read_csv_dataset(csv_file):
    '''
    Reads traffic sign data from German Traffic Sign Recognition Benchmark dataset.
    Arguments:
        csv filename :  Description file, Example /data/GTSRB/Train.csv
    Returns:
        x,y          :  np array of images, np array of corresponding labels
    '''

    path = os.path.dirname(csv_file)
    name = os.path.basename(csv_file)

    # ---- Read csv file
    #
    df = pd.read_csv(csv_file, header=0)

    # ---- Get filenames and ClassIds
    #
    filenames = df['Path'].to_list()
    y         = df['ClassId'].to_list()
    x         = []

    # ---- Read images
    #
    for filename in tqdm(filenames, desc='Loading Images', unit=' image'):
        image=io.imread(f'{path}/{filename}')
        x.append(image)

    # ---- Return
    #
    return (np.array(x,dtype=object), np.array(y))

In [None]:
(x_train, y_train) = read_csv_dataset(f'{dataset_dir_tmp}/Train.csv')
(x_test,  y_test)  = read_csv_dataset(f'{dataset_dir_tmp}/Test.csv')
(x_meta,  y_meta)  = read_csv_dataset(f'{dataset_dir_tmp}/Meta.csv')

# ---- Shuffle train set
x_train, y_train = shuffle(x_train, y_train)

# ---- Sort Meta
combined = list(zip(x_meta, y_meta))
combined.sort(key=lambda x: x[1])
x_meta, y_meta = zip(*combined) # dézipper

In [None]:
# ------ Global stuff
print("x_train shape : ",x_train.shape)
print("y_train shape : ",y_train.shape)
print("x_test  shape : ",x_test.shape)
print("y_test  shape : ",y_test.shape)
print("x_meta  size  : ",len(x_meta))
print("y_meta  size  : ",len(y_meta))

# **Show Dataset**

In [None]:
def show(
    images,
    targets,
    figsize=(16, 8),
    index_range=(1, 17),
    subplot=(4, 4),
    axis_onoff="off",
    cmap=None,
):
    plt.figure(figsize=figsize)
    for i in range(*index_range):
        plt.subplot(*subplot, i)
        plt.imshow(images[i], cmap=cmap)
        plt.title(targets[i])
        plt.axis(axis_onoff)
    plt.tight_layout()
    plt.show()
    print("><")
    return

In [None]:
show(x_meta, y_meta)
show(x_train, y_train)
show(x_test, y_test)

# **Enhancement cooking**
**A nice function for preparing our data.Input: a set of images (numpy array)Output: a enhanced images, resized and reprocessed (numpy array)**

In [None]:
def images_enhancement(images, width=24, height=24, proc='RGB'):
    '''
    Resize and convert images - doesn't change originals.
    input images must be RGBA or RGB.
    Note : all outputs are fixed size numpy array of float32
    args:
        images :         images list
        width,height :   new images size (24, 24)
        mode :           RGB | RGB-HE | L | L-HE | L-LHE | L-CLAHE
    return:
        numpy array of enhanced images
    '''
    lz={ 'RGB':3, 'RGB-HE':3, 'L':1, 'L-HE':1, 'L-LHE':1, 'L-CLAHE':1}[proc]

    out=[]

    for img in tqdm(images, desc="Enhancing images", unit=' image'):

        # ---- if RGBA, convert to RGB
        if img.shape[2]==4:
            img=color.rgba2rgb(img)

        # ---- Resize
        img = transform.resize(img, (width, height))

        # ---- RGB / Histogram Equalization
        if proc=='RGB-HE':
            hsv = color.rgb2hsv(img.reshape(width, height,3))
            hsv[:, :, 2] = exposure.equalize_hist(hsv[:, :, 2])
            img = color.hsv2rgb(hsv)

        # ---- Grayscale
        if proc=='L':
            img=color.rgb2gray(img)

        # ---- Grayscale / Histogram Equalization
        if proc=='L-HE':
            img=color.rgb2gray(img)
            img=exposure.equalize_hist(img)

        # ---- Grayscale / Local Histogram Equalization
        if proc=='L-LHE':
            img=color.rgb2gray(img)
            img = img_as_ubyte(img)
            img=rank.equalize(img, disk(10))/255.

        # ---- Grayscale / Contrast Limited Adaptive Histogram Equalization (CLAHE)
        if proc=='L-CLAHE':
            img=color.rgb2gray(img)
            img=exposure.equalize_adapthist(img)

        # ---- Add image in list of list
        out.append(img)

    # ---- Reshape images
    #     (-1, width,height,1) for L
    #     (-1, width,height,3) for RGB
    #
    out = np.array(out, dtype='float32')
    out = out.reshape(-1, width, height, lz)
    return out

# **To get an idea of the different recipes**

In [None]:
i = random.randint(0, len(x_train) - 32)
x_samples = x_train[i : i + 32]
y_samples = y_train[i : i + 32]

# Dictionary
datasets  = {}

datasets['RGB']     = images_enhancement(x_samples, width=pixels, height=pixels, proc='RGB')
datasets['RGB-HE']  = images_enhancement(x_samples, width=pixels, height=pixels, proc='RGB-HE')
datasets['L']       = images_enhancement(x_samples, width=pixels, height=pixels, proc='L')
datasets['L-HE']    = images_enhancement(x_samples, width=pixels, height=pixels, proc='L-HE')
datasets['L-LHE']   = images_enhancement(x_samples, width=pixels, height=pixels, proc='L-LHE')
datasets['L-CLAHE'] = images_enhancement(x_samples, width=pixels, height=pixels, proc='L-CLAHE')

# Afficher les images pour chaque méthode de traitement
for key, images in datasets.items():
    show(images, y_samples)


# **A function to save a dataset (h5 file)**

In [None]:
def save_h5_dataset(x_train, y_train, x_test, y_test, x_meta,y_meta, filename):

    directory = os.path.dirname(filename)
    if not os.path.exists(directory):
        os.makedirs(directory)

    # ---- Create h5 file
    with h5py.File(filename, "w") as f:
        f.create_dataset("x_train", data=x_train)
        f.create_dataset("y_train", data=y_train)
        f.create_dataset("x_test",  data=x_test)
        f.create_dataset("y_test",  data=y_test)
        f.create_dataset("x_meta",  data=x_meta)
        f.create_dataset("y_meta",  data=y_meta)

    # ---- done
    size=os.path.getsize(filename)/(2**20)
    print('Dataset : {:24s}  shape : {:22s} size : {:6.1f} Mo   (saved)'.format(filename, str(x_train.shape),size))

# **Generate enhanced datasets**

In [None]:
# ---- Size and processings
# create datasets with images of size 24x24 pixels and 48x48 pixels.
all_pixels= [pixels] # [24, 48, 96]

all_proc=['RGB', 'RGB-HE', 'L', 'L-LHE', 'L-LHE', 'L-CLAHE']

n_train = int(len(x_train) * ratio)
n_test  = int(len(x_test) * ratio)

print(f'ratio is : {ratio}')
print(f'x_train length is : {n_train}')
print(f'x_test  length is : {n_test}')
print(f'output dir is     : {output_dir}\n')

for px in all_pixels:
    for m in all_proc:
        # ---- A nice dataset name
        filename = f'{output_dir}/set-{px}x{px}-{m}.h5'
        # ---- Enhancement
        #      Note : x_train is a numpy array of python objects (images with <> sizes)
        #             but images_enhancement() return a real array of float64 numpy (images with same size)
        #             so, we can save it in nice h5 files
        #
        x_train_new = images_enhancement(x_train[:n_train], width=px, height=px, proc=m)
        x_test_new  = images_enhancement(x_test[:n_test],   width=px, height=px, proc=m)
        x_meta_new  = images_enhancement(x_meta,            width=px, height=px, proc='RGB')

        # ---- Save
        save_h5_dataset(x_train_new, y_train[:n_train], x_test_new, y_test[:n_test], x_meta_new, y_meta, filename)

x_train_new, x_test_new=0, 0
print('\nDone.')

# **Reload data to be sure *.h5**

In [None]:
def read_dataset(path_dir, dataset_name):
    '''
    Reads h5 dataset
    Args:
        filename     : datasets filename
        dataset_name : dataset name, without .h5
    Returns:
        x_train,y_train, x_test,y_test data, x_meta,y_meta
    '''

    # ---- Read dataset
    #
    filename = f'{path_dir}/{dataset_name}.h5'
    with  h5py.File(filename,'r') as f:
        x_train = f['x_train'][:]
        y_train = f['y_train'][:]
        x_test  = f['x_test'][:]
        y_test  = f['y_test'][:]
        x_meta  = f['x_meta'][:]
        y_meta  = f['y_meta'][:]

    # ---- Shuffle train set
    x_train, y_train = shuffle(x_train, y_train)

    print("Rescaled shape >>> in:", x_train.shape, " out:", y_train.shape)
    return (x_train, y_train), (x_test, y_test), (x_meta, y_meta)

In [None]:
dataset_name=f'set-{pixels}x{pixels}-RGB'
(x_tmp, y_tmp), (_,_), (_,_) = read_dataset(output_dir, dataset_name)
show(x_tmp, y_tmp)
x_tmp, y_tmp=0, 0

TensorFlow_logo.svg

In [None]:
batch_size = 32
epochs = 10
optimizer=tf.keras.optimizers.Adam()
loss=tf.keras.losses.SparseCategoricalCrossentropy()
accurracy=tf.keras.metrics.SparseCategoricalAccuracy()
dataset_name = f'set-{pixels}x{pixels}-RGB'

In [None]:
!mkdir -p '{output_dir_tmp}' && cp '{output_dir}/{dataset_name}.h5' '{output_dir_tmp}'

In [None]:
(x_train, y_train), (x_test, y_test), (x_meta, y_meta) = read_dataset(output_dir_tmp, dataset_name)

print("x_train shape : ", x_train.shape)
print("y_train shape : ", y_train.shape)
print("x_test  shape : ", x_test.shape)
print("y_test  shape : ", y_test.shape)

show(x_train, y_train)
show(x_test, y_test)

# *Normalisation*

In [None]:
# x_train /= 255.0
# x_test  /= 255.0
print("x_train shape : ", x_train.shape)
print("x_train_max   : ", x_train.max())
print("x_train_mean  : ", x_train.mean())
print("x_train_std   : ", x_train.std())
print("x_test_max    : ", x_test.max())
print("x_test_mean   : ", x_test.mean())
print("x_test_std    : ", x_test.std())

In [None]:
class CustomModel(tf.keras.Model):
    def __init__(self, input_shape):
      super(CustomModel, self).__init__()

      self._input_shape  = input_shape
      self._00_CNN_Layer = tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation=tf.nn.relu)
      self._01_CNN_Layer = tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation=tf.nn.relu)
      self._00_Pooling   = tf.keras.layers.MaxPool2D(pool_size=(2, 2))
      self._00_Dropout   = tf.keras.layers.Dropout(0.25)

      self._02_CNN_Layer = tf.keras.layers.Conv2D(filters=128, kernel_size=(3, 3), activation=tf.nn.relu)
      self._03_CNN_Layer = tf.keras.layers.Conv2D(filters=128, kernel_size=(3, 3), activation=tf.nn.relu)
      self._01_Pooling   = tf.keras.layers.MaxPool2D(pool_size=(2, 2))
      self._01_Dropout   = tf.keras.layers.Dropout(0.25)

      self._04_CNN_Layer = tf.keras.layers.Conv2D(filters=256, kernel_size=(3, 3), activation=tf.nn.relu)
      self._05_CNN_Layer = tf.keras.layers.Conv2D(filters=256, kernel_size=(3, 3), activation=tf.nn.relu)
      self._02_Pooling   = tf.keras.layers.MaxPool2D(pool_size=(2, 2))
      self._02_Dropout   = tf.keras.layers.Dropout(0.25)

      self._00_Flatten   =  tf.keras.layers.Flatten()
      self._00_Dense     =  tf.keras.layers.Dense(1024, activation=tf.nn.relu)
      self._01_Dense     =  tf.keras.layers.Dense(512, activation=tf.nn.relu)
      self._03_Dropout   =  tf.keras.layers.Dropout(0.25)
      self._02_Dense     =  tf.keras.layers.Dense(43, activation=tf.nn.softmax)

    def call_as_eagle(self, inputs, training=False):
      x = self._00_CNN_Layer(inputs)
      x = self._01_CNN_Layer(x)
      x = self._00_Pooling(x)
      x = self._00_Dropout(x, training=training)

      x = self._02_CNN_Layer(x)
      x = self._03_CNN_Layer(x)
      x = self._01_Pooling(x)
      x = self._01_Dropout(x, training=training)

      x = self._04_CNN_Layer(x)
      x = self._05_CNN_Layer(x)
      x = self._02_Pooling(x)
      x = self._02_Dropout(x, training=training)

      x = self._00_Flatten(x)
      x = self._00_Dense(x)
      x = self._01_Dense(x)
      x = self._03_Dropout(x, training=training)
      x = self._02_Dense(x)
      return x

    def model(self):
      x = tf.keras.Input(shape=self._input_shape)
      return tf.keras.Model(inputs=x, outputs=self.call_as_eagle(inputs=x))

    @tf.function
    def call(self, inputs, training=False):
      x = self._00_CNN_Layer(inputs)
      x = self._01_CNN_Layer(x)
      x = self._00_Pooling(x)
      x = self._00_Dropout(x, training=training)

      x = self._02_CNN_Layer(x)
      x = self._03_CNN_Layer(x)
      x = self._01_Pooling(x)
      x = self._01_Dropout(x, training=training)

      x = self._04_CNN_Layer(x)
      x = self._05_CNN_Layer(x)
      x = self._02_Pooling(x)
      x = self._02_Dropout(x, training=training)

      x = self._00_Flatten(x)
      x = self._00_Dense(x)
      x = self._01_Dense(x)
      x = self._03_Dropout(x, training=training)
      x = self._02_Dense(x)
      return x

    @tf.function
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value
            # (the loss function is configured in `compile()`)
            loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update metrics (includes the metric that tracks the loss)
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}

In [None]:
model = CustomModel(input_shape=(pixels, pixels, 3))
model.model().summary()
model.compile(optimizer=tf.keras.optimizers.Adam(), loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=["accuracy"])

# *Config Tensorboard*

In [None]:
log_dir = f"{output_dir_tmp}/logs/fit/" + datetime.now().strftime("%Y-%m-%d_%H:%M")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

In [None]:
history = model.fit(
    x_train,
    y_train,
    epochs = epochs,
    batch_size = batch_size,
    validation_data = (x_test, y_test),
    callbacks=[tensorboard_callback]
)

In [None]:
%load_ext tensorboard
%tensorboard --logdir {output_dir_tmp}/logs/fit

In [None]:
loss = history.history["loss"]
acc = history.history["accuracy"]

val_loss = history.history["val_loss"]
val_acc = history.history["val_accuracy"]

plt.title("Loss")
plt.plot(loss, label="loss")
plt.plot(val_loss, label="val_loss")
plt.legend(loc="upper left")
plt.show()

plt.title("Accuracy")
plt.plot(acc, label="acc")
plt.plot(val_acc, label="val_acc")
plt.legend(loc="upper left")
plt.show()

# **Show predictions**

In [None]:
pred = np.argmax(model.predict(x_test), axis=1)
show(x_test, pred)

# **Show errors**

In [None]:
# Générer la liste des indices d'erreurs
error_index = [i for i in range(len(x_test)) if pred[i] != y_test[i]]

# Obtenir les images en erreur
error_x_test = x_test[error_index]

# Obtenir les prédictions correspondantes aux images d'erreur
error_predictions = pred[error_index]

print("nombre d'erreurs trouvées:", len(error_index))
show(error_x_test, error_predictions, cmap="plasma")

# Save model

In [None]:
datetime_format = datetime.now().strftime('%Y-%m-%d_%H-%M')
model.save(f'{output_dir}/model_{datetime_format}', save_format='tf')