<a href="https://colab.research.google.com/github/Erfan-Mostafiz/CSE499_Emotion-Analysis/blob/Erfan/Code/CSE499_Emotion_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# from google.colab import drive
# drive.mount('/content/drive')
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
!unzip "/content/gdrive/MyDrive/CSE499_EmotionAnalysis/DeepVANet-main.zip"

Archive:  /content/gdrive/MyDrive/CSE499_EmotionAnalysis/DeepVANet-main.zip
   creating: DeepVANet-main/
  inflating: DeepVANet-main/dataset.py  
  inflating: DeepVANet-main/data_preprocess.py  
  inflating: DeepVANet-main/decision_level_fusion.py  
  inflating: DeepVANet-main/demo.py  
  inflating: DeepVANet-main/models.py  
  inflating: DeepVANet-main/pretrained_cnn.pth  
  inflating: DeepVANet-main/readme.md  
  inflating: DeepVANet-main/train.py  
  inflating: DeepVANet-main/utils.py  


In [4]:
!unzip "/content/gdrive/MyDrive/CSE499_EmotionAnalysis/Reduced/datasets.zip"

Archive:  /content/gdrive/MyDrive/CSE499_EmotionAnalysis/Reduced/datasets.zip
   creating: datasets/
   creating: datasets/DEAP/
   creating: datasets/DEAP/data_preprocessed_python/
   creating: datasets/DEAP/data_preprocessed_python/data_preprocessed_python/
  inflating: datasets/DEAP/data_preprocessed_python/data_preprocessed_python/s01.dat  
  inflating: datasets/DEAP/data_preprocessed_python/data_preprocessed_python/s02.dat  
  inflating: datasets/DEAP/data_preprocessed_python/data_preprocessed_python/s03.dat  
  inflating: datasets/DEAP/data_preprocessed_python/data_preprocessed_python/s04.dat  
  inflating: datasets/DEAP/data_preprocessed_python/data_preprocessed_python/s05.dat  
  inflating: datasets/DEAP/data_preprocessed_python/data_preprocessed_python/s06.dat  
  inflating: datasets/DEAP/data_preprocessed_python/data_preprocessed_python/s07.dat  
  inflating: datasets/DEAP/data_preprocessed_python/data_preprocessed_python/s08.dat  
  inflating: datasets/DEAP/data_preprocessed

In [5]:

import torch
import os
from PIL import Image
from torch.utils import data
import numpy as np
import pandas as pd
from torchvision import transforms as T
import io
import zipfile


