<a href="https://colab.research.google.com/github/Hiver77/Big-Data/blob/master/09%20-%20MACHINE%20LEARNING.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!wget --no-cache -O init.py -q https://raw.githubusercontent.com/UDEA-Esp-Analitica-y-Ciencia-de-Datos/EACD-03-BIGDATA/master/init.py
import init; init.init(force_download=False); 
from IPython.display import Image #hola

In [None]:
Image("local/imgs/udea-datascience.png")

In [None]:
#Instalación
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar xf spark-3.0.1-bin-hadoop3.2.tgz 
!pip install -q findspark

#Variables de Entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

#SparkContext
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = SparkContext.getOrCreate()

In [None]:
#SQL Context
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.types import *
sqlCtx = SQLContext(sc)

#**MACHINE LEARNING**

El Machine Learning o aprendizaje automático es una disciplina orientada a crear sistemas que puedan aprender por sí solos, con el fin de extraer información no trivial de grandes volúmenes de datos por medio de la identificación de patrones complejos.

Spark implementa el aprendizaje automático a través del módulo MLLib que cuenta con un gran número de algoritmos que permiten crear modelos para el aprendizaje automático. Teniendo en cuenta que Spark proporciona sistemas distribuidos para trabajar en paralelo, los algoritmos de Machine Learning implementados en MLLib deben poder ser paralelizables

Pueden identificarse dos grandes ramas en el aprendizaje automático, a saber, el aprendizaje supervisado y el aprendizaje NO supervisado.



##**Preparación de Datos**

Para poder aplicar los métodos del aprendizaje supervisado y no supervisado, es necesario realizar una etapa previa de preparación de datos.

Esta etapa nos permite identificar y corregir posibles anomalías presentes en los datos y contribuir a la etapa de aprendizaje para obtener un buen resultado en la evaluación de los modelos.

###**Pasos**

Para la preparación de datos se recomienda realizar los siguientes pasos

**1. Identificación de variables**

* Propiedades del conjunto de datos
* Tipos de datos (verificar carga correcta de datos)


**2. Tratamiento de duplicados**

* Eliminar variables duplicadas (columnas)
* Eliminar registros duplicados (filas)
* Eliminar variables irrelevantes (ID, cedula, nombre, teléfono)


**3. Análisis univariable**

*  Variables numéricas: estadística descriptiva, histogramas, box plot
*  Variables categóricas: tabla de frecuencias y diagrama de barras


**4. Análisis bivariable**

* Correlaciones entre las variables predictoras deben ser menores a 0.7
* Correlaciones con la variable objetivo debe ser mayor a 0.3


**5. Tratamiento de outliers** (eliminar registros, eliminar variables, imputar, predecir)


**6. Tratamiento de datos nulos** (eliminar registros, eliminar variables, imputar o predecir)


**7. Transformación de variables desde las reglas del negocio**

* Discretización o Binning: convertir de número a categoría
* Crear variables Dummy: convertir de categoría a número


**8. Creación de variables** (fecha y otros)


**9. Reducción de variables** (en caso de ser necesario)


**10. Balanceo de la variable objetivo** (sólo en clasificación)


**11. Transformación datos para el método**

###**Modo Local**

Veamos un ejemplo de cómo realizar estos pasos de la preparación de datos en un ambiente local utilizando pandas (no es un ambiente distribuido)

In [None]:
import pandas as pd 
import numpy as np
import matplotlib.pyplot as plt

En el archivo aprobacion_curso.xlsx se encuentran los datos de estudiantes de un curso de universidad, para cada estudiante se tienen los siguientes datos:
* Id: Identificación del estudiante
* Año-Semestre: Momento en el que toma el curso
* Felder: Estilo de aprendizaje del estudiante
* Examen de admisión
* Nota final
* Aprobó: (Si/No)

**1.	Identificación de variables**
* Propiedades del conjunto de datos
* Tipos de datos (verificar carga correcta de datos)

In [None]:
data = pd.read_excel("local/data/aprobacion_curso.xlsx",sheet_name=0) #leer la primera hoja del archivo
type(data)

In [None]:
data.head()

In [None]:
data.info()

Las variables tipo Object las convertimos a categorías

In [None]:
data['Año - Semestre']=data['Año - Semestre'].astype('category')
data['Felder']=data['Felder'].astype('category')
data['Aprobo']=data['Aprobo'].astype('category')
data.info()

**2.	Tratamiento de duplicados**

* Eliminar variables duplicadas (columnas)
* Eliminar registros duplicados (filas)
* Eliminar variables irrelevantes (ID, cedula, nombre, teléfono)


**Variables Duplicadas:** Las variables Nota Final y Aprobó están duplicadas (representan la misma información), a partir de la nota final podemos saber si un estudiante aprueba o no el curso

In [None]:
data = data.drop('Nota_final',axis=1)

**Registros duplicados:** Dos registros que tengan el mismo Id están duplicados

In [None]:
data=data.drop_duplicates(['ID']) 

**Variables Irrelevantes**: Detectamos dos variables irrelevantes: Id y Año-Semestre

In [None]:
data = data.drop('ID',axis=1)
data = data.drop('Año - Semestre',axis=1)

Así quedan los datos luego de este paso

In [None]:
data.head()

**3.	Análisis univariable**

* Variables numéricas: estadística descriptiva, histogramas, box plot
* Variables categóricas: tabla de frecuencias y diagrama de barras


Variables numéricas

In [None]:
data.describe() #Solo para datos numéricos

In [None]:
data.plot.box()

Variables categóricas

In [None]:
pd.value_counts(data["Felder"])

In [None]:
pd.value_counts(data["Aprobo"])

In [None]:
plt.subplot(1,2,1)
pd.value_counts(data["Felder"]).plot(kind="bar")
plt.subplot(1,2,2)
pd.value_counts(data["Aprobo"]).plot(kind="bar")

**4.	Análisis bivariable**

