In [None]:
import sys
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

In [None]:
spark = SparkSession\
    .builder\
    .appName("Spark_ML")\
    .getOrCreate()

#**MACHINE LEARNING**

El Machine Learning o aprendizaje automático es una disciplina orientada a crear sistemas que puedan aprender por sí solos, con el fin de extraer información no trivial de grandes volúmenes de datos por medio de la identificación de patrones complejos.

Spark implementa el aprendizaje automático a través del módulo MLLib que cuenta con un gran número de algoritmos que permiten crear modelos para el aprendizaje automático. Teniendo en cuenta que Spark proporciona sistemas distribuidos para trabajar en paralelo, los algoritmos de Machine Learning implementados en MLLib deben poder ser paralelizables

Pueden identificarse dos grandes ramas en el aprendizaje automático, a saber, el aprendizaje supervisado y el aprendizaje NO supervisado.

##**Aprendizaje Supervisado**

El aprendizaje supervisado utiliza un conjunto histórico de datos donde se tienen los registros previamente catalogados, para crear un modelo de predicción. Este modelo de predicción aprende de los datos históricos hasta obtener la capacidad de predecir lo que pasará con nuevos conjuntos de datos.

Se caracterizan por disponer de una variable objetivo o variable de clase, que es justamente lo que se quiere predecir. Esta variable objetivo puede ser numérica (regresión) o categórica (clasificación)

El **ciclo de vida** del aprendizaje supervisado consta de tres fases

* **Modelamiento:** Consiste en construir el modelo que permita predecir la variable objetivo
* **Evaluación:** Se evalúa el modelo predictivo construido para ver que tanto podemos confiar en el
* **Validación:** Una vez el modelo ha sido evaluado y el resultado es el esperado, se someten datos nuevos al modelo para realizar la predicción

La etapa de modelamiento y evaluación requieren del conjunto histórico de datos. Esto significa que debemos decidir la forma como se utilizarán los datos históricos en estas dos etapas. Para esto disponemos de tres opciones:

* **Evaluar el mismo conjunto de entrenamiento**
* **División de datos (Split / 70-30)**
* **Validación cruzada**

###**Predicción continua o Regresión**

Es el estudio de un conjunto de datos históricos con el fin de predecir un evento numérico futuro.

Es decir que la variable objetivo a predecir es de tipo numérica.

**Evaluación**

Como la variable a predecir es numérica el error de predicción se calcula al comparar el valor real contra el valor de la predicción y esto se hace con una fórmula matemática de cálculo de error. Existen varias formas de calcular ese error:

* Error medio absoluto:

$$error = \frac{\sum_ {i=1}^n  |f(x)-p(x)|}{n}$$

* Error cuadrático medio:
$$error = \frac{1}{n}\sum(f(x)-p(x))^2$$


Donde, 
* f(x): Valor real
* p(x): predicción

**Ejemplo: Predicción del precio de venta de una propiedad en Boston**


Tomado de: https://www.kaggle.com/kyasar/boston-housing

**1. Preparar**

Carga de datos

In [None]:
ruta="s3://mybucket9825/Entrada/boston_housing.csv"
boston = spark.read.format("csv") \
      .option("header", True) \
      .option("delimiter", ",") \
      .option("inferschema",True) \
      .load(ruta)
boston.cache()
boston.printSchema()

Esta es la descripción de las variables disponibles:

* crim : Tasa de delincuencia per cápita por población
* zn:  Proporción de terreno residencial dividido en zonas para lotes de más de 25,000 pies cuadrados.
* indus : Proporción de terrenos comerciales no minoristas por población.
* chas: Indica si la propiedad limita o no con el río (1 si limita con el río. 0 en caso contrario).
* nox: Concentración de óxidos nítricos (partes por 10 millones).
* rm : Promedio de habitaciones por vivienda.
* age : Proporción de unidades ocupadas por sus propietarios construidas antes de 1940
* dis : Distancias ponderadas a cinco centros de empleo de Boston
* rad: índice de accesibilidad a carreteras principales.
* tax : tasa de impuesto a la propiedad.
* ptratio: Proporción alumno-profesor.
* black: Proporción de negros por ciudad.
* lstat: Estatus más bajo de la población (porcentaje).
* **mv : valor medio de las viviendas en $ 1000. Esta es la variable objetivo.**

Descripción estadística de los datos

In [None]:
boston.describe().show()

Análisis de correlación

