In [31]:
import warnings
from dataclasses import dataclass
from pathlib import Path

import cv2
import joblib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import skimage as ski
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.ensemble import ExtraTreesClassifier, RandomForestClassifier
from sklearn.exceptions import UndefinedMetricWarning
from sklearn.metrics import (
    accuracy_score,
    cohen_kappa_score,
    f1_score,
    precision_score,
    recall_score,
)
from sklearn.model_selection import StratifiedKFold
from sklearn.neural_network import MLPClassifier
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from tqdm.auto import tqdm
from xgboost import XGBClassifier

In [32]:
DATASET_NAME = "drsprg"
DATA_BASE_DIR = Path(f"../data/processed/{DATASET_NAME}/")
DATASET = DATA_BASE_DIR / Path("data_list_export_120217.xlsx")
AVG_BLURRED_IMAGES = DATA_BASE_DIR / Path("artifacts/avg_blurred_images.pkl")
PREP_STUDIES = DATA_BASE_DIR / Path("artifacts/prep_studies.pkl")

# Experiment variables
SEED = 42
CV = 5

In [33]:
tqdm.pandas()

In [34]:
studies = tuple(joblib.load(PREP_STUDIES))

In [35]:
class LBPEncoder(BaseEstimator, TransformerMixin):
    """LBP encoder for image data."""

    def __init__(self, radius: int = 1, sampling_pixels: int = 106):
        self.radius = radius
        self.sampling_pixels = sampling_pixels

    def transform(self, X, y=None):
        """Extract the LBP from the images batch."""
        X = list(X)
        cvt_imgs = [self._cvt(img) for img in X]
        imgs_lbps = [self._get_lbp(img) for img in cvt_imgs]
        imgs_hists = [self._get_hist(img_lbp) for img_lbp in imgs_lbps]
        features = self._get_features(imgs_hists)
        return features

    def fit(self, X, y=None):
        return self

    def _cvt(self, img):
        if isinstance(img, float):
            print("test")
        if len(img.shape) > 2:
            img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
            _, img = cv2.threshold(img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

        i_min = np.min(img)
        i_max = np.max(img)
        if i_max - i_min != 0:
            img = (img - i_min) / (i_max - i_min)

        img = img.astype(np.uint8)

        return img

    def _get_lbp(self, img):
        lbp = ski.feature.local_binary_pattern(
            img, self.sampling_pixels, self.radius, method="uniform"
        )
        return (img, lbp)

    def _get_hist(self, img_lbp):
        img, lbp = img_lbp
        hist, _ = np.histogram(
            lbp.ravel(),
            bins=np.arange(0, self.sampling_pixels + 3),
            range=(0, self.sampling_pixels + 2),
        )
        hist = hist.astype("float")
        hist /= hist.sum() + 1e-6
        return img, hist

    def _get_features(self, imgs_hists):
        hists = [img_hist[1] for img_hist in imgs_hists]
        features = []
        for h in hists:
            features.extend(h)
        return hists

In [36]:
samples = [(([pi[0] for pi in study[0]]), study[1]) for study in tqdm(studies, desc="Studies")]
df = pd.DataFrame(data=samples, columns=["features", "labels"])

Studies:   0%|          | 0/102 [00:00<?, ?it/s]

In [37]:
df["features"] = df["features"].progress_apply(lambda images: np.mean(np.stack(images, axis=0), axis=0).astype(np.uint8))

  0%|          | 0/102 [00:00<?, ?it/s]

In [38]:
@dataclass
class CVResults:
    algo: str
    encoder: str
    test_acc: list[float]
    test_macro_prec: list[float]
    test_weighted_prec: list[float]
    test_macro_recall: list[float]
    test_weighted_recall: list[float]
    test_macro_f1: list[float]
    test_weighted_f1: list[float]


def _create_lbp_pipeline(clf: BaseEstimator) -> Pipeline:
    pipeline = Pipeline(
        [
            ("encoder", LBPEncoder()),
            ("clf", clf),
        ]
    )
    return pipeline


def _run_sklearn_cv(
    clf: BaseEstimator | Pipeline,
    X: np.ndarray,
    y: np.ndarray,
    cv: int,
    random_state,
    shuffle=True,
):
    """Run the experiments using cross-validation."""

    ALGO_NAME_DICT = {
        ExtraTreesClassifier: "Extra Trees",
        DecisionTreeClassifier: "Decision Tree",
        SVC: "Support Vector",
        RandomForestClassifier: "Random Forest",
        XGBClassifier: "XGBoost",
        MLPClassifier: "MLP",
    }

    ENCODER_NAME_DICT = {
        LBPEncoder: "LBP",
        # ViTEncoder: "ViT",
    }

    # Generate split
    skf = StratifiedKFold(n_splits=cv, random_state=random_state, shuffle=shuffle)
    for train_index, test_index in skf.split(np.array(X), y):
        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]
        clf.fit(X_train, y_train)

    # Train model
    clf.fit(X_train, y_train)

    # Evaluate model
    y_pred = clf.predict(X_test)
    cv_results = CVResults(
        algo=ALGO_NAME_DICT[type(clf.named_steps["clf"])],
        encoder=ENCODER_NAME_DICT[type(clf.named_steps["encoder"])],
        test_acc=accuracy_score(y_test, y_pred),
        test_macro_prec=precision_score(y_test, y_pred, average="macro"),
        test_weighted_prec=precision_score(y_test, y_pred, average="weighted"),
        test_macro_recall=recall_score(y_test, y_pred, average="macro"),
        test_weighted_recall=recall_score(y_test, y_pred, average="weighted"),
        test_macro_f1=f1_score(y_test, y_pred, average="macro"),
        test_weighted_f1=f1_score(y_test, y_pred, average="weighted"),
    )
    return cv_results


