In [None]:
import pickle

from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
from moviepy.editor import VideoFileClip

import re

import Mic.video as mic
import Camera.video as cam

In [None]:
feature_path = "./dataset/IEMOCAP_features.pkl"
iemocap_features = pickle.load(open(feature_path, 'rb'), encoding='latin1')

videoIDs, videoSpeakers, videoLabels, videoText, videoAudio, videoVisual, videoSentence, trainVid, testVid = iemocap_features
emotionIDs = ['hap', 'sad', 'neu', 'ang', 'exc', 'fru', 'fer', 'sup', 'dis']


audio_emotions = {
    'Statement': None,
    'Text': {
        'Emotion': None,
        'Score': None,
    },
    'Audio': {
        'Emotion': None,
        'Score': None,
    }
}

video_emotions = []

In [None]:
def extract_time(duration):
    return [float(time) for time in re.findall("\d+\.\d+", duration)]

def create_clips(vid_path, start_time, end_time):
        ffmpeg_extract_subclip(vid_path, start_time, end_time, targetname="train_clip.avi")

        clip = VideoFileClip("train_clip.avi")
        width = clip.w
        clip.crop(x1=0, width=width // 2).write_videofile("left_half.mp4")
        clip.crop(x1=width // 2, width=width // 2).write_videofile("right_half.mp4")

def get_clip_speaker(vid_seg_id):
    right_speaker = vid_seg_id[5]
    if vid_seg_id.find('script') != -1:
        speaker = vid_seg_id[18]
    else:
        speaker = vid_seg_id[15]

    if speaker == right_speaker:
        # use right_half
        return "right_half.mp4"
    
    return "left_half.mp4"

def extract_video(video_emotions):
    EMOTIONS_LIST = ["Angry", "Disgust",
                     "Fear", "Happy",
                     "Neutral", "Sad",
                     "Surprise"]
    count = [video_emotions.count(emotion) for emotion in EMOTIONS_LIST]
    mode = max(count)
    if mode == 0:
        mode = 1
    return [num // mode for num in count]

def extract_audio(audio_emotions):
    EMOTIONS_LIST = ['neu', 'ang', 'hap', 'sad']
    audio_input = [0] * len(EMOTIONS_LIST)
    emotion = audio_emotions['Audio']['Emotion']
    audio_input[EMOTIONS_LIST.index(emotion)] = 1
    return audio_input

def extract_text(audio_emotions):
    EMOTIONS_LIST = ['admiration', 'amusement', 'anger', 'annoyance', 'approval', 'caring', 'confusion', 'curiosity', 'desire', 'disappointment', 'disapproval', 'disgust', 'embarrassment', 'excitement', 'fear', 'gratitude', 'grief', 'joy', 'love', 'nervousness', 'optimism', 'pride', 'realization', 'relief', 'remorse', 'sadness', 'surprise', 'neutral']
    text_input = [0] * len(EMOTIONS_LIST)
    emotion = audio_emotions['Text']['Emotion']
    text_input[EMOTIONS_LIST.index(emotion)] = 1
    return text_input
    

In [None]:
x_train = []
y_train = []

for vid_id in trainVid:
    ses_id = vid_id[4]
    ses_path = f'./dataset/IEMOCAP_full_release/Session{ses_id}/dialog/'
    vid_path = ses_path + 'avi/DivX/' + vid_id + '.avi'
    eval_path = ses_path + 'EmoEvaluation/' + vid_id + '.txt'
    

    with open(eval_path, 'r') as eval_file:
        eval_segments = [line for line in eval_file if line[0] == '[']
        for segment in eval_segments:
            duration, vid_seg_id, emotion, _ = segment.split('\t')
            print(vid_seg_id)
            if emotion == 'xxx':
                print("DNE")
                continue

            start_time, end_time = extract_time(duration)
            vid_len = end_time-start_time

            create_clips(vid_path=vid_path, start_time=start_time, end_time=end_time)

            video = get_clip_speaker(vid_seg_id=vid_seg_id)

            print("Getting Audio")
            audio_file_success, audio_response = mic.get_audio(video, vid_len)
            print("Done")

            if audio_file_success:
                print("Running Text Analysis")
                text_analysis_success, text_response = mic.run_text_analysis(audio_response)
                print("Done")

                if text_analysis_success:
                    audio_emotions["Statement"] = text_response[0]
                    print("Statement: ", audio_emotions['Statement'])
                    audio_emotions['Text'] = {
                        'Emotion': text_response[1][0]['label'],
                        'Score': text_response[1][0]['score']
                    }
                    print("Text: ", audio_emotions["Text"])
                
                else:
                    print("Error: ", text_response)
                    audio_emotions = {
                        'Statement': None,
                        'Text': {
                            'Emotion': None,
                            'Score': None,
                        },
                        'Audio': {
                            'Emotion': None,
                            'Score': None,
                        }
                    }
                    break

                print("Running Audio Analysis")
                out_prob, score, index, text_lab = mic.run_audio_analysis(audio_response)
                audio_emotions['Audio'] = {
                    'Emotion': text_lab[0],
                    'Score': score.item()
                }
                print("Audio: ", audio_emotions["Audio"])
            
            else:
                print("Error: ", audio_response)
                audio_emotions = {
                    'Statement': None,
                    'Text': {
                        'Emotion': None,
                        'Score': None,
                    }
                }
                audio_emotions['Audio'] = {
                    'Emotion': None,
                    'Score': None,
                }
                break
            
            if audio_emotions['Statement'] != None:
                print("Running Video Analysis")
                video_emotions = cam.get_pred_frame(video)
                print(video_emotions)

            video_input = extract_video(video_emotions)
            audio_input = extract_audio(audio_emotions)
            text_input = extract_text(audio_emotions)

            input = [*video_input, *audio_input, *text_input]

            expected_output = videoLabels[vid_id][videoIDs[vid_id].index(vid_seg_id)]

            x_train.append(input)
            y_train.append(expected_output)
            

In [None]:
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score
import timeit

In [8]:
clf = MLPClassifier(hidden_layer_sizes=(150,150,150), max_iter=3000, verbose=1, random_state=21, tol=0.000000001)

start = timeit.default_timer()
clf.fit(x_train, y_train)
stop = timeit.default_timer()


In [None]:
x_test = []
y_test = []

for vid_id in testVid:
    ses_id = vid_id[4]
    ses_path = f'./dataset/IEMOCAP_full_release/Session{ses_id}/dialog/'
    vid_path = ses_path + 'avi/DivX/' + vid_id + '.avi'
    eval_path = ses_path + 'EmoEvaluation/' + vid_id + '.txt'
    

    with open(eval_path, 'r') as eval_file:
        eval_segments = [line for line in eval_file if line[0] == '[']
        for segment in eval_segments:
            duration, vid_seg_id, emotion, _ = segment.split('\t')
            print(vid_seg_id)
            if emotion == 'xxx':
                print("DNE")
                continue

            start_time, end_time = extract_time(duration)
            vid_len = end_time-start_time

            create_clips(vid_path=vid_path, start_time=start_time, end_time=end_time)

            video = get_clip_speaker(vid_seg_id=vid_seg_id)

            print("Getting Audio")
            audio_file_success, audio_response = mic.get_audio(video, vid_len)
            print("Done")

            if audio_file_success:
                print("Running Text Analysis")
                text_analysis_success, text_response = mic.run_text_analysis(audio_response)
                print("Done")

                if text_analysis_success:
                    audio_emotions["Statement"] = text_response[0]
                    print("Statement: ", audio_emotions['Statement'])
                    audio_emotions['Text'] = {
                        'Emotion': text_response[1][0]['label'],
                        'Score': text_response[1][0]['score']
                    }
                    print("Text: ", audio_emotions["Text"])
                
                else:
                    print("Error: ", text_response)
                    audio_emotions = {
                        'Statement': None,
                        'Text': {
                            'Emotion': None,
                            'Score': None,
                        },
                        'Audio': {
                            'Emotion': None,
                            'Score': None,
                        }
                    }
                    break

                print("Running Audio Analysis")
                out_prob, score, index, text_lab = mic.run_audio_analysis(audio_response)
                audio_emotions['Audio'] = {
                    'Emotion': text_lab[0],
                    'Score': score.item()
                }
                print("Audio: ", audio_emotions["Audio"])
            
            else:
                print("Error: ", audio_response)
                audio_emotions = {
                    'Statement': None,
                    'Text': {
                        'Emotion': None,
                        'Score': None,
                    }
                }
                audio_emotions['Audio'] = {
                    'Emotion': None,
                    'Score': None,
                }
                break
            
            if audio_emotions['Statement'] != None:
                print("Running Video Analysis")
                video_emotions = cam.get_pred_frame(video)
                print(video_emotions)

            video_input = extract_video(video_emotions)
            audio_input = extract_audio(audio_emotions)
            text_input = extract_text(audio_emotions)

            input = [*video_input, *audio_input, *text_input]

            expected_output = videoLabels[vid_id][videoIDs[vid_id].index(vid_seg_id)]

            x_test.append(input)
            y_test.append(expected_output)
            

In [None]:
y_pred = clf.predict(x_test)
accuracy = accuracy_score(y_test, y_pred)
print('accuracy: ', accuracy)

In [None]:
filename = 'finalized_model.sav'
pickle.dump(clf, open(filename, 'wb'))