# Image Classification Train Model

In [None]:
#---- Install stuff -------
!pip install --upgrade pip

!pip install tensorflow
!pip install numpy==1.19.5

!pip install Pillow
!pip install playsound
!pip install gTTS

!pip install matplotlib
!pip install pandas
!pip install seaborn

!pip install PyYAML

In [None]:
import pathlib

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns

import os
import PIL
import zipfile

from PIL import Image

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras import layers
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten, Conv2D, MaxPooling2D, BatchNormalization, GlobalAveragePooling2D
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping

In [None]:
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.applications.resnet_v2 import ResNet50V2

In [None]:
%matplotlib inline

import warnings
warnings.filterwarnings('ignore')

In [None]:
is_google_colab = True
is_azure_ml = False

In [None]:
print('tf version: ',tf.__version__)
print('keras version: ',keras.__version__)

In [None]:
#check for GPU
print('tf gpu: ',tf.test.is_gpu_available())
print('tf gpu: ',tf.config.list_physical_devices('GPU'))
for x in tf.config.list_physical_devices():
    print('device: ',x)

# Read Cfg

In [None]:
!pwd
!ls

In [None]:
#cfg_file = 'flowers-recognition.yml'
cfg_file = 'work_pose.yml'
#cfg_file = 'home_presence.yml'

#read cfg
import yaml

with open(cfg_file, "r") as ymlfile:
    cfg = yaml.load(ymlfile) #, Loader=yaml.CLoader

In [None]:
project_name = cfg["project_name"]
print('project_name: ',project_name)
print('-'*20)

project_parent_dir = cfg["project_parent_dir"]
project_dir = project_parent_dir + project_name + "/"
print('project_dir: ',project_dir)

img_height = cfg['input_img_height']
img_width = cfg['input_img_width']
print('img_height: ',img_height)
print('img_width: ',img_width)

print('train_freeze_base_layer: ',cfg['train_freeze_base_layer'])
print('train_freeze_skip_last_layers: ',cfg['train_freeze_skip_last_layers'])
print('train_augumentation: ',cfg['train_augumentation'])

In [None]:
if os.path.isdir(project_parent_dir)==False:
    os.mkdir(project_parent_dir)
if os.path.isdir(project_dir)==False:
    os.mkdir(project_dir)

In [None]:
## If you are using the data by mounting the google drive, use the following :
if is_google_colab:
    from google.colab import drive
    drive.mount('/content/gdrive')
##Ref:https://towardsdatascience.com/downloading-datasets-into-google-drive-via-google-colab-bcb1b30b0166

In [None]:
working_dir_str = project_dir

In [None]:
!ls $working_dir_str

In [None]:
from os import listdir
from os.path import isdir, join

def get_dir(path_loc, only_dir=True):
    result = []
    for f in listdir(path_loc):
        if only_dir:
            if isdir(join(path_loc, f)):
                result.append(f)
        else:
            result.append(f)
    return result

In [None]:
#path_to_zip_file = '/content/gdrive/MyDrive/ColabNotebooks/flowers-recognition.zip'
path_to_zip_file = '/content/gdrive/MyDrive/ColabNotebooks/work_pose.zip'
!ls $path_to_zip_file

In [None]:
#already_extracted = False
already_extracted = False if len(get_dir(working_dir_str))==0 else True
print('already_extracted: ',already_extracted)
if already_extracted==False:  
  with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref:
      zip_ref.extractall(working_dir_str)

In [None]:
#working_dir_str = working_dir_str + 'flowers/'

In [None]:
!ls $working_dir_str

In [None]:
!rm -r $working_dir_str'__MACOSX'

In [None]:
from gtts import gTTS
import IPython
from IPython.core.display import display

def text2audio(mytext):
    print('text2audio: ',mytext)
    myobj = gTTS(text=mytext, lang='en', slow=False)
    myobj.save("./tts.mp3")    
    display(IPython.display.Audio("./tts.mp3", autoplay=True))

In [None]:
np.random.seed(30)
import random as rn
rn.seed(30)
tf.random.set_seed(30)

# Create Data Set

In [None]:
batch_size = 32

color_mode="rgb"
#color_mode="grayscale"

In [None]:
print('working_dir_str: ', working_dir_str)
!ls $working_dir_str

