# SqueezeNet1p1 model training

This notebook can be used to train the SqueezeNet1p1 model on Google Colab.

Note: this script is a modified version of the "chess_piece_models_common.py," "squeezenet.py," and "train_squeezenet1p1.py" (in the "cpmodels" folder) of the LiveChess2FEN project (https://github.com/davidmallasen/LiveChess2FEN).

# Step 1: enable GPU on Google Colab

### Use the following code to verify that GPU is enabled

In [None]:
import tensorflow as tf
tf.test.gpu_device_name()

'/device:GPU:0'

# Step 2: upload the pretrained model

### Create a "models" folder containing the pretrained SqueezeNet1p1 model "SqueezeNet1p1_old.h5"

### Compress the "models" folder into "models.zip"

### Upload "models.zip" to Google Colab

### Unzip "models.zip" with the following code

In [None]:
!unzip models.zip

Archive:  models.zip
  inflating: models/SqueezeNet1p1_old.h5  


(Feel free to delete the uploaded "models.zip" once it's unzipped)

# Step 3: import data

### Create a "data" folder containing the "training" folder and the "validation" folder with each folder containing exactly 13 subfolders ("\_", "r_", "n_", "b_", "q_", "k_", "p_", "R", "N", "B", "Q", "K", and "P")

### Compress the "data" folder into "data.zip"

### Upload "data.zip" to Google Colab

### Unzip the compressed file using the following code

In [None]:
!unzip data.zip

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: data/train/q_/109.jpg   
  inflating: data/train/q_/11.jpg    
  inflating: data/train/q_/110.jpg   
  inflating: data/train/q_/111.jpg   
  inflating: data/train/q_/112.jpg   
  inflating: data/train/q_/113.jpg   
  inflating: data/train/q_/114.jpg   
  inflating: data/train/q_/115.jpg   
  inflating: data/train/q_/116.jpg   
  inflating: data/train/q_/117.jpg   
  inflating: data/train/q_/118.jpg   
  inflating: data/train/q_/119.jpg   
  inflating: data/train/q_/12.jpg    
  inflating: data/train/q_/120.jpg   
  inflating: data/train/q_/121.jpg   
  inflating: data/train/q_/122.jpg   
  inflating: data/train/q_/123.jpg   
  inflating: data/train/q_/124.jpg   
  inflating: data/train/q_/125.jpg   
  inflating: data/train/q_/126.jpg   
  inflating: data/train/q_/127.jpg   
  inflating: data/train/q_/128.jpg   
  inflating: data/train/q_/129.jpg   
  inflating: data/train/q_/13.jpg    
  inflating: data/train

### Rename the train and validation data folders:

In [None]:
import os
os.rename('./data/train/b_', './data/train/b')
os.rename('./data/train/k_', './data/train/k')
os.rename('./data/train/n_', './data/train/n')
os.rename('./data/train/p_', './data/train/p')
os.rename('./data/train/q_', './data/train/q')
os.rename('./data/train/r_', './data/train/r')

os.rename('./data/validation/b_', './data/validation/b')
os.rename('./data/validation/k_', './data/validation/k')
os.rename('./data/validation/n_', './data/validation/n')
os.rename('./data/validation/p_', './data/validation/p')
os.rename('./data/validation/q_', './data/validation/q')
os.rename('./data/validation/r_', './data/validation/r')

Notes: 1) to replace the existing data with new data, delete the "data" folder with the following code (uncomment it first) and repeat step 5:

In [None]:
# import shutil
# shutil.rmtree('./data')

2) if there happens to be a (hidden) folder called ".ipynb_checkpoints" in the "train" or "validation" folder, make sure to delete it with the following code (uncomment it first):

In [None]:
# import shutil
# shutil.rmtree('./data/train/.ipynb_checkpoints')
# shutil.rmtree('./data/validation/.ipynb_checkpoints')

# Step 6: define common functions

In [None]:
"""
Common functions to train the chess piece models.
"""
import matplotlib

from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from keras.layers import Dense, GlobalAveragePooling2D
from keras.models import Model
from keras.preprocessing.image import ImageDataGenerator

matplotlib.use('agg')
import matplotlib.pyplot as plt


