In [1]:
# !pip install lightgbm

In [2]:
import os
import sys
import glob
import pathlib
import numpy as np
import pandas as pd
import pylab as plt

from lightgbm import LGBMClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.base import BaseEstimator, ClassifierMixin

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error as MAE

In [3]:
sys.path.append(os.path.abspath('../src/'))
    
from dpipe_metrics import hausdorff_distance, surface_distances, dice_score, assd

In [67]:
metrics_dict = {
    "dice_coefficient": {"2d": dice_score, "3d": dice_score},
#     "mae": {"2d": lambda x, y: np.abs(x - y).mean(), "3d": lambda x, y: np.abs(x - y).mean()},
#     "mse": {"2d": lambda x, y: ((x - y) ** 2).mean(), "3d": lambda x, y: ((x - y) ** 2).mean()},
    "hausdorff_distance": {"2d": hausdorff_distance, "3d": hausdorff_distance},
    "surface_distances": {"2d": surface_distances, "3d": surface_distances},
    "assd": {"2d": assd, "3d": assd}
}

unary_metrics_dict = {
    "area": {"2d": lambda x: (x > 0).sum(), "3d": lambda x: (x > 0).sum()}
}

In [52]:
class BaseQualityEstimator(BaseEstimator, ClassifierMixin):
    """Base Estimator for segmentation quality assessment"""

    def __init__(self, metrics=["dice_coefficient"], unary_metrics=["area"], meta_clf=LGBMClassifier()):
        """
        Args:
            metrics: list of strings: metrics to be computed on pairs of preds and gt
            unary_metrics: list of string: metrics to be computed on preds directly
        
        TODO: params??
        """
        self.meta_clf = meta_clf
        self.metrics = list(filter(lambda _: _ in metrics_dict, metrics))
        self.unary_metrics = list(filter(lambda _: _ in unary_metrics_dict, unary_metrics))
        
        self.data_type = "3d"
        self.X_metrics = None
    
    
    def fit(self, X, Xy=None, y=None):
        """
        
        """
        assert len(X) == len(Xy) == len(y)
        # get the dimensionality of the data
#         self.data_type = self._check_data_type(X)
        # compute all the metrics on the pairs from X (predictions) and Xy (gt)
        self.X_metrics = self._compute_metrics(X, Xy)
        # fit meta-classifier to metrics and human-made labels
        self.meta_clf.fit(self.X_metrics, y)

        return self
    
    def predict(self, X, Xy):
        
        X_metrics = self._compute_metrics(X, Xy)
        
        y_pred = self.meta_clf.predict(X_metrics)
        
        return y_pred
    
    def predict_proba(self, X, Xy):
        
        X_metrics = self._compute_metrics(X, Xy)
        
        y_pred = self.meta_clf.predict_proba(X_metrics)
        
        return y_pred
    
    def _compute_metrics(self, X, Xy):
        
        def _metrics(x, xy):
            metrics_computed = dict()
            for metric_ in self.metrics:
                metrics_computed[metric_] = metrics_dict[metric_][self.data_type](x, xy)
            return metrics_computed
        
        def _unary_metrics(x):
            unary_metrics_computed = dict()
            for metric_ in self.unary_metrics:
                unary_metrics_computed[metric_] = unary_metrics_dict[metric_][self.data_type](x)
            
            return unary_metrics_computed
        
        metrics_computed = []
        
        for x_, xy_ in zip(X, Xy):
            metrics_temp_ = _metrics(x_, xy_)
            metrics_temp_.update(_unary_metrics(x_))
            metrics_computed.append(metrics_temp_)
            
        df_metrics_computed = pd.DataFrame(metrics_computed)
        
        return df_metrics_computed
        
    def _check_data_type(self, X):
        """
        TODO:
        """
        # заглушка:
        if len(X.shape) == 2:
            return "2d"
        elif X.shape[2] == 1:
            return "2d"
        else:
            return "3d"

    def score(self, X, y=None):
        # counts number of values bigger than mean
        return(sum(self.predict(X))) 

### Check metrics:

In [53]:
y_df = pd.read_csv("../data/OpenPart.csv")

In [54]:
X = np.array([plt.imread(_) for _ in sorted(glob.glob("../data/sample_1/*"))]).astype(bool)

In [55]:
X = np.array([plt.imread(_) for _ in sorted(glob.glob("../data/sample_1/*"))]).astype(bool)
X_ids = [_.split("/")[-1] for _ in sorted(glob.glob("../data/sample_1/*"))]
Xy = np.array([plt.imread(_) for _ in glob.glob("../data/after/*")]).astype(bool)
Xy_ids = [_.split("/")[-1] for _ in sorted(glob.glob("../data/after/*"))]

y = y_df.sort_values(by="Case")["Sample 1"].values
y_ids = y_df.sort_values(by="Case")["Case"].values

X_not_ids = [X_ids[i] for i in range(len(X)) if X_ids[i] not in y_ids]

X = np.array([X[i] for i in range(len(X)) if X_ids[i] in y_ids])
Xy = np.array([Xy[i] for i in range(len(Xy)) if Xy_ids[i] in y_ids])

In [56]:
X_train, X_eval, Xy_train, Xy_eval, y_train, y_eval = train_test_split(X, Xy, y, test_size=0.2, random_state=51)

In [75]:
q_clf = BaseQualityEstimator(metrics=["dice_coefficient", 
                                      "mae", "mse", 
                                      "hausdorff_distance", 
#                                       "surface_distances",
                                      "assd"
                                     ], 
                             unary_metrics=["area"], meta_clf=LGBMClassifier(max_depth=3))

In [76]:
X_train.shape, Xy_train.shape, y_train.shape

((48, 1024, 1024), (48, 1024, 1024), (48,))

In [77]:
# q_clf.X_metrics

In [78]:
q_clf.fit(X_train, Xy_train, y_train)

BaseQualityEstimator(meta_clf=LGBMClassifier(max_depth=3),
                     metrics=['dice_coefficient', 'hausdorff_distance', 'assd'])

In [79]:
y_pred = q_clf.predict(X_eval, Xy_eval)

In [80]:
y_dummy = np.array([3 for i in range(len(y_eval))])

In [81]:
y_pred

array([3, 5, 4, 3, 5, 4, 5, 2, 3, 4, 3, 4])

In [82]:
y_eval

array([4, 5, 1, 1, 4, 5, 4, 3, 3, 2, 5, 3])

In [83]:
y_dummy

array([3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3])

In [84]:
MAE(y_eval, y_dummy)

1.1666666666666667

In [85]:
MAE(y_eval, y_pred)

1.25