* Correlaciones entre las variables predictoras deben ser menores a 0.7
* Correlaciones con la variable objetivo debe ser mayor a 0.3



In [None]:
data.corr()

La correlación se aplica sobre variables numéricas, debemos convertir las variables categóricas a numéricas

In [None]:
data_num=data #Hacemos una copia de los datos

dummiesFelder = pd.get_dummies(data['Felder'])  #Creamos variables dummy para convertir  las categorías a números
data_num = data_num.drop('Felder', axis=1)      #Elimino la columna original
data_num = data_num.join(dummiesFelder)         #Adiciono las dummies

data_num.head()

In [None]:
#Se codifican las categorias de la variable objetivo
data_num["Aprobo"]=data_num["Aprobo"].replace({"si": 1, "no": 0})
data_num.head()

Ahora nuestras variables son numéricas, ya podemos aplicar la correlación
* Correlación entre las variables predictoras debe ser menor de 0.7
* Correlación con la variable objetivo debe ser mayor a 0.3

In [None]:
data_num.corr()

**5.	Tratamiento de outliers**
(eliminar registros, eliminar variables, imputar, predecir)



In [None]:
data.describe()


Si un estudiante perdió el examen de admisión (nota<3) no debería estar en el curso

In [None]:
data[data["Examen_admisión"]<3] #Consultamos los registros de los outliers
data1=data

In [None]:
#Imputación por la media
media=data["Examen_admisión"].mean()
data.Examen_admisión[data["Examen_admisión"]<3]=media
data.describe()

**6.	Tratamiento de datos nulos** 
(eliminar registros, eliminar variables, imputar o predecir)



In [None]:
data.info()

Hay dos registros con examen de admisión en null

In [None]:
#Imputación por la media
data['Examen_admisión']=data['Examen_admisión'].fillna(value=data['Examen_admisión'].mean())
data.info()


**7.	Transformación de variables desde las reglas del negocio** 

* Discretización o Binning: convertir de número a categoría
* Crear variables Dummy: convertir de categoría a número


El examen de admisión que es una variable numérica, podríamos discretizarla para volverla categórica

In [None]:
pd.cut(data["Examen_admisión"],3)

El estilo de aprendizaje que es una variable categórica, la podemos volver numérica

In [None]:
pd.get_dummies(data['Felder']).head()

**8.	Creación de variables** 
(fecha y otros)



Como ejemplo podemos tomar una variable date time y extraer sus componentes

In [None]:
from datetime import datetime

ahora = datetime.now()  # Obtiene fecha y hora actual
print("Fecha y Hora:", ahora)  # Muestra fecha y hora
print("Fecha y Hora UTC:",ahora.utcnow())  # Muestra fecha/hora UTC
print("Día:",ahora.day)  # Muestra día
print("Mes:",ahora.month)  # Muestra mes
print("Año:",ahora.year)  # Muestra año
print("Hora:", ahora.hour)  # Muestra hora
print("Minutos:",ahora.minute)  # Muestra minuto
print("Segundos:", ahora.second)  # Muestra segundo

**9.	Reducción de variables** 
(en caso de ser necesario)



Temporalmente eliminamos la variable objetivo para analizar solo las variables predictoras que son en las que debemos aplicar la reducción de variables

In [None]:
X=data_num.drop('Aprobo', axis=1)
#Eliminamos los nulos en la variable Examen admision
X['Examen_admisión']=X['Examen_admisión'].fillna(value=X['Examen_admisión'].mean())
X.head()

PCA o análisis de componentes principales nos permite convertir nuestras variables originales en un nuevo y reducido grupo de variables que representen a los datos originales con una menor dimensión

In [None]:
#Seleccionar la cantidad de componentes
from sklearn.decomposition import PCA
pca = PCA().fit(X)
plt.plot(np.cumsum(pca.explained_variance_ratio_))
plt.xlabel('number of components')
plt.ylabel('cumulative explained variance');

In [None]:
#Aplicamos PCA para reducción de variables
pca = PCA(n_components=5)
principalComponents = pca.fit(X)
print("Componentes principales")
print(pca.components_)
print("Varianza acumulada")
varianza = pca.explained_variance_ratio_
var_acum= np.cumsum(varianza)
print(var_acum)


**10.	Balanceo de la variable objetivo**
(sólo en clasificación) 

In [None]:
pd.value_counts(data["Aprobo"]).plot(kind="bar")

La variable objetivo (Aprobó) no cuenta con igual número de registros por categoría

In [None]:
from imblearn.over_sampling import SMOTENC

sm = SMOTENC(random_state=42, categorical_features=[0]) #sm = SMOTE(random_state=42) para datos sólo numéricos
X_bal, Y_bal = sm.fit_resample(data[['Felder', 'Examen_admisión']], data["Aprobo"])
# Creamos un dataframe con los resultados
data_bal = pd.DataFrame({"Felder": X_bal[:,0], "Examen_admisión":X_bal[:,1], "Aprobo": Y_bal})
# Veamos si ahora si está balanceado
pd.value_counts(data_bal["Aprobo"]).plot(kind="bar")

In [None]:
#Guardamos los dataFrame preparados
data.to_excel('local/data/aprobacion_curso_pre.xlsx')
data_bal.to_excel('local/data/aprobacion_curso_bal.xlsx',sheet_name='Data_bal')

###**Modo Distribuido (Spark)**

Ahora veamos que nos ofrece Spark para realizar la preparación de datos en modo distribuido. Nos apoyaremos en los métodos de DataFrame y en la librería de Machine Learning

https://spark.apache.org/docs/latest/api/python/pyspark.ml.html

https://spark.apache.org/docs/latest/ml-guide.html

In [None]:
data = pd.read_excel("local/data/aprobacion_curso.xlsx",sheet_name=0)
estDF=sqlCtx.createDataFrame(data)
type(estDF)