def build_model(base_model):
    """
    Builds the model from a pretrained base model.
    :param base_model: Base model from keras applications.
        Example: MobileNetV2(input_shape=(224, 224, 3),
                             include_top=False,
                             weights='imagenet')
    :return: The compiled model to train.
    """
    layers = base_model.output
    layers = GlobalAveragePooling2D()(layers)
    layers = Dense(1024, activation='relu')(layers)
    preds = Dense(13, activation='softmax')(layers)

    model = Model(inputs=base_model.input, outputs=preds)

    model.compile(optimizer='Adam',
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    return model


def data_generators(preprocessing_function, target_size, batch_size,
                    train_path='./data/train/',
                    validation_path='./data/validation/'):
    """
    Returns the train and validation generators.

    :param preprocessing_function: Corresponding preprocessing function
        for the pretrained base model.
        Example: from keras.applications.mobilenet_v2
            import preprocess_input
    :param target_size: The dimensions to which all images found will be
        resized. Example: (224, 224)
    :param batch_size: Size of the batches of data.
    :param train_path: Path to the train folder.
    :param validation_path: Path to the validation folder.
    :return: The train and validation generators.
    """
    datagen = ImageDataGenerator(preprocessing_function=preprocessing_function,
                                 dtype='float16')

    train_gen = datagen.flow_from_directory(train_path,
                                            target_size=target_size,
                                            color_mode='rgb',
                                            batch_size=batch_size,
                                            class_mode='categorical',
                                            shuffle=True)

    val_gen = datagen.flow_from_directory(validation_path,
                                          target_size=target_size,
                                          color_mode='rgb',
                                          batch_size=batch_size,
                                          class_mode='categorical',
                                          shuffle=True)
    return train_gen, val_gen


def train_model(model, epochs, train_generator, val_generator, callbacks,
                use_weights, workers, use_multiprocessing):
    """Trains the input model."""
    steps_per_epoch = train_generator.n // train_generator.batch_size
    validation_steps = val_generator.n // val_generator.batch_size

    if use_weights:
        weights = {0: 1., 1: 1., 2: 1., 3: 0.125, 4: 1., 5: 1., 6: 0.05,
                   7: 1., 8: 1., 9: 1., 10: 0.125, 11: 1., 12: 1.}
    else:
        weights = None

    return model.fit(train_generator,
                    steps_per_epoch=steps_per_epoch,
                    epochs=epochs,
                    validation_data=val_generator,
                    validation_steps=validation_steps,
                    callbacks=callbacks,
                    verbose=2,
                    class_weight=weights,
                    use_multiprocessing=use_multiprocessing,
                    workers=workers
                    )

    # return model.fit_generator(generator=train_generator,
    #                            steps_per_epoch=steps_per_epoch,
    #                            epochs=epochs,
    #                            validation_data=val_generator,
    #                            validation_steps=validation_steps,
    #                            callbacks=callbacks,
    #                            verbose=2,
    #                            class_weight=weights,
    #                            use_multiprocessing=True,
    #                            workers=workers
    #                            )


def model_callbacks(early_stopping_patience, model_checkpoint_dir,
                    reducelr_factor, reducelr_patience):
    """Initializes the model callbacks."""
    early_stopping = EarlyStopping(monitor='val_accuracy',
                                   mode='max',
                                   verbose=1,
                                   patience=early_stopping_patience,
                                   restore_best_weights=True,
                                   min_delta=0.002)
    model_checkpoint = ModelCheckpoint(filepath=model_checkpoint_dir,
                                       monitor='val_accuracy',
                                       mode='max',
                                       verbose=1,
                                       save_best_only=True)
    reduce_lr = ReduceLROnPlateau(monitor='val_accuracy',
                                  mode='max',
                                  factor=reducelr_factor,
                                  patience=reducelr_patience,
                                  verbose=1)
    return [early_stopping, model_checkpoint, reduce_lr]


def plot_model_history(history, accuracy_savedir, loss_savedir):
    """Plots the model history (accuracy and loss)."""
    # Summarize history for accuracy
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('Model accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Training accuracy', 'Validation accuracy'])
    plt.savefig(accuracy_savedir)
    plt.close()

    # Summarize history for loss
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Training loss', 'Validation loss'])
    plt.savefig(loss_savedir)
    plt.close()


def evaluate_model(model, test_generator):
    """
    Prints the test loss and accuracy of the model.

    :param model: Model to evaluate.
    :param test_generator: Generator with which to test the model.
    """
    # scores = model.evaluate_generator(test_generator, verbose=1)
    scores = model.evaluate(test_generator, verbose=1)
    print('Test loss:', scores[0])
    print('Test accuracy:', scores[1])

# Step 7: define training functions

In [None]:
"""
SqueezeNet-v1.1 implementation.
"""
import warnings

