### Utils

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import os
import re
import itertools
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.svm import SVC
import matplotlib.pyplot as plt
from sklearn.pipeline import Pipeline
from sklearn.dummy import DummyClassifier
from sklearn.linear_model import LinearRegression
from sklearn.feature_selection import SelectKBest
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.metrics import f1_score

from tqdm.notebook import tqdm

In [None]:
DATA_PATH = "./data/"

In [None]:
from utils import (
    preprocess_video,
    get_face_detector
)

from experimental import (
    analyze_frontal_video,
)

from src.detector import PIPNet_PL
from src.experiments import WFLW

In [None]:
import sys

class HiddenPrints:
    def __enter__(self):
        self._original_stdout = sys.stdout
        sys.stdout = open(os.devnull, 'w')

    def __exit__(self, exc_type, exc_val, exc_tb):
        sys.stdout.close()
        sys.stdout = self._original_stdout

In [None]:
cfg_front = WFLW.pip_32_16_60_r101_l2_l1_10_1_nb10

with HiddenPrints():
    face_detector = get_face_detector()
    front_lnd_detector = PIPNet_PL(cfg_front)

In [None]:
def test_model(model, df, cv=5, reps=20, save_path=None, silent=False, random_state=42):
    data = df.drop(["label"], axis=1)
    X = data.values
    y = df["label"].values

    cv = 5
    cv_reps = 20
    skf = RepeatedStratifiedKFold(n_splits=cv, n_repeats=cv_reps, random_state=random_state)
    skf.get_n_splits(X, y)

    results = {'fold': [], 'f1_macro': [], 'patient_ids': [], 'preds': []}
    for fold_idx, (train_index, test_index) in tqdm(enumerate(skf.split(X, y)), total=cv_reps*cv, disable=silent):
        id_column = np.where(data.columns == 'patient_id')[0][0]
        patient_ids = X[test_index, id_column]
        X_ = np.delete(X, id_column, axis=1)

        X_train, X_test = X_[train_index], X_[test_index]
        y_train, y_test = y[train_index], y[test_index]

        scores = []
        preds_ = []
        for rep in range(reps):
            model.fit(X_train, y_train)
            preds = model.predict(X_test)
            score = f1_score(y_test, preds, average="macro")
            scores.append(score)
            preds_.append(list(preds))

        results['fold'].append(fold_idx)
        results['f1_macro'].append(np.mean(score))
        results['patient_ids'].append(patient_ids)
        results['preds'].append(preds_)

    if save_path is not None:
        df = pd.DataFrame(results)
        df.to_csv(save_path, index=False)

    return results

In [None]:
def test_anova_selector(model, df, cv=5, reps=20, save_path=None, silent=False, random_state=42):
    data = df.drop(["label"], axis=1)
    X = data.values
    y = df["label"].values

    cv = 5
    cv_reps = 20
    skf = RepeatedStratifiedKFold(n_splits=cv, n_repeats=cv_reps, random_state=random_state)
    skf.get_n_splits(X, y)

    results = {'fold': [], 'f1_macro': [], 'n_features': [], 'feature_idx': [], 'feature_names': [], 'patient_ids': [], 'preds': []}
    for fold_idx, (train_index, test_index) in tqdm(enumerate(skf.split(X, y)), total=cv_reps*cv, disable=silent):
        id_column = np.where(data.columns == 'patient_id')[0][0]
        patient_ids = X[test_index, id_column]
        X_ = np.delete(X, id_column, axis=1)

        X_train, X_test = X_[train_index], X_[test_index]
        y_train, y_test = y[train_index], y[test_index]

        scores = []
        preds_ = []
        for rep in range(reps):
            model.fit(X_train, y_train)
            preds = model.predict(X_test)
            score = f1_score(y_test, preds, average="macro")
            scores.append(score)
            preds_.append(list(preds))
        
        feature_idx = model.named_steps['selector'].get_support(indices=True)
        feature_mask = model.named_steps['selector'].get_support()
        feature_names = list(data.drop(['patient_id'], axis=1).columns[feature_mask].values)

        results['fold'].append(fold_idx)
        results['f1_macro'].append(np.mean(score))
        results['patient_ids'].append(patient_ids)
        results['preds'].append(preds_)
        results['n_features'].append(model.named_steps['selector'].k)
        results['feature_idx'].append(feature_idx)
        results['feature_names'].append(feature_names)

    if save_path is not None:
        df = pd.DataFrame(results)
        df.to_csv(save_path, index=False)

    return results

In [None]:
from sklearn.feature_selection import RFE