In [None]:
import six
print("Correlación con la variable objetivo")
for i in boston.columns:
    if not( isinstance(boston.select(i).take(1)[0][0], six.string_types)):
        print( "", i, boston.stat.corr('medv',i))

Si dispone de muchas variables, se recomienda eliminar aquellas que presentan una correlación baja con la variable objetivo

In [None]:
boston=boston.drop("chas","dis")
boston.columns

Para aplicar la técnica de Machine Learning se requiere representar los datos mediante dos columnas, una con las variables predictoras (features) y la otra es la variable objetivo

In [None]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = \
    ['crim', 'zn', 'indus', 'nox', 'rm', 'age', 'rad', 'tax', 'ptratio', 'black', 'lstat'], outputCol = 'features')
vboston = vectorAssembler.transform(boston)
vboston.show(3,truncate=False)

Nos quedamos solo con las variables features y medv

In [None]:
vboston = vboston.select(['features', 'medv'])
vboston.show(3, truncate=False)

Ahora veamos la correlación entre las variables predictoras

In [None]:
matrix = Correlation.corr(vboston,"features")
matrix.collect()[0]["pearson({})".format("features")].values

In [None]:
print("Correlación entre tax y indus: ",boston.corr("tax","indus"))
print("Correlación entre nox y indus: ",boston.corr("nox","indus"))
print("Correlación entre nox y age: ",  boston.corr("nox","age"))
print("Correlación entre tax y rad: ",  boston.corr("tax","rad"))

Eliminemos las variables correlacionadas y creemos nuevamente el vector de características

In [None]:
boston=boston.drop("tax","nox")
vectorAssembler = VectorAssembler(inputCols = \
    ['crim', 'zn', 'indus', 'rm', 'age', 'rad', 'ptratio', 'black', 'lstat'], outputCol = 'features')
vboston = vectorAssembler.transform(boston)
vboston = vboston.select(['features', 'medv'])
vboston.show(3, truncate=False)

Creamos los conjuntos de datos de entrenamiento y evaluación (Split)

In [None]:
(trainingData, testData) = vboston.randomSplit([0.7, 0.3])
print(trainingData.count())
print(testData.count())

**2. Modelar**

####**Regresión lineal**

Permite evaluar la relación entre una variable dependiente (variable a predecir) y un conjunto de variables independientes (variables predictoras)


$$Y = \alpha + \beta_1 X_1  +  \beta_2 X_2 + ... + \beta_n X_n + \epsilon$$

Donde,
* Y: Variable a predecir
* X: Variables predictoras (atributos)
* alpha: Intercepto
* beta: Pendiente
* epsilon: Error

Básicamente lo que se hace es encontrar la ecuación de la recta que mejor represente al conjunto de datos, de modo que se pueda utilizar dicha ecuación para la predicción de nuevos registros

In [None]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='medv', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(trainingData)
trainingSummary = lr_model.summary

Veamos la predicción con los datos de entrenamiento

In [None]:
trainingSummary.predictions.select("prediction","medv").show(5)

El método de regresión lineal construye la ecuación de la recta que mas se ajuste a los datos.

In [None]:
print("Coeficientes: " + str(lr_model.coefficients))
print("Intercepto: " + str(lr_model.intercept))
print("Variables:")
boston.columns

**3. Evaluar**

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

lr_predictions = lr_model.transform(testData)
lr_predictions.select("prediction","medv").show(5)

lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="medv",metricName="r2")
print("Coeficiente de determinación = %g" % lr_evaluator.evaluate(lr_predictions))

In [None]:
test_result = lr_model.evaluate(testData)
print("Error cuadrático medio  = %g" % test_result.meanSquaredError)
print("Raiz del Error cuadrático medio = %g" % test_result.rootMeanSquaredError)
print("Error medio absoluto  = %g" % test_result.meanAbsoluteError)
print("Coeficiente de determinación  = %g" % test_result.r2)
print("Coeficiente de determinación ajustado  = %g" % test_result.r2adj)

####**Árboles de decisión para regresión**

El método de árboles de decisión está disponible tanto para predecir números (árboles de regresión) como para predecir categorías (árboles de clasificación).

Su funcionamiento se basa en representar el conjunto histórico de datos a través de un árbol en el que tendremos dos componentes:
* Nodos: presentan una pregunta sobre algunos de los atributos y se genera una bifurcación a partir de la respuesta
* Hojas: Están al final de cada rama y son los valores de la predicción


Cuando se somete un registro al árbol de decisión, este recorre el árbol a través de las respuestas generadas en cada nodo, y al final llegará a una hoja que indica la predicción.

