In [14]:
import cv2
import numpy as np
import opencv_jupyter_ui as jcv2
from feat import Detector
from IPython.display import Image
from feat.utils import FEAT_EMOTION_COLUMNS
import time as t
from time import sleep
from furhat_remote_api import FurhatRemoteAPI
from numpy.random import randint

import time
import sys
import trace
from pathlib import Path
import multiprocessing
import threading
import os
from torch import nn
import torch

In [15]:
class thread_with_trace(threading.Thread):
  def __init__(self, *args, **keywords):
    threading.Thread.__init__(self, *args, **keywords)
    self.killed = False
 
  def start(self):
    self.__run_backup = self.run
    self.run = self.__run      
    threading.Thread.start(self)
 
  def __run(self):
    sys.settrace(self.globaltrace)
    self.__run_backup()
    self.run = self.__run_backup
 
  def globaltrace(self, frame, event, arg):
    if event == 'call':
      return self.localtrace
    else:
      return None
 
  def localtrace(self, frame, event, arg):
    if self.killed:
      if event == 'line':
        raise SystemExit()
    return self.localtrace
 
  def kill(self):
    self.killed = True

### Furhat setup

In [None]:
FURHAT_IP = "127.0.1.1"

furhat = FurhatRemoteAPI(FURHAT_IP)


In [None]:
furhat.set_led(red=100, green=50, blue=50)

### Detector setup

In [16]:
detector = Detector(device="cuda")

In [17]:
class MLP(nn.Module):
    def __init__(self, features_in=2, features_out=3):
        super().__init__()

        self.net = nn.Sequential(
            nn.Linear(features_in, 10),
            nn.ReLU(),
            nn.Linear(10, features_out),
        )

    def forward(self, input):
        return self.net(input)
    
path=Path(os.getcwd()).parent
DIR_PATH=str(path) + '\\'

device = "cuda" if torch.cuda.is_available() else "cpu"
model = torch.load(Path(DIR_PATH + 'models\\best_model_12.pt') ).to(device)

In [18]:
expression = {"anger": 6, "disgust": 5 , "fear": 4, "happiness": 1, "neutral": 0, "sadness": 2, "surprise": 3}
def return_emo(loc):
    for key,_ in expression.items():
        if expression[key] == loc:
            return key

In [19]:
EMO_LIST= []

In [20]:
def capture_and_return_emos(t):
    fl = False
    start= time.time()
    cap = cv2.VideoCapture(1)
    print("Capture on")
    flag=True
    while flag: #or not event.is_set()
        ret, frame = cap.read()
        if not ret:
            print("Error: Unable to capture the frame.")
            break

        detected_faces = detector.detect_faces(frame)
        detected_landmarks = detector.detect_landmarks(frame, detected_faces)
        detected_aus = detector.detect_aus(frame, detected_landmarks)

        for faces,au_units in zip(detected_faces,detected_aus): #access only one frame
            for i in range(len(faces)): #access all faces detected in the frame
                au_arr=model(torch.tensor(au_units[i]).to(device)).cpu()
                max_loc=np.argmax(au_arr.softmax(dim=0).numpy())
                emotion=return_emo(max_loc)
                EMO_LIST.append(emotion)
                x, y, w, h, p = faces[i]
                # Drawing a rectangle around the detected face
                cv2.rectangle(frame, (int(x), int(y)), (int(w), int(h)), (0,0 , 255), 2)

                # Displaying the emotion label on top of the rectangle
                cv2.putText(frame, emotion, (int(x), int(y) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

                stop = time.time()
                # if event.is_set():
                #     flag=False
                #     break
                if stop - start >= t:
                    print(fl)
                    fl = True
                    flag=False
                    break                                   
                 #key == 27:

    # jcv2.destroyAllWindows()
    cap.release()
    print("Capture off")

In [8]:
# event = threading.Event()
# stop_threads = False
# t1 = thread_with_trace(target = capture_and_return_emos(event,)) #lambda : stop_threads,

# t1.start()
# time.sleep(5)
# event.set()
# t1.kill()
# # start = time.time()
# # stop = time.time()
# t1.join()
# # if stop - start >=7:
#     # stop_threads = True
# fl=False

# process1 = multiprocessing.Process(target=capture_and_return_emos(lambda: fl,))
# process1.start()
# start= time.time()
# time.sleep(7)
# stop = time.time()
# if stop - start >= 7:
#     fl = True
# else:
#     fl= False
    
# process1.terminate()
# process1.kill()

In [24]:
process1 = multiprocessing.Process(target=capture_and_return_emos(6)) #, daemon=False
process1.start()
process1.terminate()

# time.sleep(7)
# stop = time.time()
# if stop - start >= 6:
#     fl = True
# else:
#     fl= False
    


Capture on




False
Capture off


In [40]:
#len(EMO_LIST)
max=0
for i in np.unique(EMO_LIST):
    print(i, np.char.count(EMO_LIST, i).sum())
    if np.char.count(EMO_LIST, i).sum() > max:
        max = np.char.count(EMO_LIST, i).sum()
        emo = i
print(emo, max)

anger 2
happiness 9
neutral 5
happiness 9


In [39]:
EMO_LIST

['neutral',
 'neutral',
 'neutral',
 'neutral',
 'neutral',
 'happiness',
 'happiness',
 'happiness',
 'happiness',
 'happiness',
 'anger',
 'happiness',
 'happiness',
 'happiness',
 'anger',
 'happiness']

In [13]:
st = time.time()

for i in range(10000):
    print('', end='')
    ft=time.time()

print('\n', f'{(ft-st):.2f}')



 0.02


In [None]:
def task1():
    print("Task 1 assigned to thread: {}".format(threading.current_thread().name))
    print("ID of process running task 1: {}".format(os.getpid()))
 
def task2():
    print("Task 2 assigned to thread: {}".format(threading.current_thread().name))
    print("ID of process running task 2: {}".format(os.getpid()))

In [None]:
print("ID of process running main program: {}".format(os.getpid()))

print("Main thread name: {}".format(threading.current_thread().name))

t1 = threading.Thread(target=task1, name='t1')
t2 = threading.Thread(target=task2, name='t2')

t1.start()
t2.start()

t1.join()
t2.join()