from keras import backend as K
# from keras.engine.topology import get_source_inputs
from keras.utils.layer_utils import get_source_inputs
from keras.layers import GlobalAveragePooling2D, GlobalMaxPooling2D
from keras.layers import Input, Convolution2D, MaxPooling2D, Activation, \
    concatenate, Dropout
from keras.models import Model
from keras.utils import get_file
from keras.utils import layer_utils
# from keras_applications.imagenet_utils import _obtain_input_shape
from keras.applications.imagenet_utils import obtain_input_shape

SQ1X1 = "squeeze1x1"
EXP1X1 = "expand1x1"
EXP3X3 = "expand3x3"
RELU = "relu_"

WEIGHTS_PATH = "https://github.com/rcmalli/keras-squeezenet/releases/download/v1.0/squeezenet_weights_tf_dim_ordering_tf_kernels.h5"
WEIGHTS_PATH_NO_TOP = "https://github.com/rcmalli/keras-squeezenet/releases/download/v1.0/squeezenet_weights_tf_dim_ordering_tf_kernels_notop.h5"


def fire_module(x, fire_id, squeeze=16, expand=64):
    """Modular function for Fire Node."""
    s_id = 'fire' + str(fire_id) + '/'

    if K.image_data_format() == 'channels_first':
        channel_axis = 1
    else:
        channel_axis = 3

    x = Convolution2D(squeeze, (1, 1), padding='valid', name=s_id + SQ1X1)(x)
    x = Activation('relu', name=s_id + RELU + SQ1X1)(x)

    left = Convolution2D(expand, (1, 1), padding='valid', name=s_id + EXP1X1)(
        x)
    left = Activation('relu', name=s_id + RELU + EXP1X1)(left)

    right = Convolution2D(expand, (3, 3), padding='same', name=s_id + EXP3X3)(
        x)
    right = Activation('relu', name=s_id + RELU + EXP3X3)(right)

    return concatenate([left, right], axis=channel_axis, name=s_id + 'concat')


def SqueezeNet(include_top=True, weights='imagenet', input_tensor=None,
               input_shape=None, pooling=None, classes=1000):
    """Instantiates the original SqueezeNet architecture from paper."""

    if weights not in {'imagenet', None}:
        raise ValueError('The `weights` argument should be either '
                         '`None` (random initialization) or `imagenet` '
                         '(pre-training on ImageNet).')

    if weights == 'imagenet' and classes != 1000:
        raise ValueError('If using `weights` as imagenet with `include_top`'
                         ' as true, `classes` should be 1000')

    # input_shape = _obtain_input_shape(input_shape,
    #                                   default_size=227,
    #                                   min_size=48,
    #                                   data_format=K.image_data_format(),
    #                                   require_flatten=include_top)

    input_shape = obtain_input_shape(input_shape,
                                      default_size=227,
                                      min_size=48,
                                      data_format=K.image_data_format(),
                                      require_flatten=include_top)

    if input_tensor is None:
        img_input = Input(shape=input_shape)
    else:
        if not K.is_keras_tensor(input_tensor):
            img_input = Input(tensor=input_tensor, shape=input_shape)
        else:
            img_input = input_tensor

    x = Convolution2D(64, (3, 3), strides=(2, 2), padding='valid',
                      name='conv1')(img_input)
    x = Activation('relu', name='relu_conv1')(x)
    x = MaxPooling2D(pool_size=(3, 3), strides=(2, 2), name='pool1')(x)

    x = fire_module(x, fire_id=2, squeeze=16, expand=64)
    x = fire_module(x, fire_id=3, squeeze=16, expand=64)
    x = MaxPooling2D(pool_size=(3, 3), strides=(2, 2), name='pool3')(x)

    x = fire_module(x, fire_id=4, squeeze=32, expand=128)
    x = fire_module(x, fire_id=5, squeeze=32, expand=128)
    x = MaxPooling2D(pool_size=(3, 3), strides=(2, 2), name='pool5')(x)

    x = fire_module(x, fire_id=6, squeeze=48, expand=192)
    x = fire_module(x, fire_id=7, squeeze=48, expand=192)
    x = fire_module(x, fire_id=8, squeeze=64, expand=256)
    x = fire_module(x, fire_id=9, squeeze=64, expand=256)

    if include_top:
        # It's not obvious where to cut the network...
        # Could do the 8th or 9th layer... some work recommends cutting
        # earlier layers.

        x = Dropout(0.5, name='drop9')(x)

        x = Convolution2D(classes, (1, 1), padding='valid', name='conv10')(x)
        x = Activation('relu', name='relu_conv10')(x)
        x = GlobalAveragePooling2D()(x)
        x = Activation('softmax', name='loss')(x)
    else:
        if pooling == 'avg':
            x = GlobalAveragePooling2D()(x)
        elif pooling == 'max':
            x = GlobalMaxPooling2D()(x)
        elif pooling is None:
            pass
        else:
            raise ValueError("Unknown argument for 'pooling'=" + pooling)

    # Ensure that the model takes into account any potential
    # predecessors of `input_tensor`.
    if input_tensor is not None:
        inputs = get_source_inputs(input_tensor)
    else:
        inputs = img_input

    model = Model(inputs, x, name='squeezenet')

    # Load weights
    if weights == 'imagenet':
        if include_top:
            weights_path = get_file(
                'squeezenet_weights_tf_dim_ordering_tf_kernels.h5',
                WEIGHTS_PATH,
                cache_subdir='models')
        else:
            weights_path = get_file(
                'squeezenet_weights_tf_dim_ordering_tf_kernels_notop.h5',
                WEIGHTS_PATH_NO_TOP,
                cache_subdir='models')

        model.load_weights(weights_path)
        if K.backend() == 'theano':
            layer_utils.convert_all_kernels_in_model(model)

        if K.image_data_format() == 'channels_first':

            if K.backend() == 'tensorflow':
                warnings.warn('You are using the TensorFlow backend, yet you '
                              'are using the Theano '
                              'image data format convention '
                              '(`image_data_format="channels_first"`). '
                              'For best performance, set '
                              '`image_data_format="channels_last"` in '
                              'your Keras config '
                              'at ~/.keras/keras.json.')
    return model

