In [None]:
from tqdm import tqdm
import cv2
import os
from keras.preprocessing import image
import numpy as np
import pandas as pd
from keras.utils import to_categorical
from sklearn.utils import shuffle as sk_shuffle
from sklearn.model_selection import train_test_split
from keras.applications.mobilenet_v2 import preprocess_input

In [None]:
def get_image_value(path, dim, model_type): 
    '''This function will read an image and convert to a specified version and resize depending on which algorithm is being used.  If edge is specified as true, it will pass the img array to get_edged which returns a filtered version of the img'''
    img = image.load_img(path, target_size = dim)
    img = image.img_to_array(img)
    if model_type == 'mobilenet': 
        img = preprocess_input(img)
        return img
    return img/255

In [None]:
def get_mask_classes(class_type): 
    mask_paths = [f'../FaceMaskDataset/{class_type}/WithMask/{i}' for i in os.listdir(f'../FaceMaskDataset/{class_type}/WithMask')]
    mask_labels = [1 for i in range(len(mask_paths))]
    
    nomask_paths = [f'../FaceMaskDataset/{class_type}/WithoutMask/{i}' for i in os.listdir(f'../FaceMaskDataset/{class_type}/WithoutMask')]
    nomask_labels = [0 for i in range(len(nomask_paths))]
    
    labels = np.array(mask_labels + nomask_labels)
    print(f'{class_type.upper()} Value Counts')
    print(pd.Series(labels).value_counts())
    print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
    paths = np.array(mask_paths + nomask_paths)
    labels = to_categorical(labels)
    paths, labels = sk_shuffle(paths, labels)
    return paths, labels
def get_mask_splits(dim, model_type): 
    
    #Train Set
    train_paths, train_labels = get_mask_classes('Train')
    train_images = np.array([get_image_value(i, dim, model_type) for i in tqdm(train_paths, desc = 'Getting Train Images')])
    train_dict = dict(images = train_images, labels = train_labels)

    #Test Set
    test_paths, test_labels = get_mask_classes('Test')
    test_images = np.array([get_image_value(i, dim, model_type) for i in tqdm(test_paths, desc = 'Getting Test Images')])
    test_images, test_labels = sk_shuffle(test_images, test_labels)
    
    #Validation Set
    val_paths, val_labels = get_mask_classes('Validation')
    val_images = np.array([get_image_value(i, dim, model_type) for i in tqdm(val_paths, desc = 'Getting Validation Images')])
    val_images, val_labels = sk_shuffle(val_images, val_labels)
    
        
    return train_images, test_images, train_labels, test_labels, val_images, val_labels
    

dim = (224,224)
x_train, x_test, y_train, y_test, x_val, y_val = get_mask_splits(
    dim, model_type = 'mobilenet')

In [2]:
from keras.preprocessing.image import ImageDataGenerator
from keras.applications import MobileNetV2
from keras.layers import AveragePooling2D, Dropout, Flatten, Dense, Input, GlobalAveragePooling2D, Conv2D, MaxPooling2D
from keras import regularizers
from keras.models import Model, Sequential
from keras.optimizers import Adam
from keras.applications.mobilenet_v2 import preprocess_input
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import numpy as np
from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
import cv2
from keras.models import load_model

In [None]:
def get_mobilenet(dim):
    model = Sequential()
    optimizer = Adam(lr = .0005)
    baseModel = MobileNetV2(weights="imagenet", include_top=False,
        input_tensor=Input(shape=dim))
    
    model.add(baseModel)
    model.add(AveragePooling2D(pool_size=(7, 7)))
    model.add(Flatten(name="flatten"))
    model.add(Dense(256, activation="relu"))
    model.add(Dropout(0.6))
    model.add(Dense(2, activation="sigmoid", name = 'Output'))

    
    for layer in baseModel.layers:
        layer.trainable = False
        
    model.compile(loss = 'binary_crossentropy', optimizer = optimizer, metrics = ['accuracy'])
    return model

