[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/eirasf/GCED-AA2/blob/main/lab11/lab11.ipynb)

# Práctica 11: Modelos gráficos probabilísticos
En esta práctica desarrollaremos código en Python para representar **Redes bayesianas**. Podremos almacenar modelos, aprenderlos por *MLE (maximum likelihood estimation)* y utilizarlos para hacer inferencias.

## Representación de las variables
Los modelos trabajarán exclusivamente con variables categóricas. Para representar dichas variables crearemos una clase que almacene el nombre de una variable y sus valores posibles.

Incluiremos también un método que nos permita codificar cada valor posible con un número entero.

In [None]:
class Variable:
  def __init__(self, nombre, valores):
    self.nombre = nombre
    self.valores = valores

  def codifica(self, valor):
    #TODO: Completa el método
    return ...

  def __str__(self):
    return f'Variable: {self.nombre} ({"|".join(self.valores)})'

# Comprobaciones
#TODO: Crea una variable estatura con los valores posibles: baja, media, alta
var_estatura = ...
assert(var_estatura.codifica('media')==1)

## El grafo
Las redes bayesianas con **grafos dirigidos acíclicos** cuyos nodos representan a las variables abarcadas por el modelo.

### Nodos
Representaremos estos nodos con la clase Nodo, que registrarán la variable que representan y tendrán dos lista, una de nodos hijos y otra de nodos padres.

Además los nodos tendrán un método para indicar si el nodo es raíz, es decir, no tiene padres.

In [None]:
class Nodo:
  def __init__(self, variable):
    self.variable = variable
    self.padres = []
    self.hijos = []

  def añade_padre(self, p):
    #TODO: Completa el método
    ...

  def añade_hijo(self, p):
    #TODO: Completa el método
    ...

  def es_raiz(self):
    #TODO: Completa el método
    ...

  def __str__(self):
    r = f'Nodo: {self.variable.nombre}'
    r += ' P:['+' & '.join(map(lambda x: x.variable.nombre,self.padres))+']'
    r += ' H:['+' & '.join(map(lambda x: x.variable.nombre,self.hijos))+']'
    return r

# Comprobaciones
nodo_estatura = Nodo(var_estatura)
print(nodo_estatura)
assert(nodo_estatura.es_raiz())

### Representando el grafo dirigido
Para representar el grafo tendremos la clase Grafo, que simplemente será una lista de nodos (debidamente enlazados entre sí). Un grafo se creará a partir de una lista de variables y una lista de aristas (tuplas (variable_origen, variable_destino)).

Completa el código para representar el siguiente grafo.

<img src="./img/grafo.png" alt="Grafo a representar" width="700"/>

In [None]:
class Grafo:
  def __init__(self, variables, aristas):
    self.nodos = [Nodo(var) for var in variables]

    # Diccionario auxiliar para poder referirnos a los nodos solo con el nombre
    # de la variable (sin tener que buscarlos en la lista)
    dic_nodos = {nodo.variable.nombre:nodo for nodo in self.nodos}
    for inicio,fin in aristas:
      #TODO: Representa la arista añadiendo padres/hijos a los nodos
      ...

  def imprime(self):
    for nodo in self.nodos:
      print(nodo)


var_alimentacion = Variable('alimentación',['mala','buena'])
var_peso = Variable('peso',['bajo','alto'])
var_bmi = Variable('BMI',['bajo','alto'])
#TODO: Declara el grafo pedido
grafo_est = ...

# Comprobaciones - Verifica que el grafo está bien representado
grafo_est.imprime()

## Distribuciones (condicionadas) de probabilidad
Para que el grafo constituya una red bayesiana, cada nodo debe ir acompañado de una **distribución de probabilidad** de la variable $X$ que representa (condicionada a las variables de sus nodos padres, si los hubiese):

$$P(X_i | padres(X_i))$$

