## Import required libraries

In [514]:
import csv
from pathlib import Path
from sys import __stdout__
from ast import literal_eval
from pickle import dump

import numpy as np
import pandas as pd
import sklearn as sk
import copy
from pandas import DataFrame
from sklearn.decomposition import TruncatedSVD
from sklearn.ensemble import RandomForestRegressor, AdaBoostRegressor
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from typing import Dict, Set, Tuple, List, Any, Optional, Callable
from sklearn.feature_selection import SelectKBest, chi2, mutual_info_regression, VarianceThreshold
from sklearn.svm import SVR

from sklearn.pipeline import Pipeline as skPipeline
from sklearn.pipeline import make_pipeline

from sklearn.base import BaseEstimator, TransformerMixin

import cv2
import mediapipe as mp
mp_drawing = mp.solutions.drawing_utils # type: ignore
mp_drawing_styles = mp.solutions.drawing_styles # type: ignore
mp_face_mesh = mp.solutions.face_mesh # type: ignore
mp_face_mesh_connections = mp.solutions.face_mesh_connections # type: ignore

from settings import *

## Prepare the Training Data

In [524]:
## %%script echo skip

BLENDSHAPE_I = []
WEIGHTS = []
IMAGE_FILES = []

csv_file = open(BLENDSHAPE_FILE, "r")
reader = csv.reader(csv_file, skipinitialspace=True, delimiter=",")
## skip header
next(reader, None)
for line in reader:
    if not Path(line[2]).exists():
        print(line[2])
        print("File not found")
        continue
    BLENDSHAPE_I.append(line[0])
    WEIGHTS.append(line[1])
    IMAGE_FILES.append(line[2])
csv_file.close()

train_file = open(TRAIN_FILE, "w")
writer = csv.writer(train_file, delimiter=",", quoting=csv.QUOTE_ALL)
writer.writerow(HEADERS)

drawing_spec = mp_drawing.DrawingSpec(thickness=1, circle_radius=1)
with mp_face_mesh.FaceMesh(
    static_image_mode=True,
    max_num_faces=1,
    refine_landmarks=True,
    min_detection_confidence=0.5,
) as face_mesh:
    for idx, file in enumerate(IMAGE_FILES):
        image = cv2.imread(file)
        # Convert the BGR image to RGB before processing.
        results = face_mesh.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))

        # Print and draw face mesh landmarks on the image.
        if not results.multi_face_landmarks:
            continue
        annotated_image = image.copy()
        arr = [BLENDSHAPE_I[idx], WEIGHTS[idx]]

        # Assume to have only one face in tracking, 
        # Following is equal to results.multi_face_landmarks[0]
        for face_landmarks in results.multi_face_landmarks:
            for landmark in face_landmarks.landmark:
                arr.append([landmark.x, landmark.y, landmark.z])
            writer.writerow(arr)
train_file.close()


## Define Data Structures and Classes

In [330]:
class BlendshapeData:
    def __init__(self, train_X, test_X, train_Y, test_Y) -> None:
        self.train_X = train_X
        self.test_X = test_X
        self.train_Y = train_Y
        self.test_Y = test_Y

class BlendshapeTrainingSet:
    def __init__(self, blendshape_idx, blendshape_name, X, Y):
        self.blendshape_idx = blendshape_idx
        self.blendshape_name = blendshape_name
        self.X = X
        self.Y = Y

    def __str__(self):
        return f"BlendshapeData({self.blendshape_idx})"
        
blendshape_training_set_lst: List[BlendshapeTrainingSet] = []


class DataTransformer:

    def __init__(self) -> None:
        pass
    
    def transform_X(self, X: DataFrame) -> DataFrame:
        raise NotImplementedError

class Pipeline:

    def __init__(self, pipeline_name: str, 
                dataset_transformer: DataTransformer, 
                model: Callable,
                split: Callable,
                selection_method: Optional[Callable] = None,
                test_size: float = 0.2,
                random_state: int = 42,
                shuffle: bool = True) -> None:
        self.pipeline_name = pipeline_name
        self.dataset_transformer: DataTransformer = dataset_transformer
        self.selection_method = selection_method
        self.model = model
        self.split = split
        self.test_size = test_size
        self.random_state = random_state
        self.shuffle = shuffle
    
    # def fit(self, train_df: DataFrame):
    #     selectors = dict()
    #     predictors = dict()
    #     results = []
    #     transformed_df = self.dataset_transformer(train_df)
    #     global blendshape_idx_lst
    #     for blendshape_i in blendshape_idx_lst:
    #         sub_df = transformed_df[transformed_df["blendshape_i"] == blendshape_i]
    #         reduced_X = sub_df.iloc[:, 2:]
    #         Y = sub_df.filter(regex="weight").to_numpy().flatten()
    #         selectors[f"{blendshape_i}"] = None
    #         if self.selection_method:
    #             predictor = self.selection_method(reduced_X, Y)
    #             reduced_X = predictor.fit_transform(reduced_X, Y)
    #             selectors[f"{blendshape_i}"] = copy.deepcopy(predictor)
    #         splitted_data = self.split(reduced_X, Y)
    #         self.model.fit(splitted_data.train_X, splitted_data.train_Y)
    #         score = self.model.score(splitted_data.test_X, splitted_data.test_Y)
    #         results.append(score)
    #         predictors[f"{blendshape_i}"] = copy.deepcopy(self.model)
    #         # print(f"{self.pipeline_name}: blendshape_i={blendshape_i}, score={score}")
    #     print(f"{self.pipeline_name}: mean={np.mean(results)}, min={np.min(results)}, max={np.max(results)}")
    #     return selectors, predictors
    
    def fit_(self, training_set_list: List[BlendshapeTrainingSet], verbose: int = 1):
        selectors = dict()
        predictors = dict()
        results = []
        for training_set in training_set_list:
            new_X = self.dataset_transformer.transform_X(training_set.X)
            if self.selection_method:
                predictor = self.selection_method(new_X, training_set.Y)
                new_X = predictor.fit_transform(new_X, training_set.Y)
                print(predictor.get_support())
                selectors[f"{training_set.blendshape_idx}"] = copy.deepcopy(predictor)
            X_train, X_test, Y_train, Y_test = train_test_split(
                new_X, training_set.Y, test_size=self.test_size, 
                random_state=self.random_state, shuffle=self.shuffle)
            self.model.fit(X_train, Y_train)
            score = self.model.score(X_test, Y_test)
            results.append(score)
            predictors[f"{training_set.blendshape_idx}"] = copy.deepcopy(self.model)
            break
        if verbose > 0:
            print(f"{self.pipeline_name}: mean={np.mean(results)}, min={np.min(results)}, max={np.max(results)}")
        return selectors, predictors    