early_stopping = EarlyStopping(monitor='val_loss', verbose = 1, patience=5, min_delta = .00075)
model_checkpoint = ModelCheckpoint(f'ModelWeights/Mobilenet_Masks.h5', verbose = 1, save_best_only=True,
                                  monitor = 'val_loss')
lr_plat = ReduceLROnPlateau(patience = 5, mode = 'min')
epochs = 2000
batch_size = 64
    

dim = (x_train.shape[1], x_train.shape[2], x_train.shape[3])
mobilenet = get_mobilenet(dim =dim)

    
augmentation =ImageDataGenerator(rotation_range = 20, width_shift_range = .2, height_shift_range = .2, 
                                                       horizontal_flip = True, shear_range = .15, 
                                 fill_mode = 'nearest', zoom_range = .15)
augmentation.fit(x_train)
mobilenet_history = mobilenet.fit_generator(augmentation.flow(x_train, y_train, batch_size = batch_size),
            epochs = epochs, 
     callbacks = [early_stopping, model_checkpoint, lr_plat], validation_data = (x_test, y_test), verbose= 1)

In [None]:
def get_emotion_classes(class_type, max_values): 
    angry_paths = [f'../EmotionDataset/{class_type}/angry/{i}' for i in os.listdir(f'../EmotionDataset/{class_type}/angry')][:max_values]
    angry_labels = [0 for i in range(len(angry_paths))]
    
    happy_paths = [f'../EmotionDataset/{class_type}/happy/{i}' for i in os.listdir(f'../EmotionDataset/{class_type}/happy')][:max_values] 
    happy_labels = [1 for i in range(len(happy_paths))]
    
    neutral_paths = [f'../EmotionDataset/{class_type}/neutral/{i}' for i in os.listdir(f'../EmotionDataset/{class_type}/neutral')][:max_values] 
    
    neutral_labels = [2 for i in range(len(neutral_paths))]
    
    labels = np.array(angry_labels + happy_labels + neutral_labels)
    
    print(f'{class_type.upper()} Value Count')
    print(pd.Series(labels).value_counts())
    print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
    labels = to_categorical(labels)
    paths = np.array(angry_paths + happy_paths + neutral_paths)
    paths, labels = sk_shuffle(paths, labels)
    return paths, labels

#0: angry  1: happy  2: neutral  
def get_emotion_splits(dim, model_type = 'mobilenet', max_values = 6000): 
        
    train_paths, train_labels = get_emotion_classes('train', max_values)
    test_paths, test_labels = get_emotion_classes('test', max_values)
    
    train_images = np.array([get_image_value(i, dim, model_type) for i in tqdm(train_paths, desc = 'Getting Train Images')])
    test_images = np.array([get_image_value(i, dim, model_type) for i in tqdm(test_paths, desc = 'Getting Test Images')])
    
    return train_images, test_images, train_labels, test_labels 

x_train, x_test, y_train, y_test = get_emotion_splits(dim = (48,48), model_type = 'normal', max_values = 4000) 