Estas distribuciones de probabilidad contendrán parámetros que nos indicarán la probabilidad de que la variable $X$ tome un valor concreto dados los valores de cada uno de sus padres. Necesitaremos representar estas distribuciones de probabilidad, así que para ello declararemos la clase CPD. Esta clase almacenará:
1. La variable que representa
1. La lista de variables condicionantes
1. Los parámetros de la CPD. Los almacenaremos en una tabla que tendrá una fila por cada valor de $X$ y una fila por cada combinación de valores de las variables condicionantes (como en el siguiente ejemplo).

<img src="./img/cpd.png" alt="Ejemplo CPD" width="400"/>

Habrá dos maneras de inicializar los parámetros: o bien recibiendo un Numpy array de las dimensiones apropiadas, o bien inicializándolos todos a 0 (este será el comportamiento por defecto).

Por último, incluiremos un método que dado una fila con datos que contenga valores para las variables involucradas en esta CPD devuelva el valor de probabilidad correspondiente.

In [None]:
import numpy as np
import pandas as pd

class CPD:
  def __init__(self, var, vars_condicion, valores = None):
    self.variable = var
    self.variables_condicion = vars_condicion

    #TODO: Calcula las dimensiones de la matriz de parámetros
    dimensiones = ...
    if valores is not None and valores.shape==dimensiones:
      self.cpd = valores
    else:
      #TODO: Inicializa la matriz de parámetros
      self.cpd = ...
  
  def get_prob(self, data):
    #TODO: Devuelve el valor correcto (aprovecha los dos métodos auxiliares siguientes)
    return ...

  def get_indice_columna(self, data):
    '''
    Dado un elemento con valores para las variables involucradas
    devuelve el índice de la columna correspondiente en la matriz de parámetros
    '''
    return self.variable.codifica(data[self.variable.nombre])

  def get_indice_fila(self, data):
    '''
    Dado un elemento con valores para las variables involucradas
    devuelve el índice de la columna correspondiente en la matriz de parámetros
    '''
    parents_index = 0
    tam_bloque = 1
    for v in self.variables_condicion:
      parents_index += tam_bloque*v.codifica(data[v.nombre])
      tam_bloque *= len(v.valores)
    return parents_index

  def __str__(self):
    '''
    Método auxiliar para imprimir la CPD formateada
    '''
    df = pd.DataFrame(self.cpd, columns=self.variable.valores)
    cond = ''
    if len(self.variables_condicion)>0:
      cond = '|' + ','.join(map(lambda x:x.nombre, self.variables_condicion))
    df.insert(loc=0,column=f'P({self.variable.nombre}{cond})', value=self.nombra_filas())
    return df.to_string(index=False)

  def nombra_filas(self):
    '''
    Método auxiliar para nombrar las filas de la matriz de parámetros
    '''
    nombres = ['']
    for v in self.variables_condicion:
      nombres_original = nombres
      nombres = []
      for val in v.valores:
        nombres += [val+','+n for n in nombres_original]
    return nombres

# Comprobaciones
cpd_peso = CPD(var_peso, [var_estatura, var_alimentacion])
assert(cpd_peso.get_prob({'estatura':'baja', 'alimentación':'buena', 'peso':'bajo'})==0)
cpd_peso = CPD(var_peso, [var_estatura, var_alimentacion], np.array([[0.1,0.9],[0.2,0.8],[0.3,0.7],[0.4,0.6],[0.45,0.55],[0.47,0.53]]))
assert(cpd_peso.get_prob({'estatura':'baja', 'alimentación':'buena', 'peso':'bajo'})==0.4)

# Red bayesiana
Una vez tenemos representaciones para grafos y CPDs, podemos pasar a representar nuestro modelo de red bayesiana. Definiremos la clase BayesianNetwork que almacenará el grafo y las CPDs (estas últimas en un diccionario indizado por el nombre de las variables, para facilitar el acceso).

La red se creará a partir de un grafo e inicialmente creará CPDs con parámetros a 0 para cada variable.

In [None]:
class BayesianNetwork:
  def __init__(self, grafo):
    self.grafo = grafo
    self.cpds = {}
    for nodo in grafo.nodos:
      #TODO: Crea una CPD con las variables correctas (y los parámetros a 0) para nodo.variable
      ...

red_est = BayesianNetwork(grafo_est)