**Identificación de variables**
* Propiedades del conjunto de datos
* Tipos de datos (verificar carga correcta de datos)

In [None]:
estDF.printSchema()
estDF.show(5)

En pandas nos muestra información sobre los registros que tienen valores null, DataFrames no nos muestra esa información y a demás las variables categóricas las trabajamos como String

**Tratamiento de duplicados**

* Eliminar variables duplicadas (columnas)
* Eliminar registros duplicados (filas)
* Eliminar variables irrelevantes (ID, cedula, nombre, teléfono)


In [None]:
estDF=estDF.dropDuplicates(['ID'])
estDF=estDF.drop('ID','Año - Semestre','Nota_final')
estDF.printSchema()

**Análisis univariable**

* Variables numéricas: estadística descriptiva, histogramas, box plot
* Variables categóricas: tabla de frecuencias y diagrama de barras


Variables numéricas

In [None]:
estDF.select('Examen_admisión').describe().show()
estDF.select('Examen_admisión').distinct().show()

Encontró problemas con la variable examen de admisión, no puede calcular la estadística descriptiva de la variable hasta no resolver esos problemas

In [None]:
estDF=estDF.fillna(3.0,'Examen_admisión')
estDF.select('Examen_admisión').describe().show()

Variables categóricas

In [None]:
estDF.groupBy('Felder').count().show()
estDF.groupBy('Aprobo').count().show()


**Análisis bivariable**

* Correlaciones entre las variables predictoras deben ser menores a 0.7
* Correlaciones con la variable objetivo debe ser mayor a 0.3



La correlación se aplica sobre variables numéricas, debemos convertir las variables categóricas a numéricas

In [None]:
estNum=estDF
estNum.select('Felder').distinct().show()

In [None]:
from pyspark.sql.types import *
estNum=estNum.\
  withColumn('equilibrio',(estNum.Felder=='equilibrio').cast(IntegerType())).\
  withColumn('activo',(estNum.Felder=='activo').cast(IntegerType())).\
  withColumn('visual',(estNum.Felder=='visual').cast(IntegerType())).\
  withColumn('reflexivo',(estNum.Felder=='reflexivo').cast(IntegerType())).\
  withColumn('sensorial',(estNum.Felder=='sensorial').cast(IntegerType())).\
  withColumn('intuitivo',(estNum.Felder=='intuitivo').cast(IntegerType())).\
  withColumn('secuencial',(estNum.Felder=='secuencial').cast(IntegerType())).\
  withColumn('verbal',(estNum.Felder=='verbal').cast(IntegerType())).\
  withColumn('AproboInt',(estNum.Aprobo=='si').cast(IntegerType()))
estNum=estNum.drop('Felder','Aprobo')

estNum.printSchema()
estNum.show(5)

Ahora nuestras variables son numéricas, ya podemos aplicar la correlación
* Correlación entre las variables predictoras debe ser menor de 0.7
* Correlación con la variable objetivo debe ser mayor a 0.3

Tenemos el método corr que calcula la correlación entre dos variables

In [None]:
estNum.corr('equilibrio','activo')

La librería ml.stat cuenta con un método para calcular la correlación pero requiere definir los datos dentro de un vector

In [None]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler


vector_col = "corr_features"
assembler = VectorAssembler(inputCols=estNum.columns, outputCol=vector_col)
df_vector = assembler.transform(estNum).select(vector_col)

matrix = Correlation.corr(df_vector, vector_col)

matrix.collect()[0]["pearson({})".format(vector_col)].values

**Tratamiento de outliers y de nulos**
(eliminar registros, eliminar variables, imputar, predecir)



Si un estudiante perdió el examen de admisión (nota<3) no debería estar en el curso

In [None]:
prom=estNum.agg(functions.avg(estNum.Examen_admisión)).take(1)[0][0]
prom


In [None]:
estNum = estNum.withColumn('Examen_admisión', functions.when(estNum.Examen_admisión<3,prom)\
                                                 .when(estNum.Examen_admisión>=3, estNum.Examen_admisión))
estNum.describe().show()

**Transformación de variables desde las reglas del negocio** 

* Discretización o Binning: convertir de número a categoría
* Crear variables Dummy: convertir de categoría a número


El examen de admisión que es una variable numérica, podríamos discretizarla para volverla categórica

In [None]:
from pyspark.ml.feature import QuantileDiscretizer

discretizer = QuantileDiscretizer(numBuckets=3, inputCol="Examen_admisión", outputCol="ExamenCat")

result = discretizer.fit(estNum).transform(estNum)
result.show()

La variable Aprobó es categórica, podemos convertirla a númerica

In [None]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Aprobo", outputCol="AproboNum")
indexer.fit(estDF).transform(estDF).show()

**Reducción de variables** 
(en caso de ser necesario)



In [None]:
estNum.columns

Creamos un vector que contiene las variables predictoras (eliminamos la variable objetivo)

In [None]:
# convert to vector column first
vector_col = "features"
assembler = VectorAssembler(inputCols=estNum.drop('AproboInt').columns, outputCol=vector_col)
vest = assembler.transform(estNum)
df_vector = vest.select(vector_col)

In [None]:
from pyspark.ml.feature import PCA
from pyspark.mllib.linalg import Vectors

pca = PCA(k=3, inputCol=vector_col, outputCol="pcaFeatures")
model = pca.fit(df_vector)
result = model.transform(df_vector).select("pcaFeatures")
result.show(truncate=False)

##**Aprendizaje Supervisado**

El aprendizaje supervisado utiliza un conjunto histórico de datos donde se tienen los registros previamente catalogados, para crear un modelo de predicción. Este modelo de predicción aprende de los datos históricos hasta obtener la capacidad de predecir lo que pasará con nuevos conjuntos de datos.

Se caracterizan por disponer de una variable objetivo o variable de clase, que es justamente lo que se quiere predecir. Esta variable objetivo puede ser numérica (regresión) o categórica (clasificación)

