<img src="logo.png" alt="Logo Máster" align="left" height="62" width="300">

<br><br><br><br>

# Programación de NaiveBayes con Spark


### Luis de la Ossa


Escuela Superior de Ingeniería Informática de Albacete


*Universidad de Castilla-La Mancha*


---
El objetivo de este tutorial es hacer una toma de contacto con la programación en Spark que, como se ha visto en clase, es funcional, y requiere un cambio de planteamiento con respecto a cómo se programa en programación imperativa. En concreto, implementaremos la versión más básica del algoritmo de clasificación supervisada _Naive Bayes_ en el entorno _Spark_. 

---

Antes de empezar, hemos de comprobar que el contexto de Spark está activo.

In [1]:
sc

<pyspark.context.SparkContext at 0x7f451055ee48>

---
### Lectura de datos

En primer lugar, vamos a crear un _RDD_ denominado `dataRDD` con los datos. En primer lugar, hay que leer el archivo con `sc.textFile`. Esto genera un _RDD_ con líneas de texto. Sin embargo, cada línea contiene una serie de valores separados por comas, por lo que hay que procesarla y transformarla en una lista de valores mediante la función `split` del objeto `String`. 

__Nota__: Si se trabaja en modo local, hay que poner el camino absoluto a la localización del archivo.

In [1]:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# @hidden_cell
# This function is used to setup the access of Spark to your Object Storage. The definition contains your credentials.
# You might want to remove those credentials before you share your notebook.
def set_hadoop_config_with_credentials_74af1899931d4f94bb91ee5a4f761112(name):
    """This function sets the Hadoop configuration so it is possible to
    access data from Bluemix Object Storage using Spark"""

    prefix = 'fs.swift.service.' + name
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set(prefix + '.auth.url', 'https://identity.open.softlayer.com'+'/v3/auth/tokens')
    hconf.set(prefix + '.auth.endpoint.prefix', 'endpoints')
    hconf.set(prefix + '.tenant', 'e67e4544bd794bb7bf3a337a94365c28')
    hconf.set(prefix + '.username', '1f432d6a92284e7382af2539f883b401')
    hconf.set(prefix + '.password', 'iaYr2?EoV#1GUnM2')
    hconf.setInt(prefix + '.http.port', 8080)
    hconf.set(prefix + '.region', 'dallas')
    hconf.setBoolean(prefix + '.public', False)

# you can choose any name
name = 'keystone'
set_hadoop_config_with_credentials_74af1899931d4f94bb91ee5a4f761112(name)

# Please read the documentation of PySpark to learn more about the possibilities to load data files.
# PySpark documentation: https://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html#pyspark.sql.SQLContext
# The SQLContext object is already initalized for you.
# The following variable contains the path to your file on your Object Storage.
path_1 = "swift://PracticaSpark." + name + "/coches.txt"


In [2]:
dataRDD = sc.textFile(path_1).map(lambda linea: linea.split(','))

Ahora, muestra las cinco primeras líneas mediante la función `take`.

In [3]:
dataRDD.take(5)

[['ALQUILER', 'VELOCIDAD', 'CARRETERA', 'VEHICULO'],
 ['Barato', 'Normal', 'Nacional', 'Berlina'],
 ['Barato', 'Normal', 'Nacional', 'Berlina'],
 ['Barato', 'Normal', 'Nacional', 'Berlina'],
 ['Caro', 'Normal', 'Nacional', 'Berlina']]

Para facilitar la lectura del código posterior, vamos a extraer los nombres de las variables, y almacenarlos en la variable `features`. También se obtendrá el número de variables, que ha de ser almacenado en la variable `n_feat`, y se almacenará el nombre de la clase en la variable `class_feature`.

In [4]:
features = dataRDD.take(1)[0]
n_feat = len(features)
class_feature = features[-1]

Una vez leídos y almacenados los nombres de las variables, los eliminamos del RDD que contiene los datos. Para ello, utilizamos la función `filter`. También guardaremos el número de instancias de la base de datos en la variable `n_inst`.