## Entrenamiento por Máxima Verosimilitud
Una de las maneras de aprender los parámetros de las CPDs asociadas a una red bayesiana es utilizando la **estimación de máxima verosimilitud**. Consiste en encontrar los parámetros de CPDs que maximicen la probabilidad asignada a todos los ejemplos contenidos en el conjunto de datos de entrenamiento:

$$P(X|θ) = \prod_i P(x^{(i)}|\theta)$$

La regla de la cadena y el desarrollo matemático de la fórmula nos indican que cada parámetro se podrá obtener haciendo un mero recuento.

Debemos adaptar nuestra clase CPD para que permita ir acumulando los recuentos durante la fase de entrenamiento y para que, una vez terminado el entrenamiento, normalice los valores de las tablas para que representen distribuciones de probabilidad.

Podemos aprovechar la naturaleza dinámica de Python para añadir estas nuevas funcionalidades sin necesidad de volver a ejecutar las celdas anteriores.

In [None]:
def cuenta(self, data):
  '''
  Método de la clase CPD
  Aumenta en 1 el parámetro correspondiente a los valores indicados en data
  '''
  # TODO: Completa el método (recuerda los métodos get_indice_fila y get_indice_columna)
  ...

def normaliza(self):
  '''
  Método de la clase CPD
  Normaliza la matriz de parámetros (que durante la fase de entrenamiento almacena
  recuentos) para dar lugar a distribuciones de probabilidad
  '''
  # TODO: Normaliza la matriz CPD. Cada fila debe sumar 1.
  self.cpd /= ...

# Enlazamos los métodos a la clase para darle la nueva funcionalidad
CPD.cuenta = cuenta
CPD.normaliza = normaliza

# Comprobaciones
datos_ejemplo = [{'peso':'bajo', 'alimentación':'buena'},\
                 {'peso':'bajo', 'alimentación':'mala'},\
                 {'peso':'alto', 'alimentación':'buena'},\
                 {'peso':'alto', 'alimentación':'mala'},\
                 {'peso':'alto', 'alimentación':'mala'},\
                 {'peso':'alto', 'alimentación':'mala'}]

cpd_peso = CPD(var_peso, [var_alimentacion])
for d in datos_ejemplo:
  cpd_peso.cuenta(d)
assert(cpd_peso.get_prob({'peso':'alto', 'alimentación':'buena'})==1)
assert(cpd_peso.get_prob({'peso':'alto', 'alimentación':'mala'})==3)
cpd_peso.normaliza()
print(cpd_peso)
assert(cpd_peso.get_prob({'peso':'alto', 'alimentación':'buena'})==0.5)
assert(cpd_peso.get_prob({'peso':'bajo', 'alimentación':'mala'})==0.25)

Con esta adaptación, ya podemos escribir el método para realizar el entrenamiento por máxima verosimilitud. El método recibirá un DataFrame de Pandas e iterará una sola vez por las filas haciendo los recuentos.

In [None]:
def fit(self, data):
  '''
  Método de BayesianNetwork
  Realiza entrenamiento MLE a partir de los datos recibidos como parámetro
  
  Params:
    data: DataFrame con ejemplos descritos con las variables abarcadas por la red
  '''
  for _, row in data.iterrows():
    for n in self.grafo.nodos:
      #TODO: Actualiza el recuento de la CPD correspondiente
      ...

  #TODO: Normaliza todas las CPDs
  ...

BayesianNetwork.fit=fit

# Comprobaciones
red_test = BayesianNetwork(Grafo([var_alimentacion, var_peso],[(var_alimentacion,var_peso)]))
red_test.fit(pd.DataFrame(datos_ejemplo))
assert(red_test.cpds['peso'].get_prob({'peso':'bajo', 'alimentación':'mala'})==0.25)

## Predicción MAP (Maximum a posteriori)
Una vez tenemos una red entrenada, podemos estimar la probabilidad de un ejemplo dado en el que todas sus variables tengan valor asignado ($P(x^{(k)}_0,x^{(k)}_1,...,x^{(k)}_d)$) utilizando la regla de la cadena. La probabilidad será el producto de las probabilidades asignadas en las distintas CPDs de la red.