In [None]:
def get_conv_model(dim): 
    
    model = Sequential()
    
    model.add(Conv2D(32, kernel_size = (3,3), activation = 'relu', input_shape = dim))
    model.add(Conv2D(64, kernel_size = (3,3), activation = 'relu'))
    model.add(MaxPooling2D(pool_size = (2,2)))
    model.add(Dropout(.35))
    
    model.add(Conv2D(128, kernel_size=(3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(128, kernel_size=(3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.35))

    model.add(Flatten())
    model.add(Dense(1024, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(3, activation = 'softmax', name = 'Output'))
    model.compile(optimizer = 'adam', loss = 'categorical_crossentropy', metrics = ['accuracy'])
    return model

early_stopping = EarlyStopping(monitor='val_loss', verbose = 1, patience=5, min_delta = .00075)
model_checkpoint = ModelCheckpoint(f'ModelWeights/Normal_Emotions.h5', verbose = 1, save_best_only=True,
                                  monitor = 'val_loss')
lr_plat = ReduceLROnPlateau(patience = 5, mode = 'min')
epochs = 2000
batch_size = 32
augment = False  

dim = (x_train.shape[1], x_train.shape[2], x_train.shape[3])
cnn = get_conv_model(dim =dim)

cnn_history = cnn.fit(x_train, y_train, batch_size = batch_size,
          epochs = epochs, 
   callbacks = [early_stopping, model_checkpoint, lr_plat], validation_data = (x_test, y_test), verbose= 1)

In [3]:
#live video

classifier = cv2.CascadeClassifier(f'ModelWeights/haarcascade_frontalface_default.xml')
mask_model = load_model(f'ModelWeights/Mobilenet_Masks.h5')
emotion_model = load_model(f'ModelWeights/Normal_Emotions.h5')

emotion_dim = (48,48)
mask_dim = (224,224)

mask_dict = {0: 'No Mask', 1: 'Mask'}
mask_dict_color = {0: (0,0,255), 1: (0,255,0)} #colors for bounding boxes
emotion_dict = {0: 'angry', 1: 'happy', 2: 'neutral'}

vid_frames = []
cap = cv2.VideoCapture(0)

while True: 
    ret, frame = cap.read()
    frame = cv2.flip(frame, 1,1)
    clone = frame.copy()
    bboxes = classifier.detectMultiScale(clone)
    for i in bboxes: 
        x, y, width, height = i[0], i[1], i[2], i[3]
        x2, y2 = x+ width, y+height
        mask_roi = clone[y:y2, x:x2]
        emotion_roi = mask_roi.copy()
        emotion_roi = cv2.resize(emotion_roi, emotion_dim, interpolation = cv2.INTER_CUBIC)
        mask_roi = cv2.resize(mask_roi, mask_dim, interpolation = cv2.INTER_CUBIC)
        
        #preprocess mask_input
        mask_roi= preprocess_input(mask_roi)
    
        #preprocess emotion input
        emotion_roi = emotion_roi/255
        
        #resize emotion and mask to feed into nn   
        mask_roi = mask_roi.reshape(1, mask_roi.shape[0], mask_roi.shape[1], mask_roi.shape[2])
        emotion_roi = emotion_roi.reshape(1, emotion_roi.shape[0], emotion_roi.shape[1], emotion_roi.shape[2])
        #mask predictions
        mask_predict = mask_model.predict(mask_roi)[0]
        mask_idx = np.argmax(mask_predict)
        mask_conf = f'{round(np.max(mask_predict)*100)}%'
        mask_cat = mask_dict[mask_idx]
        mask_color = mask_dict_color[mask_idx]
        if mask_idx == 0: #if there is no mask detected --> move onto emotions
            #emotion predictions
            emotion_predict = emotion_model.predict(emotion_roi)[0]
            emotion_idx = np.argmax(emotion_predict)
            emotion_cat = emotion_dict[emotion_idx]
            emotion_conf = f'{round(np.max(emotion_predict)*100)}%'
            cv2.putText(clone, f'{mask_cat}: {mask_conf} || {emotion_cat}: {emotion_conf}', (x, y+15), cv2.FONT_HERSHEY_SIMPLEX, .5, mask_color, 2)
            cv2.rectangle(clone, (x,y), (x2,y2), mask_color, 1)
            continue
        cv2.putText(clone, f'{mask_cat}: {mask_conf}', (x, y+15), cv2.FONT_HERSHEY_SIMPLEX, .5, mask_color, 2)
        cv2.rectangle(clone, (x,y), (x2,y2), mask_color, 1)
               
    cv2.imshow('LIVE', clone)
    vid_frames.append(clone)
    
    if cv2.waitKey(1) & 0xFF ==ord('q'): 
        break 
cap.release()
cv2.destroyAllWindows()




INFO:plaidml:Opening device "opencl_amd_ellesmere.0"