## Helper Functions for Debugging

### Plot the selected features

In [414]:
def plot_selected_features(image_file: str, selected_landmark_idx: List[int], output_file_name: str = "") -> None:

    vertices_sets: Dict[str, Set[Tuple[int, int]]] = {
        "FACEMESH_FACE_OVAL": mp_face_mesh_connections.FACEMESH_FACE_OVAL,
        "FACEMESH_LIPS": mp_face_mesh_connections.FACEMESH_LIPS, 
        "FACEMESH_LEFT_EYE": mp_face_mesh_connections.FACEMESH_LEFT_EYE,
        "FACEMESH_LEFT_IRIS": mp_face_mesh_connections.FACEMESH_LEFT_IRIS,
        "FACEMESH_LEFT_EYEBROW": mp_face_mesh_connections.FACEMESH_LEFT_EYEBROW,
        "FACEMESH_RIGHT_EYE": mp_face_mesh_connections.FACEMESH_RIGHT_EYE,
        "FACEMESH_RIGHT_EYEBROW": mp_face_mesh_connections.FACEMESH_RIGHT_EYEBROW,
        "FACEMESH_RIGHT_IRIS": mp_face_mesh_connections.FACEMESH_RIGHT_IRIS}

    uniform_circle_radius: int = 1

    drawing_spec_oval = mp_drawing.DrawingSpec(color=[255, 0, 0], thickness=1, circle_radius=uniform_circle_radius)
    drawing_spec_lips = mp_drawing.DrawingSpec(color=[0, 255, 0], thickness=1, circle_radius=uniform_circle_radius)
    drawing_spec_eyes = mp_drawing.DrawingSpec(color=[0, 0, 255], thickness=1, circle_radius=uniform_circle_radius)
    drawing_spec_iris = mp_drawing.DrawingSpec(color=[255, 255, 0], thickness=1, circle_radius=uniform_circle_radius)
    drawing_spec_eyebrow = mp_drawing.DrawingSpec(color=[0, 255, 255], thickness=1, circle_radius=uniform_circle_radius)
    drawing_spec_norm = mp_drawing.DrawingSpec(color=[255, 0, 255], thickness=1, circle_radius=uniform_circle_radius)
    all_points = mp_drawing.DrawingSpec(color=[255, 255, 255], thickness=1, circle_radius=uniform_circle_radius)
    with mp_face_mesh.FaceMesh(
        static_image_mode=True,
        max_num_faces=1,
        refine_landmarks=True,
        min_detection_confidence=0.5) as face_mesh:
        for idx, file in enumerate([image_file]):
            image = cv2.imread(file)
            # Convert the BGR image to RGB before processing.
            results = face_mesh.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))

            # Print and draw face mesh landmarks on the image.
            if not results.multi_face_landmarks:
                continue
            annotated_image = image.copy()
            face_landmarks = results.multi_face_landmarks[0]
            # oval
            mp_drawing.draw_landmarks(
                image=annotated_image,
                landmark_list=face_landmarks,
                connections=mp_face_mesh_connections.FACEMESH_FACE_OVAL,
                landmark_drawing_spec=None,
                connection_drawing_spec=drawing_spec_oval)
            
            # lips
            mp_drawing.draw_landmarks(
                image=annotated_image,
                landmark_list=face_landmarks,
                connections=mp_face_mesh_connections.FACEMESH_LIPS,
                landmark_drawing_spec=None,
                connection_drawing_spec=drawing_spec_lips)
            
            # eyes
            mp_drawing.draw_landmarks(
                image=annotated_image,
                landmark_list=face_landmarks,
                connections=mp_face_mesh_connections.FACEMESH_LEFT_EYE,
                landmark_drawing_spec=None,
                connection_drawing_spec=drawing_spec_eyes)

            mp_drawing.draw_landmarks(
                image=annotated_image,
                landmark_list=face_landmarks,
                connections=mp_face_mesh_connections.FACEMESH_RIGHT_EYE,
                landmark_drawing_spec=None,
                connection_drawing_spec=drawing_spec_eyes)
            
            # iris
            mp_drawing.draw_landmarks(
                image=annotated_image,
                landmark_list=face_landmarks,
                connections=mp_face_mesh_connections.FACEMESH_LEFT_IRIS,
                landmark_drawing_spec=None,
                connection_drawing_spec=drawing_spec_iris)
            mp_drawing.draw_landmarks(
                image=annotated_image,
                landmark_list=face_landmarks,
                connections=mp_face_mesh_connections.FACEMESH_RIGHT_IRIS,
                landmark_drawing_spec=None,
                connection_drawing_spec=drawing_spec_iris)

            # eyebrow
            mp_drawing.draw_landmarks(
                image=annotated_image,
                landmark_list=face_landmarks,
                connections=mp_face_mesh_connections.FACEMESH_LEFT_EYEBROW,
                landmark_drawing_spec=None,
                connection_drawing_spec=drawing_spec_eyebrow)
            mp_drawing.draw_landmarks(
                image=annotated_image,
                landmark_list=face_landmarks,
                connections=mp_face_mesh_connections.FACEMESH_RIGHT_EYEBROW,
                landmark_drawing_spec=None,
                connection_drawing_spec=drawing_spec_eyebrow)
            
            # All points
            mp_drawing.draw_landmarks(
                image=annotated_image,
                landmark_list=face_landmarks,
                connections=None,
                landmark_drawing_spec=all_points,
                connection_drawing_spec=None)

            s = selected_landmark_idx
            c = [1 for i in range(len(s))]

            # selected
            mp_drawing.draw_landmarks(
                image=annotated_image,
                landmark_list=face_landmarks,
                connections=list(zip(c,s)),
                landmark_drawing_spec=None,
                connection_drawing_spec=drawing_spec_norm)

            filename = './annotated_image-' + str(idx) + '.png'
            if output_file_name != "":
                filename = f"./{output_file_name}.png"
            cv2.imwrite(filename, annotated_image)
            print('save to', filename)

