In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Import

In [2]:
from pathlib import Path
import numpy as np
from PIL import Image
from matplotlib import pyplot as plt
import torch
import torch.nn as nn
import torchvision.transforms.functional as TF
from torchvision import models
from torch.utils.data import Dataset
from torch.utils.data import random_split, DataLoader
#from torchmetrics.functional import accuracy
from torch.nn.utils.rnn import pad_sequence
import torchvision.transforms as T
import argparse
import imutils
import time
import dlib
import cv2
from google.colab.patches import cv2_imshow
import os
import math
import csv

import sys
sys.path.append('/content/drive/MyDrive/NAPOLI/code/mtcnn/')
from mtcnn import MTCNN

# **Head pose estimation**

In [3]:
class PoseEstimator:
    def __init__(self, weights=None):
      device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
      self.detector =  MTCNN()

    def detect_faces(self, image, image_shape_max=640):

      image_shape = image.shape[:2]

      # perform image resize for faster detection
      if image_shape_max:
          scale_factor = max([1, max(image_shape) / image_shape_max])
      else:
          scale_factor = 1

      if scale_factor > 1:
          scaled_image = cv2.resize(image, (0, 0), fx = 1/scale_factor, fy = 1/scale_factor)

          start = time.time()
          print("[INFO[ performing face detection with MTCNN...")
          boxes, points = self.detector.detect_faces(scaled_image)
          end = time.time()
          print("[INFO] face detection took {:.4f} seconds".format(end - start))

          boxes[:,:4] *= scale_factor
          points *= scale_factor
      else:
          start = time.time()
          print("[INFO[ performing face detection with MTCNN...")
          boxes, points = self.detector.detect_faces(image)
          end = time.time()
          print("[INFO] face detection took {:.4f} seconds".format(end - start))

      return boxes, points

    def draw_landmarks(self, image, boxes, points):

      font = cv2.FONT_HERSHEY_COMPLEX # Text in video
      font_size = 0.6
      blue = (0, 0, 255)
      green = (0,128,0)
      red = (255, 0, 0)

      #boxes, points = self.detect_faces(image)

      boxes = boxes.astype(int)
      points = points.astype(int)
      # draw rectangle and landmarks on face
      cv2.rectangle(image, (boxes[0], boxes[1]), (boxes[2], boxes[3]), red, 1)
      cv2.circle(image, (int(points[0]), int(points[5])), 2, blue, 2)# left eye
      cv2.circle(image, (int(points[1]), int(points[6])), 2, blue, 2)# right eye
      cv2.circle(image, (int(points[2]), int(points[7])), 2, blue, 2)# nose
      cv2.circle(image, (int(points[3]), int(points[8])), 2, blue, 2)# mouth - left
      cv2.circle(image, (int(points[4]), int(points[9])), 2, blue, 2)# mouth - right

      cv2_imshow(image)

      #w = int(boxes[2])-int(boxes[0]) # width
      #h = int(boxes[3])-int(boxes[1]) # height


    def one_face(self, frame, bbs, pointss):

      # select only process only one face (center ?)
      offsets = [(bbs[:,0]+bbs[:,2])/2-frame.shape[1]/2,
                (bbs[:,1]+bbs[:,3])/2-frame.shape[0]/2]
      offset_dist = np.sum(np.abs(offsets),0)
      index = np.argmin(offset_dist)
      bb = bbs[index]
      points = pointss[:,index]
      return bb, points


    def find_roll(self, points):
      return points[6] - points[5]

    def find_yaw(self, points):

      le2n = points[2] - points[0]
      re2n = points[1] - points[2]
      return le2n - re2n

    def find_pitch(self, points):

      eye_y = (points[5] + points[6]) / 2
      mou_y = (points[8] + points[9]) / 2
      e2n = eye_y - points[7]
      n2m = points[7] - mou_y
      return e2n / n2m


# **DATASET Head and Gaze position RESNET**
create dataset using head pose and features extracted by Resnet from eyes images


In [4]:
#preprocessing DATA per training della SVM

eyes_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_eye.xml")