In [None]:
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    working_dir_str, 
    label_mode='categorical',
    batch_size=batch_size, image_size=(img_height, img_width), 
    shuffle=True, 
    seed=123, validation_split=0.2, subset='training', color_mode=color_mode
)

In [None]:
val_ds = tf.keras.preprocessing.image_dataset_from_directory(
    working_dir_str, 
    label_mode='categorical',
    batch_size=batch_size, image_size=(img_height, img_width), 
    shuffle=True, 
    seed=123, validation_split=0.2, subset='validation', color_mode=color_mode
)

In [None]:
class_names = train_ds.class_names
print(class_names)

In [None]:
num_classes = len(train_ds.class_names)
num_classes

### Visualize the data

In [None]:
#fix this
plt.figure(figsize=(10, 10))
for images, labels in train_ds.take(1):
  #print(labels)
  for i in range(num_classes):
    ax = plt.subplot(3, 3, i + 1)
    if color_mode=="grayscale":      
      plt.imshow(images[i].numpy().astype("uint8")[:, :, 0], cmap='gray') #
    else:
      plt.imshow(images[i].numpy().astype("uint8"))
    #plt.title(class_names[labels[i]])
    score = tf.nn.softmax(labels[i])
    plt.title( class_names[np.argmax(score)] )

    #plt.title(class_names[get_class_label_index_from_categorical(labels[i])])
    plt.axis("off")

In [None]:
for image_batch, labels_batch in train_ds:
  print(image_batch.shape)
  print(labels_batch.shape)
  break

In [None]:
AUTOTUNE = tf.data.experimental.AUTOTUNE

train_ds = train_ds.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)

### Data Augumentation

In [None]:
print('img_height:',img_height)
print('img_width:',img_width)

In [None]:
#https://towardsdatascience.com/writing-a-custom-data-augmentation-layer-in-keras-2b53e048a98
class RandomColorDistortion(tf.keras.layers.Layer):
    contrast_range=[-1.0, 1.0]
    brightness_delta=[-50, 50]

    def __init__(self, **kwargs):
        super(RandomColorDistortion, self).__init__(**kwargs)

    def update_cfg(self, cfg_to_use):
        contrast_range = cfg_to_use['train_augumentation']['contrast_range']
        brightness_delta = cfg_to_use['train_augumentation']['brightness_delta']

    def call(self, images, training=True):
        if not training:
            return images

        contrast = np.random.uniform(self.contrast_range[0], self.contrast_range[1])
        brightness = np.random.uniform(self.brightness_delta[0], self.brightness_delta[1])

        #print('brightness: ',brightness, ', contrast: ',contrast)

        #images = tf.image.adjust_contrast(images, contrast)
        images = tf.image.adjust_brightness(images, brightness)
        images = tf.clip_by_value(images, 0, 255)
        return images

randomColorDistortion = RandomColorDistortion()
randomColorDistortion.update_cfg(cfg)

data_augmentation = Sequential(
    [
      layers.experimental.preprocessing.RandomRotation(
        tuple(cfg['train_augumentation']['random_rotation']),
        #(-0.03,0.03), #3% random rotation
        input_shape=(img_height, img_width, 1 if color_mode=="grayscale" else 3)), 
        layers.experimental.preprocessing.RandomZoom(
            tuple(cfg['train_augumentation']['random_zoom']) #(-0.05,0)  #5% random zoom-in
            ),
      randomColorDistortion,
    ])

In [None]:
# visualize how your augmentation strategy works for one instance of training image.
plt.figure(figsize=(10, 10))
for images, _ in train_ds.take(1):
  img_index = 1
  for i in range(9):
    augmented_images = data_augmentation(images, training=True)
    ax = plt.subplot(3, 3, i + 1)
    if color_mode=="grayscale":      
      plt.imshow(augmented_images[img_index].numpy().astype("uint8")[:, :, 0], cmap='gray') #
    else:
      plt.imshow(augmented_images[img_index].numpy().astype("uint8"))

    plt.axis("off")

# Model

In [None]:
filepath = cfg['model_file']
print('filepath:',filepath)
print('train_freeze_base_layer: ', cfg['train_freeze_base_layer'])
print('train_freeze_skip_last_layers: ', cfg['train_freeze_skip_last_layers'])

In [None]:
# Callbacks

