In [50]:
import sys
import os

# Workaround to make packages work in both Jupyter notebook and Python
MODULE_ROOT_NAME = "AgeEstimator"
MODULE_PATHS = [
    os.path.abspath(os.path.join('..')),
    os.path.abspath(os.path.join('../..')),
    os.path.abspath(os.path.join('../../..'))
]
MODULE_PATHS = list(
    filter(lambda x: x.endswith(MODULE_ROOT_NAME), MODULE_PATHS))
MODULE_PATH = MODULE_PATHS[0] if len(MODULE_PATHS) == 1 else ""
if MODULE_PATH not in sys.path:
    sys.path.append(MODULE_PATH)
    
from server.data.dataset import DataLoader
from server.models.cnn.model import IMAGE_SIZE, INPUT_SHAPE, CURR_DIR

In [63]:
import numpy as np
import cv2
import matplotlib.pyplot as plt
from mtcnn.mtcnn import MTCNN
from keras_vggface.utils import preprocess_input
from keras_vggface.vggface import VGGFace
from multiprocessing import cpu_count

In [64]:
# Shared model to create bottleneck features

model = VGGFace(model="resnet50", include_top=False,
                   input_shape=INPUT_SHAPE)

workers = cpu_count() - 1

In [66]:
def extract_face(filename, input_image=None, img_size=IMAGE_SIZE):
    r"""Extract a single face from a given photograph"""

    img = input_image if input_image else plt.imread(filename)
    
    # Create the detector, using default weights
    detector = MTCNN()
    detection = detector.detect_faces(img)

    # Extract the bounding box from the first face
    x1, y1, width, height = detection[0]['box']
    x2, y2 = x1 + width, y1 + height
    face = img[y1:y2, x1:x2]
    
    # Resize pixels to the model size
    image = Image.fromarray(face)
    image = image.resize(img_size)
    face_array = np.asarray(image)
    return face_array.astype('float32')

In [55]:
def preprocess_image(img):
    print("Processing image...")
    img = extract_face(input_image=img)
    img = expand_dims(img, axis=0)
    
    print("Preprocessing input...")
    processed = preprocess_input(img, version=2)
    
    print("Creating bottleneck features...")
    y_hat = model.predict(processed)
    
    print("Done.")
    return y_hat

In [65]:
def preprocess_dataset():
    dl = DataLoader()
    
    x_train, y_train = dl.load_train()
    x_test, y_test   = dl.load_test()
    
    print("Processing training images...")
    x_train_labels   = []
    x_train_features = []
    for i, fname in enumerate(x_train):
        try:
            feature = extract_face(fname)
        except:
            continue
        x_train_labels.append(y_train[i])
        x_train_features.append(feature)
        
    print("Processing test images...")
    x_test_labels   = []
    x_test_features = []
    for i, fname in enumerate(x_test):
        try:
            feature = extract_face(fname)
        except:
            continue
        x_test_labels.append(y_test[i])
        x_test_features.append(feature)
    
    print("Preprocessing inputs...")
    x_train_processed = preprocess_input(x_train_features, version=2)
    x_test_processed  = preprocess_input(x_test_features, version=2)
    
    print("Creating bottleneck features...")
    p_train = model.predict(x_train_processed, workers=workers, use_multiprocessing=True)
    p_test  = model.predict(x_test_processed, workers=workers, use_multiprocessing=True)
    
    print("Saving features...")
    np.save(os.path.join(CURR_DIR, "features-train"), p_train)
    np.save(os.path.join(CURR_DIR, "features-test"), p_test)
    np.save(os.path.join(CURR_DIR, "labels-train"), x_train_labels)
    np.save(os.path.join(CURR_DIR, "labels-test"), x_test_labels)
    
    print("Done.")
    return p_train, p_test
    

In [None]:
y_train, y_test = preprocess_dataset()

Processing training images...