class dataset(Dataset):

  def __init__(self, csv_file):
    self.sample_elements = []
    # sample_elements = [(sample_1), (sample_2),..]
    # sample_1 = [(FEATURES OCCHI, POSIZIONE TESTA), LABLE)] etc.
    self.eyes_features = []
    self.roll_pitch_yaw = []
    self.pupil_coords = []
    self.paths = []
    self.labels = []
    self.faces_cropped = []
    self.names = []
    self.csv_file = csv_file
    self.model = models.resnet50(weights='DEFAULT')
    self.extract_path_label_from_csv()


  def extract_path_label_from_csv(self):

    #read CSV
    with open(self.csv_file, mode ='r') as file:

      next(file)
      csvFile = csv.reader(file)

      for lines in csvFile:
        #print(lines)
        if '0_0' not in lines[0]:
          id_class = lines[1]
          name = lines[0]
          self.paths.append("/content/drive/MyDrive/NAPOLI/DATASET_FACE/PNG/HEAD-GAZE_DATASET/"+name[:3]+"/"+name+".png")
          self.labels.append(id_class)
          self.names.append(name)

    print(self.paths) #[0])
    print(self.labels) #[0])

    return

  def extract_eyestensor_headpose(self):
    #print(self.path)
    est = PoseEstimator()

    for i in self.paths:
      print(i)
      rpy = []
      pupils = []
      image = cv2.imread(i)

      image = imutils.resize(image, width=600)
      image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)# convert to rgb
      image_rgb = cv2.flip(image_rgb, 1)# flip for user friendliness

      bounding_boxes, landmarks = est.detect_faces(image_rgb)

      #print(bounding_boxes)

      if len(bounding_boxes) > 0:
        if len(bounding_boxes) >= 2:
          bounding_boxes, landmarks = est.one_face(image_rgb, bounding_boxes, landmarks)
          roll = est.find_roll(landmarks)
          pitch = est.find_pitch(landmarks)
          yaw = est.find_yaw(landmarks)
          lx = landmarks[0]
          ly = landmarks[5]
          rx = landmarks[1]
          ry = landmarks[6]

          '''
          bounding_boxes = bounding_boxes.astype(int)
          for (x, y, w, h) in bounding_boxes[0]:
            face_crop = image[y:y + h, x:x + w]

          self.faces_cropped.append(face_crop)
          print(face_crop)
          print(len(face_crop))
          '''
        else:
          roll = est.find_roll(landmarks)
          pitch = est.find_pitch(landmarks)
          yaw = est.find_yaw(landmarks)
          lx = landmarks[0]
          ly = landmarks[5]
          rx = landmarks[1]
          ry = landmarks[6]

          print(bounding_boxes)

          '''
          #bounding_boxes = bounding_boxes.astype(int)
          for (x, y, w, h) in bounding_boxes[0]:
            face_crop = image[y:y + h, x:x + w]

          print(face_crop)
          print(len(face_crop))
          self.faces_cropped.append(face_crop)
          '''

      else:
        print("NO FACE FOUND")
        roll = 0.0
        pitch = 0.0
        yaw = 0.0
        lx = 0.0
        ly = 0.0
        rx = 0.0
        ry = 0.0


      gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)# convert to rgb
      eyes = eyes_cascade.detectMultiScale(gray, 1.1, 5, minSize = [40,40])
      #print(eyes)

      # qui prendo un occhio alla volta, chiamo la extract_features
      # appendo in eyes_tensor direttamente le features estratte da entrambi gli occhi

      actual_eyes = []

      for (x, y, w, h) in eyes:

        eye_crop = image[y:y + h, x:x + w]

        cv2.rectangle(gray, (x, y), (x + w, y + h), (0, 255, 0), 2)

        eye_crop = TF.resize(Image.fromarray(eye_crop), size=[224, 224])
        eye_crop = TF.to_tensor(eye_crop)
        eye_crop = TF.normalize(eye_crop, [0.5], [0.5])

        features = self.extract_eyes_features(eye_crop)

        actual_eyes.append(features)

      # show the output image
      #if len(eyes) > 2:
      #  cv2_imshow(gray)

      #QUI DOBBIAMO COMBINARE LE FEATURES SALVATE IN ACTUAL EYES

      if len(actual_eyes) >= 1:
        if len(actual_eyes) == 1:
          final_features = actual_eyes[0]
        elif len(actual_eyes) > 1:
          final_features = actual_eyes[0] + actual_eyes[1]
          final_features = final_features/2
      else:
        final_features = torch.zeros(1, 2048, 1, 1)

      #print(final_features)
      print(final_features.size())

      #ESTRAZIONE HEAD POSE
      rpy = [roll, pitch, yaw]
      pupils = [lx, ly, rx, ry]

      self.eyes_features.append(final_features)
      self.roll_pitch_yaw.append(rpy)
      self.pupil_coords.append(pupils)


  def extract_eyes_features(self, eye):
    with torch.no_grad():
      # strip the last layer
      feature_extractor = torch.nn.Sequential(*list(self.model.children())[:-1])

      features = feature_extractor(eye.unsqueeze(0)) # output now has the features corresponding to input eye
      #print(features.shape)

    return features


  def process_data(self):

    print(len(self.labels))
    for i in range(len(self.labels)):
      j = 0
      eyes = []
      #print(len(self.eyes_features[i][0]))
      while j < 2048:
        eyes.append(self.eyes_features[i][0][j][0][0].numpy().tolist())
        j += 1
      head_pose = np.array(self.roll_pitch_yaw[i])
      pupil_pose = np.array(self.pupil_coords[i])

    #  QUI DOBBIAMO CREARE I SAMPLE DEL DATASET

      #print(len(eyes))
      self.sample_elements.append([np.array(eyes), np.array(head_pose), np.array(pupil_pose)])

    return self.sample_elements, self.labels

  def __len__(self):
    # returns the number of samples in our dataset
    return len(self.sample_elements)

  def __getitem__(self, i):
    return self.sample_elements[i]