def test_rfe_selector(model, df, importance_getter, n_features, cv=5, reps=20, save_path=None, silent=False, random_state=42):
    data = df.drop(["label"], axis=1)
    columns = data.drop('patient_id', axis=1).columns
    X = data.values
    y = df["label"].values

    cv = 5
    cv_reps = 20
    skf = RepeatedStratifiedKFold(n_splits=cv, n_repeats=cv_reps, random_state=random_state)
    skf.get_n_splits(X, y)

    results = {'fold': [], 'f1_macro': [], 'n_features': [], 'feature_idx': [], 'feature_names': [], 'patient_ids': [], 'preds': []}
    for fold_idx, (train_index, test_index) in tqdm(enumerate(skf.split(X, y)), total=cv_reps*cv, disable=silent):
        for k_features in range(1, n_features+1):
            id_column = np.where(data.columns == 'patient_id')[0][0]
            patient_ids = X[test_index, id_column]
            X_ = np.delete(X, id_column, axis=1)

            X_train, X_test = X_[train_index], X_[test_index]
            y_train, y_test = y[train_index], y[test_index]

            sel = RFE(model, importance_getter=f"named_steps.clf.{importance_getter}", n_features_to_select=k_features)
            sel = sel.fit(X_train, y_train)

            feature_idx = [i for i, x in enumerate(sel.support_) if x]
            feature_names = list(columns[feature_idx])

            X_train_sub = X_train[:, feature_idx]
            X_test_sub = X_test[:, feature_idx]

            scores = []
            preds_ = []
            for rep in range(reps):
                model.fit(X_train_sub, y_train)
                preds = model.predict(X_test_sub)
                score = f1_score(y_test, preds, average="macro")
                scores.append(score)
                preds_.append(list(preds))

            results['fold'].append(fold_idx)
            results['f1_macro'].append(np.mean(score))
            results['patient_ids'].append(patient_ids)
            results['preds'].append(preds_)
            results['n_features'].append(sel.n_features_)
            results['feature_idx'].append(feature_idx)
            results['feature_names'].append(feature_names)

    if save_path is not None:
        df = pd.DataFrame(results)
        df.to_csv(save_path, index=False)

    return results

In [None]:
from mlxtend.feature_selection import SequentialFeatureSelector as SFS
from sklearn.model_selection import StratifiedKFold, RepeatedStratifiedKFold
from sklearn.metrics import f1_score
from tqdm import tqdm

def test_sfs_selector(model, df, sfs_params, n_features, cv=5, cv_reps=5, reps=20, save_path=None, random_state=42):
    data = df.drop(["label"], axis=1)
    feature_names = data.drop('patient_id', axis=1).columns
    X = data.values
    y = df["label"].values

    cv = 5
    cv_reps = 20
    skf = RepeatedStratifiedKFold(n_splits=cv, n_repeats=cv_reps, random_state=random_state)
    skf.get_n_splits(X, y)

    results = {'fold': [], 'f1_macro': [], 'n_features': [], 'feature_idx': [], 'feature_names': [], 'patient_ids': [], 'preds': []}
    for fold_idx, (train_index, test_index) in tqdm(enumerate(skf.split(X, y)), total=cv_reps*cv):
        id_column = np.where(data.columns == 'patient_id')[0][0]
        patient_ids = X[test_index, id_column]
        X_ = np.delete(X, id_column, axis=1)

        X_train, X_test = X_[train_index], X_[test_index]
        y_train, y_test = y[train_index], y[test_index]

        sel = SFS(**sfs_params, estimator=model, k_features=n_features)
        sel = sel.fit(X_train, y_train, custom_feature_names=feature_names)
        for k, v in sel.subsets_.items():
            X_train_sub = X_train[:, v['feature_idx']]
            X_test_sub = X_test[:, v['feature_idx']]

            scores = []
            preds_ = []
            for rep in range(reps):
                model.fit(X_train_sub, y_train)
                preds = model.predict(X_test_sub)
                score = f1_score(y_test, preds, average="macro")
                scores.append(score)
                preds_.append(list(preds))

            results['fold'].append(fold_idx)
            results['f1_macro'].append(np.mean(score))
            results['n_features'].append(k)
            results['feature_idx'].append(list(v['feature_idx']))
            results['feature_names'].append(list(v['feature_names']))
            results['patient_ids'].append(patient_ids)
            results['preds'].append(preds_)

    if save_path is not None:
        df = pd.DataFrame(results)
        df.to_csv(save_path, index=False)
    
    return results