In [5]:
# COMPLETAR

dataRDD = dataRDD.filter(lambda x: x!=features)
n_inst = dataRDD.count()
print(n_inst)
#16

16


---


<br>
### Naive Bayes: Aprendizaje del modelo

El clasificador Naive Bayes factoriza la distribución de probabilidad conjunta (DPC) $P(X_1,\dots,X_n,Y)$ como:

$$
P(x_1,\dots,x_n,y) \, = \,  \prod_{i=1}^{n}  P(x_i|y) \cdot P(y)
$$

Por tanto, el aprendizaje del modelo solamente consiste en estimar $P(Y)$ y $P(X_i|Y)$ para cada variable $P(X_i)$.  


---

#### Definición de las variables

En primer lugar, hemos de obtener los valores que puede tomar cada una de las variables, ya que serán necesarios posteriormente. Hemos de pensar que la base de datos está almacenada en un RDD, y podría tener millones de registros, y que obtener los valores de cada variable, a priori desconocidos, implica recorrer la base de datos. Para llevar a cabo este proceso, utilizamos la función `distinct` de Spark.  

La lista de valores que puede tomar cada variable, la almacenaremos a su vez en otra lista denominada `feat_values`, y el número de valores que puede tomar cada variable en la lista denominada `num_feat_values`.

In [16]:
# Crea la estructura para almacenar los valores de las variables
feat_values = { }

# Obtiene el número de valores distintos.
for var in range(n_feat):
    ############
    # COMPLETAR
    feat_values[features[var]] = dataRDD.map(lambda x: x[var]).distinct().collect()

# Extrae el número de valores.    
############
# COMPLETAR
num_feat_values = dict(map(lambda x: (x[0],len(x[1])), feat_values.items()))

# Los imprime
print(feat_values)
print(num_feat_values)

#{'CARRETERA': ['Nacional', 'Autovía'], 'VELOCIDAD': ['Normal', 'Alta'], 'ALQUILER': ['Caro', 'Barato'], 'VEHICULO': ['Berlina', 'Deportivo']}
#{'CARRETERA': 2, 'VELOCIDAD': 2, 'ALQUILER': 2, 'VEHICULO': 2}

{'CARRETERA': ['Nacional', 'Autovía'], 'VELOCIDAD': ['Normal', 'Alta'], 'ALQUILER': ['Caro', 'Barato'], 'VEHICULO': ['Berlina', 'Deportivo']}
{'CARRETERA': 2, 'VELOCIDAD': 2, 'ALQUILER': 2, 'VEHICULO': 2}


#### Probabilidad marginal de la clase. 

En esta parte, vamos a optener la probabilidad de la clase, $P(Y)$. Para cada valor de la clase $c_k \in Y$ la probabilidad (por máxima verosimilitud) se obtiene como:
$$
p(Y=c_k) = \frac{count(c_k)}{N}
$$

donde $count(c_k)$ es el número de instancias con la clase $c_k$, y $N$ el número de instancias.

En primer lugar contaremos el número de instancias de cada clase, y las almacenaremos en la variable `c_y`, que es un diccionario. En este proceso, hemos de tener en cuenta que los datos se encuentran en un RDD, y el tamaño de la base de datos, por lo que utilizaremos la función de Spark `countByValue`.

In [8]:
############
# COMPLETAR
c_y = dict(dataRDD.map(lambda x: x[-1]).countByValue())
c_y
# {'Berlina': 8, 'Deportivo': 8}

{'Berlina': 8, 'Deportivo': 8}

Una vez hecho el recuento, se pueden calcular las probabilidades. En lugar de máxima verosimilitud, utilizaremos la _corrección de Laplace_. Esto equivale (no nos escucha ningún estadístico) a añadir un ejemplo de cada clase. Por tanto, la probabilidad de que una instancia sea de la clase $c_k$ sería:

$$
p(Y=c_k) = \frac{count(Y=c_k)+1}{N+K}
$$

donde $N$ es el número de ejemplos y $K$ es el número de clases. En este caso, puesto que ya se dispone de los parámetros necesarios, y no hay que procesar la base de datos, no se utilizarán las primitivas de Spark.

In [17]:
############
# COMPLETAR
p_y = {key:(value+1.0)/(n_inst+num_feat_values[class_feature]) for key,value in c_y.items()} 
p_y
#{'Berlina': 0.5, 'Deportivo': 0.5}

{'Berlina': 0.5, 'Deportivo': 0.5}

<br>

---

#### Probabilidades condicionales.

A continuación, se van a obtener la probabilidad condicional de cada variable $X_i$ dada la clase, es decir $P(X_i|Y)$. La probabilidad de que $X_i=x_l$ cuando $Y=c_k$ se obtiene, por máxima verosimilitud, como:

<br>
$$
p(X_i=x_l|Y=c_k) = \frac{p(X_i=x_l,Y=c_k)}{p(Y=c_k)}=\frac{count(X_i=x_l,Y=c_k)}{count(Y=c_k)}
$$

<br>

La operación $count(X_i=x_l,Y=c_k)$ se tiene que ejecutar para cada combinación de valores de $X_i$ e $Y$, y para todas las variables $X_i$.  Con el fin de paralelizar esta operación, vamos a transformar cada instancia en una lista de tuplas (una por variable) que contengan el nombre de la variable, el valor, y el valor de la clase.  Por ejemplo, la instancia

    ['Barato', 'Normal', 'Nacional', 'Berlina']


se transformaría en


    [('ALQUILER', 'Barato', 'Berlina'), ('VELOCIDAD', 'Normal', 'Berlina'), ('CARRETERA', 'Nacional', 'Berlina')].

Definir una función, denominada `expand`, que lleve a cabo esta transformación.

In [10]:
def expand(instance):
    values = []
    for nvar,value in enumerate(instance[:-1]):
        values.append((features[nvar], value, instance[-1]))
    return values

print(expand(['Barato', 'Normal', 'Nacional', 'Berlina']))
#[('ALQUILER', 'Barato', 'Berlina'), ('VELOCIDAD', 'Normal', 'Berlina'), ('CARRETERA', 'Nacional', 'Berlina')]

[('ALQUILER', 'Barato', 'Berlina'), ('VELOCIDAD', 'Normal', 'Berlina'), ('CARRETERA', 'Nacional', 'Berlina')]


Transformar las instancias mediante `map` y  generar una lista unidimensional de tuplas mediante `flatMap`. Almacenar el resultado en una variable denominada `data_expRDD`.

In [12]:
############
# COMPLETAR
data_expRDD = dataRDD.map(expand).flatMap(lambda x: x)
data_expRDD.collect()[:3]

#[('ALQUILER', 'Barato', 'Berlina'),
# ('VELOCIDAD', 'Normal', 'Berlina'),
# ('CARRETERA', 'Nacional', 'Berlina')]

[('ALQUILER', 'Barato', 'Berlina'),
 ('VELOCIDAD', 'Normal', 'Berlina'),
 ('CARRETERA', 'Nacional', 'Berlina')]

A partir de la base de datos anterior, obtenemos el número de veces que aparece cada tupla ($X_i,x_l,c_k$), que corresponde a $count(X_i=x_l,Y=c_k)$, y lo almacenaremos en un RDD pareado denominado `pair_counts`. Para ello, hemos de usar `reduceByKey`. Almacenar el resultado posteriormente en un diccionario denominado `c_xy`.

__Nota__: Se podría proceder de manera distinta, pero así vemos cómo hacer el conteo con `reduceByKey`.

In [13]:
#c_xy = dict(data_exp.countByValue()) # No devuelve un RDD sino un diccionario directamente.