El **ciclo de vida** del aprendizaje supervisado consta de tres fases

* **Modelamiento:** Consiste en construir el modelo que permita predecir la variable objetivo
* **Evaluación:** Se evalúa el modelo predictivo construido para ver que tanto podemos confiar en el
* **Validación:** Una vez el modelo ha sido evaluado y el resultado es el esperado, se someten datos nuevos al modelo para realizar la predicción

La etapa de modelamiento y evaluación requieren del conjunto histórico de datos. Esto significa que debemos decidir la forma como se utilizarán los datos históricos en estas dos etapas. Para esto disponemos de tres opciones:

* **Evaluar el mismo conjunto de entrenamiento**
* **División de datos (Split / 70-30)**
* **Validación cruzada**

###**Predicción continua o Regresión**

Es el estudio de un conjunto de datos históricos con el fin de predecir un evento numérico futuro.

Es decir que la variable objetivo a predecir es de tipo numérica.

**Evaluación**

Como la variable a predecir es numérica el error de predicción se calcula al comparar el valor real contra el valor de la predicción y esto se hace con una fórmula matemática de cálculo de error. Existen varias formas de calcular ese error:

* Error medio absoluto:

$$error = \frac{\sum_ {i=1}^n  |f(x)-p(x)|}{n}$$

* Error cuadrático medio:
$$error = \frac{1}{n}\sum(f(x)-p(x))^2$$


Donde, 
* $f(x)$: Valor real
* $p(x)$: predicción

####**Regresión lineal**

Permite evaluar la relación entre una variable dependiente (variable a predecir) y un conjunto de variables independientes (variables predictoras)


$$Y = \alpha + \beta_1 X_1  +  \beta_2 X_2 + ... + \beta_n X_n + \epsilon$$

Donde,
* $Y$: Variable a predecir
* $X$: Variables predictoras (atributos)
* $\alpha$: Intercepto
* $\beta$: Pendiente
* $\epsilon$: Error

Básicamente lo que se hace es encontrar la ecuación de la recta que mejor represente al conjunto de datos, de modo que se pueda utilizar dicha ecuación para la predicción de nuevos registros




**Ejemplo: Predicción del precio de venta de una propiedad en Boston**


Tomado de: https://www.kaggle.com/kyasar/boston-housing

**1. Preparar**

Carga de datos

In [None]:
boston = sqlCtx.read.format("csv") \
      .option("header", True) \
      .option("delimiter", ",") \
      .option("inferschema",True) \
      .load("local/data/boston_housing.csv")
boston.cache()
boston.printSchema()

Esta es la descripción de las variables disponibles:

* crim : Tasa de delincuencia per cápita por población
* zn:  Proporción de terreno residencial dividido en zonas para lotes de más de 25,000 pies cuadrados.
* indus : Proporción de terrenos comerciales no minoristas por población.
* chas: Indica si la propiedad limita o no con el río (1 si limita con el río. 0 en caso contrario).
* nox: Concentración de óxidos nítricos (partes por 10 millones).
* rm : Promedio de habitaciones por vivienda.
* age : Proporción de unidades ocupadas por sus propietarios construidas antes de 1940
* dis : Distancias ponderadas a cinco centros de empleo de Boston
* rad: índice de accesibilidad a carreteras principales.
* tax : tasa de impuesto a la propiedad.
* ptratio: Proporción alumno-profesor.
* black: Proporción de negros por ciudad.
* lstat: Estatus más bajo de la población (porcentaje).
* **mv : valor medio de las viviendas en $ 1000. Esta es la variable objetivo.**

Descripción estadística de los datos

In [None]:
boston.describe().show()

In [None]:
boston.describe().toPandas().transpose()

Análisis de correlación

In [None]:
import six
print("Correlación con la variable objetivo")
for i in boston.columns:
    if not( isinstance(boston.select(i).take(1)[0][0], six.string_types)):
        print( "", i, boston.stat.corr('medv',i))

Si dispone de muchas variables, se recomienda eliminar aquellas que presentan una correlación baja con la variable objetivo (<0.3)

In [None]:
boston=boston.drop("chas","dis")
boston.columns

Para aplicar la técnica de Machine Learning se requiere representar los datos mediante dos columnas, una con las variables predictoras (features) y la otra es la variable objetivo

In [None]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = \
    ['crim', 'zn', 'indus', 'nox', 'rm', 'age', 'rad', 'tax', 'ptratio', 'black', 'lstat'], outputCol = 'features')
vboston = vectorAssembler.transform(boston)
vboston.show(1)

Nos quedamos solo con las variables features y medv

In [None]:
vboston = vboston.select(['features', 'medv'])
vboston.show(3)

Ahora veamos la correlación entre las variables predictoras

In [None]:
matrix = Correlation.corr(vboston,"features")
matrix.collect()[0]["pearson({})".format("features")].values

In [None]:
print("Correlación entre tax y indus: ",boston.corr("tax","indus"))
print("Correlación entre nox y indus: ",boston.corr("nox","indus"))
print("Correlación entre nox y age: ",  boston.corr("nox","age"))
print("Correlación entre tax y rad: ",  boston.corr("tax","rad"))


Eliminemos las variables correlacionadas y creemos nuevamente el vector de características

In [None]:
boston=boston.drop("tax","nox")
vectorAssembler = VectorAssembler(inputCols = \
    ['crim', 'zn', 'indus', 'rm', 'age', 'rad', 'ptratio', 'black', 'lstat'], outputCol = 'features')
vboston = vectorAssembler.transform(boston)
vboston = vboston.select(['features', 'medv'])
vboston.show(3)

Creamos los conjuntos de datos de entrenamiento y evaluación (Split)

In [None]:
(trainingData, testData) = vboston.randomSplit([0.7, 0.3])
print(trainingData.count())
print(testData.count())

**2. Modelar**