En el caso de los árboles de regresión, la variable a predecir es numérica, es decir que el resultado final de la predicción será un número

**Ejemplo: Predicción del precio de venta de una propiedad en Boston**

**Modelar**

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'medv')
dt_model = dt.fit(trainingData)

Veamos la predicción con los datos de entrenamiento

In [None]:
dt_model.transform(trainingData).select("prediction","medv").show(5)

**Evaluar**

In [None]:
dt_predictions = dt_model.transform(testData)
dt_predictions.select("prediction","medv").show(5)

dt_evaluator = RegressionEvaluator(
    labelCol="medv", predictionCol="prediction", metricName="r2")
print("Coeficiente de determinación = %g" % dt_evaluator.evaluate(dt_predictions))

Veamos la importancia de las vaiables predictoras

In [None]:
dt_model.featureImportances

In [None]:
boston.columns

###**Predicción discreta o Clasificación**

La clasificación permite predecir un evento numérico discreto, es decir que lo que pedimos ya no es un número sino una categoría, una cualidad.

Se utiliza el conjunto histórico de datos donde se cuenta con una variable objetivo que es de tipo categórica. Con ese histórico de datos, se crea un modelo que aprenderá de los datos y permitirá predecir datos futuros

**Evaluación**

Teniendo en cuenta que la variable objetivo es categórica, contamos con la clase real a la que pertenece cada uno de los registros que hacen parte del histórico de datos.

Por otro lado, el modelo arrojará como resultado una predicción sobre la clase a la que debe pertenecer cada registro. Usando la clase real y la predicción se construyen unas medidas de error que permiten evaluar la calidad del modelo. 

Estas medidas se construyen al identificar la cantidad de registros que quedaron clasificados correctamente es decir que la clase de la predicción y la clase real coinciden

Con esta información se construye lo que se conoce como matriz de confusión

las cantidades relacionadas en la matriz de confusión nos permiten calcular esas medidas de error entre las cuales se destacan:

* Precisión: $$p=\frac{a}{a+b}$$
* Cobertura: $$r=\frac{a}{a+c}$$
* Exactitud: $$e=\frac{a+d}{a+b+c+d}$$
* Media armónica: $$f=\frac{2pr}{p+r}$$
* Razón de verdaderos positivos: $$VPR=\frac{a}{a+c}$$
* Razón de falsos positivos: $$FPR=\frac{b}{b+d}$$

Donde,

* a: Verdaderos positivos
* b: Falsos positivos
* c: Falsos negativos
* d: Verdaderos negativos

Con la razón de verdaderos positivos y la razón de falsos positivos se calcula el área ROC, medida utilizada ampliamente para evaluar modelos predictivos de clasificación.

**Ejemplo: Medicamentos (drug.csv)**

**1. Preparar**

In [None]:
ruta="s3://mybucket9825/Entrada/drug.csv"
schema = StructType([StructField('Id', IntegerType(), True),
                     StructField('Edad', IntegerType(), True),
                     StructField('Sexo', StringType(), True),                  
                     StructField('Presion', StringType(), True),      
                     StructField('Colesterol', StringType(), True),
                     StructField('Sodio', FloatType(), True),
                     StructField('Potasio', FloatType(), True),
                     StructField('Medicamento', StringType(), True)])
data=spark.read.csv(ruta, header=True, schema=schema, sep=';')
data.printSchema()
data.show(10)

Análisis de variables numéricas

In [None]:
data.describe(['Id','Edad','Sodio','Potasio']).show()

Análisis de variables categóricas

In [None]:
data.groupBy('Sexo').count().show()

In [None]:
data.groupBy('Presion').count().show()

In [None]:
data.groupBy('Colesterol').count().show()

In [None]:
data.groupBy('Medicamento').count().show()


Ahora vamos a resolver los problemas de calidad de datos

In [None]:
data = data.withColumn('Sexo', functions.when(data.Sexo=='Mujer','F')\
                                                 .when(data.Sexo!='Mujer', data.Sexo))
data = data.withColumn('Presion', functions.when(data.Presion=='Null','HIGH')\
                                                 .when(data.Presion!='Null', data.Presion))
data.groupBy('Sexo').count().show()
data.groupBy('Presion').count().show()

In [None]:
data = data.withColumn('Edad', functions.when(data.Edad>100,50)\
                                                 .when(data.Edad<=100, data.Edad))
data.describe(['Edad']).show()


Eliminemos duplicados y variables irrelevantes

