In [None]:
'''
@author: Emmeke Veltmeijer, 2022
Notebook for finding emotional subgroups in images
'''

### Part I: Setting up ###

In [None]:
# workaround for https://stackoverflow.com/q/68862735/3033937
import site; 
from os.path import join

dist_package_path = site.getsitepackages()[0]
filename = join(dist_package_path, "keras_vggface/models.py")
text = open(filename).read()
open(filename, "w+").write(text.replace('keras.engine.topology', 'tensorflow.keras.utils'))

print("workaround applied")

In [6]:
import tensorflow as tf
from tensorflow import keras
from keras.preprocessing import image
from keras_vggface import utils
import face_recognition
import numpy as np
import scipy
import json
import cv2
import csv
import os
import pandas as pd
from sklearn.cluster import KMeans
import torchvision
from torch.autograd import Variable
from torchvision import transforms
import torch.nn.functional as F
import sys
sys.path.append('deep-head-pose-lite') 
import stable_hopenetlite
import torch
from PIL import Image

In [None]:
# Loading emotion recognition model
emotion_model = keras.models.load_model('vggface_emo-recogn.h5')

# Loading HopeNet light model for gaze recognition
gaze_model = stable_hopenetlite.shufflenet_v2_x1_0()
saved_state_dict = torch.load('deep-head-pose-lite/model/shuff_epoch_120.pkl', map_location="cpu")
gaze_model.load_state_dict(saved_state_dict, strict=False)
gaze_model.eval()

In [None]:
# Parameter settings and directories

# Weights assigned for emotions and coordinates of individual faces
w_emo = 1
w_coord = 1.5

# cluster_type can be either 'kmeans' or 'hierarchical'
cluster_type = 'kmeans'

# Setting face_width and/or yaw to True will add the face size and/or yaw (gaze) to the feature vector
face_width = False
yaw = True

# Images to test on should be placed in folder 'img'
img_dir = './img/'

### Part II: Feature extraction ###

In [None]:
# Defining functions for face recognition

def face_recogn(image_full):
    face_locations = face_recognition.face_locations(image_full)
    return face_locations

