In [None]:
import numpy as np
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
from keras.layers import Dropout, Flatten, Dense
from keras import applications
from sklearn.metrics import pairwise_distances, log_loss, confusion_matrix
import matplotlib.pyplot as plt
import requests
from PIL import Image
import pandas as pd
import pickle
from sklearn import datasets, preprocessing,cross_validation, feature_extraction
from sklearn import linear_model, svm, metrics, ensemble, tree, ensemble
from sklearn.decomposition import PCA
import urllib
import csv
from sklearn.linear_model import SGDClassifier
from sklearn.calibration import CalibratedClassifierCV
import seaborn as sns
import warnings
from collections import Counter


In [None]:
import cv2
import numpy as np
import argparse
import time
import glob
import os
import pandas
import sys
import subprocess
import random

cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
facecascade = cv2.CascadeClassifier("haarcascade_frontalface_default.xml")
facedict = {}
actions = {}
emotions = ["anger","disgust","fear", "happy","neutral","sadness","surprise"]
df = pandas.read_excel("emotion_audio.xlsx") 
actions["anger"] = [x for x in df.anger.dropna()] 
actions["disgust"] = [x for x in df.disgust.dropna()]
actions["fear"] = [x for x in df.fear.dropna()]
actions["happy"] = [x for x in df.happy.dropna()]
actions["neutral"] = [x for x in df.neutral.dropna()]
actions["sadness"] = [x for x in df.sadness.dropna()]
actions["surprise"] = [x for x in df.surprise.dropna()]


def open_stuff(filename): 
    if sys.platform == "win32":
        os.startfile(filename)
    else:
        opener ="open" if sys.platform == "darwin" else "xdg-open"
        subprocess.call([opener, filename])

def crop_face(clahe_image, face):
    for (x, y, w, h) in face:
        faceslice = clahe_image[y:y+h, x:x+w]
        faceslice = cv2.resize(faceslice, (350, 350))
    facedict["face%s" %(len(facedict)+1)] = faceslice
    return faceslice

def update_model(emotions):
    print("Model update mode active")
    check_folders(emotions)
    for i in range(0, len(emotions)):
        save_face(emotions[i])
    print("collected images, looking good! Now updating model...")
    print("Done!")
    cv2.destroyWindow("preview")
    cv2.destroyWindow("webcam")

def check_folders(emotions): 
    for x in emotions:
        if os.path.exists("sorted_set\\%s" %x):
            pass
        else:
            os.makedirs("sorted_set\\%s" %x)

def save_face(emotion):
    print("\n\nplease look " + emotion + " when the timer expires and keep the expression stable until instructed otherwise.")
    actionlist = [x for x in actions[emotion]] 
    random.shuffle(actionlist) 
    open_stuff(actionlist[0])
    for i in range(0,5):
        print(5-i)
        time.sleep(1)
    while len(facedict.keys()) < 11:
        grab_webcamframe()
    for x in facedict.keys(): 
        cv2.imwrite("sorted_set\\%s\\%s.jpg" %(emotion, len(glob.glob("sorted_set\\%s\\*" %emotion))), facedict[x])
    facedict.clear() 

def grab_webcamframe():
      
    while True:
        if vc.isOpened(): 
            rval, frame = vc.read()
        else:
            rval = False
        cv2.imshow("preview", frame)
        key = cv2.waitKey(40)
        if key == 27: # exit on ESC
            break
        if key == 32:
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) #Convert image to grayscale to improve detection speed and accuracy
            clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
            clahe_image = clahe.apply(gray)

            
            face = facecascade.detectMultiScale(clahe_image, scaleFactor=1.1, minNeighbors=15, minSize=(10, 10), flags=cv2.CASCADE_SCALE_IMAGE)

            for (x, y, w, h) in face: 
                cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 0, 255), 2) 

            if len(face) == 1: 
                faceslice = crop_face(clahe_image, face)
                cv2.imshow("webcam", frame)
                return faceslice
            else:
                print("no/multiple faces detected, passing over frame")

    cv2.destroyWindow("preview")
    cv2.destroyWindow("webcam")

update_model(emotions)