## Load Train Dataset

In [548]:
## %%script echo skip

blendshape_training_set_lst = []

def str_splitted_by_space_to_list(s: str) -> list:
    return [float(x) for x in s.strip("[").strip("]").split()]

def load_data() -> None:
    name_df = pd.read_csv("blendshapes_name.csv", header=0, delimiter=",", index_col=False)
    blendshape_idx_lst = []
    train_df = pd.read_csv(
        TRAIN_FILE, header=0, delimiter=",", index_col=False
    )
    name_val = [name_df.loc[name_df["index"] == idx].iloc[0, 1].strip() for idx in train_df["blendshape_i"]]
    train_df.insert(1, "blendshape_name", name_val)
    blendshape_idx_lst = train_df["blendshape_i"].drop_duplicates().to_list()
    landmarks = train_df.columns[3:].to_list()
    train_df[landmarks] = train_df[landmarks].applymap(literal_eval).applymap(np.array)
    for blendshape_idx in blendshape_idx_lst:
        blendshape_training_set_lst.append(
            BlendshapeTrainingSet(
                blendshape_idx=blendshape_idx,
                blendshape_name=train_df[train_df["blendshape_i"] == blendshape_idx]["blendshape_name"],
                X=train_df[train_df["blendshape_i"] == blendshape_idx][landmarks],
                Y=train_df[train_df["blendshape_i"] == blendshape_idx]["weight"],
            )
        )
load_data()


In [547]:
name_df = pd.read_csv("blendshapes_name.csv", header=0, delimiter=",", index_col=False)
name_df.loc[name_df["index"] == 67].iloc[0, 1].strip()

Index(['index', ' weight', ' filename'], dtype='object')
blendShape1.AU_01_InnerBrowRaiser


In [357]:
print(blendshape_training_set_lst[0].blendshape_idx, blendshape_training_set_lst[0].blendshape_name)
print( blendshape_training_set_lst[0].Y)

67 0     blendShape1.AU_01_InnerBrowRaiser
1     blendShape1.AU_01_InnerBrowRaiser
2     blendShape1.AU_01_InnerBrowRaiser
3     blendShape1.AU_01_InnerBrowRaiser
4     blendShape1.AU_01_InnerBrowRaiser
5     blendShape1.AU_01_InnerBrowRaiser
6     blendShape1.AU_01_InnerBrowRaiser
7     blendShape1.AU_01_InnerBrowRaiser
8     blendShape1.AU_01_InnerBrowRaiser
9     blendShape1.AU_01_InnerBrowRaiser
10    blendShape1.AU_01_InnerBrowRaiser
11    blendShape1.AU_01_InnerBrowRaiser
12    blendShape1.AU_01_InnerBrowRaiser
13    blendShape1.AU_01_InnerBrowRaiser
14    blendShape1.AU_01_InnerBrowRaiser
15    blendShape1.AU_01_InnerBrowRaiser
16    blendShape1.AU_01_InnerBrowRaiser
17    blendShape1.AU_01_InnerBrowRaiser
18    blendShape1.AU_01_InnerBrowRaiser
19    blendShape1.AU_01_InnerBrowRaiser
20    blendShape1.AU_01_InnerBrowRaiser
21    blendShape1.AU_01_InnerBrowRaiser
22    blendShape1.AU_01_InnerBrowRaiser
23    blendShape1.AU_01_InnerBrowRaiser
24    blendShape1.AU_01_InnerBrowRais

## Transforming Dataset

In [169]:
training_set_transformers: Dict[str, Any] = dict()

### Default training set

In [278]:
class DefaultFeatures(BaseEstimator, TransformerMixin):

    def __init__(self):
        super().__init__()

    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        """Produce the training set that using 478 landmarks' coordinates (1434 in total) as features

        Args:
            input_df (DataFrame): the origin dataframe from csv file

        Returns:
            DataFrame: the training set with distance between landmarks
        """
        default_train_columns = (X.columns.to_list()
            + [f"{i}_landmark_{j}" for i in range(N_LANDMARKS) for j in ["x", "y", "z"]])

        default_X =[]

        for _, row in X.iterrows():
            default_X.append(np.concatenate(row.values))
        default_X = DataFrame(data=default_X, columns=default_train_columns)
        return default_X

training_set_transformers["default"] = DefaultFeatures()

### Distance training set