In [None]:
"""
Train SqueezeNet-v1.1 model.
"""
# from keras.applications import Xception
# from keras.applications.xception import preprocess_input
# # from keras.engine.saving import load_model
# from keras.models import load_model
# import tensorflow as tf

# from keras.applications import MobileNetV2
# from keras.applications.mobilenet_v2 import preprocess_input
# # from keras.engine.saving import load_model
# from keras.models import load_model
# import tensorflow as tf

from keras.applications.imagenet_utils import preprocess_input
# from keras.engine.saving import load_model
from keras.models import load_model
import tensorflow as tf

def train_chesspiece_model(num_of_epochs, batch_size, use_multiprocessing, lr):
    """Trains the chesspiece model based on MobileNetV2."""
    print("Initial training begins here...")
    
    # base_model = MobileNetV2(input_shape=(224, 224, 3), include_top=False,
    #                          weights='imagenet', alpha=0.5)

    model = load_model("./models/SqueezeNet1p1_old.h5")

    # First train only blocks 7-9

    train_generator, validation_generator = data_generators(
        preprocess_input, (227, 227), batch_size)
    
    print(train_generator.class_indices)

    for layer in model.layers[:41]:
        layer.trainable = False
    for layer in model.layers[41:]:
        layer.trainable = True

    model.compile(optimizer = tf.keras.optimizers.Adam(lr), loss='categorical_crossentropy',
                  metrics=['accuracy'])

    # callbacks = model_callbacks(20, "./models/Xception.h5", 0.2, 8)
    callbacks = model_callbacks(20, "./models/SqueezeNet1p1.h5", 0.2, 8)

    history = train_model(model, num_of_epochs, train_generator, validation_generator,
                          callbacks, use_weights=False, workers = 5, use_multiprocessing = use_multiprocessing)

    plot_model_history(history, "./models/SqueezeNet1p1_acc.png",
                       "./models/SqueezeNet1p1_loss.png")
    evaluate_model(model, validation_generator)

    model.save("./models/SqueezeNet1p1_last.h5")

    print("Initial training is complete!\n")


def continue_training(num_of_epochs, batch_size, use_multiprocessing, lr):
    """Continues training the chesspiece model based on Xception."""
    print("Global training begins here...")

    # model = load_model("./models/Xception.h5")

    model = load_model("./models/SqueezeNet1p1_last.h5")
    # model = load_model("./models/Xception_old.h5")

    # train_generator, validation_generator = data_generators(
    #     preprocess_input, (299, 299), 64)
    
    train_generator, validation_generator = data_generators(
        preprocess_input, (227, 227), batch_size)
    
    print(train_generator.class_indices)

    # Train all layers
    for layer in model.layers:
        layer.trainable = True

    model.compile(optimizer = tf.keras.optimizers.Adam(lr), loss='categorical_crossentropy',
                  metrics=['accuracy'])

    callbacks = model_callbacks(20, "./models/SqueezeNet1p1_all.h5", 0.2, 8)

    # history = train_model(model, 100, train_generator, validation_generator,
    #                       callbacks, use_weights=False, workers=5)
    
    history = train_model(model, num_of_epochs, train_generator, validation_generator,
                          callbacks, use_weights=False, workers = 5, use_multiprocessing = use_multiprocessing)

    plot_model_history(history, "./models/SqueezeNet1p1_all_acc.png",
                       "./models/SqueezeNet1p1_all_loss.png")
    evaluate_model(model, validation_generator)

    model.save("./models/SqueezeNet1p1_all_last.h5")

    print("Global training is complete!")