############
# COMPLETAR
pair_counts = data_expRDD.map(lambda x: (x,1)).reduceByKey(lambda v1,v2:v1+v2)
c_xy = pair_counts.collectAsMap()
c_xy

#{('ALQUILER', 'Barato', 'Berlina'): 6,
# ('ALQUILER', 'Barato', 'Deportivo'): 2,
#...}

{('ALQUILER', 'Barato', 'Berlina'): 6,
 ('ALQUILER', 'Barato', 'Deportivo'): 2,
 ('ALQUILER', 'Caro', 'Berlina'): 2,
 ('ALQUILER', 'Caro', 'Deportivo'): 6,
 ('CARRETERA', 'Autovía', 'Berlina'): 4,
 ('CARRETERA', 'Autovía', 'Deportivo'): 4,
 ('CARRETERA', 'Nacional', 'Berlina'): 4,
 ('CARRETERA', 'Nacional', 'Deportivo'): 4,
 ('VELOCIDAD', 'Alta', 'Berlina'): 4,
 ('VELOCIDAD', 'Alta', 'Deportivo'): 8,
 ('VELOCIDAD', 'Normal', 'Berlina'): 4}

Ahora podemos calcular las probabilidades condicionales. Igual que en el caso anterior, aplicamos la corrección de Laplace:

$$
p(X_i=x_l|Y=c_k) = \frac{count(x_l,c_k)+1}{count(c_k)+L}
$$

donde $L$ es el número de valores que puede tomar la variable $X_i$. Es decir, equivale a añadir un caso con cada etiqueta.

Obtener las probabilidades y almanacenarlas en un diccionario denominado `p_xy`. Para ello, utilizar la función `map` sobre `pair_counts`.

In [None]:
############
# COMPLETAR
#p_xy = pair_counts.map(lambda count: (count))
p_xy.collectAsMap()

#{('ALQUILER', 'Barato', 'Berlina'): 0.7,
# ('ALQUILER', 'Barato', 'Deportivo'): 0.3,
# ('ALQUILER', 'Caro', 'Berlina'): 0.3,
#...}

Una alternativa es hacerlo con Python normal, operando sobre el diccionario `c_xy`.

In [18]:
############
# COMPLETAR
p_xy = {key:(value+1)/(c_y[key[2]]+num_feat_values[key[0]]) for key,value in c_xy.items()}
p_xy

{('ALQUILER', 'Barato', 'Berlina'): 0.7,
 ('ALQUILER', 'Barato', 'Deportivo'): 0.3,
 ('ALQUILER', 'Caro', 'Berlina'): 0.3,
 ('ALQUILER', 'Caro', 'Deportivo'): 0.7,
 ('CARRETERA', 'Autovía', 'Berlina'): 0.5,
 ('CARRETERA', 'Autovía', 'Deportivo'): 0.5,
 ('CARRETERA', 'Nacional', 'Berlina'): 0.5,
 ('CARRETERA', 'Nacional', 'Deportivo'): 0.5,
 ('VELOCIDAD', 'Alta', 'Berlina'): 0.5,
 ('VELOCIDAD', 'Alta', 'Deportivo'): 0.9,
 ('VELOCIDAD', 'Normal', 'Berlina'): 0.5}

---

<br>
### Naive Bayes: Clasificación


Una vez se dispone del modelo (tabla con las probabilidades marginales y condicionales), vamos a utilizarlo para clasificar nuevos casos. Por ejemplo:

    ALQUILER = Barato, CARRETERA = Alta, VELOCIDAD = Nacional

In [15]:
import numpy as np
instance = ['Barato', 'Alta', 'Nacional']

Dada una entrada, Naive Bayes predice la clase $y$ tal que:

$$
c_\theta(x) = \underset{y \in \{c_1, \ldots, c_k\}}{argmax} P(x_1,\dots,x_n,y) \, = \underset{y \in \{c_1, \ldots, c_k\}}{argmax} \,  \prod_{i=1}^{n}  P(x_i|y) \cdot P(y)
$$