In [277]:
class Distance(BaseEstimator, TransformerMixin):

    def __init__(self):
        super().__init__()
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        """Produce the training set with distance between certain sets of landmarks

        Args:
            input_df (DataFrame): the origin dataframe from csv file

        Returns:
            DataFrame: the training set with distance between landmarks
        """
        vertices_sets: Dict[str, Set[Tuple[int, int]]] = {
            "FACEMESH_FACE_OVAL": mp_face_mesh_connections.FACEMESH_FACE_OVAL,
            "FACEMESH_LIPS": mp_face_mesh_connections.FACEMESH_LIPS, 
            "FACEMESH_LEFT_EYE": mp_face_mesh_connections.FACEMESH_LEFT_EYE,
            "FACEMESH_LEFT_IRIS": mp_face_mesh_connections.FACEMESH_LEFT_IRIS,
            "FACEMESH_LEFT_EYEBROW": mp_face_mesh_connections.FACEMESH_LEFT_EYEBROW,
            "FACEMESH_RIGHT_EYE": mp_face_mesh_connections.FACEMESH_RIGHT_EYE,
            "FACEMESH_RIGHT_EYEBROW": mp_face_mesh_connections.FACEMESH_RIGHT_EYEBROW,
            "FACEMESH_RIGHT_IRIS": mp_face_mesh_connections.FACEMESH_RIGHT_IRIS}
        vertices_sets_new_val = map(tuple_set_to_list, vertices_sets.values())
        vertices_sets = dict(zip(vertices_sets.keys(), vertices_sets_new_val))

        NOSE_IDX = 1
        TOP_DOWN_FACE = (10,152)
        LEFT_RIGHT_OUTER_EYE = (263, 33)
        LEFT_RIGHT_MOUSE= (61, 291)

        # define the column names
        new_columns = list()
        for name, vertices_set in vertices_sets.items():
            for _, vertices in enumerate(vertices_set):
                column = f"{name}_distance_{vertices}"
                new_columns.append(column)

        distance_X = DataFrame(columns=new_columns, dtype=np.float64)

        for i, row in X.iterrows():
            new_row = list()
            middle_point_x = np.mean([(row[LEFT_RIGHT_OUTER_EYE[0]][0] + row[LEFT_RIGHT_OUTER_EYE[1]][0]) / 2, 
                (row[LEFT_RIGHT_MOUSE[0]][0] + row[LEFT_RIGHT_MOUSE[1]][0]) / 2, row[NOSE_IDX][0]])
            middle_point_y = ((row[TOP_DOWN_FACE[0]] + row[TOP_DOWN_FACE[1]]) / 2)[1]
            middle_point_z = 0
            middle_point = [middle_point_x, middle_point_y, middle_point_z]
            normalised_distance = np.linalg.norm(row[TOP_DOWN_FACE[0]] - row[TOP_DOWN_FACE[1]])
            for name, vertices_set in vertices_sets.items():
                for _, vertex in enumerate(vertices_set):
                    distance = np.linalg.norm(row[vertex] - middle_point)
                    new_row.append(distance)
            distance_X.loc[i] = [distance / normalised_distance for distance in new_row] # type: ignore
        ## distance_X.to_csv("test_dis.csv", index=False)  
        return distance_X

    def _tuple_set_to_list(in_set: Set[Tuple]) -> List:
        return list({element for tuple_ in in_set for element in tuple_})

training_set_transformers["distance"] = Distance()

### Full Distance Training Set

The training set base on distances that includes all the points, labelled by their index in MediaPipe.

In [276]:
class FullDistance(BaseEstimator, TransformerMixin):

    def __init__(self):
        super().__init__()
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        """Produce the training set with distance between certain sets of landmarks

        Args:
            input_df (DataFrame): the origin dataframe from csv file

        Returns:
            DataFrame: the training set with distance between landmarks
        """

        NOSE_IDX = 1
        TOP_DOWN_FACE = (10,152)
        LEFT_RIGHT_OUTER_EYE = (263, 33)
        LEFT_RIGHT_MOUSE= (61, 291)

        # define the column names
        new_columns = list()
        for idx, _ in enumerate(X.columns):
            new_columns.append(f"distance_{idx}")

        distance_X = DataFrame(columns=new_columns, dtype=np.float64)

        for i, row in X.iterrows():
            new_row = list()
            middle_point_x = np.mean([(row[LEFT_RIGHT_OUTER_EYE[0]][0] + row[LEFT_RIGHT_OUTER_EYE[1]][0]) / 2, 
                (row[LEFT_RIGHT_MOUSE[0]][0] + row[LEFT_RIGHT_MOUSE[1]][0]) / 2, row[NOSE_IDX][0]])
            middle_point_y = ((row[TOP_DOWN_FACE[0]] + row[TOP_DOWN_FACE[1]]) / 2)[1]
            middle_point_z = 0
            middle_point = [middle_point_x, middle_point_y, middle_point_z]
            normalised_distance = np.linalg.norm(row[TOP_DOWN_FACE[0]] - row[TOP_DOWN_FACE[1]])
            
            for _, landmark in enumerate(row):
                distance = np.linalg.norm(landmark - middle_point)
                new_row.append(distance)
            distance_X.loc[i] = [distance / normalised_distance for distance in new_row] # type: ignore
        return distance_X

training_set_transformers["full_distance"] = FullDistance()

## Splitting the Dataset into Train, Test and Validation

In [173]:
def default_split(X, Y) -> BlendshapeData:
    train_X, test_X, train_Y, test_Y = train_test_split(
            X, Y, test_size=0.2, random_state=42, shuffle=True
        )
    return BlendshapeData(train_X, test_X, train_Y, test_Y)

## Features selection

In [174]:
selection_methods: Dict[str, Callable] = dict()

### PCA