In [None]:
def print_data_info(data_path):
    df = pd.read_csv(os.path.join(DATA_PATH, data_path))
    df_f = df.groupby("patient_id").first()
    print(f"Number of patients: {len(df_f)} (disorder: {len(df_f[df_f['label'] == 1])}, healthy: {len(df_f[df_f['label'] ==0])})")
    print(f"Number of features: {len(df_f.drop(['label'], axis=1).columns)}")

### Preprocessing

#### DNZ

In [None]:
def preprocess_patient_videos(data_path: str, face_detector, front_lnd_detector, prof_lnd_detector, target_size=1000, padding=80, silent=False, video=None):
    base_path = data_path.replace("patients", "preprocessed")
    if video is not None:
        vid_name = f"videos.{video.replace('_', '.')}"
        filenames = list(filter(lambda f: vid_name in f, os.listdir(data_path)))
    else:
        filenames = list(filter(lambda f: "videos" in f, os.listdir(data_path)))

    for filename in tqdm(filenames, total=len(filenames), desc="Preprocessing patient videos", disable=silent):
        f = os.path.join(data_path, filename)
        name = "_".join(filename.split(".")[1:3])
        save_path = os.path.join(base_path, name)
        if name.startswith("frontal"):
            landmarks_detector = front_lnd_detector
        else:
            landmarks_detector = prof_lnd_detector
        preprocess_video(f, face_detector, landmarks_detector, target_size, padding, save_path)

In [None]:
def preprocess_dnz(data_dir, target_size=1000, padding=80, only_new=True):
    dirs = os.listdir(data_dir)
    dirs = list(filter(lambda x: not x.startswith("."), dirs))
    if only_new:
        preprocessed = os.listdir(os.path.join(DATA_PATH, "preprocessed"))
        preprocessed = list(filter(lambda x: not x.startswith("."), preprocessed))
        dirs = [dir_ for dir_ in dirs if dir_ not in preprocessed]
    for dirname in tqdm(dirs, total=len(dirs), position=0, desc="Preprocessing patients"):
        data_path = os.path.join(data_dir, dirname)
        preprocess_patient_videos(data_path, face_detector, front_lnd_detector, prof_lnd_detector, target_size, padding, video="frontal_open")

In [None]:
patients_path = os.path.join("./my_data", "patients")
preprocess_dnz(patients_path, only_new=True)

### Load data

In [None]:
DATA_PATH = "./my_data/"
patients_path = os.path.join(DATA_PATH, "patients")
preprocessed_path = os.path.join(DATA_PATH, "preprocessed")

In [None]:
def get_diagnosis():
    diagnosis = pd.read_csv(os.path.join(DATA_PATH, "diagnosis.csv"))
    diagnosis = diagnosis.dropna().reset_index(drop=True)
    return diagnosis

In [None]:
def get_random_score(random_state=42):
    features = pd.read_csv(os.path.join(DATA_PATH, f"features.csv"))
    features_df = ex3_data[ex3_data["video"] == str(["frontal_open"])]
    random_model = DummyClassifier(strategy="stratified", random_state=random_state)
    results = test_model(random_model, features_df, silent=True)
    return np.mean(results["f1_macro"])

### Experiment 1

#### Code

In [None]:
def plot_ex1(df_, figsize=None):
    df = df_.copy()
    df["label"] = df.apply(lambda x: "line_" + x["line"] + "_cp_" + str(x["central_point"]), axis=1)
    df.drop(["line", "central_point", "fold", "preds", "patient_ids"], inplace=True, axis=1)

    groups = df.groupby("label")
    values = groups["f1_macro"].mean().values
    labels = groups.mean().index.values

    best_5_idx = (-values).argsort()[:5]
    print("5 best socres:")
    for idx in best_5_idx:
        print(f"{labels[idx]}: {values[idx]}")

    if figsize is not None:
        f, ax = plt.subplots(figsize=figsize)
    plt.bar(labels, values)
    plt.grid(axis='y', zorder=0)
    plt.xticks(rotation='vertical')
    plt.ylabel("Mean score (f1_macro)")

In [None]:
def extract_ex1_features(data_dir: str, save_name: str, diagnosis, videos, line_v="v6", line_h="h1", central_point=16, silent=False):
    dirs = FINAL_IDS
    extractors = {
        "frontal_open": analyze_frontal_video,
    }

    patients = []
    for patient_id in tqdm(dirs, total=len(dirs), position=0, desc="Extracting patients features", disable=silent):
        features = {}
        for video in videos:
            extractor = extractors[video]
            patient_path = os.path.join(data_dir, patient_id, video)
            vid_features = extractor(patient_path, line_v=line_v, line_h=line_h, central_point=central_point)
            for k, v in vid_features.items():
                features[f"{video}_{k}"] = v

        label = None
        diagnosed_ids = list(diagnosis["Id"])
        if patient_id in diagnosed_ids:
            diag = diagnosis[diagnosis["Id"] == patient_id]["Diagnosis"].values[0]
            label = 0 if diag == "healthy" else 1

        features["patient_id"] = patient_id
        features["label"] = label
        patients.append(features)

    save_path = os.path.join(data_dir, "..", save_name)
    df = pd.DataFrame.from_dict(patients)
    df.to_csv(f"{save_path}.csv", index=False)