# Step 8: train the model!

### The final model "SqueezeNet1p1_all_last.h5" is in "./models"

In [None]:
num_of_epochs_for_initial_training = 20
num_of_epochs_for_continued_training = 100
batch_size = 16
use_multiprocessing = False
initial_learning_rate = 1e-5
global_learning_rate = 1e-5
train_chesspiece_model(num_of_epochs_for_initial_training, batch_size, use_multiprocessing, initial_learning_rate)
continue_training(num_of_epochs_for_continued_training, batch_size, use_multiprocessing, global_learning_rate)

Initial training begins here...
Found 12885 images belonging to 13 classes.
Found 3214 images belonging to 13 classes.
{'B': 0, 'K': 1, 'N': 2, 'P': 3, 'Q': 4, 'R': 5, '_': 6, 'b': 7, 'k': 8, 'n': 9, 'p': 10, 'q': 11, 'r': 12}
Epoch 1/20

Epoch 1: val_accuracy improved from -inf to 0.84844, saving model to ./models/SqueezeNet1p1.h5
805/805 - 71s - loss: 1.1845 - accuracy: 0.8147 - val_loss: 0.6679 - val_accuracy: 0.8484 - lr: 1.0000e-05 - 71s/epoch - 88ms/step
Epoch 2/20

Epoch 2: val_accuracy improved from 0.84844 to 0.87844, saving model to ./models/SqueezeNet1p1.h5
805/805 - 55s - loss: 0.5180 - accuracy: 0.8715 - val_loss: 0.4558 - val_accuracy: 0.8784 - lr: 1.0000e-05 - 55s/epoch - 68ms/step
Epoch 3/20

Epoch 3: val_accuracy improved from 0.87844 to 0.90125, saving model to ./models/SqueezeNet1p1.h5
805/805 - 50s - loss: 0.3613 - accuracy: 0.8955 - val_loss: 0.3440 - val_accuracy: 0.9013 - lr: 1.0000e-05 - 50s/epoch - 62ms/step
Epoch 4/20

Epoch 4: val_accuracy improved from 0.901

# Step 9: convert the model from keras to onnx

In [None]:
!pip install tf2onnx

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tf2onnx
  Downloading tf2onnx-1.14.0-py3-none-any.whl (451 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m451.2/451.2 kB[0m [31m9.9 MB/s[0m eta [36m0:00:00[0m
Collecting onnx>=1.4.1
  Downloading onnx-1.13.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.5/13.5 MB[0m [31m80.7 MB/s[0m eta [36m0:00:00[0m
Collecting flatbuffers<3.0,>=1.12
  Downloading flatbuffers-2.0.7-py2.py3-none-any.whl (26 kB)
Installing collected packages: flatbuffers, onnx, tf2onnx
  Attempting uninstall: flatbuffers
    Found existing installation: flatbuffers 23.3.3
    Uninstalling flatbuffers-23.3.3:
      Successfully uninstalled flatbuffers-23.3.3
Successfully installed flatbuffers-2.0.7 onnx-1.13.1 tf2onnx-1.14.0


In [None]:
import tf2onnx

MODELS_PATH = "./models/"
KERAS_MODEL_NAME = "SqueezeNet1p1_all_last.h5"
ONNX_MODEL_NAME = KERAS_MODEL_NAME[:-2] + "onnx"

image_size = 227
# image_size = 299
channels = 3
batch_size = 1
target_opset = 13

# Convert the .h5 keras model to .pb
keras_model = tf.keras.models.load_model(MODELS_PATH + KERAS_MODEL_NAME)
keras_model.save(MODELS_PATH + "saved_model")

# Convert to ONNX
spec = (tf.TensorSpec((batch_size, image_size, image_size, channels), tf.float32, name="input"),)
model_proto, _ = tf2onnx.convert.from_keras(keras_model, 
                                            input_signature=spec, 
                                            opset=target_opset, 
                                            output_path=MODELS_PATH + ONNX_MODEL_NAME)