In [2]:
data = np.load("data_features.npy")
target = np.load("data_labels.npy")
data.shape

In [None]:
data_y = [target[i].split('\\')[0] for i in range(target.shape[0])]
type(data_y)
data_y = pd.DataFrame(data_y)
type(data_y)
data_y.columns = ['label']
Counter(data_y['label'])

In [None]:
classes = {"label": {"ger": 0, "sgust": 1, "ar": 2, "ppy": 3,
                                  "utral": 4, "dness": 5, "rprise":6 }}
data_y.replace(classes, inplace=True)
print(data_y.head())
print(type(data_y))

In [None]:
from sklearn.model_selection import train_test_split
X_train, test_df, y_train, y_test = train_test_split(data, data_y, stratify=data_y, test_size=0.05)
train_df, cv_df, y_train, y_cv = train_test_split(X_train, y_train, stratify=y_train, test_size=0.2)
print(train_df.shape)
print(cv_df.shape)
print(test_df.shape)

In [None]:
train_class_distribution = y_train['label'].value_counts().sortlevel()
test_class_distribution = y_test['label'].value_counts().sortlevel()
cv_class_distribution = y_cv['label'].value_counts().sortlevel()

my_colors = 'rgbkymc'
train_class_distribution.plot(kind='bar', color=my_colors)
plt.xlabel('Class')
plt.ylabel('Data points per Class')
plt.title('Distribution of yi in train data')
plt.grid()
plt.show()

sorted_yi = np.argsort(-train_class_distribution.values)
for i in sorted_yi:
    print('Number of data points in class', i, ':',train_class_distribution.values[i], '(', np.round((train_class_distribution.values[i]/train_df.shape[0]*100), 3), '%)')

    
print('-'*80)
my_colors = 'rgbkymc'
test_class_distribution.plot(kind='bar', color=my_colors)
plt.xlabel('Class')
plt.ylabel('Data points per Class')
plt.title('Distribution of yi in test data')
plt.grid()
plt.show()

sorted_yi = np.argsort(-test_class_distribution.values)
for i in sorted_yi:
    print('Number of data points in class', i, ':',test_class_distribution.values[i], '(', np.round((test_class_distribution.values[i]/test_df.shape[0]*100), 3), '%)')

print('-'*80)
my_colors = 'rgbkymc'
cv_class_distribution.plot(kind='bar', color=my_colors)
plt.xlabel('Class')
plt.ylabel('Data points per Class')
plt.title('Distribution of yi in cross validation data')
plt.grid()
plt.show()

sorted_yi = np.argsort(-train_class_distribution.values)
for i in sorted_yi:
    print('Number of data points in class', i, ':',cv_class_distribution.values[i], '(', np.round((cv_class_distribution.values[i]/cv_df.shape[0]*100), 3), '%)')


In [11]:
def plot_confusion_matrix(test_y, predict_y):
    C = confusion_matrix(test_y, predict_y)
    A =(((C.T)/(C.sum(axis=1))).T)
    B =(C/C.sum(axis=0))
    
    labels = [0,1,2,3,4,5,6]

    print("-"*20, "Confusion matrix", "-"*20)
    plt.figure(figsize=(20,7))
    sns.heatmap(C, annot=True, cmap="YlGnBu", fmt=".3f", xticklabels=labels, yticklabels=labels)
    plt.xlabel('Predicted Class')
    plt.ylabel('Original Class')
    plt.show()

    print("-"*20, "Precision matrix (Columm Sum=1)", "-"*20)
    plt.figure(figsize=(20,7))
    sns.heatmap(B, annot=True, cmap="YlGnBu", fmt=".3f", xticklabels=labels, yticklabels=labels)
    plt.xlabel('Predicted Class')
    plt.ylabel('Original Class')
    plt.show()
    
    print("-"*20, "Recall matrix (Row sum=1)", "-"*20)
    plt.figure(figsize=(20,7))
    sns.heatmap(A, annot=True, cmap="YlGnBu", fmt=".3f", xticklabels=labels, yticklabels=labels)
    plt.xlabel('Predicted Class')
    plt.ylabel('Original Class')
    plt.show()