In [None]:
ex_name = "ex1_data"
videos = ["frontal_open"]

central_points = [16, 85, 94, [94, 85], [16, 85, 94], [93, 94, 95], [86, 85, 84], [86, 85, 84] + [95, 94, 93]]
v_lines = ["v1", "v2", "v3", "v4", "v5", "v6", "v7", "v8"]
h_lines = ["h1", "h2", "h3"]

for line_h in h_lines:
    df = pd.DataFrame()
    options = list(itertools.product(v_lines, central_points))
    for line_v, central_point in tqdm(options, total=len(options), desc=f"Experiment 1: extraction for {line_h}"):
        save_name = f"{ex_name}_{line_h}"
        extract_ex1_features(preprocessed_path, save_name, get_diagnosis(), videos, line_v=line_v, line_h=line_h, central_point=central_point, silent=True)
        t_df = pd.read_csv(os.path.join(DATA_PATH, f"{save_name}.csv"))
        t_df = t_df.dropna().reset_index(drop=True)
        t_df["label"] = t_df["label"].astype('int')
        t_df["line"] = line_v
        t_df["central_point"] = str(central_point)
        df = pd.concat([df, t_df], ignore_index=True)

    save_path = os.path.join(preprocessed_path, "..", save_name)
    df.to_csv(f"{save_path}.csv", index=False)

In [None]:
ex_name = "ex1_res"

for line_h in h_lines:
    ex1_data = pd.read_csv(os.path.join(DATA_PATH, f"ex1_data_{line_h}.csv"))
    ex1_res = pd.DataFrame()

    lines = list(ex1_data["line"].unique())
    central_points = list(ex1_data["central_point"].unique())

    options = list(itertools.product(lines, central_points))
    for line, central_point in tqdm(options, total=len(options), desc=f"Experiment 1: results for {line_h}"):
        t_df = ex1_data[(ex1_data["line"] == line) & (ex1_data["central_point"] == central_point)]
        t_df = t_df.drop(["line", "central_point"], axis=1)

        scaler = StandardScaler
        svc_pipe = Pipeline([
            ('scaler', scaler()),
            ('svc', SVC(kernel="linear"))
        ])
        results = test_model(svc_pipe, t_df, reps=1, silent=True)
        res_df = pd.DataFrame(results)
        res_df["line"] = line
        res_df["central_point"] = central_point
        ex1_res = pd.concat([ex1_res, res_df], ignore_index=True)

    save_name = f"{ex_name}_{line_h}"
    save_path = os.path.join(preprocessed_path, "..", save_name)
    ex1_res.to_csv(f"{save_path}.csv", index=False)

#### Results

In [None]:
print_data_info("ex1_data_h1.csv")

In [None]:
def plot_ex1(df_, line_h, save_path=None, figsize=None):
    df = df_.copy()
    df["label"] = df.apply(lambda x: "line_" + x["line"] + "_cp_" + str(x["central_point"]), axis=1)
    df.drop(["line", "central_point", "fold", "preds", "patient_ids"], inplace=True, axis=1)

    groups = df.groupby("label")
    values = groups["f1_macro"].mean().values
    stds = groups["f1_macro"].std().values
    labels = groups.mean().index.values

    label_to_values = dict(zip(labels, values))
    label_to_std = dict(zip(labels, stds))

    best_5_idx = (-values).argsort()[:5]
    print("5 best socres:")
    for idx in best_5_idx:
        print(f"{labels[idx]}: {values[idx]}")

    lines = np.unique([name.split("_")[1] for name in labels])
    points = np.unique([name.split("_")[-1] for name in labels])
    data_df = pd.DataFrame([[line] + [label_to_values[f'line_{line}_cp_{point}'] for point in points] for line in lines], columns=['Central line'] + [f'Point {i+1}' for i in range(len(points))])

    data_df.plot(
        x='Central line',
        kind='bar',
        title=f"Results depending on the observed point and the determined centerline",
        zorder=2,
        width=0.75,
        figsize=figsize
    )
    plt.ylabel("F1 mesaure (macro)")
    plt.grid(axis='y', zorder=1)
    plt.xticks(rotation='horizontal')
    plt.axhline(y=get_random_score(), color='r', linestyle='dashed', label="Random prediction")
    plt.legend(bbox_to_anchor=(1.01, 1), borderaxespad=0)
    plt.tight_layout(rect=[0,0,0.75,1])

    if save_path is not None:
        plt.savefig(save_path, bbox_inches="tight")

    data_df = pd.DataFrame([[line] + [label_to_values[f'line_{line}_cp_{point}'] for point in points] + [label_to_std[f'line_{line}_cp_{point}'] for point in points] for line in lines], columns=['Linia pośrodkowa'] + [f'Punkt {i+1} mu' for i in range(len(points))] + [f'Punkt {i+1} std' for i in range(len(points))])
    return data_df