In [None]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='medv', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(trainingData)
trainingSummary = lr_model.summary

Veamos la predicción con los datos de entrenamiento

In [None]:
trainingSummary.predictions.select("prediction","medv").show(5)

El método de regresión lineal construye la ecuación de la recta que mas se ajuste a los datos.

In [None]:
print("Coeficientes: " + str(lr_model.coefficients))
print("Intercepto: " + str(lr_model.intercept))
print("Variables:")
boston.columns

Veamos las medidas de error

In [None]:
print("Error cuadrático medio (MSE): %f" % trainingSummary.meanSquaredError)
print("Error medio absoluto (MAE): %f" % trainingSummary.meanAbsoluteError)
print("Raíz del Error cuadrático medio (RMSE): %f" % trainingSummary.rootMeanSquaredError)
print("Coeficiente de determinación (r2): %f" % trainingSummary.r2)
print("Coeficiente de determinación ajustado (r2Adj): %f" % trainingSummary.r2adj)


**3. Evaluar**

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

lr_predictions = lr_model.transform(testData)
lr_predictions.select("prediction","medv").show(5)

lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="medv",metricName="r2")
print("Coeficiente de determinación con los datos de evaluación = %g" % lr_evaluator.evaluate(lr_predictions))

In [None]:
test_result = lr_model.evaluate(testData)
print("Error cuadrático medio con los datos de evaluación = %g" % test_result.meanSquaredError)
print("Raiz del Error cuadrático medio con los datos de evaluación = %g" % test_result.rootMeanSquaredError)
print("Error medio absoluto con los datos de evaluación = %g" % test_result.meanAbsoluteError)
print("Coeficiente de determinación con los datos de evaluación = %g" % test_result.r2)
print("Coeficiente de determinación ajustado con los datos de evaluación = %g" % test_result.r2adj)

####**Árboles de decisión para regresión**

El método de árboles de decisión está disponible tanto para predecir números (árboles de regresión) como para predecir categorías (árboles de clasificación).

Su funcionamiento se basa en representar el conjunto histórico de datos a través de un árbol en el que tendremos dos componentes:
* Nodos: presentan una pregunta sobre algunos de los atributos y se genera una bifurcación a partir de la respuesta
* Hojas: Están al final de cada rama y son los valores de la predicción


Cuando se somete un registro al árbol de decisión, este recorre el árbol a través de las respuestas generadas en cada nodo, y al final llegará a una hoja que indica la predicción.

En el caso de los árboles de regresión, la variable a predecir es numérica, es decir que el resultado final de la predicción será un número 

**Ejemplo: Predicción del precio de venta de una propiedad en Boston**

**Modelar**

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'medv')
dt_model = dt.fit(trainingData)

Veamos la predicción con los datos de entrenamiento

In [None]:
dt_model.transform(trainingData).select("prediction","medv").show(5)

**Evaluar**

In [None]:
dt_predictions = dt_model.transform(testData)
dt_predictions.select("prediction","medv").show(5)

dt_evaluator = RegressionEvaluator(
    labelCol="medv", predictionCol="prediction", metricName="r2")
print("Coeficiente de determinación con los datos de evaluación = %g" % dt_evaluator.evaluate(dt_predictions))

Veamos la importancia de las vaiables predictoras

In [None]:
dt_model.featureImportances

In [None]:
boston.columns

###**Predicción discreta o Clasificación**

La clasificación permite predecir un evento numérico discreto, es decir que lo que pedimos ya no es un número sino una categoría, una cualidad.

Se utiliza el conjunto histórico de datos donde se cuenta con una variable objetivo que es de tipo categórica. Con ese histórico de datos, se crea un modelo que aprenderá de los datos y permitirá predecir datos futuros

**Evaluación**

Teniendo en cuenta que la variable objetivo es categórica, contamos con la clase real a la que pertenece cada uno de los registros que hacen parte del histórico de datos.

Por otro lado, el modelo arrojará como resultado una predicción sobre la clase a la que debe pertenecer cada registro. Usando la clase real y la predicción se construyen unas medidas de error que permiten evaluar la calidad del modelo. 

Estas medidas se construyen al identificar la cantidad de registros que quedaron clasificados correctamente es decir que la clase de la predicción y la clase real coinciden

Con esta información se construye lo que se conoce como matriz de confusión


In [None]:
Image("local/imgs/matriz_confusion.png")

las cantidades relacionadas en la matriz de confusión nos permiten calcular esas medidas de error entre las cuales se destacan:

* Precisión: $$p=\frac{a}{a+b}$$
* Cobertura: $$r=\frac{a}{a+c}$$
* Exactitud: $$e=\frac{a+d}{a+b+c+d}$$
* Media armónica: $$f=\frac{2pr}{p+r}$$
* Razón de verdaderos positivos: $$VPR=\frac{a}{a+c}$$
* Razón de falsos positivos: $$FPR=\frac{b}{b+d}$$

Donde,

* $a$: Verdaderos positivos
* $b$: Falsos positivos
* $c$: Falsos negativos
* $d$: Verdaderos negativos

Con la razón de verdaderos positivos y la razón de falsos positivos se calcula el área ROC, medida utilizada ampliamente para evaluar modelos predictivos de clasificación.


####**Regresión Logística**

El funcionamiento es similar a la regresión lineal, donde se calculaba la ecuación de la recta que mejor represente al conjunto de datos con el fin de predecir una variable numérica.

La regresión logística aplica ese principio, pero teniendo en cuenta que la variable a predecir es categórica y puede tener n categorías,
lo que propone es calcular las ecuaciones de n rectas, una para cada categoría 

**Ejemplo: Aprobación curso**

**1. Preparar**

In [None]:
data = pd.read_excel("local/data/aprobacion_curso_bal.xlsx",sheet_name=0)
estDF=sqlCtx.createDataFrame(data).drop('Unnamed: 0')
estDF.show(5)

