<a href="https://colab.research.google.com/github/amilkh/cs230-fer/blob/transfer-learning/fer2013.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#tensorflow_version 1.x
#!pip install keras-vggface
#!conda install scikit-image
#!conda install pydot

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.python.lib.io import file_io

%matplotlib inline

import keras
from keras import backend as K
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.models import load_model
from keras.preprocessing.image import ImageDataGenerator
from keras_vggface.vggface import VGGFace
from keras.utils import plot_model
from sklearn.metrics import *
from keras.engine import Model
from keras.layers import Input, Flatten, Dense, Activation, Conv2D, MaxPool2D, BatchNormalization, Dropout, MaxPooling2D
import skimage
from skimage.transform import rescale, resize

import pydot


try:
    pydot.Dot.create(pydot.Dot())
except:
    print('pydot error')

In [None]:
print(tf.__version__)
print(keras.__version__)

In [None]:
data = pd.read_csv('fer2013/icml_face_data.csv', sep=r'\s*,\s*', header=0, encoding='ascii', engine='python')
data.head()

data_train = data[data['Usage'] == 'Training']
#print('Number samples in the training dataset: ', data_train.shape[0])

data_dev = data[data['Usage'] == 'PublicTest']
#print('Number samples in the development dataset: ', data_dev.shape[0])
print(data_dev.head())

data_test = data[data['Usage'] == 'PrivateTest']
#print('Number samples in the development dataset: ', data_dev.shape[0])
print(data_test.head())

data_train.to_csv('fer2013/train.csv')
data_dev.to_csv('fer2013/dev.csv')
data_test.to_csv('fer2013/test.csv')

print(data_train.shape)
print(data_dev.shape)
print(data_test.shape)

In [None]:
Resize_pixelsize = 197
vgg_notop = VGGFace(model='resnet50', include_top=False, input_shape=(Resize_pixelsize, Resize_pixelsize, 3), pooling='avg')
last_layer = vgg_notop.get_layer('avg_pool').output
x = Flatten(name='flatten')(last_layer)
x = Dense(4096, activation='relu', name='fc6')(x)
x = Dense(1024, activation='relu', name='fc7')(x)
#print("Emotions count", len(EMOTIONS))
l=0
for layer in vgg_notop.layers:
    print(layer,"["+str(l)+"]")
    l=l+1
    
batch_norm_indices = [2, 6, 9, 13, 14, 18, 21, 24, 28, 31, 34, 38, 41, 45, 46, 53, 56, 60, 63, 66, 70, 73, 76, 80, 83, 87, 88, 92, 95, 98]
for i in range(101):
    if i not in batch_norm_indices:
        vgg_notop.layers[i].trainable = False

print('vgg layer 2 is trainable: ' + str(vgg_notop.layers[2].trainable))
print('vgg layer 3 is trainable: ' + str(vgg_notop.layers[3].trainable))

out = Dense(7, activation='softmax', name='classifier')(x)

custom_resnet = Model(vgg_notop.input, out)


optim = keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
#optim = keras.optimizers.Adam(lr=0.0005, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
sgd = keras.optimizers.SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)

custom_resnet.compile(optimizer=sgd, loss='categorical_crossentropy', metrics=['accuracy'])
# plot_model(custom_resnet, to_file='model2.png', show_shapes=True)

In [None]:
# Function that reads the data from the csv file, increases the size of the images and returns the images and their labels
    # dataset: Data path
def get_data(dataset, pixelsize = Resize_pixelsize):
    
    file_stream = file_io.FileIO(dataset, mode='r')
    data = pd.read_csv(file_stream)

    #data = pd.read_csv('fer2013/fer2013.csv')
    data['pixels'] = data['pixels'].apply(lambda x: [int(pixel) for pixel in x.split()])

    # Retrieve train input and target
    X, Y = data['pixels'].tolist(), data['emotion'].values
    #print(len(X))
    #print(X[0])
    # Reshape images to 4D (num_samples, width, height, num_channels)
    X = np.array(X, dtype='float32').reshape(-1,48,48,1)
    # Normalize images with max (the maximum pixel intensity is 255)
    X = X/255.0
    #print(X.shape)
    #print(X[0])
    #image_resized = resize(image, (image.shape[0] // 4, image.shape[1] // 4), anti_aliasing=True)

    X_res = np.zeros((X.shape[0], pixelsize,pixelsize,3))
    for ind in range(X.shape[0]):  #X_dev.shape[0]
        sample = X[ind]
        sample = sample.reshape(48, 48)
        #plt.imshow(sample, cmap='gray')
        #plt.show()
        image_resized = resize(sample, (pixelsize, pixelsize), anti_aliasing=True)
        X_res[ind,:,:,:] = image_resized.reshape(pixelsize,pixelsize,1)
    #         np.save('X_res.npy', X_res)

    Y_res = np.zeros((Y.size, Y.max()+1))
    Y_res[np.arange(Y.size),Y] = 1
    
    
    return  X_res, Y_res