In [6]:
class DEAP(data.Dataset):
    '''
    DEAP dataset for per-subject experiments
    Parameters:
        modal   : data modality; {'face', 'eeg', 'peri', 'bio', 'faceeeg', 'faceperi', 'facebio'}, default = 'facebio'.
        subject : subject ID; an integer between 1 and 22, default = 1.
        k       : kth fold; an integer between 1 and 10, default = 1.
        kind    : dataset type; {'train', 'val', 'all'}, default = 'all'.
        indices : index of data samples (for dataset shuffle); an list of integers, default = list(range(2400)).
        label   : emotion label; {'valence', 'arousal'}, defualt = 'valence'.
    '''
    def __init__(self, modal='facebio', subject=1, k=1, kind='all', indices=list(range(2400)),label='valence'):
        self.modal = modal
        self.subject = subject
        self.k = k
        self.kind = kind
        self.label = label
        self.bio_path = f'./data/DEAP/bio/s{subject}.zip'
        self.label_path = f'./data/DEAP/labels/'
        self.face_path = f'./data/DEAP/faces/s{subject}.zip'
        self.labels = pd.read_csv(self.label_path+'participant_ratings.csv')
        self.face_zip = zipfile.ZipFile(self.face_path, 'r')
        self.bio_zip = zipfile.ZipFile(self.bio_path, 'r')
        self.size = len(indices)

        if kind == 'train':
            self.indices = indices[:int((k - 1) * self.size / 10)] + indices[int(k * self.size / 10):]
        if kind == 'val':
            self.indices = indices[int((k - 1) * self.size / 10):int(k * self.size / 10)]
        if kind == 'all':
            self.indices = indices

    def __getitem__(self, i):
        index = self.indices[i]
        trial = index // 60 + 1
        segment = index % 60 + 1
        prex = 's' + (str(self.subject) if self.subject > 9 else '0' + str(self.subject)) + '/s' + (
            str(self.subject) if self.subject > 9 else '0' + str(self.subject)) + '_trial' + (
                   str(trial) if trial > 9 else '0' + str(trial)) + '/s' + (
                   str(self.subject) if self.subject > 9 else '0' + str(self.subject)) + '_trial' + (
                   str(trial) if trial > 9 else '0' + str(trial))
        transform = T.Compose([T.Resize((64, 64)),
                               T.ToTensor()])

        face_data = []
        for n in range(1, 6):
            img = Image.open(io.BytesIO(self.face_zip.read(prex + f'_{(segment - 1) * 5 + n}.png')))
            frame_array = transform(img)
            frame_array = frame_array.view(1, 3, 64, 64)
            face_data.append(frame_array)
        face_data = torch.cat(face_data, dim=0)
        bio_data = torch.tensor(
            np.load(io.BytesIO(self.bio_zip.read(f's{self.subject}/{self.subject}_{trial}_{segment}.npy')))).float()

        if self.modal == 'face':
            data = face_data
        elif self.modal == 'eeg':
            data = bio_data[:32]
        elif self.modal == 'peri':
            data = bio_data[32:]
        elif self.modal == 'bio':
            data = bio_data
        elif self.modal == 'faceeeg':
            data = (face_data, bio_data[:32])
        elif self.modal == 'faceperi':
            data = (face_data, bio_data[32:])
        elif self.modal == 'facebio':
            data = (face_data, bio_data)

        valence = 0 if self.labels[(self.labels['Participant_id']==self.subject) & (self.labels['Trial']==trial)]['Valence'].iloc[0] < 5 else 1
        arousal = 0 if self.labels[(self.labels['Participant_id']==self.subject) & (self.labels['Trial']==trial)]['Arousal'].iloc[0] < 5 else 1
        if self.label == 'valence':
            return data, valence
        else:
            return data, arousal

    def __len__(self):
        return len(self.indices)


