<a href="https://colab.research.google.com/github/carlosramos1/practicas-machine-learning/blob/main/Arbol_de_decision_CART_(implementacion).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Librerias para la implementación
import numpy as np
import pandas as pd
from scipy import stats

# Arbol de Regresión CART

## Implementación

Código de implementación del Árbol de Desición para problemas de Regresión (tipo CART)

In [None]:
class ArbolRegresion:

  def __init__(self, t_min = 5):
    self.t_min = t_min

  def entrenar(self, X, y):
    self.raiz = Nodo(X, y, self.t_min)

  def predecir(self, X_new):
    return self.raiz.predecir(X_new)

  def imprimirNodos(self):
    self.raiz.imprimir()

In [None]:
class Nodo:

  def __init__(self, X, y, t_min):
    self.X = X          # data x (predictores)
    self.y = y          # data y (respuestas)
    self.j = -1         # indice de la variable separadora
    self.s = -1         # indice del valor punto de corte
    self.n_izq = None
    self.n_der = None
    self.es_hoja = False
    self.t_min = t_min  # Cant. mínima de observaciones del nodo

    self.buscar_mejor_separacion() # encuentra un j y s.

    if(self.j > -1 and self.s > -1):
      # Creamos los nodos izq y der en base al j y s encontrado
      lado_izq = self.X[:,self.j] <= self.X[self.s][self.j]
      lado_der = self.X[:,self.j] >  self.X[self.s][self.j]
      self.n_izq = Nodo(self.X[lado_izq], self.y[lado_izq], self.t_min)
      self.n_der = Nodo(self.X[lado_der], self.y[lado_der], self.t_min)
    else:
      # No se pudó encontrar un división
      self.es_hoja = True


  def buscar_mejor_separacion(self):
    '''
    Busca un 'j' y un 's' tal que minimicen el RSS (Suma de errores al cuadrado)

      argmin [ sum (y_i - ŷ_Rizq)^2 + sum (y_i - ŷ_Rder)^2 ]
       {j,s}
    '''
    # Inicialmente asignamos un valor grande
    rss_min  = self.calcular_rss(self.y) * 100
    num_vars = self.X.shape[1]
    num_obs  = self.X.shape[0]

    for j in range(num_vars):   # variable_separadora X_j
      for s in range(num_obs):  # punto_de_corte      X_j[s]

        # Posible partición
        lado_izq = self.X[:,j] <= self.X[s][j]
        lado_der = self.X[:,j] >  self.X[s][j]

        # Verificar que las particiones tenga almenos el tamaño mínimo
        if lado_izq.sum() < self.t_min or lado_der.sum() < self.t_min:
          continue

        # Calculamos suma de errores al cuadrado (RSS)
        rss_izq = self.calcular_rss(self.y[lado_izq])
        rss_der = self.calcular_rss(self.y[lado_der])
        rss = rss_izq + rss_der

        # Elegimos un j y s tal que minimicen RSS
        if (rss < rss_min):
          rss_min = rss
          self.j = j
          self.s = s

  def calcular_rss(self, y):
    return sum((y - np.mean(y))**2)

  def predecir(self, X_new):
    return np.array([self.predecir_obs(xi) for xi in X_new])

  def predecir_obs(self, xi):

    if self.es_hoja:
      return np.mean(self.y)

    if xi[self.j] <= self.X[self.s][self.j]:
        return self.n_izq.predecir_obs(xi)
    else:
        return self.n_der.predecir_obs(xi)


  def imprimir(self):
    if self.es_hoja:
      print(self.X.shape)
    else:
      self.n_izq.imprimir()
      self.n_der.imprimir()


Puntos importantes de la implementación
- El punto de corte escogido es un elemento de las observaciones. En https://cienciadedatos.net/documentos/py07_arboles_decision_python menciona que se debería tomar el punto medio entre dos observaciones como punto de corte.
- En cada recursión estoy haciendo una copia de las observaciones (nodo izq y nodo der). Quiza seria más optimo mantener una sola matriz para las observaciones y solo pasar por referencia los datos a los nodos izq y der (o pasar un array de indices), manteniendo la matriz de obs como un atributo estático.
- El crecimiento del arbol está definido por el tamaño mínimo que debe tener un nodo hoja.
- La elección de la variable separadora es tomado en cuenta según el orden en el que se encuentra, es decir, se toma en cuenta la primera variable, luego la segunda, asi sucesivamente. Esto no garantiza que se tenga la mejor separación, es posible tomar aleatoriamente el orden de las variables.

## Ejemplo

A continuación resolveremos nuevamente el problema de estimar el volumen de un árbol.

In [None]:
url = 'https://raw.githubusercontent.com/carlosramos1/datosML/refs/heads/main/trees.csv'
trees = pd.read_csv(url, index_col=0)
trees.head(10)