In [None]:
test_data_len = test_df.shape[0]
cv_data_len = cv_df.shape[0]

cv_predicted_y = np.zeros((cv_data_len,7))
for i in range(cv_data_len):
    rand_probs = np.random.rand(1,7)
    cv_predicted_y[i] = ((rand_probs/sum(sum(rand_probs)))[0])
print("Log loss on Cross Validation Data using Random Model",log_loss(y_cv,cv_predicted_y, eps=1e-15))

test_predicted_y = np.zeros((test_data_len,7))
for i in range(test_data_len):
    rand_probs = np.random.rand(1,7)
    test_predicted_y[i] = ((rand_probs/sum(sum(rand_probs)))[0])
print("Log loss on Test Data using Random Model",log_loss(y_test,test_predicted_y, eps=1e-15))

predicted_y =np.argmax(test_predicted_y, axis=1)
plot_confusion_matrix(y_test, predicted_y)

In [None]:
alpha = [10 ** x for x in range(-5, 1)]
cv_log_error_array=[]
for i in alpha:
    clf = SGDClassifier(alpha=i, penalty='l2', loss='log', random_state=42)
    clf.fit(train_df, y_train)
    
    sig_clf = CalibratedClassifierCV(clf, method="sigmoid")
    sig_clf.fit(train_df, y_train)
    predict_y = sig_clf.predict_proba(cv_df)
    
    cv_log_error_array.append(log_loss(y_cv, predict_y, labels=clf.classes_, eps=1e-15))
    print('For values of alpha = ', i, "The log loss is:",log_loss(y_cv, predict_y, labels=clf.classes_, eps=1e-15))

fig, ax = plt.subplots()
ax.plot(alpha, cv_log_error_array,c='g')
for i, txt in enumerate(np.round(cv_log_error_array,3)):
    ax.annotate((alpha[i],np.round(txt,3)), (alpha[i],cv_log_error_array[i]))
plt.grid()
plt.title("Cross Validation Error for each alpha")
plt.xlabel("Alpha i's")
plt.ylabel("Error measure")
plt.show()


best_alpha = np.argmin(cv_log_error_array)
clf = SGDClassifier(alpha=alpha[best_alpha], penalty='l2', loss='log', random_state=42)
clf.fit(train_df, y_train)
sig_clf = CalibratedClassifierCV(clf, method="sigmoid")
sig_clf.fit(train_df, y_train)

predict_y = sig_clf.predict_proba(train_df)
print('For values of best alpha = ', alpha[best_alpha], "The train log loss is:",log_loss(y_train, predict_y, labels=clf.classes_, eps=1e-15))
predict_y = sig_clf.predict_proba(cv_df)
print('For values of best alpha = ', alpha[best_alpha], "The cross validation log loss is:",log_loss(y_cv, predict_y, labels=clf.classes_, eps=1e-15))
predict_y = sig_clf.predict_proba(test_df)
print('For values of best alpha = ', alpha[best_alpha], "The test log loss is:",log_loss(y_test, predict_y, labels=clf.classes_, eps=1e-15))

predicted_y =np.argmax(predict_y, axis=1)
plot_confusion_matrix(y_test, predicted_y)

In [None]:
alpha = [10 ** x for x in range(-5, 1)]
cv_log_error_array=[]
for i in alpha:
    clf = SGDClassifier(alpha=i, penalty='l2', loss='hinge', random_state=42)
    clf.fit(train_df, y_train)
    
    sig_clf = CalibratedClassifierCV(clf, method="sigmoid")
    sig_clf.fit(train_df, y_train)
    predict_y = sig_clf.predict_proba(cv_df)
    
    cv_log_error_array.append(log_loss(y_cv, predict_y, labels=clf.classes_, eps=1e-15))
    print('For values of alpha = ', i, "The log loss is:",log_loss(y_cv, predict_y, labels=clf.classes_, eps=1e-15))

fig, ax = plt.subplots()
ax.plot(alpha, cv_log_error_array,c='g')
for i, txt in enumerate(np.round(cv_log_error_array,3)):
    ax.annotate((alpha[i],np.round(txt,3)), (alpha[i],cv_log_error_array[i]))