In [7]:
class DEAPAll(data.Dataset):
    def __init__(self, modal='facebio', k=1, kind='all', indices=list(range(52440)),label='valence'):
        self.modal = modal
        self.k = k
        self.kind = kind
        self.label = label
        self.bio_path = f'./data/DEAP/bio/'
        self.label_path = f'./data/DEAP/labels/'
        self.face_path = f'./data/DEAP/faces/'
        self.labels = pd.read_csv(self.label_path+'participant_ratings.csv')
        deap_indices_dict = {1: 2400,
                             2: 2400,
                             3: 2340,
                             4: 2400,
                             5: 2340,
                             6: 2400,
                             7: 2400,
                             8: 2400,
                             9: 2400,
                             10: 2400,
                             11: 2220,
                             12: 2400,
                             13: 2400,
                             14: 2340,
                             15: 2400,
                             16: 2400,
                             17: 2400,
                             18: 2400,
                             19: 2400,
                             20: 2400,
                             21: 2400,
                             22: 2400}
        self.sub_trial_seg = []
        for sub in range(1,23):
            for trial in range(1,int(deap_indices_dict[sub]/60+1)):
                for seg in range(1,61):
                    self.sub_trial_seg.append((sub,trial,seg))
        self.size = len(indices)

        if kind == 'train':
            self.indices = indices[:int((k - 1) * self.size / 10)] + indices[int(k * self.size / 10):]
        if kind == 'val':
            self.indices = indices[int((k - 1) * self.size / 10):int(k * self.size / 10)]
        if kind == 'all':
            self.indices = indices

    def __getitem__(self, i):
        index = self.indices[i]
        subject, trial, segment = self.sub_trial_seg[index]
        face_zip = zipfile.ZipFile(self.face_path+f's{subject}.zip', 'r')
        bio_zip = zipfile.ZipFile(self.bio_path+f's{subject}.zip', 'r')
        prex = 's' + (str(subject) if subject > 9 else '0' + str(subject)) + '/s' + (
            str(subject) if subject > 9 else '0' + str(subject)) + '_trial' + (
                   str(trial) if trial > 9 else '0' + str(trial)) + '/s' + (
                   str(subject) if subject > 9 else '0' + str(subject)) + '_trial' + (
                   str(trial) if trial > 9 else '0' + str(trial))
        transform = T.Compose([T.Resize((64, 64)),
                               T.ToTensor()])
        face_data = []
        for n in range(1, 6):
            img = Image.open(io.BytesIO(face_zip.read(prex + f'_{(segment - 1) * 5 + n}.png')))
            frame_array = transform(img)
            frame_array = frame_array.view(1, 3, 64, 64)
            face_data.append(frame_array)
        face_data = torch.cat(face_data, dim=0)

        bio_data = torch.tensor(np.load(io.BytesIO(bio_zip.read(f's{subject}/{subject}_{trial}_{segment}.npy')))).float()

        if self.modal == 'face':
            data = face_data
        elif self.modal == 'eeg':
            data = bio_data[:32]
        elif self.modal == 'peri':
            data = bio_data[32:]
        elif self.modal == 'bio':
            data = bio_data
        elif self.modal == 'faceeeg':
            data = (face_data, bio_data[:32])
        elif self.modal == 'faceperi':
            data = (face_data, bio_data[32:])
        elif self.modal == 'facebio':
            data = (face_data, bio_data)

        valence = 0 if self.labels[(self.labels['Participant_id']==subject) & (self.labels['Trial']==trial)]['Valence'].iloc[0] < 5 else 1
        arousal = 0 if self.labels[(self.labels['Participant_id']==subject) & (self.labels['Trial']==trial)]['Arousal'].iloc[0] < 5 else 1
        if self.label == 'valence':
            return data, valence
        else:
            return data, arousal

    def __len__(self):
        return len(self.indices)

In [8]:
!pip install face_alignment

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting face_alignment
  Downloading face_alignment-1.3.5.tar.gz (27 kB)
