# Face Recognition(Classification) using Keras

Detect and classify using Keras

## Training and Testing

In [None]:
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Model, Sequential
from keras.layers import Conv2D, MaxPooling2D, BatchNormalization, GlobalAveragePooling2D, InputLayer, Activation, Dropout, Flatten, Dense
from keras.optimizers import RMSprop, SGD
from keras import backend as K
import keras

import time
import matplotlib.pyplot as plt
print(keras.__version__)

### 1. Prepare Data

download datasets from https://www.kaggle.com/dansbecker/5-celebrity-faces-dataset and save them right directories

In [None]:
img_width, img_height = 200, 200
train_data_dir = '../datasets/celebrity/train'
validation_data_dir = '../datasets/celebrity/val'
batch_size = 16

In [None]:
# dataset
# this is the augmentation configuration we will use for training
train_datagen = ImageDataGenerator(
    rescale=1. / 255,
    rotation_range=10,  # randomly rotate images in the range (degrees, 0 to 180)
    zoom_range = 0.1, # Randomly zoom image 
    width_shift_range=0.1,  # randomly shift images horizontally (fraction of total width)
    height_shift_range=0.1,  # randomly shift images vertically (fraction of total height)
    #shear_range=0.2,
    vertical_flip=False,
    horizontal_flip=True)

# this is the augmentation configuration we will use for testing:
# only rescaling
test_datagen = ImageDataGenerator(rescale=1. / 255)

train_generator = train_datagen.flow_from_directory(
    train_data_dir,
    target_size=(img_width, img_height),
    batch_size=batch_size,
    class_mode='categorical')

validation_generator = test_datagen.flow_from_directory(
    validation_data_dir,
    target_size=(img_width, img_height),
    batch_size=batch_size,
    class_mode='categorical')

In [None]:
nb_train_samples = 93
nb_validation_samples = 25
epochs = 20
numclasses = 5

### 2. Model


In [None]:
if K.image_data_format() == 'channels_first':
    input_shape = (3, img_width, img_height)
else:
    input_shape = (img_width, img_height, 3)

In [None]:
def vgg16CNNtl(input_shape, outclass, sigma='sigmoid'):
    
    base_model = None
    base_model = keras.applications.VGG16(weights='imagenet', include_top=False, input_shape=input_shape)
    top_model = Sequential()
    top_model.add(Flatten(input_shape=base_model.output_shape[1:]))
    for i in range(2):
        top_model.add(Dense(4096, activation='relu'))
        top_model.add(Dropout(0.5))
    top_model.add(Dense(outclass, activation=sigma))

    model = None
    model = Model(inputs=base_model.input, outputs=top_model(base_model.output))
    
    return model
 
def resnet50tl(input_shape, outclass, sigma='sigmoid'):
    
    base_model = None
    base_model = keras.applications.resnet50.ResNet50(weights='imagenet', include_top=False, input_shape=input_shape)
    top_model = Sequential()
    top_model.add(Flatten(input_shape=base_model.output_shape[1:]))
    for i in range(2):
        top_model.add(Dense(4096, activation='relu'))
        top_model.add(Dropout(0.5))
    top_model.add(Dense(outclass, activation=sigma))

    model = None
    model = Model(inputs=base_model.input, outputs=top_model(base_model.output))
    
    return model

In [None]:
K.clear_session() # Clear previous models from memory.

model = resnet50tl(input_shape, numclasses, 'softmax')
lr = 1e-5
decay = 1e-7 #0.0
optimizer = RMSprop(lr=lr, decay=decay)
model.compile(loss='categorical_crossentropy',
              optimizer=optimizer,
              metrics=['accuracy'])

### 3. Train

In [None]:
epochs = 50
batch_size = 16