Unnamed: 0,C,A,V
1,8.3,70,10.3
2,8.6,65,10.3
3,8.8,63,10.2
4,10.5,72,16.4
5,10.7,81,18.8
6,10.8,83,
7,11.0,66,15.6
8,11.0,75,18.2
9,11.1,80,22.6
10,11.2,75,19.9


In [None]:
# Definamos el conjunto de entrenamiento...
X = trees[trees.V.notna()].loc[:,['C','A']]
y = trees[trees.V.notna()].loc[:,['V']]
X.head(8)

Unnamed: 0,C,A
1,8.3,70
2,8.6,65
3,8.8,63
4,10.5,72
5,10.7,81
7,11.0,66
8,11.0,75
9,11.1,80


In [None]:
# Entrenar
my_reg_tree = ArbolRegresion()
my_reg_tree.entrenar(X.values, y.values)

In [None]:
# Predicción (Datos del árbol 6)
my_reg_tree.predecir([[10.8,83]]).round(2)

array([14.26])

Al parecer hicimos un buen trabajo. Sin embargo una forma de validar esto es imaginar que todos los datos de entrenamiento tienen volúmenes desconocidos. Luego, podemos estimar volúmenes y calcular una métrica de error.

In [None]:
# Calcular estimaciones para todos los árboles
my_trees_new = pd.DataFrame(pd.concat([X,y],axis=1))
my_trees_new["V_hat"] = my_reg_tree.predecir(X.values).round(2)
my_trees_new.head(10)

Unnamed: 0,C,A,V,V_hat
1,8.3,70,10.3,14.26
2,8.6,65,10.3,14.26
3,8.8,63,10.2,14.26
4,10.5,72,16.4,14.26
5,10.7,81,18.8,14.26
7,11.0,66,15.6,14.26
8,11.0,75,18.2,14.26
9,11.1,80,22.6,21.36
10,11.2,75,19.9,21.36
11,11.3,79,24.2,21.36


In [None]:
# Calculemos errores al cuadrado
my_trees_new["e2"] = (my_trees_new.V - my_trees_new.V_hat)**2
my_trees_new.head(3)

Unnamed: 0,C,A,V,V_hat,e2
1,8.3,70,10.3,14.26,15.6816
2,8.6,65,10.3,14.26,15.6816
3,8.8,63,10.2,14.26,16.4836


In [None]:
# Obetengamos el error cuadrático medio
my_trees_new.e2.sum()/my_trees_new.shape[0]

np.float64(34.484959999999994)

# Árbol de Clasificación - CART

## Implementación

Código de implementación del Árbol de Desición para problemas de Clasificación

In [None]:
class ArbolClasificacion:

  def __init__(self, t_min = 5, criterio = 'gini'):
    self.t_min = t_min
    self.criterio = criterio

  def entrenar(self, X, y):
    self.raiz = Nodo(X, y, self.t_min, self.criterio)

  def predecir(self, X_new):
    return self.raiz.predecir(X_new)

  def imprimirNodos(self):
    self.raiz.imprimir()