$$P(\mathbf{x}^{(k)}) = \prod_i P(x^{(k)}_i | padres(x^{(k)}_i))$$

Añadiremos, además, la funcionalidad de hacer predicciones de una variable $Y$ a partir de la observación las demás ($P(Y | (X-Y))$). Para ello estimaremos la probabilidad del ejemplo con todos los valores posibles de $Y$ y devolveremos el valor de mayor probabilidad.

$$MAP(Y | (X-Y)=(\mathbf{x}-\mathbf{y})^{(k)})=argmax_{y}(P(Y=y | (X-Y)=(\mathbf{x}-\mathbf{y})^{(k)}))$$

In [None]:
def get_prob(self, row):
  '''
  Método de BayesianNetwork
  Dada una fila con datos, devuelve la probabilidad de dicha fila.
  La fila debe contener los padres de todas las variables que contenga.
  '''
  # TODO: Calcula la probabilidad utilizando la regla de la cadena
  p = 1
  ...
  return p

def predict_probs(self, row, target_variable):
  '''
  Método de Bayesian Network
  Dada una fila con datos, devuelve una lista con la probabilidad de cada valor
  posible de la target variable

  Returns:
    Lista de tuplas (probabilidad, valor)
  '''
  probs = []
  #Hacemos una copia sobre la que modificar el valor sin afectar a los datos originales
  values = row.copy()

  #TODO: Añade a probs las tuplas necesarias
  ...
  return probs

def predict(self, row, target_variable):
  '''
  Método de BayesianNetwork
  Dada una fila con datos, devuelve el valor más probable para la variable target
  '''
  probs = self.predict_probs(row, target_variable)
  #TODO: Devuelve el valor de máxima probabilidad
  ...


BayesianNetwork.get_prob=get_prob
BayesianNetwork.predict_probs=predict_probs
BayesianNetwork.predict=predict

# Comprobaciones
print(red_test.predict_probs({'peso':'alto'},var_alimentacion))
assert(red_test.predict({'peso':'alto'},var_alimentacion)=='mala')

# Experimentos

## Conjunto de datos
Para probar nuestro modelo descargaremos el un conjunto de datos *Nursery* del [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Nursery). Este conjunto representa candidaturas a una escuela de enfermería eslovena. Cada candidatura presenta una serie de características (categóricas) sociodemográficas y es valorada con una decisión que indica su prioridad a la hora de ser admitida.

In [None]:
from os import listdir
import os.path

PATH = 'lab11/'

if not os.path.exists(PATH):
    os.mkdir(PATH)
    !wget https://archive.ics.uci.edu/ml/machine-learning-databases/nursery/nursery.data -O lab11/nursery.data

nombres_variables = ['parents','has_nurs','form','children','housing','finance','social','health','decision']
NOMBRE_ETIQUETA = 'decision'

data = pd.read_csv(PATH + 'nursery.data', ',', names=nombres_variables)

# Creamos un objeto Variable para cada variable donde almacenamos todos sus
# posibles valores
variables = []
etiqueta = None
for v in nombres_variables:
  variables.append(Variable(v,list(data[v].unique())))
  if v==NOMBRE_ETIQUETA:
    etiqueta = Variable(v,list(data[v].unique()))

# Mostramos los datos para un primer examen superficial
print(data)

Hacemos una partición en train-test utilizando la librería *sklearn*.

In [None]:
from sklearn.model_selection import train_test_split
train_data, test_data = train_test_split(data, test_size=0.3, random_state=1235)

# Modelo Naïve-Bayes
La red bayesiana que vamos a probar será un modelo **Naïve Bayes**. En este modelo, se asume la independencia de todas las variables de entrada entre sí, mientras que se asume que la probabilidad de cada variable depende de la clase.

<img src="./img/naive-bayes.png" alt="Arquitectura del modelo Naive Bayes" width="700"/>

Crea la red bayesiana correspondiente y entrénala con *train_data*

In [None]:
# TODO: Crea el Grafo y la BayesianNetwork
red_naive = ...

# TODO: Entrena la red
...