In [None]:
ex1_df = pd.DataFrame()

for line_h in h_lines:
    print(f"=== LINE H: {line_h} ===")
    save_name = f"ex1_res_{line_h}"
    save_path = os.path.join(preprocessed_path, "..", save_name)
    ex1_res = pd.read_csv(f"{save_path}.csv")

    save_path = os.path.join(DATA_PATH, "experiments", f"ex1_{line_h}")

    temp_df = plot_ex1(ex1_res, line_h, figsize=(15, 5), save_path=save_path)
    temp_df = temp_df.rename(columns={"Linia pośrodkowa": "Linia v"})
    temp_df["Linia h"] = line_h
    ex1_df = pd.concat([ex1_df, temp_df], ignore_index=True)

In [None]:
ex1_df

### Experiment 2

#### Code

In [None]:
def plot_ex2(df_, bbox_to_anchor, save_path=None, figsize=None):
    df = df_.copy()
    df.drop(["fold", "preds", "patient_ids"], inplace=True, axis=1)

    groups = df.groupby("model")
    values = groups["f1_macro"].mean().values
    stds = groups["f1_macro"].std().values
    labels = groups.mean().index.values

    sorted_values_idx = (-values).argsort()
    sorted_labels = [labels[idx] for idx in sorted_values_idx]
    sorted_values = [values[idx] for idx in sorted_values_idx]
    sorted_stds = [stds[idx] for idx in sorted_values_idx]

    best_5_idx = (-values).argsort()[:5]
    print("5 best socres:")
    for idx in best_5_idx:
        print(f"{labels[idx]}: {values[idx]}")

    plt.figure(figsize=figsize)
    plt.title("Results depending on the model")
    plt.bar(sorted_labels, sorted_values, zorder=2, yerr=sorted_stds, capsize=5)
    plt.grid(axis='y', zorder=1)
    plt.ylabel("F1 measure (macro)")
    plt.xlabel("Model")
    plt.axhline(y=get_random_score(), color='r', linestyle='dashed', label="Random prediction")
    plt.legend(bbox_to_anchor=bbox_to_anchor, borderaxespad=0)

    if save_path is not None:
        plt.savefig(save_path, bbox_inches="tight")

In [None]:
def extract_ex2_features(data_dir: str, save_name: str, diagnosis, videos, lowpass_kernel, line_v="v4", line_h="h1", central_point=16, silent=False):
    dirs = FINAL_IDS
    extractors = {
        "frontal_open": analyze_frontal_video,
    }

    patients = []
    for patient_id in tqdm(dirs, total=len(dirs), position=0, desc="Extracting patients features", disable=silent):
        features = {}
        for video in videos:
            extractor = extractors[video]
            patient_path = os.path.join(data_dir, patient_id, video)
            vid_features = extractor(patient_path, line_v=line_v, line_h=line_h, central_point=central_point, lowpass_kernel=lowpass_kernel)
            for k, v in vid_features.items():
                features[f"{video}_{k}"] = v

        label = None
        diagnosed_ids = list(diagnosis["Id"])
        if patient_id in diagnosed_ids:
            diag = diagnosis[diagnosis["Id"] == patient_id]["Diagnosis"].values[0]
            label = 0 if diag == "healthy" else 1

        features["patient_id"] = patient_id
        features["label"] = label
        patients.append(features)

    save_path = os.path.join(data_dir, "..", save_name)
    df = pd.DataFrame.from_dict(patients)
    df.to_csv(f"{save_path}.csv", index=False)

In [None]:
save_name = "ex2_data"

central_point = 85
line_v =  "v8"
line_h = "h1"
lowpass_kernel = None
videos = ["frontal_open"]

df = pd.DataFrame()
extract_ex2_features(preprocessed_path, save_name, get_diagnosis(), videos, lowpass_kernel=lowpass_kernel, line_v=line_v, line_h=line_h, central_point=central_point, silent=False)
t_df = pd.read_csv(os.path.join(DATA_PATH, f"{save_name}.csv"))
t_df = t_df.dropna().dropna().reset_index(drop=True)
t_df["label"] = t_df["label"].astype('int')
df = pd.concat([df, t_df], ignore_index=True)