In [None]:
class Nodo:

  def __init__(self, X, y, t_min, criterio):
    self.X = X          # data x (predictores)
    self.y = y         # data y (respuestas)
    self.j = -1         # indice de la 'variable separadora'
    self.s = -1         # indice del valor 'punto de corte'
    self.nodo_izq = None  # Sub-region izq
    self.nodo_der = None  # Sub-region der
    self.es_hoja = False
    self.t_min = t_min        # Cant. mínima de obs. en un nodo terminal
    self.criterio = criterio  # Método de medida de impureza

    # encontrar los mejores valores para j y s
    self.buscar_mejor_separacion()

    if(self.j > -1 and self.s > -1):
      # Creamos los nodos izq y der en base al j y s encontrado
      lado_izq = self.X[:,self.j] <= self.X[self.s][self.j]
      lado_der = self.X[:,self.j] >  self.X[self.s][self.j]
      self.nodo_izq = Nodo(self.X[lado_izq], self.y[lado_izq], self.t_min, self.criterio)
      self.nodo_der = Nodo(self.X[lado_der], self.y[lado_der], self.t_min, self.criterio)
    else:
      # No se pudó encontrar un división
      self.es_hoja = True


  def buscar_mejor_separacion(self):
    '''
    Busca un 'j' y un 's' tal que minimicen la medida de impureza

      argmin [impureza_izq (j,s) + impureza_der (j,s) ]
       {j,s}
    '''
    # Inicialmente asignamos un valor grande
    impureza_min = 10**10
    num_vars = self.X.shape[1]
    num_obs  = self.X.shape[0]

    for j in range(num_vars):   # variable_separadora X_j
      for s in range(num_obs):  # punto_de_corte      X_j[s]

        # Posible partición
        lado_izq = self.X[:,j] <= self.X[s][j]
        lado_der = self.X[:,j] >  self.X[s][j]

        # Verificar que las particiones tenga almenos el tamaño mínimo
        if lado_izq.sum() < self.t_min or lado_der.sum() < self.t_min:
          continue

        # Calcular la medida de impureza
        impureza_izq = self.calcular_impureza(self.y[lado_izq], self.criterio)
        impureza_der = self.calcular_impureza(self.y[lado_der], self.criterio)
        impureza = impureza_izq + impureza_der

        # Elegimos un j y s tal que minimice la impureza
        if (impureza < impureza_min):
          impureza_min = impureza
          self.j = j
          self.s = s

  def calcular_impureza(self, y, criterio):

    assert criterio == 'gini' or criterio == 'entropia'

    if(criterio == 'gini'):
      return self.calcular_indice_gini(y)

    if(criterio == 'entropia'):
      pass    # pendiente implementacion


  def calcular_indice_gini(self, y):
    '''
        Sum   P_mk * (1-P_mk)
    {k1,..,kn}

    P_mk = 1/N Sum I(y_i == k)
      - N: num. obs
    '''
    clases, cantidad = np.unique(y, return_counts=True)
    i_gini = 0
    n_obs = y.size

    for k in clases:
      p_k = cantidad[clases == k] / n_obs
      i_gini  += (p_k * (1 - p_k))

    return i_gini

  def predecir(self, X_new):
    return np.array([self._predecir_obs(xi) for xi in X_new])

  def _predecir_obs(self, xi):

    if self.es_hoja:
      return int(stats.mode(self.y).mode)

    if xi[self.j] <= self.X[self.s][self.j]:
        return self.nodo_izq._predecir_obs(xi)
    else:
        return self.nodo_der._predecir_obs(xi)


  def imprimir(self):
    if self.es_hoja:
      print(f'{self.X.shape}, clases {np.unique(self.y)}, represen. {int(stats.mode(self.y).mode)}')
    else:
      self.nodo_izq.imprimir()
      self.nodo_der.imprimir()


#### Ejemplo numérico

Ahora consideremos el conjunto de datos sobre tiroides.

In [None]:
url = 'https://raw.githubusercontent.com/carlosramos1/datosML/refs/heads/main/newthyroid.csv'
thyroids = pd.read_csv(url)
thyroids.head()

Unnamed: 0,T3resin,Thyroxin,Triiodothyronine,Thyroidstimulating,TSH_value,Class
0,107,10.1,2.2,0.9,2.7,1
1,113,9.9,3.1,2.0,5.9,1
2,127,12.9,2.4,1.4,0.6,1
3,109,5.3,1.6,1.4,1.5,1
4,105,7.3,1.5,1.5,-0.1,1


In [None]:
# Renombrando las columnas y solo usaremos 3 columnas
thyroids.columns = ['T3','TRX','TRI','TRS','TSH','Tipo']
thyroids_selected = thyroids.loc[:,['T3','TRX','Tipo']]
thyroids_selected.head()

Unnamed: 0,T3,TRX,Tipo
0,107,10.1,1
1,113,9.9,1
2,127,12.9,1
3,109,5.3,1
4,105,7.3,1


In [None]:
# Nuevo sujeto
new_subject = [136,12,np.nan]
new_subject

[136, 12, nan]

In [None]:
# Primero debemos definir el conjunto de entrenamiento...
X = thyroids_selected.loc[:,['T3','TRX']]
y = thyroids_selected.loc[:,['Tipo']]
X.head(2), y.head(2)

(    T3   TRX
 0  107  10.1
 1  113   9.9,
    Tipo
 0     1
 1     1)

In [None]:
# Entrenar
my_clas_tree = ArbolClasificacion()
my_clas_tree.entrenar(X.values, y.values.flatten())

In [None]:
# Predicción (Datos del nuevo sujeto)
my_clas_tree.predecir([[new_subject[0], new_subject[1]]])

array([1])

In [None]:
# Predicciones para todos los sujetos
my_clas_tree.predecir(X.values)

array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 1, 3, 1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
       3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 1])

In [None]:
# Matriz de confusión
my_mc = pd.crosstab(my_clas_tree.predecir(X.values),
                    np.ravel(y))
my_mc

col_0,1,2,3
row_0,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1,147,2,2
2,1,33,0
3,2,0,28


In [None]:
# Tasa de clasificación erronea
(1-np.diag(my_mc).sum()/len(y))*100

np.float64(3.2558139534883734)

### Bibliografía
- The elements of statistical machine learning - Hastle, Tibshirani, Friedman
- Árboles de decisión con Python: regresión y clasificación (https://cienciadedatos.net/documentos/py07_arboles_decision_python)