In [5]:
data = dataset('/content/drive/MyDrive/NAPOLI/dataset_final.csv')

Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to /root/.cache/torch/hub/checkpoints/resnet50-11ad3fa6.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 319MB/s]


['/content/drive/MyDrive/NAPOLI/DATASET_FACE/PNG/HEAD-GAZE_DATASET/1_1/1_1_a(1).png', '/content/drive/MyDrive/NAPOLI/DATASET_FACE/PNG/HEAD-GAZE_DATASET/1_1/1_1_a(2).png', '/content/drive/MyDrive/NAPOLI/DATASET_FACE/PNG/HEAD-GAZE_DATASET/1_1/1_1_a(3).png', '/content/drive/MyDrive/NAPOLI/DATASET_FACE/PNG/HEAD-GAZE_DATASET/1_1/1_1_a(4).png', '/content/drive/MyDrive/NAPOLI/DATASET_FACE/PNG/HEAD-GAZE_DATASET/1_1/1_1_a(5).png', '/content/drive/MyDrive/NAPOLI/DATASET_FACE/PNG/HEAD-GAZE_DATASET/1_1/1_1_a(6).png', '/content/drive/MyDrive/NAPOLI/DATASET_FACE/PNG/HEAD-GAZE_DATASET/1_1/1_1_a(7).png', '/content/drive/MyDrive/NAPOLI/DATASET_FACE/PNG/HEAD-GAZE_DATASET/1_1/1_1_a(8).png', '/content/drive/MyDrive/NAPOLI/DATASET_FACE/PNG/HEAD-GAZE_DATASET/1_1/1_1_a(9).png', '/content/drive/MyDrive/NAPOLI/DATASET_FACE/PNG/HEAD-GAZE_DATASET/1_1/1_1_a(10).png', '/content/drive/MyDrive/NAPOLI/DATASET_FACE/PNG/HEAD-GAZE_DATASET/1_1/1_1_a(11).png', '/content/drive/MyDrive/NAPOLI/DATASET_FACE/PNG/HEAD-GAZE_DATA

In [6]:
data.extract_eyestensor_headpose()

Instructions for updating:
Deprecated in favor of operator or tf.math.divide.


/content/drive/MyDrive/NAPOLI/DATASET_FACE/PNG/HEAD-GAZE_DATASET/1_1/1_1_a(1).png
[INFO[ performing face detection with MTCNN...
[INFO] face detection took 9.1340 seconds
[[218.73394909 305.33268112 380.56492247 528.29256896   0.99960345]]
torch.Size([1, 2048, 1, 1])
/content/drive/MyDrive/NAPOLI/DATASET_FACE/PNG/HEAD-GAZE_DATASET/1_1/1_1_a(2).png
[INFO[ performing face detection with MTCNN...
[INFO] face detection took 0.2841 seconds
[[225.64135327 302.66041025 396.97103173 536.91371975   0.99901688]]
torch.Size([1, 2048, 1, 1])
/content/drive/MyDrive/NAPOLI/DATASET_FACE/PNG/HEAD-GAZE_DATASET/1_1/1_1_a(3).png
[INFO[ performing face detection with MTCNN...
[INFO] face detection took 0.1407 seconds
[[213.52393369 311.20970319 370.88877611 518.3644875    0.99968433]]
torch.Size([1, 2048, 1, 1])
/content/drive/MyDrive/NAPOLI/DATASET_FACE/PNG/HEAD-GAZE_DATASET/1_1/1_1_a(4).png
[INFO[ performing face detection with MTCNN...
[INFO] face detection took 0.0574 seconds
[[224.33454812 309.083939

KeyboardInterrupt: ignored

In [None]:
samples, labels = data.process_data()

In [None]:
#WRITE THE CSV FILE

#write the header
header = []
for i in range(2048):
  header.append("feature_"+str(i))

header.append("roll")
header.append("pitch")
header.append("yawn")
header.append("lx")
header.append("ly")
header.append("rx")
header.append("ry")
header.append("label")
print(header)
print(len(header))


f = "/content/drive/MyDrive/NAPOLI/features_prova.csv"

with open(f, 'w') as csvfile:
  writer = csv.writer(csvfile)

  writer.writerow(header)

  j = 0
  for i in samples:
    m = np.concatenate((i[0], i[1], i[2], labels[j]), axis=None)
    j += 1

    writer.writerow(m)
    print(m)
    print(len(m))

    #print(len(i[0]))
    #print(len(i[1]))
  print("------------------------------")
  print(labels)


# **SVM**

In [None]:
import pandas as pd
df = pd.read_csv("/content/drive/MyDrive/NAPOLI/features_prova.csv", delimiter=',', header=0)

In [None]:
df.shape
print(df.shape)
print(df)

In [None]:
from sklearn.preprocessing import normalize, minmax_scale, StandardScaler, LabelEncoder

def normalization_data(data, normalization):

  if normalization == "min_max":

    data_norm = data  #.to_numpy() #fa side effect su uavs
    max = np.max(data_norm)
    min = np.min(data_norm)
    for i in range(905):
      for j in range(2055):  #2055
        data_norm[i][j] = np.divide((data_norm[i][j]-min),(max-min))

  elif normalization == "normalize":
    data_norm = normalize(data, norm='l2', axis=1, copy=True, return_norm=False)

  elif normalization == "scaler":
    scaler = StandardScaler()
    scaler.fit(data)
    data_norm = scaler.transform(data)

  else :
    data_norm = minmax_scale(data, feature_range=(0,1))

  print(data_norm)
  print(np.max(data_norm), np.min(data_norm))

  return data_norm

In [None]:
#SVM CLASSIFIER

from sklearn import svm, datasets
import sklearn.model_selection as model_selection
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC


# The SVC Class from Sklearn

poly_SVC = SVC(C=100,                       # The regularization parameter
            kernel='poly',                  # 'rbf', 'linear', 'sigmoid' The kernel type used
            degree=4,                       # Degree of polynomial function
            gamma='scale',                  # The kernel coefficient
            coef0=10.0,                     # If kernel = 'poly'/'sigmoid'
            shrinking=True,                 # To use shrinking heuristic
            probability=False,              # Enable probability estimates
            tol=0.001,                      # Stopping crierion
            cache_size=200,                 # Size of kernel cache
            class_weight=None,              # The weight of each class
            verbose=False,                  # Enable verbose output
            max_iter=- 1,                   # Hard limit on iterations
            decision_function_shape='ovo',  # One-vs-rest or one-vs-one
            break_ties=False,               # How to handle breaking ties
            random_state=None               # Random state of the model
            )

In [None]:
df_shuffled = df.sample(frac=1).reset_index()
#print(df_shuffled.head())

X = df_shuffled.drop(['label','index'], axis=1)
y = df_shuffled['label']

#xx = df_shuffled[['roll', 'pitch', 'yawn', 'lx', 'ly', 'rx', 'ry']]
#print(y)

# assuming X has multiple columns and y only one column
X = X.values

normalization = "normalize"
x1 = normalization_data(X, normalization)        #choose between : normalize, min_max, scaler, min_max_scale
print(x1[1])
y = y.tolist()
print(y)

In [None]:
from sklearn.datasets import make_classification
from sklearn.experimental import enable_halving_search_cv  # noqa
from sklearn.model_selection import HalvingGridSearchCV
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB

#base_estimator = KNeighborsClassifier(n_neighbors=5)
#param_grid = {"n_neighbors":[2,3,5,7,10,12]}
#search = HalvingGridSearchCV(base_estimator, param_grid).fit(x1, y)

param_grid= {'kernel': ('poly', 'rbf', 'linear'),
              'C': [1, 10, 100], 'degree': [2,4,5,8,10,11], 'coef0': [1.0, 2.0, 3.0, 5.0, 10.0]}
base_estimator = SVC(gamma='scale')

search = HalvingGridSearchCV(base_estimator, param_grid, cv=5,
                          factor=2, min_resources=20).fit(x1, y)


In [None]:
search.best_params_

In [None]:
X_train, X_test, y_train, y_test = train_test_split(x1, y, test_size = 0.20, random_state=12, shuffle=True)

#clf = KNeighborsClassifier(n_neighbors=1)
#clf.fit(X_train, y_train)

#clf = GaussianNB()
#model = clf.fit(X_train, y_train)

clf = poly_SVC.fit(X_train, y_train)

In [None]:
x_pred = X_test
#print(x_pred)

#p = np.array(clf.decision_function(x_pred)) # decision is a voting function
#print(np.exp(p))
classes = clf.predict(x_pred)
print(classes)
print(y_test)

accuracy = accuracy_score(y_test, classes)
f1 = f1_score(y_test, classes, average='weighted')
print('Accuracy (RBF Kernel): ', "%.2f" % (accuracy*100))
print('F1 (RBF Kernel): ', "%.2f" % (f1*100))

In [None]:
from sklearn.metrics import ConfusionMatrixDisplay

titles_options = [
    ("Confusion matrix, without normalization", None),
    ("Normalized confusion matrix", "true"),
]
for title, normalize in titles_options:
    disp = ConfusionMatrixDisplay.from_estimator(
        clf,
        X_test,
        y_test,
        display_labels=[1,2,3,4,5,6,7,8,9],
        cmap=plt.cm.Blues,
        normalize=normalize,
    )
    disp.ax_.set_title(title)

    print(title)
    #print(disp.confusion_matrix)

plt.show()

In [None]:
##########################
# SAVE-LOAD using joblib #
##########################
import joblib

# save
joblib.dump(clf, "/content/drive/MyDrive/NAPOLI/svm_poly_normalizenorm_10-1(74.3).pkl")

# load
#clf2 = joblib.load("model.pkl")
#clf2.predict(X)

# TEST PART

In [None]:
data = dataset()

dir_path = "/content/drive/MyDrive/NAPOLI/FRAMES/frames1_INTERNO"

paths = []

for i in os.listdir(dir_path):
  paths.append(dir_path+i)

eyes_features, roll_pitch_yaw, pupil_coords = data.extract_eyestensor_headpose(paths)

sample_elements = data.process_data(eyes_features, roll_pitch_yaw, pupil_coords)

In [None]:
final_samples = []

for i in sample_elements:
  #print(len(i[0]),len(i[1]),len(i[2]))
  s = np.concatenate((i[0], i[1], i[2]), axis=None)  #i[0] i[1],
  print(s)
  final_samples.append(s)

In [None]:
dff = pd.DataFrame(final_samples)

#dff.shape

print(dff)

dff.shape

In [None]:
import joblib

test = dff.values


normalization = "normalize"
test_norm = normalization_data(test, normalization)        #choose between : normalize, min_max, scaler, min_max_scale

#print(test_norm)

#print(test)

clf3 = joblib.load("/content/drive/MyDrive/NAPOLI/svm_poly_normalizenorm_10-1(74.3).pkl")

classes = clf.predict(test_norm)

classes3 = clf3.predict(test_norm)
print(classes)
print("------------------------------------------------------")

print(classes3)
print("------------------------------------------------------")