In [None]:
data=data.dropDuplicates(['Id'])
data=data.drop('Id')

Convertimos las variables categóricas a numéricas

In [None]:
data = data.withColumn('SexoNum', functions.when(data.Sexo=='F',0)\
                                .when(data.Sexo=='M',1)).\
                    withColumn('PresionNum', functions.when(data.Presion=='LOW',0)\
                                .when(data.Presion=='NORMAL',1) \
                                .when(data.Presion=='HIGH',2)).\
                    withColumn('ColesterolNum', functions.when(data.Colesterol=='NORMAL',0)\
                                .when(data.Colesterol=='HIGH',1)).\
                    withColumn('MedicamentoNum', functions.when(data.Medicamento=='drugA',0)\
                                .when(data.Medicamento=='drugB',1) \
                                .when(data.Medicamento=='drugC',2) \
                                .when(data.Medicamento=='drugX',3) \
                                .when(data.Medicamento=='drugY',4)) 

data.show()

In [None]:
Nos quedamos solo con las variables numéricas

In [None]:
dataNum=data
dataNum=dataNum.select(['Edad','SexoNum','PresionNum','ColesterolNum','Sodio','Potasio','MedicamentoNum'])
dataNum.printSchema()
dataNum.show(10)

Creamos vector de características

In [None]:
vectorAssembler = VectorAssembler(inputCols = \
    ['Edad', 'SexoNum', 'PresionNum', 'ColesterolNum', 'Sodio','Potasio'], outputCol = 'features')
vest = vectorAssembler.transform(dataNum)
vest = vest.select(['features', 'MedicamentoNum'])
vest.show(10, truncate=False)


Creamos los conjuntos de datos de entrenamiento y evaluación (Split)

In [None]:
(trainingData, testData) = vest.randomSplit([0.7, 0.3])
print(trainingData.count())
print(testData.count())

**2. Modelar**

####**Regresión Logística**

El funcionamiento es similar a la regresión lineal, donde se calculaba la ecuación de la recta que mejor represente al conjunto de datos con el fin de predecir una variable numérica.

La regresión logística aplica ese principio, pero teniendo en cuenta que la variable a predecir es categórica y puede tener n categorías,
lo que propone es calcular las ecuaciones de n rectas, una para cada categoría

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="MedicamentoNum", featuresCol="features",maxIter=10)
lr_model=lr.fit(trainingData)

In [None]:
trainingSummary = lr_model.summary
trainingSummary.predictions.select('MedicamentoNum','prediction','probability').show(5, truncate=False)

In [None]:
print("Coeficientes: " + str(lr_model.coefficientMatrix))
print("Intercepto: " + str(lr_model.interceptVector))

**3. Evaluar**

In [None]:
predict_test=lr_model.transform(testData)
predict_test.select('MedicamentoNum','prediction').show(5)

In [None]:
Veamos la matriz de confusion

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

preds_and_labels = predict_test.select(['prediction','MedicamentoNum']).withColumn('MedicamentoNum', col('MedicamentoNum').cast(FloatType())).orderBy('prediction')

metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix())

In [None]:
Veamos otras medidas de evaluacion

In [None]:
print("Accuracy: ",metrics.accuracy)
print("Precision: ",metrics.precision(1.0))
print("Recall: ",metrics.recall(1.0))
print("F1", metrics.fMeasure(1.0,1.0))

####**Árboles de decisión para clasificación**

El funcionamiento es similar a los árboles de regresión, solo que en este caso la variable a predecir es categórica.

A partir del conjunto histórico de datos se construye en árbol que tiene en sus nodos una pregunta sobre alguno de los atributos y en las hojas alguna de las categorías de la variable objetivo

**Modelar**

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="MedicamentoNum", featuresCol="features")
dt_model=dt.fit(trainingData)

In [None]:
predict_train = dt_model.transform(trainingData)
predict_train.select('MedicamentoNum','prediction').show(5)

**Evaluar**

In [None]:
predict_test = dt_model.transform(testData)

In [None]:
preds_and_labels = predict_test.select(['prediction','MedicamentoNum']).withColumn('MedicamentoNum', col('MedicamentoNum').cast(FloatType())).orderBy('prediction')

metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix())

In [None]:
print("Accuracy: ",metrics.accuracy)
print("Precision: ",metrics.precision(1.0))
print("Recall: ",metrics.recall(1.0))
print("F1", metrics.fMeasure(1.0,1.0))

##**Aprendizaje NO Supervisado**

###**Clustering**

