In [None]:
import pyod
from typing import Any, Literal, Protocol, Self, Optional, overload
import numpy as np
import numpy.typing as npt

# relevant PyOD api distilled down

In [None]:
DATA = npt.NDArray[np.float_]  # (n_samples, n_features)
SCORES = npt.NDArray[np.float_]  # (n_samples,) higher = more anomalous
LABELS = npt.NDArray[np.int_]  # (n_samples,) 0 for inliers, (>=)1 for outliers


class PyODModel(Protocol):
    decision_scores_: DATA  # train scores
    labels_: LABELS  # train predicted labels
    threshold_: float

    def __init__(self, contamination: float, **kwargs):
        ...

    def fit(self, X: DATA, y: Optional[LABELS]) -> Self:
        """Fit detector. y is ignored in unsupervised methods."""
        ...

    def decision_function(self, X: DATA) -> SCORES:
        """Predict raw anomaly scores of X using the fitted detector."""
        ...

    def predict(
        self, X: DATA, return_confidence: bool
    ) -> LABELS | tuple[LABELS, SCORES]:
        """Predict if a particular sample is an outlier or not."""
        ...

    def predict_proba(
        self, X: DATA, method: str, return_confidence: bool
    ) -> SCORES | tuple[SCORES, SCORES]:
        """Predict the probability of a sample being outlier."""
        ...

    def predict_confidence(self, X: DATA) -> SCORES:
        """Predict the model's confidence in making the same prediction
        under slightly different training sets."""
        ...

    def _predict_rank(self, X: DATA, normalized: bool) -> SCORES:
        """Predict the outlyingness rank of a sample by a fitted model."""
        ...

    def _set_n_classes(self, y: LABELS) -> Self:
        """Set the number of classes if `y` is presented, which is not
        expected. It could be useful for multi-class outlier detection."""
        ...

# Minimal requirements to apply balif-like framework

In [None]:
REGION = npt.NDArray[np.int_]  # (n_samples,)

class BaeysianableDetector:
    def apply(self, X: DATA)->npt.NDArray[np.int_] :
        """Finds the terminal region idx for each sample in X."""
        ...

    def predict_proba(
        self, X: DATA, return_confidence: bool
    ) -> SCORES | tuple[SCORES, SCORES]:
        ...

class BaeysianableEnsemble:
    @property
    def estimators_(self) -> list[BaeysianableDetector]:
        ...

BETADISTR = tuple[npt.NDArray[np.float_], npt.NDArray[np.float_]]

class BaeysifiedDetector(BaeysianableDetector):
    def predict_distr(self, X: DATA) -> BETADISTR:
        ...

    def update(self, X: DATA, y: LABELS) -> Self:
        ...

# Extending PYOD IF model

## example of IF usage

In [None]:
from pyod.utils.data import generate_data
from pyod.utils.example import visualize
from pyod.models.iforest import IForest

X_train, X_test, y_train, y_test = generate_data(
    n_train=1000, n_test=100, contamination=0.1, random_state=0
)
X = X_test.astype(np.float32)

model = IForest().fit(X_train)
y_train_pred, y_test_pred = model.predict(X_train), model.predict(X_test)

visualize("IF", X_train, y_train, X_test, y_test, y_train_pred, y_test_pred)

## Relevant API distilled down

In [None]:
from pyod.models.iforest import IForest
from sklearn.ensemble import IsolationForest
from sklearn.tree import ExtraTreeRegressor
from sklearn.tree._tree import Tree

class IForest(PyODModel):
    @property
    def detector_(self)->IsolationForest:
        """The underlying sklearn IsolationForest object."""
        ...

    @property
    def estimators_(self)->list[ExtraTreeRegressor]:
        """The collection of fitted sub-estimators."""
        ...

    @property
    def estimators_samples_(self)->list[npt.NDArray[np.int_]]:
        """The subset of in-bag samples for each base estimator."""
        ...

    @property
    def max_samples_(self):
        """The actual number of samples."""
        ...

# sklearn classes follow the API:

class IsolationForest(Protocol):
    @property
    def tree_(self)->Tree:
        ... 

class ExtraTreeRegressor(Protocol):
    @property
    def tree_(self)->Tree:
        ... 

class Tree:
    #THIS ARE ALL PROPERTIES, NOT CLASS ATTRIBUTES

    # basic tree structure
    max_depth: int
    node_count: int
    n_leaves: int
    children_left: npt.NDArray[np.int_] # (node_count,)
    children_right: npt.NDArray[np.int_] # (node_count,)
    
    # tree fitting information
    feature: npt.NDArray[np.int_] # (node_count,)
    threshold: npt.NDArray[np.float_] # (node_count,)
    n_node_samples: npt.NDArray[np.int_] # (node_count,)

    # prediction
    value: npt.NDArray[np.float_] # (node_count, n_outputs, max_n_classes)

    def compute_node_depths(self)->npt.NDArray[np.int_]:
        """Compute the depth of each node."""
        ...

    def apply(self, X: DATA)->npt.NDArray[np.int_] :
        """Finds the terminal region (=leaf node) for each sample in X."""
        ...

    def decision_path(self, X: DATA)->npt.NDArray[np.int_]:
        """Returns the decision path in the tree.
        this is going to be a sparse matrix with shape (n_samples, n_nodes)")
        1 means the node is in the path, 0 means it is not
        NOTE: not returned as scipy sparse int matrix
            convert before using it with .toarray().astype(bool)"""
        ...

    

