In [1]:
# -*- coding: utf-8 -*-
"""Isolation-based anomaly detection using nearest-neighbor ensembles.
Part of the codes are adapted from https://github.com/xhan97/inne
"""
# Author: Xin Han <xinhan197@gmail.com>
# License: BSD 2 clause

import numbers
from warnings import warn

import numpy as np
from sklearn.metrics import euclidean_distances
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted, check_random_state

from pyod.models.base import BaseDetector
from pyod.utils.utility import MAX_INT, invert_order

MIN_FLOAT = np.finfo(float).eps  # Smallest possible float value to avoid division by zero

class INNE(BaseDetector):
    """ Isolation-based anomaly detection using nearest-neighbor ensembles.

    The INNE algorithm uses the nearest neighbour ensemble to isolate anomalies.
    It partitions the data space into regions using a subsample and determines 
    an isolation score for each region, allowing it to detect both global and 
    local anomalies.

    See :cite:`bandaragoda2018isolation` for details.

    Parameters
    ----------
    n_estimators : int, default=200
        The number of base estimators in the ensemble.
    max_samples : int or float, optional (default="auto")
        Number of samples for training each base estimator.
    contamination : float in (0., 0.5), optional (default=0.1)
        Proportion of outliers in the dataset, used for setting the anomaly threshold.
    random_state : int, RandomState instance or None, optional (default=None)
        Seed for random number generation.
    """

    def __init__(self, n_estimators=200, max_samples="auto", contamination=0.1, random_state=None):
        # Initialization of parameters
        self.n_estimators = n_estimators
        self.max_samples = max_samples
        self.random_state = random_state
        self.contamination = contamination

    def fit(self, X, y=None):
        """Fit the anomaly detector.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The input samples.
        y : Ignored
            Not used in unsupervised methods.
        """
        # Validate input X (and optionally y) for consistency
        X = check_array(X, accept_sparse=False)
        self._set_n_classes(y)  # Needed for compatibility with PyOD's BaseDetector

        # Determine the number of samples
        n_samples = X.shape[0]
        if isinstance(self.max_samples, str):
            if self.max_samples == "auto":
                # Set max_samples to 8 or n_samples, whichever is smaller
                max_samples = min(8, n_samples)
            else:
                raise ValueError(f"max_samples ({self.max_samples}) is not supported.")
        elif isinstance(self.max_samples, numbers.Integral):
            if self.max_samples > n_samples:
                # Ensure max_samples is not greater than total samples
                warn(f"max_samples ({self.max_samples}) is greater than n_samples ({n_samples}). Setting max_samples to n_samples.")
                max_samples = n_samples
            else:
                max_samples = self.max_samples
        else:
            # max_samples should be a float in (0, 1] if it's not an int or "auto"
            if not 0.0 < self.max_samples <= 1.0:
                raise ValueError(f"max_samples must be in (0, 1], got {self.max_samples}.")
            max_samples = int(self.max_samples * X.shape[0])

        self.max_samples_ = max_samples  # Store the actual max_samples value

        # Proceed to fit the model
        self._fit(X)
        self.decision_scores_ = invert_order(self._score_samples(X))  # Compute decision scores
        self._process_decision_scores()  # Calculate threshold and labels
        return self

    def _fit(self, X):
        """ Build nearest-neighbor ensembles based on the given data.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The training input samples.
        """
        n_samples, n_features = X.shape
        # this might change
        self._centroids = np.empty([self.n_estimators, self.max_samples_, n_features])  # Stores centroids
        self._ratio = np.empty([self.n_estimators, self.max_samples_])  # Stores ratio of distances
        self._centroids_radius = np.empty([self.n_estimators, self.max_samples_])  # Stores radius of each hypersphere
        # this might change

        # Generate random seeds for reproducibility
        random_state = check_random_state(self.random_state)
        self._seeds = random_state.randint(MAX_INT, size=self.n_estimators)

        for i in range(self.n_estimators):
            rnd = check_random_state(self._seeds[i])
            # Randomly select subsamples as centroids
            center_index = rnd.choice(n_samples, self.max_samples_, replace=False)

            #this might change
            self._centroids[i] = X[center_index]
            center_dist = euclidean_distances(self._centroids[i], self._centroids[i], squared=True)
            np.fill_diagonal(center_dist, np.inf)  # Ignore self-distances (diagonal is set to infinity)
            # Calculate the radius of each hypersphere (nearest neighbor distance)
            self._centroids_radius[i] = np.amin(center_dist, axis=1)

            # Find nearest neighbor indices and calculate distance ratios
            cnn_index = np.argmin(center_dist, axis=1)
            cnn_radius = self._centroids_radius[i][cnn_index]

            # Calculate the ratio used for scoring
            self._ratio[i] = 1 - (cnn_radius + MIN_FLOAT) / (self._centroids_radius[i] + MIN_FLOAT)

            # this might change

        return self

    def decision_function(self, X):
        """Predict anomaly scores for the input data.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The input samples.
        """
        check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])  # Ensure the model is fitted
        # Return inverted outlier scores (larger values indicate anomalies)
        return invert_order(self._score_samples(X))

    def _score_samples(self, X):
        """Compute the anomaly score for each sample.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            The input samples.
        """
        X = check_array(X, accept_sparse=False)  # Validate input array
        isolation_scores = np.ones([self.n_estimators, X.shape[0]])  # Initialize isolation scores

        # Loop over each base estimator (ensemble member)
        # this might change
        for i in range(self.n_estimators):
            # Calculate the distance between test points and centroids
            x_dists = euclidean_distances(X, self._centroids[i], squared=True)
            # Find samples covered by at least one hypersphere
            cover_radius = np.where(x_dists <= self._centroids_radius[i], self._centroids_radius[i], np.nan)
            x_covered = np.where(~np.isnan(cover_radius).all(axis=1))
            # Identify the centroid with the smallest radius covering the sample
            cnn_x = np.nanargmin(cover_radius[x_covered], axis=1)
            isolation_scores[i][x_covered] = self._ratio[i][cnn_x]
        #this might change

        # Average the isolation scores across all estimators
        scores = np.mean(isolation_scores, axis=0)
        return -scores  # Return negative scores (lower is more abnormal)


In [2]:
X =  [[-1.1], [0.3], [0.5], [100]]
clf = INNE().fit(X)
print(clf.predict([[0.1], [0.5], [90], [-1.5]]))


[0 0 1 0]


In [10]:
from pyod.models.lof import LOF
X =  [[-1.1, 1, 5, 33, 4], [-1.5, 2, 4, 8, 3],[-1.5, 2, 4, 8, 3], [-28, 2, 49, 7, 23], [18, 2, 4, 1, 13], [0.3, 111, 89, 46, 23], [0.5, 15, 11, 2, -3]]
clf = LOF().fit(X)
print(clf.predict([[0.1, 1, 5, 2, 1.5], [0.5, 2, 7, 1, 0.8], [90, 100, 45, 7, 31]]))


[0 0 1]