En el aprendizaje No Supervisado, NO se cuenta con datos históricos previamente etiquetados para la etapa de entrenamiento, en su lugar se dispone de datos actuales y lo que se haces es describir la estructura de esos datos actuales mediante un análisis exploratorio con el fin de facilitar el entendimiento de los datos.

Su objetivo es agrupar un conjunto de datos heterogéneo en grupos de datos homogéneos. En principio cada registro (fila) del conjunto de datos es diferente de los demás, por eso decimos que es heterogéneo, pero puede tener cierta similitud a un subconjunto de registros y es ahi donde se genera la agrupación, siendo cada subconjunto de datos (clúster) diferente a los demás subconjuntos.

La similaridad de los registros se expresa como una medida de distancia, de modo que un par de registros que se encuentre distante implicará que son diferentes, mientras que registros cercanos diremos que son similares

Dentro del **ciclo de vida** del Clustering se encuentran las siguientes etapas:
* Aprendizaje: Crear el modelo
* Evaluación: Evaluar el modelo
* Perfilamiento: Entender las características que definen a cada clúster (describir el centroide)

**Evaluación**

La evaluación del Clustering se basa en la comparación de dos medidas, estas medidas son:
* Cohesión o compacticidad: promedio de distancia de cada elemento a su centroide (distancia dentro del clúster)
* Separabilidad: promedio de distancias entre los centroides de los clústers (distancia entre clústers)

La comparación de estas dos medidas se expresa mediante índices, los mas conocidos son:

* Dunn
* Davies-Bouldin
* Silueta

**Ejemplo: Titanic**

In [None]:
ruta="s3://mybucket9825/Entrada/titanic.csv"
titanic=spark.read.csv(ruta, header=True,  sep=';')
titanic.show(5)

In [None]:
titanicNum = titanic.withColumn('ClaseNum', functions.when(titanic.Clase=='Tripulacion',0)\
                                .when(titanic.Clase=='Primera',1)\
                                .when(titanic.Clase=='Segunda',2)\
                                .when(titanic.Clase=='Tercera',3)).\
                    withColumn('EdadNum', functions.when(titanic.Edad=='Adulto',0)\
                                .when(titanic.Edad=='Nino',1)).\
                    withColumn('SexoNum', functions.when(titanic.Sexo=='Hombre',0)\
                                .when(titanic.Sexo=='Mujer',1)).\
                    withColumn('SobrevivioNum', functions.when(titanic.Sobrevivio=='No',0)\
                                .when(titanic.Sobrevivio=='Si',1))

titanicNum.distinct().show()

Seleccionamos solo las variables numéricas

In [None]:
titanicNum=titanicNum.select('ClaseNum', 'EdadNum', 'SexoNum', 'SobrevivioNum')
titanicNum.distinct().show()

In [None]:
titanicNum.printSchema()

Construimos el vector de características

In [None]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = titanicNum.columns, outputCol = 'features')
vdata = vectorAssembler.transform(titanicNum)
vdata.cache()
vdata.show(5)

**Modelar**

####**K-means**

Divide el conjunto de datos en un número predefinido de grupos k. Es el método más comúnmente utilizado, la idea del método es definir k centroides, uno por clúster, y los datos son asociados al centroide más cercano.

In [None]:
from pyspark.ml.clustering import KMeans
km = KMeans( featuresCol='features', k=3, predictionCol='cluster', distanceMeasure='euclidean')
km_model = km.fit(vdata)

Veamos en cuál cluster quedó cada registro

In [None]:
km_model.summary.predictions.show()

Veamos la distribución de los clústers

In [None]:
print(km_model.summary.clusterSizes)

Revicemos los centroides

In [None]:
km_model.clusterCenters()

In [None]:
print(titanic.columns)

**Evaluar**

In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator
predictions = km_model.transform(vdata)
evaluator = ClusteringEvaluator(predictionCol='cluster', metricName='silhouette')
silhouette = evaluator.evaluate(predictions)
print("Indice de la silueta = " + str(silhouette))

Aumentemos el número de clústers

In [None]:
#modelar
km = KMeans( featuresCol='features', k=18, predictionCol='cluster', distanceMeasure='euclidean')
km_model = km.fit(vdata)

#Asignación de clusters
predictions = km_model.transform(vdata)

#Evaluar
evaluator = ClusteringEvaluator(predictionCol='cluster', metricName='silhouette')
silhouette = evaluator.evaluate(predictions)
print("Indice de la silueta = " + str(silhouette))