In [None]:
print("you can access the underlying sklearn IsolationForest object:")
sklearn_iforest = model.detector_
print("model.detector_ is a", type(sklearn_iforest))
print()

print("this object has some usefull precomputed values:")
c, *_ = model.detector_._average_path_length_per_tree
path_lenght, *_ = model.detector_._decision_path_lengths
print(f" - average_path_length: {type(c)}({c.shape}, {c.dtype})")
print(f" - path_lengths: {type(path_lenght)}({path_lenght.shape}, {path_lenght.dtype})")

In [None]:
print("you can access individual ExtraRegressionTree:")
estimator, *_ = model.estimators_
print("model.estimators_ is a", type(model.estimators_), "of", type(estimator))
print()

print("from each estimator you can access the undelying tree:")
tree = estimator.tree_
print("a tree is a", tree)
print()

print("with this tree you can:")
leaves = tree.apply(X)
leaves = estimator.apply(X)
print(f" - apply(X): get the leaf nodes for X")
print(f"             ({type(leaves)}({leaves.shape}, {leaves.dtype})") 
print()   

path = tree.decision_path(X)
path = estimator.decision_path(X)
print(f" - decision_path(X): get the decision path for X")
print(f"                     this is going to be a sparse matrix with shape (n_samples, n_nodes)")
print(f"                     1 means the node is in the path, 0 means it is not")
print(f"                     ({type(path)}({path.shape}, {path.dtype})")
print()

depths = tree.compute_node_depths()
print(f" - compute_node_depths(): get the depth of each node")
print(f"                          ({type(depths)}({depths.shape}, {depths.dtype})")
print()

## Extenting the ExtraTreeRegressor with bayes info

In [None]:
c = model.detector_._average_path_length_per_tree
path_lenght = model.detector_._decision_path_lengths

In [None]:
# import numpy as np
# from joblib import Parallel
# from joblib.parallel import delayed
# from sklearn.ensemble import IsolationForest
# from sklearn.utils import check_array
# from sklearn.utils.validation import check_is_fitted

from dataclasses import dataclass
from pyod.models.base import BaseDetector
from pyod.models.iforest import IForest
from scipy.stats import beta


@dataclass
class BetaDistr():
    a: npt.NDArray[np.float_]
    b: npt.NDArray[np.float_]

    def mean(self):
        return beta.mean(self.a, self.b)
    
    def var(self):
        return beta.var(self.a, self.b)

    def nu(self):
        return self.a + self.b

    def update(self, y: LABELS): 
        self.a += np.sum(y >= 1)
        self.b += np.sum(y == 0)   


class Wrapper



class Balif(IForest):   
    threshold_: float

    def __init__(self, *args, prior_strength=0.1, score_to_prob_method="linear", **kwargs):
        super().__init__(*args, **kwargs)
        self.prior_strength = prior_strength
        self.score_to_prob_method = score_to_prob_method

    def fit(self, X, y=None):
        super().fit(X, y)

        self.
        #TODO
        return self

    def decision_function(self, X):
        #TODO


    # def predict(
    #     self, X: DATA, return_confidence: bool
    # ) -> LABELS | tuple[LABELS, SCORES]:
    #     """Predict if a particular sample is an outlier or not."""
    #     ...

    # def predict_proba(
    #     self, X: DATA, method: str, return_confidence: bool
    # ) -> SCORES | tuple[SCORES, SCORES]:
    #     """Predict the probability of a sample being outlier."""
    #     ...

    # def predict_confidence(self, X: DATA) -> SCORES:
    #     """Predict the model's confidence in making the same prediction
    #     under slightly different training sets."""
    #     ...

    # def _predict_rank(self, X: DATA, normalized: bool) -> SCORES:
    #     """Predict the outlyingness rank of a sample by a fitted model."""
    #     ...

In [None]:
from pyod.models.iforest import IForest
from sklearn.tree import ExtraTreeRegressor
from sklearn.tree._tree import Tree

class BayesianExtraTreeRegressor(ExtraTreeRegressor):
    @classmethod
    def wrap(cls, regressor:ExtraTreeRegressor)->Self:
        self.regressor = regressor
        self.alpha, self.beta = cls.compute_prior(regressor)

    @staticmethod
    def compute_prior(regressor:ExtraTreeRegressor)->BETADISTR:
        ...

    def predict_distr(self, X: DATA) -> BETADISTR:
        ...

    def update(self, X: DATA, y: LABELS) -> Self:
        ...
    

In [None]:
from pyod.models.iforest import IForest
import odds_datasets


for dataset in odds_datasets.datasets_names[1:]:
    X, y = odds_datasets.load(dataset)
    print(f"Dataset: {dataset}")
    model = IForest().fit(X)
    break

In [None]:
leaves = np.array([est.apply(X) for est in model.detector_.estimators_])
print(leaves.shape)

common = [
    [
        np.array([len(np.unique(l)) for l in leaves[:, current_leaves == l]]).sum()
        for l in np.unique(current_leaves)
    ]
    for current_leaves in leaves
]

In [None]:
import matplotlib.pyplot as plt
for c in common:
    plt.hist(c,alpha=0.5)

In [None]:
sum(sum(common, []))