def face_matching(img_num, face_locations):
    '''
    Only recognized faces that match a ground truth face should be kept.
    '''
    
    with open('data.json', 'r') as f:
        full_dict = json.load(f)

    temp_dict = {}

    # loop over the detections
    for idx, face in enumerate(face_locations):
        
        # compute the coordinates of the bounding box for the object
        startX = int(face[3])
        startY = int(face[0])
        endX = int(face[1])
        endY = int(face[2])

        temp_dict[idx] = {'bbox_x': [startX, endX], 'bbox_y': [startY, endY]}
                  
    face_numbers = []
    for face in full_dict[img_num]["faces"]:
        face_numbers.append(int(face))
    
    face_locations_matched = [0] * (max(face_numbers)+1) #reserve a spot for all ground truth faces
      
    #looking for the matching mask
    for mask in full_dict[img_num]["faces"]:
        for detected_mask in temp_dict:
            largest_x_idx = np.argmax([temp_dict[detected_mask]['bbox_x'][1],full_dict[str(img_num)]["faces"][mask]['bbox_x'][1]])
            smallest_x_idx = np.argmin([temp_dict[detected_mask]['bbox_x'][0],full_dict[str(img_num)]["faces"][mask]['bbox_x'][0]])
            
            # no overlap
            if largest_x_idx == 1 and smallest_x_idx == 0 and full_dict[str(img_num)]["faces"][mask]['bbox_x'][0] >= temp_dict[detected_mask]['bbox_x'][1]:
                x_overlap = 0
                x_union = 1
            elif largest_x_idx == 0 and smallest_x_idx == 1 and temp_dict[detected_mask]['bbox_x'][0] >= full_dict[str(img_num)]["faces"][mask]['bbox_x'][1]:
                x_overlap = 0
                x_union = 1
            
            # some overlap
            elif largest_x_idx == 0 and smallest_x_idx == 0:
                x_overlap = full_dict[str(img_num)]["faces"][mask]['bbox_x'][1] - full_dict[str(img_num)]["faces"][mask]['bbox_x'][0]
                x_union = temp_dict[detected_mask]['bbox_x'][1] - temp_dict[detected_mask]['bbox_x'][0]
            elif largest_x_idx == 0 and smallest_x_idx == 1:
                x_overlap = full_dict[str(img_num)]["faces"][mask]['bbox_x'][1] - temp_dict[detected_mask]['bbox_x'][0]
                x_union = temp_dict[detected_mask]['bbox_x'][1] - full_dict[str(img_num)]["faces"][mask]['bbox_x'][0]
            elif largest_x_idx == 1 and smallest_x_idx == 0:
                x_overlap = temp_dict[detected_mask]['bbox_x'][1] - full_dict[str(img_num)]["faces"][mask]['bbox_x'][0]
                x_union = full_dict[str(img_num)]["faces"][mask]['bbox_x'][1] - temp_dict[detected_mask]['bbox_x'][0]
            elif largest_x_idx == 1 and smallest_x_idx == 1:
                x_overlap = temp_dict[detected_mask]['bbox_x'][1] - temp_dict[detected_mask]['bbox_x'][0]
                x_union = full_dict[str(img_num)]["faces"][mask]['bbox_x'][1] - full_dict[str(img_num)]["faces"][mask]['bbox_x'][0]

            IoU_x = x_overlap / x_union

            largest_y_idx = np.argmax([temp_dict[detected_mask]['bbox_y'][1],full_dict[str(img_num)]["faces"][mask]['bbox_y'][1]])
            smallest_y_idx = np.argmin([temp_dict[detected_mask]['bbox_y'][0],full_dict[str(img_num)]["faces"][mask]['bbox_y'][0]])
            
            # no overlap
            if largest_y_idx == 1 and smallest_y_idx == 0 and full_dict[str(img_num)]["faces"][mask]['bbox_y'][0] >= temp_dict[detected_mask]['bbox_x'][1]:
                y_overlap = 0
                y_union = 1
            elif largest_y_idx == 0 and smallest_y_idx == 1 and temp_dict[detected_mask]['bbox_y'][0] >= full_dict[str(img_num)]["faces"][mask]['bbox_y'][1]:
                y_overlap = 0
                y_union = 1
            
            # some overlap
            elif largest_y_idx == 0 and smallest_y_idx == 0:
                y_overlap = full_dict[str(img_num)]["faces"][mask]['bbox_y'][1] - full_dict[str(img_num)]["faces"][mask]['bbox_y'][0]
                y_union = temp_dict[detected_mask]['bbox_y'][1] - temp_dict[detected_mask]['bbox_y'][0]
            elif largest_y_idx == 0 and smallest_y_idx == 1:
                y_overlap = full_dict[str(img_num)]["faces"][mask]['bbox_y'][1] - temp_dict[detected_mask]['bbox_y'][0]
                y_union = temp_dict[detected_mask]['bbox_y'][1] - full_dict[str(img_num)]["faces"][mask]['bbox_y'][0]
            elif largest_y_idx == 1 and smallest_y_idx == 0:
                y_overlap = temp_dict[detected_mask]['bbox_y'][1] - full_dict[str(img_num)]["faces"][mask]['bbox_y'][0]
                y_union = full_dict[str(img_num)]["faces"][mask]['bbox_y'][1] - temp_dict[detected_mask]['bbox_y'][0]
            elif largest_y_idx == 1 and smallest_y_idx == 1:
                y_overlap = temp_dict[detected_mask]['bbox_y'][1] - temp_dict[detected_mask]['bbox_y'][0]
                y_union = full_dict[str(img_num)]["faces"][mask]['bbox_y'][1] - full_dict[str(img_num)]["faces"][mask]['bbox_y'][0]

            IoU_y = y_overlap / y_union

            #we require an IoU of at least 0.3
            if 0.2 <= IoU_x <= 1.0 and 0.2 <= IoU_y <= 1.0:
                face_locations_matched[int(mask)]= face_locations[detected_mask]
                break
         
    to_remove = []
    for idx, listornot in enumerate(face_locations_matched):
        # if face idx was not recognized, it should be removed from the ground truth list
        if type(listornot) == int: #still initialized 0 instead of list with coordinates
            to_remove.append(idx)

    for item in sorted(to_remove, reverse=True):
        del face_locations_matched[item]
                            
    return face_locations_matched

In [None]:
# Defining function for individual emotion recognition