In [None]:
EPOCHS = 20
BS = 32
training_dataset_dir = 'fer2013/train.csv'
dev_dataset_dir = 'fer2013/dev.csv'
# Data preparation
X_train_res, Y_train_res  = get_data(training_dataset_dir)
X_dev_res, Y_dev_res  = get_data(dev_dataset_dir)

# Generate batches of tensor image data with real-time data augmentation. The data will be looped over (in batches) indefinitely
# rescale:          Rescaling factor (defaults to None). Multiply the data by the value provided (before applying any other transformation)
# rotation_range:   Int. Degree range for random rotations
# shear_range:      Float. Shear Intensity (Shear angle in counter-clockwise direction as radians)
# zoom_range:       Float or [lower, upper]. Range for random zoom. If a float, [lower, upper] = [1-zoom_range, 1+zoom_range]
# fill_mode :       Points outside the boundaries of the input are filled according to the given mode: {"constant", "nearest", "reflect" or "wrap"}
# horizontal_flip:  Boolean. Randomly flip inputs horizontally
train_datagen = ImageDataGenerator(
    rotation_range  = 10,
    shear_range     = 10, # 10 degrees
    zoom_range      = 0.1,
    fill_mode       = 'reflect',
    horizontal_flip = True)

# Takes numpy data & label arrays, and generates batches of augmented/normalized data. Yields batcfillhes indefinitely, in an infinite loop
    # x:            Data. Should have rank 4. In case of grayscale data, the channels axis should have value 1, and in case of RGB data, 
    #               it should have value 3
    # y:            Labels
    # batch_size:   Int (default: 32)
train_generator = train_datagen.flow(X_train_res, Y_train_res,  batch_size  = BS)

In [None]:
#print ("X_train shape: " + str(X_train.shape))
#print ("Y_train shape: " + str(Y_train.shape))
#print ("X_dev shape: " + str(X_dev.shape))
#print ("Y_dev shape: " + str(Y_dev.shape))

print ("X_train_res shape: " + str(X_train_res.shape))
print("Y_train_res shape: " + str(Y_train_res.shape))
print ("X_dev_res shape: " + str(X_dev_res.shape))
print("Y_dev_res shape: " + str(Y_dev_res.shape))

In [None]:
i = 10
#image = X_train[i,:,:,:].reshape(48, 48)
#plt.imshow(image, cmap='gray')
#plt.show()

image_resized = X_train_res[i,:,:,:].reshape(Resize_pixelsize, Resize_pixelsize, 3)
plt.imshow(image_resized)
plt.show()
print(Y_train_res[i])

dev_resized = X_dev_res[i,:,:,:].reshape(Resize_pixelsize, Resize_pixelsize, 3)
plt.imshow(dev_resized)
plt.show()
print(Y_dev_res[i])


#Baseline model
model = tf.keras.models.Sequential([
    InputLayer(input_shape=(48,48,1),name="input"),
    Conv2D(filters=32,kernel_size=3,activation='relu',padding='same',name="conv1"),
    Dropout(0.25), BatchNormalization(),
    Conv2D(filters=32,kernel_size=3,activation='relu',padding='same',name="conv2"),
    Dropout(0.25), BatchNormalization(),
    MaxPool2D(pool_size=(2,2),name="maxpool1"),
    Conv2D(filters=64,kernel_size=3,activation='relu',padding='same',name="conv3"),
    Dropout(0.25), BatchNormalization(),
    Conv2D(filters=64,kernel_size=3,activation='relu',padding='same',name="conv4"),
    Dropout(0.25), BatchNormalization(),
    Flatten(),
    Dense(1024,input_shape=(24*24*64,1),activation='relu',name='fc1'),
    Dense(7,input_shape=(1024,1),activation='softmax',name='fc-softmax')
])

print("Accuracy after training")
model.compile(loss='sparse_categorical_crossentropy',optimizer='adam',metrics=['accuracy'])

model.fit(X_train, Y_train, batch_size=32, epochs=1, validation_data=(X_dev, Y_dev))

small_x = X_train_res[0:5000]
small_y = Y_train_res[0:5000]

history = custom_resnet.fit(
    X_train_res,
    Y_train_res,
    epochs=10,
    batch_size=64,
    shuffle=True,
    validation_data=(X_dev_res, Y_dev_res)
)

print('\nhistory dict:', history.history)

In [None]:
custom_resnet.fit_generator(
    generator           = train_generator,
    validation_data=(X_dev_res, Y_dev_res), 
    steps_per_epoch=len(X_train_res) // BS,
    shuffle=True,
    epochs=EPOCHS)  

In [None]:
custom_resnet.save('transfer_learning_DG.h5')

custom_resnet.evaluate(X_train_res, Y_train_res)

custom_resnet.evaluate(X_dev_res, Y_dev_res)