def run_experiments(
    clfs: list[BaseEstimator],
    studies: pd.DataFrame,
    cv: int,
    random_state,
) -> dict[str, list[np.ndarray]]:
    """Run experiments."""

    lbp_cv_results = [
        _run_sklearn_cv(
            _create_lbp_pipeline(clf),
            np.array(studies["features"]),
            np.array(studies["labels"]),
            cv=cv,
            random_state=random_state,
        )
        for clf in tqdm(clfs, desc=f"Classifiers")
    ]

    cv_result_list = [lbp_cv_results]

    return cv_result_list

In [39]:
clfs = [
    SVC(random_state=SEED),
    RandomForestClassifier(random_state=SEED),
    DecisionTreeClassifier(random_state=SEED),
    ExtraTreesClassifier(random_state=SEED),
    # XGBClassifier(
    #     objective="multi:softprob",
    #     eval_metric="mlogloss",
    # ),
    MLPClassifier(hidden_layer_sizes=(100,), max_iter=300, random_state=SEED),
]

with warnings.catch_warnings():
    warnings.simplefilter(action="ignore", category=UndefinedMetricWarning)
    cv_results_list = run_experiments(clfs, df, cv=CV, random_state=SEED)

Classifiers:   0%|          | 0/5 [00:00<?, ?it/s]

In [41]:
def scores_to_df(cv_results_list: list[list[CVResults]]) -> pd.DataFrame:
    """Transform the scores dictionary into a dataframe object."""
    data = []
    for cv_results in cv_results_list:
        for cv_result in cv_results:
            data.append(
                {
                    "algo": cv_result.algo,
                    "encoder": cv_result.encoder,
                    "acc": np.mean(cv_result.test_acc),
                    "macro_prec": np.mean(cv_result.test_macro_prec),
                    "weighted_prec": np.mean(cv_result.test_weighted_prec),
                    "macro_recall": np.mean(cv_result.test_macro_recall),
                    "weighted_recall": np.mean(cv_result.test_weighted_recall),
                    "macro_f1": np.mean(cv_result.test_macro_f1),
                    "weighted_f1": np.mean(cv_result.test_weighted_f1),
                }
            )
    df = pd.DataFrame(data=data)
    return df

scores_to_df(cv_results_list)

Unnamed: 0,algo,encoder,acc,macro_prec,weighted_prec,macro_recall,weighted_recall,macro_f1,weighted_f1
0,Support Vector,LBP,0.6,0.3,0.36,0.5,0.6,0.375,0.45
1,Random Forest,LBP,0.8,0.809524,0.804762,0.770833,0.8,0.78022,0.793407
2,Decision Tree,LBP,0.4,0.375,0.4,0.375,0.4,0.375,0.4
3,Extra Trees,LBP,0.85,0.9,0.88,0.8125,0.85,0.82906,0.841026
4,MLP,LBP,0.6,0.3,0.36,0.5,0.6,0.375,0.45