In [None]:
estDF=sqlCtx.createDataFrame(data)
estDF=estDF.\
  withColumn('equilibrio',(estDF.Felder=='equilibrio').cast(IntegerType())).\
  withColumn('activo',(estDF.Felder=='activo').cast(IntegerType())).\
  withColumn('visual',(estDF.Felder=='visual').cast(IntegerType())).\
  withColumn('reflexivo',(estDF.Felder=='reflexivo').cast(IntegerType())).\
  withColumn('sensorial',(estDF.Felder=='sensorial').cast(IntegerType())).\
  withColumn('intuitivo',(estDF.Felder=='intuitivo').cast(IntegerType())).\
  withColumn('secuencial',(estDF.Felder=='secuencial').cast(IntegerType())).\
  withColumn('verbal',(estDF.Felder=='verbal').cast(IntegerType())).\
  withColumn('AproboInt',(estDF.Aprobo=='si').cast(IntegerType()))
estDF=estDF.drop('Felder','Aprobo','Unnamed: 0')
estDF.printSchema()
estDF.show(5)

In [None]:
vectorAssembler = VectorAssembler(inputCols = \
    ['Examen_admisión', 'equilibrio', 'activo', 'visual', 'reflexivo', 'sensorial', 'intuitivo', \
     'secuencial', 'verbal'], outputCol = 'features')
vest = vectorAssembler.transform(estDF)
vest = vest.select(['features', 'AproboInt'])
vest.show(5)

Creamos los conjuntos de datos de entrenamiento y evaluación (Split)

In [None]:
(trainingData, testData) = vest.randomSplit([0.7, 0.3])
print(trainingData.count())
print(testData.count())

**2. Modelar**

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="AproboInt", featuresCol="features",maxIter=10)
lr_model=lr.fit(trainingData)

In [None]:
trainingSummary = lr_model.summary
trainingSummary.predictions.select('AproboInt','prediction','probability').show(5)

In [None]:
print("Coeficientes: " + str(lr_model.coefficients))
print("Intercepto: " + str(lr_model.intercept))
estDF.columns

Veamos el área ROC

In [None]:
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('Razón de los falsos positivos')
plt.xlabel('Razón de los verdaderos positivos')
plt.title('Curva ROC')
plt.show()
print('Área ROC con los datos de entrenamiento: ' + str(trainingSummary.areaUnderROC))

**3. Evaluar**

In [None]:
predict_test=lr_model.transform(testData)
predict_test.select('AproboInt','prediction','probability').show(10)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='AproboInt')
print('Área ROC con los datos de evaluación', evaluator.evaluate(predict_test))

####**Árboles de decisión para clasificación**

El funcionamiento es similar a los árboles de regresión, solo que en este caso la variable a predecir es categórica.

A partir del conjunto histórico de datos se construye en árbol que tiene en sus nodos una pregunta sobre alguno de los atributos y en las hojas alguna de las categorías de la variable objetivo

**Modelar**

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="AproboInt", featuresCol="features")
dt_model=dt.fit(trainingData)

In [None]:
predict_train = dt_model.transform(trainingData)
predict_train.select('AproboInt','prediction').show(5)

**Evaluar**

In [None]:
predict_test = dt_model.transform(testData)

Veamos la precisión del clasificador

In [None]:
from sklearn.metrics import classification_report, confusion_matrix

clase_real = predict_test.select(['AproboInt']).collect()
clase_prediccion = predict_test.select(['prediction']).collect()

print("Medidas de error")
print(classification_report(clase_real, clase_prediccion))
print("Matriz de confusión")
print(confusion_matrix(clase_real, clase_prediccion))
evaluator = BinaryClassificationEvaluator(labelCol='AproboInt')
print('Área ROC con los datos de evaluación', evaluator.evaluate(predict_test))

####**Redes neuronales**

Las redes neuronales son modelos computacionales inspirados en sistemas biológicos, adaptados y simulados en computadoras convencionales. Buscan simular en un algoritmo la forma como se comportan las neuronas del cerebro humano.

Cada neurona tiene variables de entrada (atributos) que son multiplicados por un vector de pesos y el resultado es llevado a un mezclador lineal. A la salida del mezclador lineal se aplica una función de activación que puede ser tan sencilla o compleja como se quiera, el resultado es la salida de la neurona.

Las neuronas no trabajan solas, los resultados de cada neurona pueden ser llevados a varias neuronas que los utilizan como entradas. En este caso diremos que la red neuronal puede tener varias capas y dentro de cada capa se puede tener varias neuronas 

**Multilayer Perceptron**

Es una red neuronal con alimentación hacia adelante, compuesta de varias capas de neuronas entre la entrada y la salida de la misma, esta red permite establecer regiones de decisión mucho más complejas que las de dos semiplanos.

Como **función de activación** puede utilizar:
* Hardlim: Función escalón entre 0 y 1
* Hardlims: Función escalón simétrica entre -1 y 1