start_time = time.time()
history = model.fit_generator(
    train_generator,
    steps_per_epoch=nb_train_samples // batch_size,
    epochs=epochs,
    validation_data=validation_generator,
    validation_steps=nb_validation_samples // batch_size)

print("processed time: {:.2f} sec".format(time.time() - start_time))

### 4. Evaluation

Plot loss/val loss and loss/acc

In [None]:
# Get training and test loss histories
training_loss = history.history['loss']
training_acc = history.history['acc']

# Create count of the number of epochs
epoch_count = range(1, len(training_loss) + 1)

fig=plt.figure(figsize=(12, 4))
# Visualize loss history
fig.add_subplot(121)
plt.plot(epoch_count, training_loss, 'r--')
plt.plot(epoch_count, training_acc, 'b-')
plt.legend(['Training Loss', 'Training Accuracy'])
plt.xlabel('Epoch')
plt.ylabel('Loss/Acc')

# Get training and test loss histories
val_acc = history.history['val_acc']
training_acc = history.history['acc']

# Create count of the number of epochs
epoch_count = range(1, len(val_acc) + 1)

# Visualize loss history
fig.add_subplot(122)
plt.plot(epoch_count, val_acc, 'r--')
plt.plot(epoch_count, training_acc, 'b-')
plt.legend(['Validation Accuracy', 'Training Accuracy'])
plt.xlabel('Epoch')
plt.ylabel('Loss')

plt.show();
score = model.evaluate_generator(validation_generator)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

### 5. Save and load trained model

In [None]:
from keras.models import load_model
savemodel = '../models/celebriytag_model.h5'
saveweight =  '../models/celebriytag_weight.h5'

In [None]:
# load model
model = load_model(savemodel)

### 6. Testing model

In [None]:
from keras.preprocessing import image
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
from io import BytesIO
import cv2
import requests
import os
import http
import json

In [None]:
labels = ['ben_afflek',  'elton_john',  'jerry_seinfeld',  'madonna',  'mindy_kaling']
validation_data_dir = '../datasets/celebrity/val'
test_imgs = ['ben_afflek/httpabsolumentgratuitfreefrimagesbenaffleckjpg.jpg', 'madonna/httpcdnfuncheapcomwpcontentuploadsVOGUEjpg.jpg']

In [None]:
for test in test_imgs:
    start_time = time.time()
    test_img = os.path.join(validation_data_dir, test)
    img = image.load_img(test_img, target_size=(img_width, img_height))
    x = image.img_to_array(img)
    x = np.expand_dims(x, axis=0)
    print(x)
    print(type(x), x.shape)
    x /= 255.
    classes = model.predict(x)
    result = np.squeeze(classes)
    result_indices = np.argmax(result)
    
    img = cv2.imread(test_img, cv2.IMREAD_COLOR)
    img = cv2.resize(img, (img_width, img_height), interpolation = cv2.INTER_AREA)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    pilimg = Image.fromarray(img)
    display(pilimg)
    print("{}, {:.2f}%".format(labels[result_indices], result[result_indices]*100))
    print("processed time: {:.2f} sec".format(time.time() - start_time))

In [None]:
import appface
import appconfig

test_urls =["http://www.cheatsheet.com/wp-content/uploads/2017/06/ben-affleck-jennifer-lopez.jpg",
            "http://energy106.ca/wp-content/uploads/2017/09/160825140941-madonna-super-tease.jpg", 
            "http://cdn01.cdn.justjared.com/wp-content/uploads/headlines/2017/09/madonna-people.jpg"]

for test_url in test_urls:
    r = requests.get(test_url)
    img = Image.open(BytesIO(r.content))
    img = np.array(img)
    
    rects, dt = appface.detectFaceCV(img)
    print("img detect time: {:.2f}".format(dt))

    faces = []
    i = 0
    for rect in rects:
        ps_time = time.time()

        (x, y, w, h) = rect
        roi_face = img[y:y+h, x:x+w]
        roi_face = cv2.cvtColor(roi_face, cv2.COLOR_BGR2RGB)
        i, conf = appface.classifyFace(model, roi_face)
        
        tag = appconfig.labels[i]

        # be aware that convert numpy.int32 to int for json serialization
        drect = { 'x': int(x), 'y': int(y), 'w': int(w), 'h': int(h) }
        face = { 'tag': tag, 'confidence': conf, 'rect': drect }
        faces.append(face)

        print("{}:{:.2f}, recognition time:  {:.2f} sec".format(tag, conf, time.time() - ps_time))
        
        cv2.rectangle(img, (x, y), (x+w, y+h), (255, 0, 0), 2)
        cv2.putText(img, "{}:{:.2f}".format(tag, conf), (x, y), 2, 1, (255,0,0), 1)
        
    pilimg = Image.fromarray(img)
    display(pilimg)

## Compare with Customvision.ai

You need to create a new project and train your model first.

Then update project `api_id`, `api_iter` and `api_key` in `appconfig.py` file

In [None]:
import appconfig
#print(appconfig.api_id, appconfig.api_key, appconfig.api_iter)

In [None]:
# http POST request with binary data - Azure custom.ai prediction api

for test in test_imgs:
    start_time = time.time()
    test_img = os.path.join(validation_data_dir, test)
    img = cv2.imread(test_img, cv2.IMREAD_COLOR)
    img = cv2.resize(img, (img_width, img_height), interpolation = cv2.INTER_AREA)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    pilimg = Image.fromarray(img)
    display(pilimg)

    apiurl = 'https://southcentralus.api.cognitive.microsoft.com/customvision/v2.0/Prediction/%s/image?iterationId=%s'
    headers = {"Content-Type": "application/octet-stream", "Prediction-Key": appconfig.api_key }

    with open(test_img, 'rb') as roi:
        r = requests.post(apiurl % (appconfig.api_id, appconfig.api_iter), headers=headers, data=roi)

        if (r.status_code == 200):

            # JSON parse
            pred = json.loads(r.content.decode("utf-8"))

            conf = float(pred['predictions'][0]['probability'])
            label = pred['predictions'][0]['tagName']

            print("{}, {:.2f}%".format(label, conf*100))
            print("processed time: {:.2f} sec".format(time.time() - start_time))
        else:
            print("%d error: %s" % (r.status_code, r.text))

## Continous learning

### 1. Upload model file

In [None]:
savemodel = '../models/facetag_model.h5'
#print(appconfig.blobacct, appconfig.blobkey)

call(["blobxfer", "upload", "--storage-account", appconfig.blobacct, "--storage-account-key", appconfig.blobkey, 
      "--remote-path", remotepath, "--local-path", savemodel, "--skip-on-lmt-ge"])

### 2. Retraining with new dataset

Get a new face dataset

#### Trigger new dataset 

In [None]:
facetag_url = "dsvm.iljoong.xyz:8080"

In [None]:
from subprocess import call
import appconfig

In [None]:
timesince = "2018-07-23 00:00:01"

In [None]:
# trigger new dataset
headers = {"Content-Type": "application/json" }
jobapi = "http://%s/api/admin/job" % facetag_url

r = requests.post(jobapi, headers=headers, data=json.dumps({"timesince": timesince}))
if (r.status_code == 201):
    j = r.json()
    blobpath = j['blobpath']
    remotepath = blobpath.replace("https://%s.blob.core.windows.net/" % appconfig.blobacct, "")
    print(remotepath)
else:
    print('error')

#### download dataset

__note__: download after blob dataset is created

In [None]:
localpath='../../datasets'
#print(appconfig.blobacct, appconfig.blobkey)

call(["blobxfer", "download", "--storage-account", appconfig.blobacct, "--storage-account-key", appconfig.blobkey, 
      "--remote-path", remotepath, "--local-path", localpath, 
      "--skip-on-lmt-ge", "--strip-components", "100"])

### Re-train

1. re-organize the dataset after downloading a new zipfile from blob storage
2. re-train model for improving face recoginition
3. upload new model file to blob storage


#### restart application

In [None]:
# trigger new dataset
headers = {"Content-Type": "application/json" }
jobapi = "http://%s/api/admin/model" % facetag_url

r = requests.put(jobapi, headers=headers)
if (r.status_code == 200):
    print(r.text)
else:
    print('error')