def emotion_recognition(image_full, idx, face):

    # compute the coordinates of the bounding box for the object
    startX = int(face[3])
    startY = int(face[0])
    endX = int(face[1])
    endY = int(face[2])

    center_face_x = startX + (endX - startX) / 2
    center_face_y = startY + (endY - startY) / 2
    x_norm = center_face_x / len(image_full[1]) #normalize
    y_norm = center_face_y / len(image_full[0]) #normalize
    radius_x = endX - startX
    radius_y = endY - startY

    img_cropped = image_full[startY:endY,startX:endX]
    img_resized = cv2.resize(img_cropped,(224, 224))

    x = keras.utils.img_to_array(img_resized)
    x = np.expand_dims(x, axis=0)
    x = utils.preprocess_input(x, version=1)
    predictions = emotion_model.predict(x)
    predictions_valence = [predictions[0][0]+predictions[0][1]+predictions[0][2]+predictions[0][5],predictions[0][4],predictions[0][3]]
    predicted_classes = np.argmax(predictions_valence)

    return predictions_valence, center_face_x, center_face_y, x_norm, y_norm, radius_x, radius_y

In [None]:
# Defining functions for extracting and saving individual information

def information_extraction(img_num, img_path, cluster_output):
#     start csv files with k-means data
    new_filename = './temp/coordinates_emotion_output_img' + str(img_num) +'.csv'
    
    with open(new_filename, mode='w') as coordinates_file:
        coordinates_writer = csv.writer(coordinates_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
        coordinates_writer.writerow(['conf_0','conf_1','conf_2','x_cor', 'y_cor'])
    
    image_full = face_recognition.load_image_file(img_path)
    face_locations_unmatched = face_recogn(image_full)
    face_locations = face_matching(img_num, face_locations_unmatched)
    
    cluster_output[img_num] = {}
    
    for idx, face in enumerate(face_locations):
        predictions_valence, center_face_x, center_face_y, x_norm, y_norm, radius_x, radius_y = emotion_recognition(image_full, idx, face)

        cluster_output[img_num][idx] = {}

        dict_addition = {'conf_0':float(predictions_valence[0]), 'conf_1':float(predictions_valence[1]), 'conf_2':float(predictions_valence[2]), 'x_cor':center_face_x, 'y_cor':center_face_y, 'x_norm':x_norm, 'y_norm':y_norm, 'radius_x':radius_x, 'radius_y':radius_y}
        cluster_output[img_num][idx].update(dict_addition)

        with open(new_filename, mode='a') as coordinates_file:
            coordinates_writer = csv.writer(coordinates_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
            coordinates_writer.writerow([predictions_valence[0],predictions_valence[1],predictions_valence[2],x_norm,y_norm])
      
    return cluster_output

def gaze_extraction(img_num, img_path, cluster_output):

    img_path = os.path.join(img_dir,img_name)
    
    for face_name in cluster_output[img_num]:
        centerX = cluster_output[img_num][face_name]['x_cor']
        centerY = cluster_output[img_num][face_name]['y_cor']
        radiusX = cluster_output[img_num][face_name]['radius_x']
        radiusY = cluster_output[img_num][face_name]['radius_y']
        startY = centerY - (radiusY/2)
        endY = startY + radiusY
        startX = centerX - (radiusX/2)
        endX = startX + radiusX

        img = Image.open(img_path)
        img = img.convert('RGB')
        img = img.crop((startX, startY, endX, endY))

        transformations = transforms.Compose([transforms.Scale(224),
        transforms.CenterCrop(224), transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])

        img = transformations(img)
        img_shape = img.size()
        img = img.view(1, img_shape[0], img_shape[1], img_shape[2])

        yaw, pitch, roll = gaze_model(img)

        yaw_predicted = F.softmax(yaw)
        pitch_predicted = F.softmax(pitch)
        roll_predicted = F.softmax(roll)

        # Get continuous predictions in degrees.
        idx_tensor = [idx for idx in range(66)]
        idx_tensor = torch.FloatTensor(idx_tensor)

        yaw_predicted = torch.sum(yaw_predicted.data[0] * idx_tensor) * 3 - 99

        yaw_direction_norm = yaw_predicted / 45
            
        cluster_output[img_num][face_name]['yaw_norm'] = yaw_direction_norm.item() #.item(): convert tensor to float
        
    return cluster_output

def face_width_extraction(img_num, cluster_output):
    
    all_widths = []

    for faces in cluster_output[img_num]: #find biggest face
        all_widths.append(cluster_output[img_num][faces]['radius_x'])

    for faces in cluster_output[img_num]: #normalize
        cluster_output[img_num][faces]['face_width'] = cluster_output[img_num][faces]['radius_x'] / max(all_widths)
        
    return cluster_output

def emotion_assignment(img_num, K, cluster_output, cluster_type):

    for i in range(K): #looping through number of clusters
        prob_sum = [0,0,0]

        if cluster_type == 'kmeans':
            for idx in range(len(cluster_output[img_num])):
                if cluster_output[img_num][str(idx)]['kmeans_cluster'] == i:
                    prob_sum[0] += cluster_output[img_num][str(idx)]['conf_0']
                    prob_sum[1] += cluster_output[img_num][str(idx)]['conf_1']
                    prob_sum[2] += cluster_output[img_num][str(idx)]['conf_2']
            predicted_class = np.argmax(prob_sum)

            for idx in range(len(cluster_output[img_num])):
                if cluster_output[img_num][str(idx)]['kmeans_cluster'] == i:
                    cluster_output[img_num][str(idx)]['kmeans_emotion'] = int(predicted_class)
                    
        elif cluster_type == 'hierarchical':
            for idx in range(len(cluster_output[img_num])):
                if cluster_output[img_num][str(idx)]['hierarchical_cluster'] == i:
                    prob_sum[0] += cluster_output[img_num][str(idx)]['conf_0']
                    prob_sum[1] += cluster_output[img_num][str(idx)]['conf_1']
                    prob_sum[2] += cluster_output[img_num][str(idx)]['conf_2']
            predicted_class = np.argmax(prob_sum)
            
            for idx in range(len(cluster_output[img_num])):
                if cluster_output[img_num][str(idx)]['hierarchical_cluster'] == i:
                    cluster_output[img_num][str(idx)]['hierarchical_emotion'] = int(predicted_class)
                
    return cluster_output

In [None]:
# Extracting and saving features

cluster_output = {}

for count, img_name in enumerate(os.listdir(img_dir)):
    img_num = img_name[:-4]
    img_path = os.path.join(img_dir,img_name)

    cluster_output = information_extraction(img_num, img_path, cluster_output)
    
    if count % 10 == 0:
        print(f'Finished baseline feature extraction from image {count}')
    
if face_width:
    for count, img_name in enumerate(os.listdir(img_dir)):
        img_num = img_name[:-4]
        img_path = os.path.join(img_dir,img_name)

        cluster_output = face_width_extraction(img_num, cluster_output)

        if count % 10 == 0:
            print(f'Finished additional face_width feature extraction from image {count}')
    
if yaw:
    for count, img_name in enumerate(os.listdir(img_dir)):
        img_num = img_name[:-4]
        img_path = os.path.join(img_dir,img_name)

        cluster_output = gaze_extraction(img_num, img_path, cluster_output)

        if count % 10 == 0:
            print(f'Finished additional gaze feature extraction from image {count}')
            
with open('./output_files/image_information.json', 'w') as f:
    json.dump(cluster_output, f)

### Part III: Emotional subgroup clustering ###

In [None]:
# Defining functions for k-means clustering

def optimal_k(X):
    '''
    Method from https://www.datasciencecentral.com/profiles/blogs/how-to-automatically-determine-the-number-of-clusters-in-your-dat
    Accessed on [16-06-2021]
    '''
    k_range = range(1, len(X)+1)
    km = [KMeans(n_clusters=i, random_state=0) for i in k_range]
    scores = [km[i].fit(X).score(X) for i in range(len(km))]
    
    if scores[0] < 0:
        for num,i in enumerate(scores):
            scores[num] = -i
    
    deltas_1 = np.zeros(len(scores))
    for i in range(1,len(deltas_1)):
        deltas_1[i] = scores[i-1]-scores[i]

    deltas_2 = np.zeros(len(deltas_1))
    for i in range(1,len(deltas_2)):
        deltas_2[i] = deltas_1[i-1]-deltas_1[i]

    strength = np.zeros(len(deltas_2))
    for i in range(1,len(strength)-1):
        strength[i] = deltas_2[i+1]-deltas_1[i+1]

    max_value = np.max(strength)
    optimal_k = np.where(strength==max_value)[0][0] + 1 # correcting for index number
    
    return k_range, scores, optimal_k

def kmeans_regular(img_num, cluster_output, face_width, yaw):
    
    current_filename = './temp/coordinates_emotion_output_img' + str(img_num) +'.csv'
    
    train = pd.read_csv(current_filename)
    X = np.array(train.astype(float))
    
    # weighing emotion and distance importance
    for row in X:
        row[0] = row[0] * w_emo
        row[1] = row[1] * w_emo
        row[2] = row[2] * w_emo
        row[3] = row[3] * w_coord
        row[4] = row[4] * w_coord
    
    if face_width: # if face size should be included in the feature vector
        face_widths = np.zeros((X.shape[0],1))

        for idx,face in enumerate(cluster_output[img_num]):
            face_widths[idx] = cluster_output[img_num][face]['face_width']

        X = np.append(X, face_widths, axis=1)
        
    if yaw: # if yaw should be included in the feature vector
        yaws = np.zeros((X.shape[0],1))

        for idx,face in enumerate(cluster_output[img_num]):
            yaws[idx] = cluster_output[img_num][face]['yaw_norm']

        X = np.append(X, yaws, axis=1)
    
    _, _, k_opt = optimal_k(X)
    kmeans = KMeans(n_clusters=k_opt, random_state=0).fit(X)
    
    #saving k-means labels
    for idx in range(len(cluster_output[img_num])):
        cluster_output[img_num][str(idx)]['kmeans_cluster'] = int(kmeans.labels_[idx])
    
    return k_opt, cluster_output

In [None]:
# Defining functions for hierarchical clustering

class hierarchical_clustering:
    def __init__(self,data):
        self.data = data
        self.centroids = {idx:row for idx, row in enumerate(data)}
        self.distances = {}
        self.clusters = {x:[x] for x in range(np.shape(data)[0])}
        self.flat_clusters = {}
        self.cluster_history = {}
        self.distance_history = []
        self.current_clusters = [x for x in range(np.shape(data)[0])]
        
        for i in range(np.shape(data)[0]):
            for j in range(i+1,np.shape(data)[0]):
                dist = np.linalg.norm(data[i,:]-data[j,:])
                self.distances[i,j] = dist
                
    def clustering(self, k):
        
        while len(self.current_clusters) > k:
            distances_ordered = sorted(self.distances.items(), key=lambda item: item[1])
            min_pair = distances_ordered[0]
            new_cluster_members = min_pair[0]
            min_dist = min_pair[1]

            # updating the list of current clusters
            self.current_clusters.remove(new_cluster_members[0])
            self.current_clusters.remove(new_cluster_members[1])
            new_cluster_id = len(self.clusters) #this is always 1 more than the current highest number
            self.current_clusters.append(new_cluster_id) 

            # updating the cluster history: keeps track of all clusters with their distances
            self.cluster_history[new_cluster_members] = min_dist
            self.distance_history.append(min_dist)

            # updating the clusters: keeps track which entities belong to which cluster IDs. Nothing is deleted for
            # the purpose of retrieving which cluster was formed by which points/clusters
            self.clusters[new_cluster_id] = new_cluster_members
            flat_new_members = list(new_cluster_members)
            if new_cluster_members[0] in self.flat_clusters:
                flat_new_members[0] = list(self.flat_clusters[new_cluster_members[0]])
            if new_cluster_members[1] in self.flat_clusters:
                flat_new_members[1] = list(self.flat_clusters[new_cluster_members[1]])
                    
            for member in flat_new_members:
                if new_cluster_id in self.flat_clusters:
                    if type(member) == int:
                        self.flat_clusters[new_cluster_id].extend([member])
                    elif type(member) == list:
                        self.flat_clusters[new_cluster_id].extend(member)
                else: #first value
                    if type(member) == int:
                        self.flat_clusters[new_cluster_id] = [member]
                    elif type(member) == list:
                        self.flat_clusters[new_cluster_id] = member

            # updating the centroids: keeps track of the centroids of all clusters. Individual values are not deleted
            # since their vectors need to be preserved. Clusters of > 1 point are removed once they merge with
            # with another point/cluster.
            
            new_centroid = self.calculating_centroids(new_cluster_members)
            if len(new_cluster_members) > 2:
                if len(new_cluster_members[0]) > 1: del self.centroids[new_cluster_members[0]]
                if len(new_cluster_members[1]) > 1: del self.centroids[new_cluster_members[1]]

            self.centroids[new_cluster_id] = new_centroid     

            # updating the distances: keeps track of distances between clusters
            to_delete = []
            for key, value in self.distances.items():
                if new_cluster_members[0] in key or new_cluster_members[1] in key:
                    to_delete.append(key)
            for key in to_delete:
                del self.distances[key]
            self.calculating_distances(new_cluster_id, self.current_clusters)
            
            # generate output in a readable format
            output = [None] * np.shape(data)[0]
            for i in range(np.shape(data)[0]): 
                exhausted = False
                if i in self.current_clusters:
                    output[i] = self.current_clusters.index(i)
                else:
                    j = i
                    while exhausted == False:
                        for key, value in self.clusters.items():
                            if j in value and j != key:
                                if key in self.current_clusters:
                                    output[i] = self.current_clusters.index(key)
                                    exhausted = True
                                else:
                                    j = key
          
        try:
            return output, self.current_clusters, self.clusters, self.cluster_history, self.distance_history
        except:
            print(f'Failed to return any output for image {img_num}')
        
    def calculating_centroids(self, new_cluster_members):
        # for every new cluster the centroid should be calculated
        
        sum_centroids = 0
        total_len = 0
        for i in new_cluster_members:
            if i in self.flat_clusters:
                for j in self.flat_clusters[i]: # retrieving all original entities in cluster
                    sum_centroids += self.centroids[j]
                    total_len += 1
            else: # cluster consists of a single entity
                sum_centroids += self.centroids[i]
                total_len += 1
                
        new_centroid = sum_centroids / total_len
        
        return new_centroid
        
    def calculating_distances(self, new_cluster_id, current_clusters):
        # calculating euclidean distance between every two clusters
        
        for cluster in self.current_clusters:
            if cluster != new_cluster_id:
                dist = np.linalg.norm(self.centroids[new_cluster_id] - self.centroids[cluster])
                self.distances[new_cluster_id,cluster] = dist

In [None]:
def hierarchical_k_opt(distance_history):
    relative_distances = distance_history
    for i in range(len(distance_history)):
        if i > 0:
            relative_distances[i] = relative_distances[i] - relative_distances[i-1]

    max_dist_index = relative_distances.index(max(relative_distances))
    optimal_k = len(relative_distances) - max_dist_index + 1 # +1 because you want the number of clusters before the maximum distance
    
    return optimal_k

In [None]:
# Clustering and saving output

cluster_output = {}

with open('./output_files/image_information.json', 'r') as f:
    cluster_output = json.load(f)

if cluster_type == 'kmeans':
    for count, img_name in enumerate(os.listdir(img_dir)):
        img_num = img_name[:-4]
        K, cluster_output = kmeans_regular(img_num=img_num, cluster_output=cluster_output, face_width=face_width, yaw=yaw)
        cluster_output = emotion_assignment(img_num, K, cluster_output, 'kmeans')

        if count % 10 == 0:
            print(f'Finished clustering image {count}')

    with open('./output_files/image_information.json', 'w') as f:
        json.dump(cluster_output, f)
            
if cluster_type == 'hierarchical':
    if __name__ == '__main__':

        for count, img_name in enumerate(os.listdir(img_dir)):
            img_num = img_name[:-4]
            current_filename = './temp/coordinates_emotion_output_img' + str(img_num) +'.csv'
            data = pd.read_csv(current_filename)
            data = np.array(data.astype(float))

            # weighing emotion and distance importance
            for row in data:
                row[0] = row[0] * w_emo
                row[1] = row[1] * w_emo
                row[2] = row[2] * w_emo
                row[3] = row[3] * w_coord
                row[4] = row[4] * w_coord

            if face_width: # if face size should be included in the feature vector
                face_widths = np.zeros((data.shape[0],1))

                for idee,face in enumerate(cluster_output[img_num]):
                    face_widths[idee] = cluster_output[img_num][face]['face_width']

                data = np.append(data, face_widths, axis=1)

            if yaw: # if yaw should be included in the feature vector
                yaws = np.zeros((data.shape[0],1))

                for idee,face in enumerate(cluster_output[img_num]):
                    yaws[idee] = cluster_output[img_num][face]['yaw_norm']

                data = np.append(data, yaws, axis=1)

            K = 1

            hc = hierarchical_clustering(data)
            output, current_clusters, clusters, cluster_history, distance_history = hc.clustering(K)

            #find optimal k
            K = hierarchical_k_opt(distance_history)

            if K < len(cluster_output[str(img_num)]):
                hc = hierarchical_clustering(data)
                output, current_clusters, clusters, cluster_history, distance_history = hc.clustering(K)

            elif K == len(cluster_output[str(img_num)]):
                output = list(range(K))

            #saving hierarchical cluster labels
            for idxx in range(len(cluster_output[img_num])):
                cluster_output[img_num][str(idxx)]['hierarchical_cluster'] = int(output[idxx])

            cluster_output = emotion_assignment(img_num, K, cluster_output, cluster_type)

            if count % 10 == 0:
                print(f'Finished clustering image {count}')

        #saving json dict
        with open('./output_files/image_information.json', 'w') as f:
            json.dump(cluster_output, f)

### Part IV: Evaluating emotional subgroup classification  ###

In [None]:
'''
The following code will compare the acquired clustering results against a provided ground truth.
'''

In [None]:
with open('./output_files/image_information.json', 'r') as f:
    clustering_output = json.load(f)
with open('data.json', 'r') as f:
    ground_truth = json.load(f)

In [None]:
# Defining functions for calculating the Hamming distance (subgroup part of testing)

def get_group_idx(groups,face_id_i):
    for idx, group in enumerate(groups):
        if face_id_i in group:
            return idx
    return -1

def build_adjacency_matrix_gt(ground_truth, img_num, num_faces, adj_matrix):

    groups = ground_truth[img_num]['emotional_subgroup_labels']['groups']
    link_dict = {}
    for face_id_i in range(num_faces):
        group_idx = get_group_idx(groups,str(face_id_i))
        link_dict[face_id_i] = group_idx

    for face_id in link_dict:
        not_linked_yet = True
        for other_face in link_dict:
            if face_id != other_face:
                if link_dict[face_id] == link_dict[other_face]:
                    adj_matrix[face_id,other_face] = 1
                    not_linked_yet = False
        if not_linked_yet: #alone in a group thus linked to itself
            adj_matrix[face_id,face_id] = 1
        
    return adj_matrix

def build_adjacency_matrix(clustering_output, img_num, num_faces, adj_matrix):

    for face_id_i in range(num_faces):
        not_linked_yet = True
        for other_face in range(num_faces):
            if face_id_i != other_face:
                if cluster_type == 'kmeans':
                    if clustering_output[img_num][str(face_id_i)]['kmeans_cluster'] == clustering_output[img_num][str(other_face)]['kmeans_cluster']:
                        adj_matrix[face_id_i,other_face] = 1
                        not_linked_yet = False
                elif cluster_type == 'hierarchical':
                    if clustering_output[img_num][str(face_id_i)]['hierarchical_cluster'] == clustering_output[img_num][str(other_face)]['hierarchical_cluster']:
                        adj_matrix[face_id_i,other_face] = 1
                        not_linked_yet = False
        if not_linked_yet: #alone in a group thus linked to itself
            adj_matrix[face_id_i,face_id_i] = 1
    
    return adj_matrix

def hamming_distance(u,v):
    h_dist = 0
    for idx, row in enumerate(u):
        h_dist += scipy.spatial.distance.hamming(row, v[idx], w=None)
    h_dist = h_dist / len(u)
    return h_dist

In [None]:
# Calculating the Hamming distance (subgroup part of testing)

h_dist_cl = []

for img_name in os.listdir(img_dir):
    img_path = os.path.join(img_dir,img_name)
    img_num = img_name[:-4]

    if len(ground_truth[img_num]['emotional_subgroup_labels']['groups']) > 0:

        # First, count the number of faces in this image
        num_faces = len(clustering_output[img_num])

        # Construct adjacency matrix for both ground truth and cluster output
        adj_matrix_gt = np.zeros((num_faces,num_faces))
        adj_matrix_gt = build_adjacency_matrix_gt(ground_truth, img_num, num_faces, adj_matrix_gt)

        adj_matrix_cl = np.zeros((num_faces,num_faces))
        adj_matrix_cl = build_adjacency_matrix(clustering_output, img_num, num_faces, adj_matrix_cl)

        # Calculate Hamming distance per row, then per matrix, then per dataset
        h_dist = hamming_distance(adj_matrix_gt,adj_matrix_cl)
        h_dist_cl.append(h_dist)

print(f'Out of {len(h_dist_cl)} approved images, the average h_dist for {cluster_type} clustering is {np.mean(h_dist_cl)}')

In [None]:
# Defining function for calculating the accuracy (emotion part of testing)

def construct_emotional_dict(img_num, num_faces):
    emotional_dict = {}
    groups = ground_truth[img_num]['emotional_subgroup_labels']['groups']
    for face_id_i in range(num_faces):
        group_idx = get_group_idx(groups,str(face_id_i))
        emotional_dict[face_id_i] = ground_truth[img_num]['emotional_subgroup_labels']['emotions'][group_idx]
    return emotional_dict

In [None]:
# Calculating the accuracy (emotion part of testing)

acc = []

pred = {0: {0: 0, 1: 0, 2:0}, 1: {0: 0, 1: 0, 2:0}, 2: {0: 0, 1: 0, 2:0}} 
# {true_neg: {pred_neg: 0, pred_neu: 0, pred_pos: 0}, 
# true_neu: {pred_neg: 0, pred_neu: 0, pred_pos: 0}, 
# true_pos: {pred_neg: 0, pred_neu: 0, pred_pos: 0}}

for img_name in os.listdir(img_dir):
    img_path = os.path.join(img_dir,img_name)
    img_num = img_name[:-4]

    if len(ground_truth[img_num]['emotional_subgroup_labels']['groups']) > 0:

        num_faces = len(clustering_output[img_num])
        emotional_dict = construct_emotional_dict(img_num, num_faces)

        correct = 0
        incorrect = 0

        for face_id_i in range(num_faces):
            if cluster_type == 'kmeans':
                pred[emotional_dict[face_id_i]][clustering_output[img_num][str(face_id_i)]['kmeans_emotion']] += 1
                if emotional_dict[face_id_i] == clustering_output[img_num][str(face_id_i)]['kmeans_emotion']:
                    correct += 1
                elif emotional_dict[face_id_i] != clustering_output[img_num][str(face_id_i)]['kmeans_emotion']:
                    incorrect += 1
            
            if cluster_type == 'hierarchical':
                pred[emotional_dict[face_id_i]][clustering_output[img_num][str(face_id_i)]['hierarchical_emotion']] += 1
                if emotional_dict[face_id_i] == clustering_output[img_num][str(face_id_i)]['hierarchical_emotion']:
                    correct += 1
                elif emotional_dict[face_id_i] != clustering_output[img_num][str(face_id_i)]['hierarchical_emotion']:
                    incorrect += 1

        acc.append(correct/(correct+incorrect))

print(f'Out of {len(acc)} approved images, the average acc for {cluster_type} clustering is {np.mean(acc)}')

In [None]:
# For further analysis of the accuracy outcome, e.g. in case of imbalanced datasets when the accuracy 
# score can be misleading, the true/false positives and true/false negatives are printed in a contingency table.

metric_dict = {'true_negative': [pred[0][0], pred[0][1], pred[0][2]], 
               'true_neutral': [pred[1][0], pred[1][1], pred[1][2]], 
               'true_positive': [pred[2][0], pred[2][1], pred[2][2]]}
print ("{:<15} {:<15} {:<15} {:<15}".format('','pred_negative','pred_neutral','pred_positive'))
for k, v in metric_dict.items():
    neg_pred, neu_pred, pos_pred = v
    print ("{:<15} {:<15} {:<15} {:<15}".format(k, neg_pred, neu_pred, pos_pred))

In [None]:
# Combining the Hamming distance and accuracy to one error measure

combined_error = (sum(h_dist_cl)**2+(len(acc)-sum(acc))**2) / (len(acc)**2)
print(f'Out of {len(acc)} approved images, the combined error for {cluster_type} clustering is {combined_error}')