Por tanto, hay que calcular, para esta instancia en concreto:

$$
P(Barato, Alta, Nacional, c_k)=P(Barato|c_k)\cdot P(Alta|c_k) \cdot P(Nacional|c_k) \cdot P(c_k)
$$

para cada clase $c_k$, y devolver el valor de $c_k$ qu eproduzca un mayor valor.

Implementar una función denominada `calc_probs` que reciba como parámetro una instancia y devuelva la probabilidad calculada para cada clase.

__Nota__: Esta función no hace uso de Spark.

In [22]:
def calc_probs(instance):
    probs = {}
    # Para cada clase
    for cl in feat_values[class_feature]:
    ############
    # COMPLETAR
        probs[cl] = np.prod(list(map(lambda x: p_xy[(features[x[0]],x[1],cl)], 
                                 enumerate(instance))))*p_y[cl]
    return probs

print(calc_probs(instance))
#{'Berlina': 0.087499999999999994, 'Deportivo': 0.067500000000000004}

{'Berlina': 0.087499999999999994, 'Deportivo': 0.067500000000000004}


El caso anterior, vemos que se clasificaría como 'Berlina', ya que es la clase para la que la distribución de probabilidad conjunta es mayor. Sin embargo, si queremos calcular la probabilidad asociada a cada etiqueta, hemos de normalizar. Es decir, dividir cada probabilidad por la suma de las probabilidades.

$$
P(y|\mathbf{x}) = \frac{\prod_{i=0}^n P(x_i|y)\cdot P(y)}{\sum_{y}\prod_{i=0}^n P(x_i|y)\cdot P(y)}
$$

Implementar una función denominada `classify`, que reciba una instancia y devuelva una tupla con la clase predicha y su probabilidad.

__Nota__: Esta función llama a la anterior, y tampoco hace uso de Spark.

In [28]:
def classify(instance):
    sum_probs = 0
    probs = calc_probs(instance)
    sum_probs = np.sum([prob for prob in probs.values()])
    ############
    # COMPLETAR
    return max([(key,value/sum_probs) for key,value in probs.items()], key=lambda x:x[1])
    
classify(instance)

# ('Berlina', 0.56451612903225801)

('Berlina', 0.56451612903225801)

Clasificar esta nueva instancia (falla).

In [None]:
instance = ['Caro', 'Normal', 'Nacional']
#print(classify(instance))

La instancia anterior falla porque no existe la probabilidad asociada y, por tanto, no se pueden calcular las probabilildades asociadas a esa instancia, ya que `p_xy[('VELOCIDAD','Normal','Deportivo']` devuelve error. Para solventarlo, se puede reescribir la función `calc_probs`, de modo que acceda a las probabilidades con el método `get`, que permite devolver un valor por defecto en caso de error. 

El valor es, al hacer corrección de laplace debe ser 
$$
p(X_i=x_l|Y=c_k) = \frac{count(x_l,c_k)+1}{count(c_k)+L} =\frac{1}{count(c_k)+L}
$$

Ya que $count(x_l,c_k)=0$. Reescribir la función `calc_probs`.

In [29]:
# Por claridad, reescribimos la función de consulta a p_xy
def prob(var,val,cl):
    return p_xy.get((var,val,cl), 1.0/(c_y[cl]+num_feat_values[var]))

# Reescribimos calc_probs
def calc_probs(instance):
    probs = {}
    # Para cada clase
    for cl in feat_values[class_feature]:
    ############
    # COMPLETAR
        probs[cl] = np.prod(list(map(lambda x: prob(features[x[0]],x[1],cl),
                                     enumerate(instance))))*p_y[cl]
    return probs

instance = ['Barato', 'Alta', 'Nacional']
print(classify(instance))
instance = ['Caro', 'Normal', 'Nacional']
print(classify(instance))


('Berlina', 0.56451612903225801)
('Berlina', 0.68181818181818188)
