In [None]:
import math 
import time 
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt 
import seaborn as sns 

from sklearn.datasets import load_iris
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

**Isolation Forest** es un algoritmo no supervisado que detecta anomalías aislado observaciones en árboles binarios aleatorios. Las anomalías (puntos poco comunes) requieren menos divisiones para ser aisladas que los puntos normales. 

**Atributos del Algoritmo**: 
- `n_estimators`: número de árboles en el bosque
- `max_sample`: número o proporción de muestras para construir cada árbol
- `contamination`: proporción esperada de anomalías en el dataset
- `random_state`: semilla para reproducibilidad 
- `forest`: lista de árboles de aislamiento entrenados 

In [None]:
gamma = 0.5772156649  

def _c(n):
  "Constante de normalización para el conjunto de tamaño n"
  if n > 2:
    H = math.log(n-1) + gamma
    return 2 * H - 2 * (n - 1) / n 
  elif n == 2:
    return 1 
  else: 
    return 0 

class iTreeNode:
  def __init__(self, size=None, split_attr=None, split_value=None, left=None, right=None):
    self.size = size                # solo para nodos externos
    self.split_attr = split_attr
    self.split_value = split_value
    self.left = left
    self.right = right

def build_iTree(X, height=0, max_height=None):
  """Construye un iTree recursivamente. 

  Args:
    X: subconjunto de datos 
    height: altura actual. Defaults to 0.
    max_height: altura máxima. Si None, se usa ceil(log2(len(X))).
  """
  n = len(X)
  if max_height is None:
    max_height = int(np.ceil(np.log2(n)))
  
  # Condición de parada: altura máxima o un solo punto
  if height >= max_height or n <= 1:
    return iTreeNode(size=n)
  
  # Seleccionar atributo y valor de corte aleatorios
  m = X.shape[1]
  q = np.random.randint(0, m)
  min_val = X[:, q].min()
  max_val = X[:, q].max()
  p = np.random.uniform(min_val, max_val)
  
  # Dividir datos
  left_mask = X[:, q] <= p
  right_mask = ~left_mask
  X_left = X[left_mask]
  X_right = X[right_mask]
  
  # Si alguna división está vacía, crear nodo externo
  if len(X_left) == 0 or len(X_right) == 0:
    return iTreeNode(size=n)
  
  # Construir subárboles recursivamente
  left_node = build_iTree(X_left, height + 1, max_height)
  right_node = build_iTree(X_right, height + 1, max_height)
  
  return iTreeNode(split_attr=q, split_value=p, left=left_node, right=right_node)

def path_length(x, tree, current_length=0):
  "Calcula la longitud de ruta (path length) ajustada para una instancia x en un iTree"
  # Nodo externo
  if tree.size is not None:
    return current_length + _c(tree.size)
  
  # Nodo interno: seguir la rama correspondiente
  if x[tree.split_attr] <= tree.split_value:
    return path_length(x, tree.left, current_length + 1)
  else:
    return path_length(x, tree.right, current_length + 1)

class Zero_IsolationForest:
  "Implementación propia del algoritmo Isolation Forest para detección de anomalías."
  
  def __init__(self, n_estimators=100, max_samples=256, contamination=0.1, random_state=None):
    self.n_estimators = n_estimators
    self.max_samples = max_samples
    self.contamination = contamination
    self.random_state = random_state
    self.forest = []
    self._subsample_size = None
  
  def fit(self, X):
    np.random.seed(self.random_state)
    n = len(X)
    self._subsample_size = min(self.max_samples, n) if isinstance(self.max_samples, int) else int(self.max_samples * n)
    
    self.forest = []
    for _ in range(self.n_estimators):
      # Submuestreo sin reemplazo
      idx = np.random.choice(n, self._subsample_size, replace=False)
      X_sub = X[idx]
      # Construir iTree
      tree = build_iTree(X_sub)
      self.forest.append(tree)
    return self
  
  def anomaly_score(self, X):
    scores = []
    for x in X:
      path_lengths = [path_length(x, tree) for tree in self.forest]
      avg_path = np.mean(path_lengths)
      # Puntuación de anomalía según la fórmula original
      score = 2 ** (-avg_path / _c(self._subsample_size))
      scores.append(score)
    return np.array(scores)
  
  def predict(self, X):
    scores = self.anomaly_score(X)
    threshold = np.percentile(scores, 100 * (1 - self.contamination))
    return np.where(scores >= threshold, -1, 1)  # -1: anomalía, 1: normal