In [175]:
def selection_pca(X, Y) -> PCA:
    n: int = min(X.shape)
    pca = PCA(n_components=n)
    pca.fit(X=X, y=Y)
    explained_variance_ratios = pca.explained_variance_ratio_ # type: ignore
    for i in range(n):
        if sum(explained_variance_ratios[:i]) > 0.95:
            n = i
            break
    pca = PCA(n_components=n)
    return pca
    
selection_methods["PCA"] = selection_pca


### Selected K Best base on Chi2

In [176]:
def selection_k_best_chi2(X, Y) -> SelectKBest:
    # k = 0
    # selector = SelectKBest(chi2, k=10)
    # selector.fit(X, Y)
    # selected_features = selector.get_support(indices=True)
    # selected_features = sorted(selected_features, key = lambda x: selector.scores_[x], reverse=True)
    # for i in range(len(selected_features)):
    #     if selector.scores_[selected_features[i]] > 0.010:
    #         k += 1
    #     else:
    #         break
    selector = SelectKBest(chi2, k=7)
    return selector

selection_methods["chi2"] = selection_k_best_chi2

### Selected K Best base on ?

## Model Training and Evaluation

In [177]:
models: Dict[str, Callable]  = dict()

### Linear Regression

#### Linear Regression

In [178]:
models["linear-regression"] = LinearRegression()

#### Logistic Regression

In [179]:
models["logistic-regression"] = LogisticRegression()

### Ensemble Models

#### Random Forest Regressor

In [180]:
models["random_forest_regressor"] = RandomForestRegressor()

#### Ada Boost Regressor

In [181]:
models["ada_boost_regressor"] = AdaBoostRegressor()

## Training

### Define the pipelines

In [551]:
pipelines: Dict[str, Pipeline] = dict()

# pipelines["default_random_forest_regressor"] = Pipeline(
#     pipeline_name="default_random_forest_regressor",
#     dataset_transformer=training_set_transformers["default"],
#     selection_method=None,
#     model=models["random_forest_regressor"],
#     split=default_split)
# pipelines["distance_random_forest_regressor"] = Pipeline(
#     pipeline_name="distance_random_forest_regressor",
#     dataset_transformer=training_set_transformers["distance"],
#     selection_method=None,
#     model=models["random_forest_regressor"],
#     split=default_split)
# pipelines["default_ada_boost_regressor"] = Pipeline(
#     pipeline_name="default_ada_boost_regressor",
#     dataset_transformer=training_set_transformers["default"],
#     selection_method=None,
#     model=models["ada_boost_regressor"],
#     split=default_split)
# pipelines["distance_ada_boost_regressor"] = Pipeline(
#     pipeline_name="distance_ada_boost_regressor",
#     dataset_transformer=training_set_transformers["distance"],
#     selection_method=None,
#     model=models["ada_boost_regressor"],
#     split=default_split)
# pipelines["default_pca_ada_boost_regressor"] = Pipeline(
#     pipeline_name="default_pca_ada_boost_regressor",
#     dataset_transformer=training_set_transformers["default"],
#     selection_method=selection_methods["PCA"],
#     model=models["ada_boost_regressor"],
#     split=default_split)
# pipelines["distance_pca_ada_boost_regressor"] = Pipeline(
#     pipeline_name="distance_pca_ada_boost_regressor",
#     dataset_transformer=training_set_transformers["distance"],
#     selection_method=selection_methods["PCA"],
#     model=models["ada_boost_regressor"],
#     split=default_split)

## Chi2 only allow non-negative values,
## therefore, the default set is not applicable

# pipelines["distance_chi2_linear_regressor"] = Pipeline(
#     pipeline_name="distance_chi2_linear_regressor",
#     dataset_transformer=training_set_transformers["distance"],
#     selection_method=selection_methods["chi2"],
#     model=models["linear-regression"],
#     split=default_split)
# pipelines["distance_chi2_random_forest_regressor"] = Pipeline(
#     pipeline_name="distance_chi2_random_forest_regressor",
#     dataset_transformer=training_set_transformers["distance"],
#     selection_method=selection_methods["chi2"],
#     model=models["random_forest_regressor"],
#     split=default_split)
# pipelines["distance_chi2_ada_boost_regressor"] = Pipeline(
#     pipeline_name="distance_chi2_ada_boost_regressor",
#     dataset_transformer=training_set_transformers["distance"],
#     selection_method=selection_methods["chi2"],
#     model=models["ada_boost_regressor"],
#     split=default_split)
pipelines["full_distance_chi2_linear_regressor"] = Pipeline(
    pipeline_name="full_distance_chi2_linear_regressor",
    dataset_transformer=training_set_transformers["full_distance"],
    selection_method=selection_methods["chi2"],
    model=models["linear-regression"],
    split=default_split)
pipelines["full_distance_chi2_random_forest_regressor"] = Pipeline(
    pipeline_name="full_distance_chi2_random_forest_regressor",
    dataset_transformer=training_set_transformers["full_distance"],
    selection_method=selection_methods["chi2"],
    model=models["random_forest_regressor"],
    split=default_split)
pipelines["full_distance_chi2_ada_boost_regressor"] = Pipeline(
    pipeline_name="full_distance_chi2_ada_boost_regressor",
    dataset_transformer=training_set_transformers["full_distance"],
    selection_method=selection_methods["chi2"],
    model=models["ada_boost_regressor"],
    split=default_split)
# pipelines["distance_tsne_ada_boost_regressor"] = Pipeline(
#     pipeline_name="distance_tsne_ada_boost_regressor",
#     dataset_transformer=training_set_transformers["distance"],
#     selection_method=selection_methods["TSNE"],
#     model=models["ada_boost_regressor"],
#     split=default_split)
# pipelines["distance_tsne_ada_boost_regressor"] = Pipeline(
#     pipeline_name="distance_tsne_ada_boost_regressor",
#     dataset_transformer=training_set_transformers["distance"],
#     selection_method=selection_methods["TSNE"],
#     model=models["ada_boost_regressor"],
#     split=default_split)