save_path = os.path.join(preprocessed_path, "..", save_name)
df.to_csv(f"{save_path}.csv", index=False)

In [None]:
from sklearn.linear_model import LogisticRegression

ex2_data = pd.read_csv(os.path.join(DATA_PATH, f"ex2_data.csv"))
ex2_res = pd.DataFrame()

models = {
    "prior": DummyClassifier(strategy="prior"),
    "RLR": LogisticRegression(),
    "SVC": SVC(kernel="linear"),
    "XGB": xgb.XGBClassifier(use_label_encoder=False, eval_metric='logloss')
}

options = list(models.items())
for model_name, model in tqdm(options, total=len(options), desc="Experiment 4: results"):
    t_df = ex2_data

    scaler = StandardScaler
    model_pipe = Pipeline([
        ('scaler', scaler()),
        ('clf', model)
    ])
    results = test_model(model_pipe, t_df, reps=1, silent=True)
    res_df = pd.DataFrame(results)
    res_df["model"] = model_name
    ex2_res = pd.concat([ex2_res, res_df], ignore_index=True)

save_name = "ex2_res"
save_path = os.path.join(preprocessed_path, "..", save_name)
ex2_res.to_csv(f"{save_path}.csv", index=False)

#### Results

In [None]:
print_data_info("ex2_data.csv")

In [None]:
save_name = "ex2_res"
save_path = os.path.join(preprocessed_path, "..", save_name)
ex2_res = pd.read_csv(f"{save_path}.csv")

save_path = os.path.join(DATA_PATH, "experiments", f"ex2")
plot_ex2(ex2_res, bbox_to_anchor=(1.155, 1), figsize=(15, 5), save_path=save_path)

In [None]:
rows = ['']
keys = ['model']
key_name = 'Model'

index_order = ex2_res.groupby('model')["f1_macro"].mean().sort_values(ascending=False).index.to_list()
columns = ['RLR', 'XGB', 'SVC', 'Prior']
print(index_order)

save_path = os.path.join(DATA_PATH, "experiments", f"ex2.tex")
to_latex_table(ex2_res, keys=keys, key_name=key_name, rows=rows, columns=columns, save_path=save_path, index_order=index_order)

### Experiment 3

#### Code

In [None]:
def extract_ex3_features(data_dir: str, save_name: str, diagnosis, videos, lowpass_kernel, line_v="v4", line_h="h1", central_point=16, silent=False):
    dirs = FINAL_IDS
    extractors = {
        "frontal_open": analyze_frontal_video,
    }

    patients = []
    for patient_id in tqdm(dirs, total=len(dirs), position=0, desc="Extracting patients features", disable=silent):
        features = {}
        for video in videos:
            extractor = extractors[video]
            patient_path = os.path.join(data_dir, patient_id, video)
            vid_features = extractor(patient_path, line_v=line_v, line_h=line_h, central_point=central_point, lowpass_kernel=lowpass_kernel)
            for k, v in vid_features.items():
                features[f"{video}_{k}"] = v

        label = None
        diagnosed_ids = list(diagnosis["Id"])
        if patient_id in diagnosed_ids:
            diag = diagnosis[diagnosis["Id"] == patient_id]["Diagnosis"].values[0]
            label = 0 if diag == "healthy" else 1

        features["patient_id"] = patient_id
        features["label"] = label
        patients.append(features)

    save_path = os.path.join(data_dir, "..", save_name)
    df = pd.DataFrame.from_dict(patients)
    df.to_csv(f"{save_path}.csv", index=False)

In [None]:
save_name = "ex3_data"
central_point = 85
line_v =  "v8"
line_h = "h1"
lowpass_kernel = None
videos = ["frontal_open"]


df = pd.DataFrame()
extract_ex3_features(preprocessed_path, save_name, get_diagnosis(), videos, lowpass_kernel=lowpass_kernel, line_v=line_v, line_h=line_h, central_point=central_point, silent=False)
t_df = pd.read_csv(os.path.join(DATA_PATH, f"{save_name}.csv"))
t_df = t_df.dropna().dropna().reset_index(drop=True)
t_df["label"] = t_df["label"].astype('int')
df = pd.concat([df, t_df], ignore_index=True)

save_path = os.path.join(preprocessed_path, "..", save_name)
df.to_csv(f"{save_path}.csv", index=False)

In [None]:
ex3_data = pd.read_csv(os.path.join(DATA_PATH, f"ex3_data.csv"))
ex3_res = pd.DataFrame()