In [None]:
iris = load_iris()
X = iris.data 
y = iris.target                   # considerar la clase 2 como anomalía
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X) 
y_binary = (y == 2).astype(int)   # 1 para anomalía (clase 2), 0 para normal

In [None]:
params = {
  'n_estimators' : 100, 
  'contamination': 0.33,
  'random_state' : 42
}

In [None]:
start = time.time()
sk_model = IsolationForest(
  n_estimators=params['n_estimators'],
  contamination=params['contamination'],
  random_state=params['random_state']
)
sk_model.fit(X_scaled)
sk_time = time.time() - start 

sk_predictions = sk_model.predict(X_scaled)
sk_scores = sk_model.decision_function(X_scaled)

print(f"Tiempo de Entrenamiento: {sk_time:.4f}s")
print(f"Puntuaciones: [{sk_scores.min():.3f}, {sk_scores.max():.3f}]")
print(f"Media: {sk_scores.mean():.3f}, Desv: {sk_scores.std():.3f}")

In [None]:
start = time.time()
zero_model = Zero_IsolationForest(
  n_estimators=params['n_estimators'],
  contamination=params['contamination'],
  random_state=params['random_state']
)
zero_model.fit(X_scaled)
zero_time = time.time() - start 

zero_predictions = zero_model.predict(X_scaled)
zero_scores = zero_model.anomaly_score(X_scaled)

print(f"Tiempo de Entrenamiento: {zero_time:.4f}s")
print(f"Puntuaciones: [{zero_scores.min():.3f}, {zero_scores.max():.3f}]")
print(f"Media: {zero_scores.mean():.3f}, Desv: {zero_scores.std():.3f}")

In [None]:
# Convertir a etiquetas binarias para evaluación
sk_labels = (sk_predictions == -1).astype(int)
zero_labels = (zero_predictions == -1).astype(int)

# Exactitud
sk_accuracy = np.mean(sk_labels == y_binary)
zero_accuracy = np.mean(zero_labels == y_binary)

print(f"Accuracy (Scikit-Learn Model):    {sk_accuracy:.3f}")
print(f"Accuracy (Zero Mode):             {zero_accuracy:.3f}")

# Matrices de confusión
def print_confusion_matrix(y_true, y_pred, label):
  tp = np.sum((y_true == 1) & (y_pred == 1))
  tn = np.sum((y_true == 0) & (y_pred == 0))
  fp = np.sum((y_true == 0) & (y_pred == 1))
  fn = np.sum((y_true == 1) & (y_pred == 0))
  
  print(f"Matriz de Confusión ({label})")
  print(f"                  Predicción")
  print(f"               Normal  Anomalía")
  print(f"Real Normal     {tn:3d}      {fp:3d}")
  print(f"Real Anomalía   {fn:3d}      {tp:3d}")

print_confusion_matrix(y_binary, sk_labels, "sklearn")
print_confusion_matrix(y_binary, zero_labels, "zero")

In [None]:
print(f"Ratio tiempo (propia/sklearn): {zero_time/sk_time:.1f}x")
print(f"Diferencia en exactitud: {abs(sk_accuracy - zero_accuracy):.3f}")

# Correlación entre puntuaciones (invertir signo de sklearn para comparar)
correlation = np.corrcoef(sk_scores, -zero_scores)[0, 1]
print(f"Correlación entre puntuaciones: {correlation:.3f}")

# Puntos con predicciones diferentes
diff_predictions = np.sum(sk_labels != zero_labels)
print(f"Instancias con predicciones diferentes: {diff_predictions}/{len(X)}")