plt.grid()
plt.title("Cross Validation Error for each alpha")
plt.xlabel("Alpha i's")
plt.ylabel("Error measure")
plt.show()


best_alpha = np.argmin(cv_log_error_array)
clf = SGDClassifier(alpha=alpha[best_alpha], penalty='l2', loss='hinge', random_state=42)
clf.fit(train_df, y_train)
sig_clf = CalibratedClassifierCV(clf, method="sigmoid")
sig_clf.fit(train_df, y_train)

predict_y = sig_clf.predict_proba(train_df)
print('For values of best alpha = ', alpha[best_alpha], "The train log loss is:",log_loss(y_train, predict_y, labels=clf.classes_, eps=1e-15))
predict_y = sig_clf.predict_proba(cv_df)
print('For values of best alpha = ', alpha[best_alpha], "The cross validation log loss is:",log_loss(y_cv, predict_y, labels=clf.classes_, eps=1e-15))
predict_y = sig_clf.predict_proba(test_df)
print('For values of best alpha = ', alpha[best_alpha], "The test log loss is:",log_loss(y_test, predict_y, labels=clf.classes_, eps=1e-15))

predicted_y =np.argmax(predict_y, axis=1)
plot_confusion_matrix(y_test, predicted_y)

In [14]:

def save_webcam_features():
    img_width, img_height = 350, 350
    top_model_weights_path = 'bottleneck_fc_model.h5'
    train_data_dir = 'real_set'

    nb_train_samples = 1

    epochs = 50
    batch_size = 1
    
    #Function to compute VGG-16 CNN for image feature extraction.
    train_target = []
    
    datagen = ImageDataGenerator(rescale=1. / 255)
    # build the VGG16 network
    model = applications.VGG16(include_top=False, weights='imagenet')
    generator_train = datagen.flow_from_directory(
        train_data_dir,
        target_size=(img_width, img_height),
        batch_size=batch_size,
        class_mode=None,
        shuffle=False)
    
    for i in generator_train.filenames:
        train_target.append(i[2:])
     

    bottleneck_features_train = model.predict_generator(generator_train, nb_train_samples // batch_size)
   
    bottleneck_features_train =  bottleneck_features_train.reshape(1,51200)
    print(bottleneck_features_train.shape)
    np.save(open('real_features.npy', 'wb'), bottleneck_features_train)
    np.save(open('real_labels.npy', 'wb'), np.array(train_target))
    prediction()
    


In [15]:
def prediction():
    predictions = []
    features = np.load('real_features.npy')
    labels = np.load('real_labels.npy')
    predictions  = sig_clf.predict_proba(features)
    
    print(predictions)
    emotion = []
    emotion = sig_clf.predict(features)

    cv2.destroyWindow("preview")

In [None]:
import cv2
import numpy as np
import time
from PIL import Image
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
facecascade = cv2.CascadeClassifier("haarcascade_frontalface_default.xml")

def crop_face(clahe_image, face):
    for (x, y, w, h) in face:
        faceslice = clahe_image[y:y+h, x:x+w]
        faceslice = cv2.resize(faceslice, (350, 350))
    facedict["face%s" %(len(facedict)+1)] = faceslice
    return faceslice

while True:
    facedict = {}
    if vc.isOpened(): 
        rval, frame = vc.read()
    else:
        rval = False
    cv2.imshow("preview", frame)
    key = cv2.waitKey(40)
    if key == 27: # exit on ESC
        break

    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) #Convert image to grayscale to improve detection speed and accuracy
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
    clahe_image = clahe.apply(gray)
    face = facecascade.detectMultiScale(clahe_image, scaleFactor=1.1, minNeighbors=15, minSize=(10, 10), flags=cv2.CASCADE_SCALE_IMAGE)

    for (x, y, w, h) in face:
        cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 0, 255), 2)

    if len(face) == 1: 
        faceslice = crop_face(clahe_image, face)

            
        for x in facedict.keys():
            cv2.imwrite("real_set\\1\\%s.jpg" %x, facedict[x])
            save_webcam_features()
        
cv2.destroyWindow("preview")
#cv2.destroyWindow("webcam")
    