In [None]:
result_pipelines = dict()

In [553]:
class CustomVarianceThreshold(VarianceThreshold):

    def __init__(self, threshold=0.0, step=1.0e-7, max_try=1000):
        super().__init__(threshold=threshold)
        self.try_count = 0
        self.step = step
        self.max_try = max_try
    
    def fit_transform(self, X, y = None, **fit_params):
        try:
            return super().fit_transform(X, y, **fit_params)
        except ValueError:
            while True:
                print("threshold is too high, new threshold = ", self.threshold - self.step, "")
                self.threshold=self.threshold - self.step
                if self.try_count > self.max_try:
                    self.threshold=0
                    print("No suitable threshold, new threshold = 0")
                    return super().fit_transform(X, y, **fit_params)
                self.try_count += 1
                try:
                    return super().fit_transform(X, y, **fit_params)
                except ValueError:
                    continue

weight = 100
searches = []
fulldistance_chi2_gridsearch_linear_regressor = skPipeline(steps=[('data_transform', FullDistance()), ('feature_selection', SelectKBest(chi2)), ('regression', LinearRegression())])
# param_grid =  {
#     'data_transform__k': [k for k in range(1,10)]
# }

fulldistance_infogain_gridsearch_linear_regressor = skPipeline(steps=[('data_transform', FullDistance()), ('feature_selection', SelectKBest(mutual_info_regression)), ('regression', LinearRegression())])
# param_grid =  {
#     'data_transform__k': [k for k in range(1,10)]
# }

fulldistance_varthershold_gridsearch_linear_regressor = skPipeline(steps=[('data_transform', FullDistance()), ('feature_selection', CustomVarianceThreshold()), ('regression', LinearRegression())])

fulldistance_varthershold_gridsearch_svr = skPipeline(steps=[('data_transform', FullDistance()), ('feature_selection', CustomVarianceThreshold()), ('regression', SVR())])

def custom_score_func(pipeline, X, y):

    feature_selector = pipeline.named_steps['chi2']
    _landmarks = feature_selector.get_support(indices=True)
    

# searches.append(GridSearchCV(
#                 estimator=fulldistance_chi2_gridsearch_linear_regressor, 
#                 param_grid={
#                     'feature_selection__k': [k for k in range(1,10)]
#                 }, 
#                 return_train_score=True, 
#                 error_score='raise'))
# searches.append(GridSearchCV(
#                 estimator=fulldistance_infogain_gridsearch_linear_regressor, 
#                 param_grid={
#                     'feature_selection__k': [k for k in range(1,10)]
#                 }, 
#                 return_train_score=True, 
#                 error_score='raise'))
searches.append(GridSearchCV(
                estimator=fulldistance_varthershold_gridsearch_linear_regressor,
                param_grid={
                    'feature_selection__threshold': [2.5e-5]
                }, 
                return_train_score=True, 
                error_score='raise',
                verbose=0)
                )
searches.append(GridSearchCV(
                estimator=fulldistance_varthershold_gridsearch_svr,
                param_grid={
                    'feature_selection__threshold': [2.5e-5]
                }, 
                return_train_score=True, 
                error_score='raise',
                verbose=0)
                )
for search in searches:
    tmp_res = []
    for training_set in blendshape_training_set_lst[:]: 
        print(training_set.blendshape_idx)
        print(training_set.blendshape_name.to_list()[0])
        search.fit(training_set.X, training_set.Y)
        print("Best Estimator:", search.best_estimator_)
        print("Best Train Score:", search.best_score_)
        tmp_res.append(search.best_estimator_)
        best_filter = search.best_estimator_.named_steps["feature_selection"]
        best_landmarks = best_filter.get_support(indices=True)
        filename_idx = training_set.blendshape_idx
        filename_data_transform = search.best_estimator_.named_steps['data_transform'].__class__.__name__
        filename_feature_selection = search.best_estimator_.named_steps['feature_selection'].__class__.__name__
        if search.best_estimator_.named_steps['feature_selection'].__class__ == SelectKBest:
            filename_feature_selection += f"{search.best_estimator_.named_steps['feature_selection'].score_func.__name__}"
        filename_regression_model = search.best_estimator_.named_steps['regression'].__class__.__name__
        plot_selected_features(f"./index{training_set.blendshape_idx}-weight${weight}.png", best_landmarks, f"{filename_idx}_{filename_data_transform}_{filename_feature_selection}_{filename_regression_model}")
    result_pipelines[f"{filename_data_transform}_{filename_feature_selection}_{filename_regression_model}"] = tmp_res




67
blendShape1.AU_01_InnerBrowRaiser
Best Estimator: Pipeline(steps=[('data_transform', FullDistance()),
                ('feature_selection',
                 CustomVarianceThreshold(threshold=2.5e-05)),
                ('regression', LinearRegression())])
Best Train Score: 0.32129720078206647
save to ./67_FullDistance_CustomVarianceThreshold_LinearRegression.png
68
blendShape1.AU_02_L_OuterBrowRaiser
Best Estimator: Pipeline(steps=[('data_transform', FullDistance()),
                ('feature_selection',
                 CustomVarianceThreshold(threshold=2.5e-05)),
                ('regression', LinearRegression())])
Best Train Score: 0.6313762498686718
save to ./68_FullDistance_CustomVarianceThreshold_LinearRegression.png
69
blendShape1.AU_02_OuterBrowRaiser
Best Estimator: Pipeline(steps=[('data_transform', FullDistance()),
                ('feature_selection',
                 CustomVarianceThreshold(threshold=2.5e-05)),
                ('regression', LinearRegression())])