max_features = 22
scaler = StandardScaler

for n_features in tqdm(range(1, max_features+1), total=max_features, desc="Experiment 6: results for ANOVA"):
    t_df = ex3_data

    # === ANOVA ===
    scaler = StandardScaler
    model_pipe = Pipeline([
        ('scaler', scaler()),
        ('selector', SelectKBest(k=n_features)),
        ('clf', LogisticRegression())
    ])
    results = test_anova_selector(model_pipe, t_df, silent=True, reps=1)
    res_df = pd.DataFrame(results)
    res_df["n_features"] = n_features
    ex3_res = pd.concat([ex3_res, res_df], ignore_index=True)

save_name = "ex3_res_anova"
save_path = os.path.join(preprocessed_path, "..", save_name)
ex3_res.to_csv(f"{save_path}.csv", index=False)

In [None]:
max_features = 22
scaler = StandardScaler

sfs_params = {'forward': True, 'floating': False, 'verbose': 0, 'scoring': 'f1_macro', 'cv': 5} 
ex3_data = pd.read_csv(os.path.join(DATA_PATH, f"ex3_data.csv"))
ex3_res = pd.DataFrame()

# === ANOVA ===
t_df = ex3_data
svc_pipe = Pipeline([
    ('scaler', scaler()),
    ('clf', LogisticRegression())
])
results  = test_sfs_selector(svc_pipe, t_df, sfs_params, n_features=max_features, reps=1)
res_df = pd.DataFrame(results)
ex3_res = pd.concat([ex3_res, res_df], ignore_index=True)

save_name = "ex3_res_sfs"
save_path = os.path.join(preprocessed_path, "..", save_name)
ex3_res.to_csv(f"{save_path}.csv", index=False)

In [None]:
max_features = 22
sfs_params = {'forward': True, 'floating': False, 'verbose': 0, 'scoring': 'f1_macro', 'cv': 5} 
scaler = StandardScaler

ex3_data = pd.read_csv(os.path.join(DATA_PATH, f"ex3_data.csv"))
ex3_res = pd.DataFrame()

# === SFS ===
t_df = ex3_data
svc_pipe = Pipeline([
    ('scaler', scaler()),
    ('clf', LogisticRegression())
])
results  = test_sfs_selector(svc_pipe, t_df, sfs_params, n_features=max_features, reps=1)
res_df = pd.DataFrame(results)
ex3_res = pd.concat([ex3_res, res_df], ignore_index=True)

save_name = "ex3_res_sfs"
save_path = os.path.join(preprocessed_path, "..", save_name)
ex3_res.to_csv(f"{save_path}.csv", index=False)

In [None]:
max_features = 22
scaler = StandardScaler

ex3_data = pd.read_csv(os.path.join(DATA_PATH, f"ex3_data.csv"))
ex3_res = pd.DataFrame()

# === RFE ===
t_df = ex3_data
svc_pipe = Pipeline([
    ('scaler', scaler()),
    ('clf', LogisticRegression())
])
results  = test_rfe_selector(svc_pipe, t_df, 'coef_', n_features=max_features, reps=1)
res_df = pd.DataFrame(results)
ex3_res = pd.concat([ex3_res, res_df], ignore_index=True)

save_name = "ex3_res_rfe"
save_path = os.path.join(preprocessed_path, "..", save_name)
ex3_res.to_csv(f"{save_path}.csv", index=False)

#### Results

In [None]:
save_name = 'ex3_res_anova'
save_path = os.path.join(preprocessed_path, "..", save_name)
ex3_res = pd.read_csv(f"{save_path}.csv")

In [None]:
save_name = "ex3_res_sfs"
save_path = os.path.join(preprocessed_path, "..", save_name)
ex3_res = pd.read_csv(f"{save_path}.csv")

In [None]:
save_name = "ex3_res_rfe"
save_path = os.path.join(preprocessed_path, "..", save_name)
ex3_res = pd.read_csv(f"{save_path}.csv")

