In [None]:
!apt-get install openjdk-11-jdk -y

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
openjdk-11-jdk is already the newest version (11.0.28+6-1ubuntu1~22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 41 not upgraded.


In [None]:
from pyspark.sql import SparkSession

#Crear una sesión de Spark
spark = (SparkSession.builder
    .appName('Arbol Decision Escalable')
    .master("local[*]")
    .getOrCreate())

spark.sparkContext.setLogLevel('WARN')

print("Spark: "+spark.version)

Spark: 3.5.1


Dataset

In [None]:
data = [
    (1, 0, 2, 'rayado', 'normal'),
    (1, 0, 1, 'blanco', 'cancerigena'),
    (1, 2, 0, 'rayado', 'normal'),
    (0, 2, 1, 'rayado', 'normal'),
    (1, 1, 1, 'rayado', 'cancerigena'),
    (2, 2, 1, 'rayado', 'cancerigena')
]

columns = ['antenas','colas','nucleos','cuerpo','clase']

df = spark.createDataFrame(data, columns)
df.show()

+-------+-----+-------+------+-----------+
|antenas|colas|nucleos|cuerpo|      clase|
+-------+-----+-------+------+-----------+
|      1|    0|      2|rayado|     normal|
|      1|    0|      1|blanco|cancerigena|
|      1|    2|      0|rayado|     normal|
|      0|    2|      1|rayado|     normal|
|      1|    1|      1|rayado|cancerigena|
|      2|    2|      1|rayado|cancerigena|
+-------+-----+-------+------+-----------+



Calculo de Entropia MapReduce

In [None]:
import numpy as np
from pyspark.sql.functions import col

def entropia(df_columna):
    n_total = df_columna.count()
    if n_total == 0:
        return 0.0

    # Convertimos a RDD y contamos ocurrencias por valor
    rdd = df_columna.rdd.map(lambda row: (row[0], 1)).reduceByKey(lambda a, b: a + b)

    def calcular_entropia_particion(iterator):
        resultados = []
        for valor, count in iterator:
            p = count / n_total
            resultados.append(-(p * np.log2(p)))
        return resultados

    entropia_valores = rdd.mapPartitions(calcular_entropia_particion)

    # Reduce: sumar todas los valores
    I = entropia_valores.reduce(lambda a, b: a + b)
    return I

def entropia_atributo(df, atributo, clase_col):
    n_total = df.count()
    if n_total == 0:
        return 0.0

    # Obtener los valores únicos del atributo
    valores = [row[atributo] for row in df.select(atributo).distinct().collect()]

    I_Ai = 0.0
    for valor in valores:
        grupo = df.filter(col(atributo) == valor)
        n_ij = grupo.count()
        I_ij = entropia(grupo.select(clase_col))
        I_Ai += (n_ij / n_total) * I_ij
    return I_Ai

def entropia_ganancia(df, atributo, clase_col):
    return entropia(df.select(clase_col)) - entropia_atributo(df, atributo, clase_col)

# entropia_nucleos = entropia_atributo(df, 'cuerpo', 'clase')
# print(f"Entropía condicional del atributo 'cuerpo': {entropia_nucleos:.4f}")

ID3 Secuencial

In [None]:
from pyspark.sql.functions import col

def id3_secuencial(df, atributos, clase_col):
    arbol = {}
    pendientes = [(df, atributos, arbol, None)]

    while pendientes:
        df_sub, attrs, nodo_padre, clave_padre = pendientes.pop()

        # Entropía total en este nodo
        clases_unicas = [r[clase_col] for r in df_sub.select(clase_col).distinct().collect()]
        if len(clases_unicas) == 1:
            if clave_padre is None:
                return clases_unicas[0]
            else:
                nodo_padre[clave_padre] = clases_unicas[0]
            continue

        # Calcular ganancia para cada atributo
        ganancias = {a: entropia_ganancia(df_sub, a, clase_col) for a in attrs}
        atributo_ganador = max(ganancias, key=ganancias.get)

        # Crear nodo para este atributo en el padre
        nuevo_nodo = {}
        if clave_padre is None:
            nodo_padre.update({atributo_ganador: nuevo_nodo})
        else:
            nodo_padre[clave_padre] = {atributo_ganador: nuevo_nodo}

        # Para cada valor posible del atributo ganador, añadir nuevos nodos pendientes
        valores = [row[atributo_ganador] for row in df_sub.select(atributo_ganador).distinct().collect()]
        for val in valores:
            sub_df = df_sub.filter(col(atributo_ganador) == val)
            if sub_df.count() == 0:
                clase_mayoritaria = df_sub.groupBy(clase_col).count().orderBy(col('count').desc()).first()[0]
                nuevo_nodo[val] = clase_mayoritaria
            else:
                # Agregar nodo pendiente a la pila
                sub_attrs = [a for a in attrs if a != atributo_ganador]
                pendientes.append((sub_df, sub_attrs, nuevo_nodo, val))

    return arbol

Resultado

In [None]:
import json

arbol = id3_secuencial(df, df.columns[:-1], 'clase')
print(json.dumps(arbol, indent=4))

{
    "nucleos": {
        "2": "normal",
        "1": {
            "antenas": {
                "2": "cancerigena",
                "0": "normal",
                "1": "cancerigena"
            }
        },
        "0": "normal"
    }
}