checkpoint = ModelCheckpoint(filepath, monitor='val_loss', verbose=1, save_best_only=True, save_weights_only=False, mode='auto', save_freq="epoch")

LR = ReduceLROnPlateau(monitor='val_loss', factor=0.75, patience=4, cooldown=1) # write the REducelronplateau code here

ES = EarlyStopping(monitor='val_loss', patience=15, verbose=1, mode="auto")

callbacks_list = [checkpoint, LR, ES]

In [None]:
#model_to_try = 1 #base cnn
model_to_try = 2 #transfer learning cnn

print('model_to_try: ', model_to_try)

In [None]:
if model_to_try==1:
  #model - bare cnn
  cnn_model = Sequential([
    data_augmentation, 
    #tf.keras.layers.experimental.preprocessing.Rescaling(1./255),
    tf.keras.layers.experimental.preprocessing.Normalization(), 

    Conv2D(16, (3, 3), padding='same', activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    BatchNormalization(),

    Conv2D(32, (3, 3), padding='same', activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    BatchNormalization(),

    Conv2D(64, (3, 3), padding='same', activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    BatchNormalization(),

    Conv2D(128, (3, 3), padding='same', activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    BatchNormalization(),

    Flatten(),

    Dense(64, activation='relu'),
    Dropout(0.50),
    BatchNormalization(),

    Dense(64, activation='relu'),
    Dropout(0.50),
    BatchNormalization(),

    Dense(num_classes, activation='softmax')])

In [None]:
#model - transfer learning
if model_to_try==2:
  conv_base = ResNet50V2(
      include_top=False,
      weights='imagenet',
      input_shape=(img_height, img_width, 1 if color_mode=="grayscale" else 3))

  if cfg['train_freeze_base_layer']:
    # freeze all the weights of the model except the last 4 layers
    for layer in conv_base.layers[:cfg['train_freeze_skip_last_layers']*-1]:
        layer.trainable = False

  cnn_model = Sequential([
      data_augmentation, 
      #tf.keras.layers.experimental.preprocessing.Rescaling(1./255),
      tf.keras.layers.experimental.preprocessing.Normalization(), 
      conv_base, 

      #MaxPooling2D(pool_size=(2, 2)),
      GlobalAveragePooling2D(),
      Flatten(),
      BatchNormalization(),

      #--- s --
      Dense(256, activation='relu'),
      Dropout(0.50),#40
      BatchNormalization(),

      Dense(128, activation='relu'),
      Dropout(0.50),#40
      BatchNormalization(),

      Dense(64, activation='relu'),
      Dropout(0.50),#40
      BatchNormalization(),
      #--- s --

      #--- l --
      #Dense(512, activation='relu'),
      #Dropout(0.60),#40
      #BatchNormalization(),

      #Dense(256, activation='relu'),
      #Dropout(0.60),
      #BatchNormalization(),

      #Dense(128, activation='relu'),
      #Dropout(0.60),
      #BatchNormalization(),

      #Dense(64, activation='relu'),
      #Dropout(0.10),
      #BatchNormalization(),
      #--- l --

      Dense(num_classes, activation='softmax')
    ])

In [None]:
lr = 0.0001
optimiser = keras.optimizers.Adam(learning_rate=lr)
cnn_model.compile(optimizer=optimiser, loss='categorical_crossentropy', metrics=['accuracy'])
print (cnn_model.summary())

In [None]:
### Train the model
epochs = 50
history = cnn_model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=epochs,
    callbacks=callbacks_list,
    initial_epoch = 0
)

### Visualizing training results

In [None]:
# Visualizing training results
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

loss = history.history['loss']
val_loss = history.history['val_loss']

#epochs_range = range(num_epochs)
epochs_range = range(len(val_acc))

plt.figure(figsize=(8, 8))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()

In [None]:
text2audio('We got validation accuracy of '+str(round(history.history['val_accuracy'][-1]*100,2)))

In [None]:
#save json model file also
from keras.models import model_from_json

model_json = cnn_model.to_json()
#print('model_json: ',model_json)

with open(project_name+".json", "w") as json_file:
    json_file.write(model_json)

In [None]:
#copy model file
if is_google_colab:
  !cp $filepath '/content/gdrive/MyDrive/ColabNotebooks/'

In [None]:
#todo add code to see what failed most
#run through val set and log case wise no and accuracy and log image of good and bad