Building wheels for collected packages: face-alignment
  Building wheel for face-alignment (setup.py) ... [?25l[?25hdone
  Created wheel for face-alignment: filename=face_alignment-1.3.5-py2.py3-none-any.whl size=28241 sha256=ca56f12171075a8dd63567f4663abb13ccc730b2db7706034e9a37ff275b1484
  Stored in directory: /root/.cache/pip/wheels/c9/ba/4d/2d368f55e5f929f9472da59e356fbdf1483f885de80a5bc620
Successfully built face-alignment
Installing collected packages: face-alignment
Successfully installed face-alignment-1.3.5


In [9]:
!pip install pyedflib

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyedflib
  Downloading pyEDFlib-0.1.30-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.4 MB)
[K     |████████████████████████████████| 2.4 MB 28.6 MB/s 
Installing collected packages: pyedflib
Successfully installed pyedflib-0.1.30


In [10]:
"""
Functions for data pre-process
"""

import cv2
import numpy as np
import pandas as pd
import math
from collections import defaultdict
from PIL import Image
import pickle as cPickle
import os
import face_alignment
from xml.dom.minidom import parse
import pyedflib
import shutil

In [11]:

# ************************* Face Data Pre-process *************************

def video2frames(dataset='DEAP'):
    '''
    Extract frames from videos.
    :param dataset: used dataset
    '''
    assert dataset in ['DEAP', 'MAHNOB'], 'Invalid dataset name'

    if dataset == 'DEAP':
        dataset_path = './datasets/DEAP/face_video/'
        des_path = './datasets/DEAP/frames/'
        for subject in os.listdir(dataset_path):
            if subject.startswith('.'):
                continue
            sub_path = dataset_path+subject
            for video_file in os.listdir(sub_path):
                if not os.path.exists(des_path + subject):
                    os.mkdir(des_path + subject)
                if not os.path.exists(des_path + subject + '/' + video_file.split('.')[0]):
                    os.mkdir(des_path + subject + '/' + video_file.split('.')[0])
                video_file_path = sub_path+'/'+video_file
                video = cv2.VideoCapture(video_file_path)
                c = 1
                frame_rate = 10
                count = 0
                while (True):
                    ret, frame = video.read()
                    if ret:
                        if (c % frame_rate == 0):
                            count += 1
                            cv2.imwrite(des_path+subject+'/'+video_file.split('.')[0] +'/'+ video_file.split('.')[0]+'_'+str(count) + '.png', frame)
                        c += 1
                        cv2.waitKey(0)
                    else:
                        break
                video.release()


In [12]:
# functions for face alignment and cropping are based on https://github.com/DANNALI35/zhihu_article/tree/master/201901_face_alignment
def to_dict(landmarks):
    '''
    Transfer detected facial landmarks list to dictionary.
    :param landmarks: a list of facial landmarks
    :return: a dictionary of facial landmarks
    '''
    l = list()
    for i in range(68):
        point = (landmarks[i][0], landmarks[i][1])
        l.append(point)
    face_landmarks_dict = dict()
    face_landmarks_dict['chin'] = l[0:17]
    face_landmarks_dict['left_eyebrow'] = l[17:22]
    face_landmarks_dict['right_eyebrow'] = l[22:27]
    face_landmarks_dict['nose_bridge'] = l[27:31]
    face_landmarks_dict['nose_tip'] = l[31:36]
    face_landmarks_dict['left_eye'] = l[36:42]
    face_landmarks_dict['right_eye'] = l[42:48]
    face_landmarks_dict['top_lip'] = l[48:55] + l[60:65]
    face_landmarks_dict['bottom_lip'] = l[55:60] + l[65:68]
    return face_landmarks_dict


def crop_face(image_array, landmarks):
    """ crop face according to eye,mouth and chin position
    :param image_array: numpy array of a single image
    :param landmarks: dict of landmarks for facial parts as keys and tuple of coordinates as values
    :return:
    cropped_img: numpy array of cropped image
    """

    eye_landmark = np.concatenate([np.array(landmarks['left_eye']),
                                   np.array(landmarks['right_eye'])])
    eye_center = np.mean(eye_landmark, axis=0).astype("int")
    lip_landmark = np.concatenate([np.array(landmarks['top_lip']),
                                   np.array(landmarks['bottom_lip'])])
    lip_center = np.mean(lip_landmark, axis=0).astype("int")
    mid_part = lip_center[1] - eye_center[1]
    top = eye_center[1] - mid_part * 18 / 40
    bottom = lip_center[1] + mid_part * 12 / 40

    w = h = bottom - top
    x_center = eye_center[0]
    left, right = (x_center - w / 2, x_center + w / 2)

    pil_img = Image.fromarray(image_array)
    left, top, right, bottom = [int(i) for i in [left, top, right, bottom]]
    cropped_img = pil_img.crop((left, top, right, bottom))
    cropped_img = np.array(cropped_img)
    return cropped_img, left, top

In [13]:
def rotate_landmarks(landmarks, eye_center, angle, row):
    """ rotate landmarks to fit the aligned face
    :param landmarks: dict of landmarks for facial parts as keys and tuple of coordinates as values
    :param eye_center: tuple of coordinates for eye center
    :param angle: degrees of rotation
    :param row: row size of the image
    :return: rotated_landmarks with the same structure with landmarks, but different values
    """
    rotated_landmarks = defaultdict(list)
    for facial_feature in landmarks.keys():
        for landmark in landmarks[facial_feature]:
            rotated_landmark = rotate(origin=eye_center, point=landmark, angle=angle, row=row)
            rotated_landmarks[facial_feature].append(rotated_landmark)
    return rotated_landmarks

In [14]:
def rotate(origin, point, angle, row):
    """ rotate coordinates in image coordinate system
    :param origin: tuple of coordinates,the rotation center
    :param point: tuple of coordinates, points to rotate
    :param angle: degrees of rotation
    :param row: row size of the image
    :return: rotated coordinates of point
    """
    x1, y1 = point
    x2, y2 = origin
    y1 = row - y1
    y2 = row - y2
    angle = math.radians(angle)
    x = x2 + math.cos(angle) * (x1 - x2) - math.sin(angle) * (y1 - y2)
    y = y2 + math.sin(angle) * (x1 - x2) + math.cos(angle) * (y1 - y2)
    y = row - y
    return int(x), int(y)

In [15]:
def align_face(image_array, landmarks):
    """ align faces according to eyes position
    :param image_array: numpy array of a single image
    :param landmarks: dict of landmarks for facial parts as keys and tuple of coordinates as values
    :return:
    rotated_img:  numpy array of aligned image
    eye_center: tuple of coordinates for eye center
    angle: degrees of rotation
    """
    # get list landmarks of left and right eye
    left_eye = landmarks['left_eye']
    right_eye = landmarks['right_eye']
    # calculate the mean point of landmarks of left and right eye
    left_eye_center = np.mean(left_eye, axis=0).astype("int")
    right_eye_center = np.mean(right_eye, axis=0).astype("int")
    # compute the angle between the eye centroids
    dy = right_eye_center[1] - left_eye_center[1]
    dx = right_eye_center[0] - left_eye_center[0]
    # compute angle between the line of 2 centeroids and the horizontal line
    angle = math.atan2(dy, dx) * 180. / math.pi
    # calculate the center of 2 eyes
    eye_center = ((left_eye_center[0] + right_eye_center[0]) // 2,
                  (left_eye_center[1] + right_eye_center[1]) // 2)
    # at the eye_center, rotate the image by the angle
    rotate_matrix = cv2.getRotationMatrix2D(eye_center, angle, scale=1)
    rotated_img = cv2.warpAffine(image_array, rotate_matrix, (image_array.shape[1], image_array.shape[0]))
    return rotated_img, eye_center, angle

In [16]:
def align_landmarks(landmarks):
    left_eye = landmarks['left_eye']
    right_eye = landmarks['right_eye']
    # calculate the mean point of landmarks of left and right eye
    left_eye_center = np.mean(left_eye, axis=0).astype("int")
    right_eye_center = np.mean(right_eye, axis=0).astype("int")
    # compute the angle between the eye centroids
    dy = right_eye_center[1] - left_eye_center[1]
    dx = right_eye_center[0] - left_eye_center[0]
    # compute angle between the line of 2 centeroids and the horizontal line
    angle = math.atan2(dy, dx) * 180. / math.pi
    # calculate the center of 2 eyes
    eye_center = ((left_eye_center[0] + right_eye_center[0]) // 2,
                  (left_eye_center[1] + right_eye_center[1]) // 2)
#     rotated_landmarks = defaultdict(list)
    rotated_landmarks = []
    for facial_feature in landmarks.keys():
        for landmark in landmarks[facial_feature]:
            rotated_landmark = rotate(origin=eye_center, point=landmark, angle=angle, row=570)
#             rotated_landmarks[facial_feature].append(rotated_landmark)
            rotated_landmarks.append(rotated_landmark)
    return rotated_landmarks

In [17]:
def face_detection_alignment_cropping(dataset='DEAP'):
    '''
    Transfer frames to faces by face detection, alignment and cropping.
    :param dataset: used dataset
    '''
    assert dataset in ['DEAP', 'MAHNOB'], 'Invalid dataset name'

    # facial landmarks detector; use gpu by changing device parameter to 'cuda'
    fa = face_alignment.FaceAlignment(face_alignment.LandmarksType._2D, flip_input=False, device='cpu')

    if dataset == 'DEAP':
        root = './datasets/DEAP/frames/'
        des_path = './data/DEAP/faces/'

    # if dataset == 'MAHNOB':
    #     root = './datasets/MAHNOB/frames/'
    #     des_path = './data/DEAP/faces/'

    for subject in os.listdir(root):
        for trial in os.listdir(root+subject):
            if os.path.exists(des_path + subject + '/' + trial):
                continue
            os.mkdir(des_path + subject + '/' + trial)
            for frame in os.listdir(root+subject+'/'+trial):
                frame_path = root + subject + '/' + trial + '/' + frame
                img = cv2.imread(frame_path)
                preds = fa.get_landmarks(img)
                try:
                    landmarks_list = preds[0]
                    landmarks_dict = to_dict(landmarks_list)
                    aligned_face, eye_center, angle = align_face(image_array=img, landmarks=landmarks_dict)
                    rotated_landmarks = rotate_landmarks(landmarks=landmarks_dict, eye_center=eye_center, angle=angle,
                                                     row=img.shape[0])
                    cropped_img, left, top = crop_face(image_array=aligned_face, landmarks=rotated_landmarks)

                    cv2.imwrite(des_path + subject + '/' + trial + '/' + frame, cropped_img)
                except:
                    print(f'Fail to get the face image: {frame}')

In [18]:
# ************************* Bio-sensing Data Pre-process *************************

def trial2segments(dataset='DEAP'):
    '''
    Divide bio-sensing data of each trial to 1-second length segments, and perform baseline removal.
    Note, when dealing with MAHNOB-HCI dataset, EEG data should be common reference averaged, bandpass filtered and artefact removed using EEGLab,
    and preprocessed EEG data files (one file per trial) should be stored in './datasets/MAHNOB/eeg_preprocessed/ folder in .npy format.
    :param dataset: used dataset
    '''
    assert dataset in ['DEAP', 'MAHNOB'], 'Invalid dataset name'

    if dataset == 'DEAP':
        root = './datasets/DEAP/data_preprocessed_python/'
        des_path = './data/DEAP/bio/'
        labels = pd.read_csv('./data/DEAP/labels/participant_ratings.csv')
        for file in os.listdir(root):
            subject = file.split('.')[0]
            sub_id = int(subject[1:])
            os.mkdir(des_path + 's' + str(sub_id))
            f = open(root + file, 'rb')
            d = cPickle.load(f, encoding='latin1')
            data = d['data']
            for experiment in range(40):
                trial = labels[(labels['Participant_id'] == sub_id) & (labels['Experiment_id'] == experiment + 1)][
                    'Trial'].iloc[0]
                # baseline
                l = []
                for i in range(3):
                    l.append(data[experiment][:, i * 128:(i + 1) * 128])
                baseline_mean = sum(l) / 3
                # segments
                for i in range(60):
                    data_seg = data[experiment][:, 384 + i * 128:384 + (i + 1) * 128]
                    data_seg_removed = data_seg - baseline_mean
                    np.save(f'{des_path}s{sub_id}/{sub_id}_{trial}_{i + 1}.npy', data_seg_removed)

In [None]:
def preprocess_demo():
    '''
    This function pre-processes DEAP dataset.
    Please unzip face_video.zip, data_preprocessed_python.zip and metadata_csv.zip from DEAP dataset in './datasets/DEAP/'.
    Then call this function, the preprocessed data will be stored in './data/DEAP/'.
    It is recommended to use a device with GPU, otherwise the face detection will be slow.
    Note that faces cannot be detected from some frames, these frames should be replaced with the neighbour frame manually.
    '''
    # pre-process face data
    if not os.path.exists('./datasets/DEAP/frames/'):
        os.mkdir('./datasets/DEAP/frames/')
    if not os.path.exists('./data/'):
        os.mkdir('./data/')
    if not os.path.exists('./data/DEAP/'):
        os.mkdir('./data/DEAP/')
    if not os.path.exists('./data/DEAP/faces/'):
        os.mkdir('./data/DEAP/faces/')
    if not os.path.exists('./data/DEAP/labels/'):
        os.mkdir('./data/DEAP/labels/')
    shutil.copy('./datasets/DEAP/metadata_csv/participant_ratings.csv', './data/DEAP/labels/participant_ratings.csv')
    video2frames('DEAP')
    face_detection_alignment_cropping('DEAP')

    # preprocess bio-sensing data
    if not os.path.exists('./data/DEAP/bio/'):
        os.mkdir('./data/DEAP/bio/')
    trial2segments('DEAP')


if __name__ == '__main__':
    preprocess_demo()

KeyboardInterrupt: ignored