In [None]:
import pandas as pd
import numpy as np
import mediapipe as mp
import cv2
import csv
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier,GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression,RidgeClassifier


In [None]:
mp_holistic=mp.solutions.holistic
mp_drawing=mp.solutions.drawing_utils

In [None]:
def make_detection(image,model):
    image=cv2.cvtColor(image,cv2.COLOR_BGR2RGB)
    image.flags.writeable=False
    results=model.process(image)
    image.flags.writeable=True
    image=cv2.cvtColor(image,cv2.COLOR_RGB2BGR)
    return image,results

In [None]:
def draw_landmarks(image,results):
    mp_drawing.draw_landmarks(image,results.face_landmarks,mp_holistic.FACEMESH_TESSELATION,
                              mp_drawing.DrawingSpec(color=(0,255,0),thickness=2,circle_radius=1),
                              mp_drawing.DrawingSpec(color=(0,230,0),thickness=2,circle_radius=1)
                              )
   

Make Face Detection

In [None]:
#check if face is detcting using mediapipe
cap=cv2.VideoCapture(0)
with mp_holistic.Holistic(min_detection_confidence=0.4,min_tracking_confidence=0.4) as holistic:
    while cap.isOpened():
        ret,frame=cap.read()
        image,results=make_detection(frame,holistic)
        
        draw_landmarks(image,results)
        cv2.imshow("raw feed",image)
        if cv2.waitKey(10) & 0xFF==ord("q"):
            break
    cap.release()
    cv2.destroyAllWindows()

In [None]:
#get the total no of coordintes in face
num_coords=len(results.face_landmarks.landmark)
num_coords

In [None]:
#get all the headings or feature names for the dataset
landmarks=["class"]
for val in range(1,num_coords+1):
    landmarks+=("x{}".format(val),"y{}".format(val),"z{}".format(val))


In [None]:
#add the feature  names to the csv file 
with open("coords_emotion.csv",mode="w",newline="") as f:
    csv_writer=csv.writer(f,delimiter=",",quotechar='"',quoting=csv.QUOTE_MINIMAL)
    csv_writer.writerow(landmarks)

In [None]:
#set the class name
#To add new class to the csv file change class_name value and collect the data for the class
class_name="Neutral"

Collecting data in real time and adding it to the csv file row by row

In [None]:
cap=cv2.VideoCapture(0)
with mp_holistic.Holistic(min_detection_confidence=0.4,min_tracking_confidence=0.4) as holistic:
    while cap.isOpened():
        ret,frame=cap.read()
        image,results=make_detection(frame,holistic)
        #print(results)
        draw_landmarks(image,results)
        try:
            face=results.face_landmarks.landmark
            face_row=list(np.array([[lnd.x,lnd.y,lnd.z] for lnd in face]).flatten())
            rows=face_row.copy()
            rows.insert(0,class_name)
            with open("coords_emotion.csv",mode="a",newline="") as f:
                csv_writer=csv.writer(f,delimiter=",",quotechar='"',quoting=csv.QUOTE_MINIMAL)
                csv_writer.writerow(rows)
        except:
            pass
        cv2.imshow("raw feed",image)
        if cv2.waitKey(10) & 0xFF==ord("q"):
            break
    cap.release()
    cv2.destroyAllWindows()

In [None]:
#read dataset
df=pd.read_csv("coords_emotion.csv")
df.tail()

In [None]:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score,confusion_matrix,classification_report

In [None]:

x=df.drop(["class"],axis="columns")

In [None]:
x

In [None]:
y=df["class"]

In [None]:
#split the dataset in to train and test 
x_train,x_test,y_train,y_test=train_test_split(x.values,y,test_size=0.2,random_state=0)

In [None]:
#create pipelines
pipelines={
    "rf":make_pipeline(StandardScaler(),RandomForestClassifier()),
    "rc":make_pipeline(StandardScaler(),RidgeClassifier()),
    "gb":make_pipeline(StandardScaler(),GradientBoostingClassifier()),
    "lr":make_pipeline(StandardScaler(),LogisticRegression()),
    "svc":make_pipeline(StandardScaler(),SVC()),
    "knn":make_pipeline(StandardScaler(),KNeighborsClassifier())
}

In [None]:
fit_models={}
for algo,pipeline in pipelines.items():
    model=pipeline.fit(x_train,y_train)
    fit_models[algo]=model

In [None]:
fit_models["lr"].score(x_test,y_test)

In [None]:
for algo,model in fit_models.items():
    yhat=model.predict(x_test)
    print(algo,accuracy_score(yhat,y_test))

In [None]:
model=fit_models["rf"]


In [None]:
yhat=fit_models["rf"].predict(x_test)
print(classification_report(yhat,y_test))

In [None]:
cm=confusion_matrix(yhat,y_test)
model.predict(x_test)

In [None]:
import seaborn as sn
sn.heatmap(cm,annot=True,fmt="d")

save the model

In [None]:
import pickle
with open("face_emotion_model.pkl","wb") as f:
    pickle.dump(fit_models["rf"],f)

In [None]:
with open("face_emotion_model.pkl","rb") as f:
    model=pickle.load(f)

In [None]:
model.score(x_test,y_test)

Make Real time predictions

In [None]:
cap=cv2.VideoCapture(0)
with mp_holistic.Holistic(min_detection_confidence=0.4,min_tracking_confidence=0.4) as holistic:
    while cap.isOpened():
        ret,frame=cap.read()
        image,results=make_detection(frame,holistic)
    
        draw_landmarks(image,results)
        try:
            face=results.face_landmarks.landmark
            face_row=list(np.array([[lnd.x,lnd.y,lnd.z] for lnd in face]).flatten())
            rows=face_row.copy()
            
            body_language_class=model.predict([rows])[0]
            body_language_probab=model.predict_proba([rows])
            

            cv2.rectangle(image,(0,0),(250,60),(255,0,0),-1)

            cv2.putText(image,"CLASS",(95,12),cv2.FONT_HERSHEY_SIMPLEX,0.5,(0,0,0),1,cv2.LINE_AA)

            cv2.putText(image,body_language_class.split(" ")[0],(90,40),cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,255),2,cv2.LINE_AA)

            cv2.putText(image,"PROB",(15,12),cv2.FONT_HERSHEY_SIMPLEX,0.5,(0,0,0),1,cv2.LINE_AA)

            cv2.putText(image,str(round(np.max(body_language_probab),2)),(10,40),cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,255),2,cv2.LINE_AA)




            
        except:
            pass
        cv2.imshow("raw feed",image)
        if cv2.waitKey(10) & 0xFF==ord("q"):
            break
    cap.release()
    cv2.destroyAllWindows()