Best Tr

In [None]:
# a = search.best_estimator_.named_steps["chi2"].get_support(indices=True)
# s = search.best_estimator_.named_steps["chi2"].scores_
# f =  search.best_estimator_.named_steps["chi2"].feature_names_in_
# zipped_s_f = zip(s, f)
# print(a)
# print(sorted(s, reverse=True))
# print(sorted(zipped_s_f, reverse=True))

this_idx = 76-67
print(blendshape_training_set_lst[this_idx].blendshape_name.to_list()[0])

# this_chi2 = SelectKBest(chi2, k = 5)
# new_X = FullDistance().transform(blendshape_training_set_lst[this_idx].X)
# y = blendshape_training_set_lst[this_idx].Y
# this_chi2.fit_transform(new_X, y)
# this_a = this_chi2.get_support(indices=True)
# this_s = this_chi2.scores_
# this_f =  this_chi2.feature_names_in_
# this_zipped_s_f = zip(this_s, this_f)
# sorted_s_f = sorted(this_zipped_s_f, reverse=True)
# this_sorted_f = [f for _, f in sorted_s_f]
# print(this_a)
# print(sorted(this_s, reverse=True))
# print(this_sorted_f)
# print(this_sorted_f[:5])


# this_idx = 1
# this_information_gain = SelectKBest(mutual_info_regression, k = 5)
# new_X = FullDistance().transform(blendshape_training_set_lst[this_idx].X)
# y = blendshape_training_set_lst[this_idx].Y
# this_information_gain.fit_transform(new_X, y)
# this_a = this_information_gain.get_support(indices=True)
# this_s = this_information_gain.scores_
# this_f =  this_information_gain.feature_names_in_
# this_zipped_s_f = zip(this_s, this_f)
# sorted_s_f = sorted(this_zipped_s_f, reverse=True)
# this_sorted_f = [f for _, f in sorted_s_f]
# print(this_a)
# print(sorted(this_s, reverse=True))
# print(this_sorted_f)
# print(this_sorted_f[:5])

var = VarianceThreshold(threshold=2.5e-5)
print(5.0 * (10 ** -5))
new_X = FullDistance().transform(blendshape_training_set_lst[this_idx].X)
y = blendshape_training_set_lst[this_idx].Y
var.fit_transform(new_X, y)
i = var.get_support(indices=True)
v = var.variances_
i_and_v = zip(i, v)
sorted_i_and_v = sorted(i_and_v, key=lambda x: x[1], reverse=True)
sorted_i = [i for i, _ in sorted_i_and_v]
print(sorted_i_and_v)
print(sorted_i)

In [555]:
for name in result_pipelines.keys():
    print(name)
    # for estimator in result_pipelines[name]:
    #     print(estimator.named_steps['feature_selection'].get_support(indices=True))

FullDistance_CustomVarianceThreshold_LinearRegression
FullDistance_CustomVarianceThreshold_SVR


### Call the fit for training

In [358]:
selectors_groups: Dict[str, Dict[str, Any]] = dict()
predictors_groups: Dict[str, Dict[str, Any]] = dict()

for pipeline in pipelines.values():
    selectors, predictors = pipeline.fit_(training_set_list=blendshape_training_set_lst)
    selectors_groups[pipeline.pipeline_name] = selectors
    predictors_groups[pipeline.pipeline_name] = predictors

[False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False  True
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False  True False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False Fa

In [186]:
print(training_set_transformers["distance"].transform_X)

<bound method DataTransformer.transform_X of <__main__.Distance object at 0x7fad1ae96640>>


In [505]:
def get_selected_point_idx(feature_name: str) -> int:
    return int(feature_name.split("_")[-1])

res = []

IDX = 77
WEIGHT = 100

IMAGE_FILES = [f"./index{IDX}-weight${WEIGHT}.png"]


selectors = selectors_groups["full_distance_chi2_linear_regressor"]
predictors = predictors_groups["full_distance_chi2_linear_regressor"]

# for idx, selector in selectors.items():
#     selected_features = selector.get_support(indices=True)
#     print(idx)
#     print(selected_features)

drawing_spec = mp_drawing.DrawingSpec(thickness=1, circle_radius=1)
with mp_face_mesh.FaceMesh(
    static_image_mode=True,
    max_num_faces=1,
    refine_landmarks=True,
    min_detection_confidence=0.5,
) as face_mesh:
    for idx, file in enumerate(IMAGE_FILES):
        image = cv2.imread(file)
        # Convert the BGR image to RGB before processing.
        results = face_mesh.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))

        # Print and draw face mesh landmarks on the image.
        if not results.multi_face_landmarks:
            continue
        annotated_image = image.copy()
        arr = []
        for face_landmarks in results.multi_face_landmarks:
            for a in face_landmarks.landmark:
                arr.append(np.array([a.x, a.y, a.z]))
        predict_df = pd.DataFrame([arr], columns=HEADERS[2:])
        # for idx, predictor in predictors.items():
        #     if int(idx) != IDX:
        #         continue
        #     else:
        #         new_X = distance_for_prediction(predict_df)
        #         print(predictor)
        #         print(predictor.coef_)
        #         print(predictor.intercept_)
        #         transformed_new_X = selectors[idx].transform(new_X)
        #         print(int(predictor.predict(transformed_new_X)[0]), end=", ")
        #         plot_selected_features(file, new_X.columns[selectors[idx].get_support(indices=True)].map(get_selected_point_idx))
        #         break
        # new_X = None
        # for idx, model in predictors.items():
        #     new_X = FullDistance().transform_X(predict_df)
        #     transformed_new_X = selectors[idx].transform(new_X)
        #     weight = int(model.predict(transformed_new_X)[0])
        #     print((idx, weight), end=", ")
        #     res.append(weight)
            # print(new_X)
            # print(transformed_new_X)
            # print(selectors[idx].get_support(indices=True))
            # print(new_X.columns[selectors[idx].get_support(indices=True)].map(get_selected_point_idx))
        # plot_selected_features(file, new_X.columns[selectors[str(IDX)].get_support(indices=True)].map(get_selected_point_idx))
        for pipeline in result_pipelines["FullDistance_CustomVarianceThreshold_SVR"]:
            weight = pipeline.predict(predict_df)[0]
            print(weight, end=", ")
            res.append(weight)
            