### Verificación del modelo
Para comprobar la eficacia de nuestro modelo vamos a medir la precisión de clasificación y el F1-score de las predicciones sobre el conjunto de test. Para ello utilizaremos la librería sklearn.

In [None]:
from sklearn.metrics import accuracy_score, f1_score

# TODO: Obtén las predicciones sobre el conjunto de test
predictions = ...

print('Accuracy:',accuracy_score(test_data[etiqueta.nombre], predictions))
print('F1-score:',f1_score(test_data[etiqueta.nombre], predictions, average='macro'))

## Otros usos para el modelo
Una ventaja de las redes bayesianas es que se pueden utilizar para generar datos que sigan la distribución que representan. Vamos a incluir dicha funcionalidad en nuestra BayesianNetwork. Generar un nuevo ejemplo consistirá en asignar un valor aleatorio (respectando las probabilidades indicadas por la CPD de dicha variable) a cada variable. Se deberá comenzar por las variables que no están condicionadas y utilizar los valores asignados para generar los valores sucesivos.

In [None]:
def sample(self, values):
  '''
  Método de CPD
  Asigna un valor aleatorio a la variable respetando los parámetros de la CPD y
  los valores recibidos como parámetro
  '''
  indice_fila = self.get_indice_fila(values)
  r = np.random.random()
  for i,p in enumerate(self.cpd[indice_fila]):
    if r < p:
      return self.variable.valores[i]
    r -= p
  return self.variable.valores[-1]

CPD.sample=sample

def sample(self):
  '''
  Método de BayesianNetwork
  Crea un nuevo diccionario con un valor asignado a cada una de las variables que
  abarca la red
  '''
  sampled_values = {} # Vector generado

  # Elegimos la siguiente CPD no muestreada para la que tenemos valores de todas sus variables condicionantes
  while len(sampled_values)<len(self.cpds):
    next_cpd = None
    for nom_var,cpd in self.cpds.items():
      if nom_var not in sampled_values:
        cpd_valida = True
        for condicion in cpd.variables_condicion:
          if condicion.nombre not in sampled_values:
            cpd_valida = False
            continue
        if cpd_valida:
            next_cpd = cpd
    
    # Muestreamos la CPD
    sampled_values[next_cpd.variable.nombre] = next_cpd.sample(sampled_values)
  return sampled_values

def sample_dataframe(self, num_elems):
  '''
  Método de BayesianNetwork
  Crea un nuevo DataFrame de num_elems elementos respetando la distribución que
  representa la red
  '''
  elems = []
  for i in range(num_elems):
    elems.append(self.sample())
  return pd.DataFrame(elems)

BayesianNetwork.sample=sample
BayesianNetwork.sample_dataframe=sample_dataframe

# Comprobaciones
print(red_naive.sample_dataframe(10))

# Ejercicios
1. Crea la red de la imagen (utiliza los valores suministrados) y genera 10000 ejemplos en un DataFrame. Expórtalo a CSV y repite el experimento para comprobar qué tal funciona el modelo Naïve Bayes en ese problema.
1. Crea nuevamente la misma red, pero con los parámetros a 0. Entrénala con el CSV generado. ¿Esperas que los resultados de Accuracy y F1-Score? ¿Son tan buenos como esperabas?

<img src="./img/red-ejemplo.png" alt="Arquitectura del modelo Naive Bayes" width="700"/>

In [None]:
NOMBRE_ETIQUETA = 'carta'

variables_v = {'dificultad': ['fácil', 'difícil'],'inteligencia': ['baja','alta'],'calificación':['suspenso','aprobado','sobresaliente'],'ebau':['suspenso','aprobado'],'carta':['no','sí']}

cpd_dificultad = np.array([[0.6, 0.4]])
cpd_inteligencia = np.array([[0.7, 0.3]])
cpd_calificación = np.array([[0.3, 0.4, 0.3],[0.7,0.25,0.05],[0.02,0.08,0.9],[0.2,0.3,0.5]])
cpd_ebau = np.array([[0.95, 0.05],[0.2,0.8]])
cpd_carta = np.array([[0.99, 0.01],[0.4,0.6],[0.1,0.9]])