Multilayer perceptron utiliza la siguiente **regla de aprendizaje**:
* Pesos: $$w_{i+1} = w_i + (y_i - y_i')x_i $$
* Variable independiente: $$b_{i+1} = b_i + (y_i - y_i')$$

Donde,

* $w_{i+1}$: Peso de la neurona en la siguiente iteración
* $w_i$: Peso de la neurona en la iteración actual
* $y_i$: Salida real
* $y_i'$: Predicción
* $x_i$: Entrada actual de la neurona
* $b_{i+1}$: Entrada independiente en la siguiente iteración
* $b_i$: Entrada independiente en la iteración actual



**Modelar**

In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
nn= MultilayerPerceptronClassifier(labelCol='AproboInt', featuresCol='features', maxIter=100, layers=[9, 5, 2])
nn_model = nn.fit(trainingData)

In [None]:
predict_train = nn_model.transform(trainingData)

**Evaluar**

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#Predicción
predict_test = nn_model.transform(testData)
predict_test.select('prediction', 'AproboInt').show(5)

#Exactitud
evaluator = MulticlassClassificationEvaluator(metricName="accuracy", labelCol='AproboInt')
print("Exactitud con los datos de evaluación = " + str(evaluator.evaluate(predict_test)))

#Matriz de confusión
clase_real = predict_test.select(['AproboInt']).collect()
clase_prediccion = predict_test.select(['prediction']).collect()
print("Matriz de confusión")
print(confusion_matrix(clase_real, clase_prediccion))

####**Máquinas de Soporte Vectorial**

Las máquinas de soporte vectorial plantean la solución de un problema complejo (no separable linealmente) en un espacio de dimensión superior mediante la construcción de un hiperplano de separación óptima de clases.

El hiper plano construido permite separar a cada lado del mismo los registros de cada categoría de la variable objetivo.

Para representar los datos en una dimensión superior se utiliza la matriz de Kernel

**Modelar**

In [None]:
from pyspark.ml.classification import LinearSVC
svc= LinearSVC(labelCol='AproboInt', featuresCol='features', maxIter=10, regParam=0.1)
svc_model = svc.fit(trainingData)

In [None]:
predict_train = svc_model.transform(trainingData)
predict_train.select('AproboInt', 'prediction').show(5)

**Evaluar**

In [None]:
predict_test = svc_model.transform(testData)
predict_test.select('prediction', 'AproboInt').show(5)
evaluator = BinaryClassificationEvaluator(labelCol='AproboInt')
print('Área ROC con los datos de evaluación', evaluator.evaluate(predict_test))

####**Método Bayesiano**

Los métodos bayesianos utilizan el teorema de la probabilidad condicional de bayes: La probabilidad de que algo ocurra dado que ya han ocurrido ciertos sucesos.

La probabilidad de que un registro $reg$ pertenezca a la clase $C_j$ se calcula como:
$$P(C_j/reg) = \frac{P(C_j)P(reg/C_j)}{P(reg)}$$

Donde,

* $P(C_j)$: Probabilidad de la clase $C_j$
* $P(reg/C_j)$: Probabilidad de que la clase $C_j$ sea la clase del registro $reg$
* $P(reg)$: Probabilidad de seleccionar un registro

Naive Bayes implementa un método bayesiano asumiendo que los registros son independientes, con esto la probabilidad de que un registro $reg$ pertenezca a la clase $C_j$ se resume como:

$$P(C_j/reg) = P(C_j)P(reg/C_j)$$


Con los datos históricos se construye una tabla de probabilidades, que indica la probabilidad de que un registro pertenezca a cada una de las categorías de la variable objetivo

Dado un registro nuevo, la predicción consistirá en evaluar la probabilidad de pertenencia de dicho registro a cada una de las clases escogiendo aquella que cuente con la mayor probabilidad.

**Modelar**

In [None]:
from pyspark.ml.classification import NaiveBayes
nb= NaiveBayes(labelCol='AproboInt', featuresCol='features')
nb_model = svc.fit(trainingData)

In [None]:
predict_train = nb_model.transform(trainingData)

**Evaluar**

In [None]:
predict_test = nb_model.transform(testData)
predict_test.select('prediction', 'AproboInt').show(5)
evaluator = BinaryClassificationEvaluator(labelCol='AproboInt')
print('Área ROC con los datos de evaluación', evaluator.evaluate(predict_test))

##**Aprendizaje NO Supervisado**

En el aprendizaje No Supervisado, NO se cuenta con datos históricos previamente etiquetados para la etapa de entrenamiento, en su lugar se dispone de datos actuales y lo que se haces es describir la estructura de esos datos actuales mediante un análisis exploratorio con el fin de facilitar el entendimiento de los datos.

###**Clustering**

Su objetivo es agrupar un conjunto de datos heterogéneo en grupos de datos homogéneos. En principio cada registro (fila) del conjunto de datos es diferente de los demás, por eso decimos que es heterogéneo, pero puede tener cierta similitud a un subconjunto de registros y es ahi donde se genera la agrupación, siendo cada subconjunto de datos (clúster) diferente a los demás subconjuntos.

La similaridad de los registros se expresa como una medida de distancia, de modo que un par de registros que se encuentre distante implicará que son diferentes, mientras que registros cercanos diremos que son similares

Dentro del **ciclo de vida** del Clustering se encuentran las siguientes etapas:
* Aprendizaje: Crear el modelo
* Evaluación: Evaluar el modelo
* Perfilamiento: Entender las características que definen a cada clúster (describir el centroide)


**Evaluación**

La evaluación del Clustering se basa en la comparación de dos medidas, estas medidas son:
* Cohesión o compacticidad: promedio de distancia de cada elemento a su centroide (distancia dentro del clúster)
* Separabilidad: promedio de distancias entre los centroides de los clústers (distancia entre clústers)

La comparación de estas dos medidas se expresa mediante índices, los mas conocidos son:

* Dunn
* Davies-Bouldin
* Silueta


####**K-means**

Divide el conjunto de datos en un número predefinido de grupos k. Es el método más comúnmente utilizado, la idea del método es definir k centroides, uno por clúster, y los datos son asociados al centroide más cercano.


**Ejemplo: Titanic**

**Preparar**

In [None]:
data = sqlCtx.read.option("header",True) \
      .option("delimiter",";") \
     .csv("local/data/titanic.csv")
data.show(5)

In [None]:
def func_Clase(value):
  if value=="Tripulacion":
    return 0
  if value=="Primera":
    return 1
  if value=="Segunda":
    return 2
  if value=="Tercera":
    return 3

In [None]:
def func_Edad(value):
  if value=="Adulto":
    return 1
  else:
    return 0

In [None]:
def func_Sexo(value):
  if value=="Hombre":
    return 1
  else:
    return 0

In [None]:
def func_Sobrevivio(value):
  if value=="Si":
    return 1
  else:
    return 0

In [None]:
data.registerTempTable("Titanic")
sqlCtx.registerFunction("func_clase",func_Clase)
sqlCtx.registerFunction("func_edad",func_Edad)
sqlCtx.registerFunction("func_sexo",func_Sexo)
sqlCtx.registerFunction("func_sobrevivio",func_Sobrevivio)

In [None]:
titanic = sqlCtx.sql("""
		SELECT func_clase(Clase) AS ClaseInt, func_edad(Edad) AS EdadInt, 
      func_sexo(Sexo) AS SexoInt, func_sobrevivio(Sobrevivio) AS SobrevivioInt
		FROM Titanic
		""")
titanic.distinct().show()

In [None]:
titanic.printSchema()

In [None]:
titanic = titanic.withColumn("ClaseInt", titanic["ClaseInt"].cast(IntegerType()))
titanic = titanic.withColumn("EdadInt", titanic["EdadInt"].cast(IntegerType()))
titanic = titanic.withColumn("SexoInt", titanic["SexoInt"].cast(IntegerType()))
titanic = titanic.withColumn("SobrevivioInt", titanic["SobrevivioInt"].cast(IntegerType()))
data=titanic
data.printSchema()

In [None]:
vectorAssembler = VectorAssembler(inputCols = data.columns, outputCol = 'features', handleInvalid="skip")
vdata = vectorAssembler.transform(data)
vdata.cache()
vdata.show(5)

**Modelar**

In [None]:
from pyspark.ml.clustering import KMeans
km = KMeans( featuresCol='features', k=3, predictionCol='cluster', distanceMeasure='euclidean')
km_model = km.fit(vdata)

Veamos en cuál cluster quedó cada registro

In [None]:
predictions = km_model.transform(vdata)
predictions.distinct().show()

Veamos los centroides de cada cluster

In [None]:
centers = km_model.clusterCenters()
print("Centroides: ")
for center in centers:
  centerR=[round(num, 0) for num in center]
  print(centerR)
print("Variables")
data.columns

**Evaluar**

In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator(predictionCol='cluster')
silhouette = evaluator.evaluate(predictions)
print("Índice de la silueta = " + str(silhouette))

Aumentemos el número de clústers

In [None]:
#modelar
km = KMeans( featuresCol='features', k=18, predictionCol='cluster', distanceMeasure='euclidean')
km_model = km.fit(vdata)

#Asignación de clusters
predictions = km_model.transform(vdata)

#Evaluar
evaluator = ClusteringEvaluator(predictionCol='cluster')
silhouette = evaluator.evaluate(predictions)
print("Índice de la silueta = " + str(silhouette))

#Centroides
centers = km_model.clusterCenters()
print("Centroides: ")
for center in centers:
  centerR=[round(num, 0) for num in center]
  print(centerR)
print("Variables")
data.columns

Cómo podemos saber cuántos clúter deberíamos crear?

In [None]:
from pyspark.ml.clustering import KMeans
kini=2; kfin=20
ks = range(kini, kfin)
costo=[]
for k in range(kini, kfin):
    kmeans= KMeans( featuresCol='features', k=k, predictionCol='cluster', distanceMeasure='euclidean')
    model = kmeans.fit(vdata)
    costo.append(model.summary.trainingCost)
plt.plot(ks, costo, '-o')
plt.title('Método del codo')
plt.xlabel('Número de clusters')
plt.ylabel('costo')
plt.xticks(ks)
plt.show()

#**Ejemplos**

##**Filtrado Colaborativo**

Recomendación de películas a partir de la calificación ingresada por los usuarios

**Preparar**

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
lines = sc.textFile("local/data/movielens.txt")
parts = lines.map(lambda row: row.split("::"))
ratingsRDD = parts.map(lambda p: (int (p[0]), int (p[1]), float (p[2])))
ratingsRDD.take(10)

In [None]:
ratings = sqlCtx.createDataFrame(ratingsRDD, ["usuario","pelicula","calificacion"])
(training, test) = ratings.randomSplit([0.7, 0.3])
training.show(10)

**Modelar**

In [None]:
als = ALS(maxIter=5, regParam=0.01, userCol="usuario", itemCol="pelicula", ratingCol="calificacion",
          coldStartStrategy="drop")
model = als.fit(training)

**Evaluar**

In [None]:
predictions = model.transform(test)
predictions.show(5)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="calificacion",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Raiz del error cuadrático medio = " + str(rmse))

**Películas recomendadas para cada usuario**

In [None]:
userRecs = model.recommendForAllUsers(2)
userRecs.take(5)

**Usuarios recomendados para cada película**

In [None]:
movieRecs = model.recommendForAllItems(2)
movieRecs.take(5)

**Películas recomendadas para una lista de usuarios específica**

In [None]:
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 5)
userSubsetRecs.collect()

**Usuarios recomendados para una lista de películas específica**

In [None]:
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 5)
movieSubSetRecs.collect()

#**Ejercicios**

**Población atendida en hospital de Envigado**

El archivo poblacion_atendida.csv contiene información sobre los pacientes atendidos en un Hospital en Envigado. Para cada paciente se tiene la siguiente información:

* SEXO	
* ESTADO_CIVIL	
* ZONA_RES	
* MUN_RES	
* PAIS_NCTO	
* EDAD	
* ESCOLARIDAD_PACIENTE

Fuente de los datos:
https://www.datos.gov.co/Salud-y-Protecci-n-Social/Poblaci-n-Atendida-a-o-2020/4ike-xz34

A) Aplique Kmeans para crear clústers que permitan describir  los tipos de pacientes 

B) Usando el clúster asignado a cada paciente como variable objetivo, cree un modelo predictivo que permita predecir el grupo al que pertenece cada paciente

C) Cree un archivo con la información de tres nuevos pacientes y utilice el modelo de predicción creado anteriormente para predecir el clúster al que pertenecerían los nuevos pacientes