4.759503071346899, 14.26345432252674, 12.338539805286018, -35.89566467581426, -6.17519879011445, 36.97601395398351, 6.592045355256687, -39.52840017549943, 40.22058479870657, 10.07236554676092, 81.58642522072932, 66.4671817807789, 4.709932222505813, 8.629585093701735, 8.980439212703686, 0.8137556527735796, -6.857312350663506, 5.856216405798932, -1.4899563431747538, 24.940234216021963, 5.146374983767146, 16.866156776282878, 8.147152253919558, 6.003528288243615, -18.966865028696475, 6.7771222524661425, -1.605576807102807, -1.4043771816595836, 2.2524811023154143, 19.497815951011717, 5.4778045228886185, 3.8516974110971205, 4.1164386317810795, 6.105254507842801, 72.1363534239531, 15.24070318671329, 60.20037796480847, 67.98979489212297, 34.08649955914416, 4.222437357604349, 58.517570852133304, -21.10817839846254, 62.14438403755412, 26.401015447724262, -6.092398092071335, -5.297665840377363, 22.28571629597991, 13.659415542250258, 

In [512]:
print(blendshape_training_set_lst[IDX-67].blendshape_name)
for idx, weight in enumerate(res):
    if weight < 0:
        res[idx] = 0
    if weight > 100:
        res[idx]  = 100
blendshape_names = train_df["blendshape_name"].unique().tolist()
print(list(zip(blendshape_names, res)))

509    blendShape1.AU_06_R_CheekRaiser
510    blendShape1.AU_06_R_CheekRaiser
511    blendShape1.AU_06_R_CheekRaiser
512    blendShape1.AU_06_R_CheekRaiser
513    blendShape1.AU_06_R_CheekRaiser
514    blendShape1.AU_06_R_CheekRaiser
515    blendShape1.AU_06_R_CheekRaiser
516    blendShape1.AU_06_R_CheekRaiser
517    blendShape1.AU_06_R_CheekRaiser
518    blendShape1.AU_06_R_CheekRaiser
519    blendShape1.AU_06_R_CheekRaiser
520    blendShape1.AU_06_R_CheekRaiser
521    blendShape1.AU_06_R_CheekRaiser
522    blendShape1.AU_06_R_CheekRaiser
523    blendShape1.AU_06_R_CheekRaiser
524    blendShape1.AU_06_R_CheekRaiser
525    blendShape1.AU_06_R_CheekRaiser
526    blendShape1.AU_06_R_CheekRaiser
527    blendShape1.AU_06_R_CheekRaiser
528    blendShape1.AU_06_R_CheekRaiser
529    blendShape1.AU_06_R_CheekRaiser
530    blendShape1.AU_06_R_CheekRaiser
531    blendShape1.AU_06_R_CheekRaiser
532    blendShape1.AU_06_R_CheekRaiser
533    blendShape1.AU_06_R_CheekRaiser
534    blendShape1.AU_06_

## Export the ideal models

In [234]:
selector_group = selectors_groups["distance_chi2_linear_regressor"]
print("selector_group", selector_group)
for blendshape_i, selector in selector_group.items():
    with open(f"fm2bs_selector_{blendshape_i}.pkl", "wb") as f:
        dump(selector, f)
predictors_group = predictors_groups["distance_chi2_linear_regressor"]
print("predictors_group", predictors_group)
for blendshape_i, model in predictors_group.items():
    with open(f"fm2bs_model_{blendshape_i}.pkl", "wb") as f:
        dump(model, f)

selector_group {'67': SelectKBest(k=7, score_func=<function chi2 at 0x7fad7c3664c0>), '68': SelectKBest(k=7, score_func=<function chi2 at 0x7fad7c3664c0>), '69': SelectKBest(k=7, score_func=<function chi2 at 0x7fad7c3664c0>), '70': SelectKBest(k=7, score_func=<function chi2 at 0x7fad7c3664c0>), '71': SelectKBest(k=7, score_func=<function chi2 at 0x7fad7c3664c0>), '72': SelectKBest(k=7, score_func=<function chi2 at 0x7fad7c3664c0>), '73': SelectKBest(k=7, score_func=<function chi2 at 0x7fad7c3664c0>), '74': SelectKBest(k=7, score_func=<function chi2 at 0x7fad7c3664c0>), '75': SelectKBest(k=7, score_func=<function chi2 at 0x7fad7c3664c0>), '76': SelectKBest(k=7, score_func=<function chi2 at 0x7fad7c3664c0>), '77': SelectKBest(k=7, score_func=<function chi2 at 0x7fad7c3664c0>), '78': SelectKBest(k=7, score_func=<function chi2 at 0x7fad7c3664c0>), '79': SelectKBest(k=7, score_func=<function chi2 at 0x7fad7c3664c0>), '80': SelectKBest(k=7, score_func=<function chi2 at 0x7fad7c3664c0>), '81'

## Draw different parts on mediapipe

In [41]:
s = [97,  98, 101, 105, 126]
c = [1 for i in range(len(s))]
list(zip(c,s))

[(1, 97), (1, 98), (1, 101), (1, 105), (1, 126)]