In [None]:
def plot_ex3(bbox_to_anchor=(1.155, 1), figsize=(15, 5), save_path=save_path):
    anova_df = pd.read_csv(os.path.join(preprocessed_path, '..', 'ex3_res_anova.csv'))
    rfe_df = pd.read_csv(os.path.join(preprocessed_path, '..', 'ex3_res_rfe.csv'))
    sfs_df = pd.read_csv(os.path.join(preprocessed_path, '..', 'ex3_res_sfs.csv'))

    anova_scores = []
    rfe_scores = []
    sfs_scores = []

    n_features_range = anova_df["n_features"].unique()
    for n_features in n_features_range:
        anova_score = np.mean(anova_df[anova_df["n_features"] == n_features]["f1_macro"])
        rfe_score = np.mean(rfe_df[rfe_df["n_features"] == n_features]["f1_macro"])
        sfs_score = np.mean(sfs_df[sfs_df["n_features"] == n_features]["f1_macro"])
        anova_scores.append(anova_score)
        rfe_scores.append(rfe_score)
        sfs_scores.append(sfs_score)

    plt.figure(figsize=figsize)
    plt.title("Results depending on the number of features selected by various selectors")
    xticks = np.arange(1, len(n_features_range)+1, step=1)
    plt.plot(xticks, anova_scores, '-o', markersize=5, label="ANOVA", zorder=2)
    plt.plot(xticks, rfe_scores, '-o', markersize=5, label="RFE", zorder=2)
    plt.plot(xticks, sfs_scores, '-o', markersize=5, label="SFS", zorder=2)
    plt.xlabel("Number of features")
    plt.ylabel("F1 measure (macro)")
    plt.xticks(xticks)
    plt.grid(axis='y', zorder=1)
    plt.axhline(y=get_random_score(), color='r', linestyle='dashed', label="Random prediction")
    plt.legend(bbox_to_anchor=bbox_to_anchor, borderaxespad=0)
    
    if save_path is not None:
        plt.savefig(save_path, bbox_inches="tight")

In [None]:
save_name = "ex2_frontal_res"
save_path = os.path.join(preprocessed_path, "..", save_name)
ex2_frontal_res = pd.read_csv(f"{save_path}.csv")

save_path = os.path.join(DATA_PATH, "experiments", f"ex3")
plot_ex3(bbox_to_anchor=(1.155, 1), figsize=(15, 5), save_path=save_path)

In [None]:
rows = [str(x) for x in range(1, 23)]
keys = ['n_features', 'selector']
key_name = 'Selektor'
columns = ["ANOVA", "RFE", "SFS"]
save_path = os.path.join(DATA_PATH, "experiments", f"ex3.tex")

anova_table_df = pd.read_csv(os.path.join(preprocessed_path, '..', 'ex3_res_anova.csv'))
rfe_table_df = pd.read_csv(os.path.join(preprocessed_path, '..', 'ex3_res_rfe.csv'))
sfs_table_df = pd.read_csv(os.path.join(preprocessed_path, '..', 'ex3_res_sfs.csv'))

anova_table_df["selector"] = "ANOVA"
rfe_table_df["selector"] = "RFE"
sfs_table_df["selector"] = "SFS"

ex3_to_table = pd.concat([anova_table_df, rfe_table_df, sfs_table_df])

#  keys order -> row, column
to_latex_table(ex3_to_table, keys=keys, key_name=key_name, rows=rows, columns=columns, save_path=save_path)

In [None]:
from ast import literal_eval
from collections import Counter


def flatten(xss):
    return [x for xs in xss for x in xs]

def plot_importances(df, selector, n_features, save_path=None, figsize=None):
    df["feature_names"] = df["feature_names"].apply(lambda x: literal_eval(str(x)))

    feature_names = df[df["n_features"] == n_features]["feature_names"].values
    feature_names = flatten(feature_names)
    ctr = Counter(feature_names)
    ctr_sorted = ctr.most_common()

    keys = [k for (k, v) in ctr_sorted]
    values = [v for (k, v) in ctr_sorted]

    keys = ['_'.join(x.split('_')[2:]) for x in keys]

    plt.figure(figsize=figsize)
    plt.bar(keys, values, zorder=2)
    plt.xticks(rotation='vertical')

    plt.title(f"Feature occurance using {n_features} features with scelector {selector}")
    plt.grid(axis='y', zorder=0)
    plt.xlabel("Feature")
    plt.yticks(np.arange(0, 101 + 1, step=5))
    plt.ylabel("Occurances")

    if save_path is not None:
        plt.savefig(save_path, bbox_inches="tight")

    plt.show()

In [None]:
save_path = os.path.join(DATA_PATH, "experiments", f"ex3_anova")
plot_importances(anova_table_df, "ANOVA", n_features=20, save_path=save_path, figsize=(15, 5))

In [None]:
save_path = os.path.join(DATA_PATH, "experiments", f"ex3_rfe")
plot_importances(rfe_table_df, "RFE", n_features=15, save_path=save_path, figsize=(15, 5))

In [None]:
save_path = os.path.join(DATA_PATH, "experiments", f"ex3_sfs")
plot_importances(anova_table_df, "SFS", n_features=22, save_path=save_path, figsize=(15, 5))