## ¿Qué vamos a hacer?

### Bank Loan Modelling (Caso de uso)

Un banco tiene dos categorías de clientes:
  - Depositors (dinero depositado en el banco)
  - Borrowers (pidieron un préstamo) <br>
Estas categorías NO son mutuamente excluyentes. <br>

La gran mayoría de los clientes son Depositors, por lo que el banco quiere hacer crecer la cantidad de Borrowers. Para esto, el banco quiere hacer crecer su base de Borrowers entre los Depositors (quiere aumentar la cantidad de productos por cliente).<br><br>
Para esto, el departamento de marketing lanzó una nueva campaña de promoción de préstamos personales. Se hizo una prueba piloto con 5000 clientes y se tuvieron buenos resultados, con una tasa de conversión de aproximadamente el 10%
<br><br>
Basado en los resultados de esta campaña, el departamento de ventas quiere hacer un modelo que pueda predecir si una persona va a tomar un préstamos personal o no, de manera de concentrar sus esfuerzos en los clientes con mayor probabilidad de aceptación. El objetivo general es llevar a cabo la campaña a gran escala con el menor costo posible. (cada cliente que llamo, es un costo. La idea es llamar la menor cantidad de gente posible. Se busca alta tasa de exito, reduciendo los casos totales (y no aumentando los casos de exito)). (Entonces, no quieren salir a enchufarle créditos a todo el mundo. Lo que quieren es saber de antemano quién va a decir que si, para poder concentrar sus esfuerzos en ellos)


## ¿Cómo lo vamos a hacer?
#### ¿De dónde a dónde estamos yendo?

## PySpark SetUp

Configurar PySpark para levantar tabla de Hive

In [2]:
# crear Spark Session
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('caece').config(conf=SparkConf()).getOrCreate()  

Cargamos el Spark DataFrame

In [6]:
# Load Spark DataFrame
spark_df = spark.read.options(header='True', inferSchema='True', delimiter='\t').\
csv('file:/home/jovyan/work/CAECE/Bank_Personal_Loan_Modelling_2.csv')
spark_df = spark_df.drop('Online', 'CreditCard')

## Exploración de Datos

#### Chequeos Básicos

In [7]:
spark_df.count()  # Cantidad de registros

5000

In [8]:
# Nombres de las columnas 
for idx, element in enumerate(spark_df.columns):
    print(idx, '-', element)

0 - ID
1 - Edad
2 - Experiencia
3 - Ingresos
4 - Cod_Postal
5 - Familia
6 - Tarjeta Credito
7 - Educacion
8 - Valor Hipoteca
9 - Cta Comitente
10 - Plazo Fijo
11 - Prestamo


In [9]:
spark_df.show(3)  # Muestra de 3 filas de la tabla. 

+---+----+-----------+--------+----------+-------+---------------+---------+--------------+-------------+----------+--------+
| ID|Edad|Experiencia|Ingresos|Cod_Postal|Familia|Tarjeta Credito|Educacion|Valor Hipoteca|Cta Comitente|Plazo Fijo|Prestamo|
+---+----+-----------+--------+----------+-------+---------------+---------+--------------+-------------+----------+--------+
|  1|  25|          1|      49|     91107|      4|            1.6|        1|             0|            1|         0|       0|
|  2|  45|         19|      34|     90089|      3|            1.5|        1|             0|            1|         0|       0|
|  3|  39|         15|      11|     94720|      1|            1.0|        1|             0|            0|         0|       0|
+---+----+-----------+--------+----------+-------+---------------+---------+--------------+-------------+----------+--------+
only showing top 3 rows



Para mejor visualización, vamos a usar la acción toPandas(). <br>
Esto es sólo para visualización. NO significa que Spark DF y Pandas DF sean lo mismo

In [12]:
spark_df.toPandas()[:3]

Unnamed: 0,ID,Edad,Experiencia,Ingresos,Cod_Postal,Familia,Tarjeta Credito,Educacion,Valor Hipoteca,Cta Comitente,Plazo Fijo,Prestamo
0,1,25,1,49,91107,4,1.6,1,0,1,0,0
1,2,45,19,34,90089,3,1.5,1,0,1,0,0
2,3,39,15,11,94720,1,1.0,1,0,0,0,0


Un poco de estadística

In [13]:
spark_df.summary().toPandas().transpose()  # Summary me da las estadísticas del df

Unnamed: 0,0,1,2,3,4,5,6,7
summary,count,mean,stddev,min,25%,50%,75%,max
ID,5000,2500.5,1443.5200033252052,1,1250,2500,3750,5000
Edad,5000,45.3384,11.463165630542662,23,35,45,55,67
Experiencia,5000,20.1046,11.46795368112056,-3,10,20,30,43
Ingresos,5000,73.7742,46.03372932108627,8,39,64,98,224
Cod_Postal,5000,93152.503,2121.8521973361953,9307,91911,93437,94608,96651
Familia,5000,2.3964,1.1476630455378507,1,1,2,3,4
Tarjeta Credito,5000,1.9379380000000053,1.7476589800467675,0.0,0.7,1.5,2.5,10.0
Educacion,5000,1.881,0.839869082664199,1,1,2,3,3
Valor Hipoteca,5000,56.4988,101.71380210211213,0,0,0,101,635


#### Correlation Matrix

En Spark se maneja todo con vectores. Entonces, para armar la matriz de correlación, es necesario transformar nuestros registros en un único vector. Esto lo hacemos con un objeto VectorAssembler. 

In [14]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

VectorAssembler paso a paso.<br>
Lo que vamos a hacer va a ser transformar las filas (registros) en un vector, para meterlo en la matriz de correlación. 

In [16]:
# df original
spark_df.toPandas()[:3]

Unnamed: 0,ID,Edad,Experiencia,Ingresos,Cod_Postal,Familia,Tarjeta Credito,Educacion,Valor Hipoteca,Cta Comitente,Plazo Fijo,Prestamo
0,1,25,1,49,91107,4,1.6,1,0,1,0,0
1,2,45,19,34,90089,3,1.5,1,0,1,0,0
2,3,39,15,11,94720,1,1.0,1,0,0,0,0


Se crea el objeto VectorAssembler (una máquina que crea vectores a partir de las columnas de input)<br>
Columnas ---> VectorAssembler ---> Vector 

In [18]:
# Se definen columnas de entrada y de salida. En este caso le estoy pasando todas las columnas
corr_assembler = VectorAssembler(inputCols=spark_df.columns, outputCol="features")

Cuando use 'corr_assembler' se va a crear un nuevo df de spark que tiene todas las columnas de entrada más una columna que contiene al vector generado

In [23]:
# Se le agrega una columna (la columna vector) al df original. 
corr_df = corr_assembler.transform(spark_df)
corr_df.toPandas()[:3]  # Mismo df, pero con una columna 'features' que tiene el vector que vamos meter en Correlation

Unnamed: 0,ID,Edad,Experiencia,Ingresos,Cod_Postal,Familia,Tarjeta Credito,Educacion,Valor Hipoteca,Cta Comitente,Plazo Fijo,Prestamo,features
0,1,25,1,49,91107,4,1.6,1,0,1,0,0,"[1.0, 25.0, 1.0, 49.0, 91107.0, 4.0, 1.6, 1.0,..."
1,2,45,19,34,90089,3,1.5,1,0,1,0,0,"[2.0, 45.0, 19.0, 34.0, 90089.0, 3.0, 1.5, 1.0..."
2,3,39,15,11,94720,1,1.0,1,0,0,0,0,"[3.0, 39.0, 15.0, 11.0, 94720.0, 1.0, 1.0, 1.0..."


In [24]:
# Se toma únicamente la columna del vector ('features')
corr_vector = corr_df.select('features')

In [26]:
corr_vector.take(3)

[Row(features=DenseVector([1.0, 25.0, 1.0, 49.0, 91107.0, 4.0, 1.6, 1.0, 0.0, 1.0, 0.0, 0.0])),
 Row(features=DenseVector([2.0, 45.0, 19.0, 34.0, 90089.0, 3.0, 1.5, 1.0, 0.0, 1.0, 0.0, 0.0])),
 Row(features=DenseVector([3.0, 39.0, 15.0, 11.0, 94720.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0]))]

Ahora si, podemos crear nuestra matriz de correlación

In [28]:
# Y se crea la matriz de correlación. 
corr_matrix = Correlation.corr(corr_vector, 'features') 

Nuestro assembled vector tiene la data de todas las columnas anteriores

Veamos la matriz de correlación.

In [29]:
for element in corr_matrix.collect()[0][0].toArray():
    to_print = ''
    for num in element:
        to_print += (str(round(num, 3)) + '\t ') 
    print(to_print)
    
# Hay que mirar la tercera desde la izquierda... Sé que no es la mejor visualización... pero es lo que hay. 
# 3, 6, 7, 8, 11
# Mejorar display!  Mostrar 

1.0	 -0.008	 -0.008	 -0.018	 0.013	 -0.017	 -0.025	 0.021	 -0.014	 -0.017	 -0.007	 -0.025	 
-0.008	 1.0	 0.994	 -0.055	 -0.029	 -0.046	 -0.052	 0.041	 -0.013	 -0.0	 0.008	 -0.008	 
-0.008	 0.994	 1.0	 -0.047	 -0.029	 -0.053	 -0.05	 0.013	 -0.011	 -0.001	 0.01	 -0.007	 
-0.018	 -0.055	 -0.047	 1.0	 -0.016	 -0.158	 0.646	 -0.188	 0.207	 -0.003	 0.17	 0.502	 
0.013	 -0.029	 -0.029	 -0.016	 1.0	 0.012	 -0.004	 -0.017	 0.007	 0.005	 0.02	 0.0	 
-0.017	 -0.046	 -0.053	 -0.158	 0.012	 1.0	 -0.109	 0.065	 -0.02	 0.02	 0.014	 0.061	 
-0.025	 -0.052	 -0.05	 0.646	 -0.004	 -0.109	 1.0	 -0.136	 0.11	 0.015	 0.137	 0.367	 
0.021	 0.041	 0.013	 -0.188	 -0.017	 0.065	 -0.136	 1.0	 -0.033	 -0.011	 0.014	 0.137	 
-0.014	 -0.013	 -0.011	 0.207	 0.007	 -0.02	 0.11	 -0.033	 1.0	 -0.005	 0.089	 0.142	 
-0.017	 -0.0	 -0.001	 -0.003	 0.005	 0.02	 0.015	 -0.011	 -0.005	 1.0	 0.317	 0.022	 
-0.007	 0.008	 0.01	 0.17	 0.02	 0.014	 0.137	 0.014	 0.089	 0.317	 1.0	 0.316	 
-0.025	 -0.008	 -0.007	 0.502	 0.0	 0.06

In [31]:
# Agregar código para mejorar el display y que se vea cuáles son las features que vamos a seleccionar

#### Hay mucho más para hacer en cuanto a exploración de datos...

## Feature Selection

¿Qué podría ser un buen predictor?

In [33]:
reduced_df = spark_df['ID', 'Ingresos', 'Tarjeta Credito', 'Educacion', 'Valor Hipoteca', 'Plazo Fijo','Prestamo']
feature_columns = [col for col in reduced_df.columns if ((col != 'ID') & (col != 'Prestamo'))]

for idx, element in enumerate(feature_columns):
    print(idx, '-', element)

0 - Ingresos
1 - Tarjeta Credito
2 - Educacion
3 - Valor Hipoteca
4 - Plazo Fijo


In [34]:
reduced_df.show(3)  # Ahora si podemos usar .show()

+---+--------+---------------+---------+--------------+----------+--------+
| ID|Ingresos|Tarjeta Credito|Educacion|Valor Hipoteca|Plazo Fijo|Prestamo|
+---+--------+---------------+---------+--------------+----------+--------+
|  1|      49|            1.6|        1|             0|         0|       0|
|  2|      34|            1.5|        1|             0|         0|       0|
|  3|      11|            1.0|        1|             0|         0|       0|
+---+--------+---------------+---------+--------------+----------+--------+
only showing top 3 rows



## Teoría. Logistic Regression
#### ¿Qué es?
#### ¿Cómo funciona?

## Feature Engineering
#### Feature Scaling
Para poder meter todas las features en la misma bolsa, es necesario normalizar las variables cuantitativas. <br>
En este caso, tenemos variables a normalizar:
- Ingresos
- Gasto promedio con Tajeta de Crédito
- Valor de la hipoteca
- Educación* 
<br>
Técnicamente, Educación es una variable categórica. Pero como son 3 niveles y va a menor a mayor, nos sirve normalizarla para que el nivel quede entre 0 y 1. 

Para escalar las features, vamos a usar la clase MinMaxScaler

In [35]:
from pyspark.ml.feature import MinMaxScaler

Va a ser necesario crear un assembler por cada variable:<br>
No es como antes que podemos meterle todas las columnas y sacar un solo vector, porque cada columna va a necesitar su Scaler. 

Igual que antes. Creamos el Assembler, y transformamos el df. 

In [37]:
ing_assembler = VectorAssembler(inputCols=['Ingresos'], outputCol="vec_Ingresos")
ing_assembled = ing_assembler.transform(spark_df)

Acá tenemos un df intermedio "ing_assembled"<br>
Ahora creamos el objeto MinMaxScaler, que va a transformar el df ing_assembled

In [38]:
ing_scaler = MinMaxScaler(inputCol="vec_Ingresos", outputCol="sc_Ingresos")
ing_scaler_model = ing_scaler.fit(ing_assembled)
ing_df = ing_scaler_model.transform(ing_assembled)

Así queda nuesta feature original, nuestra feature vectorizada y nuestra feature vectorizada y escalada

In [39]:
ing_df.select('Ingresos', 'vec_Ingresos', 'sc_Ingresos').show(5)

+--------+------------+--------------------+
|Ingresos|vec_Ingresos|         sc_Ingresos|
+--------+------------+--------------------+
|      49|      [49.0]|[0.1898148148148148]|
|      34|      [34.0]|[0.12037037037037...|
|      11|      [11.0]|[0.01388888888888...|
|     100|     [100.0]|[0.42592592592592...|
|      45|      [45.0]|[0.17129629629629...|
+--------+------------+--------------------+
only showing top 5 rows



Hay que hacer lo mismo para las otras 3 features...<br>
Vamos a necesitar una forma más eficiente de hacer esto<br><br>
Hagamos uso de la clase Pipeline de Spark que nos permiten combinar funciones. 

In [40]:
from pyspark.ml import Pipeline
columns_to_scale = ['Educacion', 'Tarjeta Credito', 'Valor Hipoteca']

Con la clase Pipeline podemos hacer todas las transformaciones juntas. <br>
Nos permite:
- Aprovechar la lazy evaluation de Spark
- Deshacerme de los pipelines intermedios
- Ahorrar algunas líneas de código

In [42]:
# Se crean todos los assemblers
assemblers = [VectorAssembler(inputCols=[col], outputCol="vec_" + col) for col in columns_to_scale]
# Se crean todos los scalers
scalers = [MinMaxScaler(inputCol="vec_" + col, outputCol="sc_" + col) for col in columns_to_scale]
# Se crea el pipeline. se definen las etapas del pipeline: primero todos los assemblers, después todos los scalers. 
pipeline = Pipeline(stages=assemblers + scalers)
# GRAN VENTAJA DEL PIPELINE: Me permite deshacerme de los dfs intermedios. 
scalerModel = pipeline.fit(ing_df)  # lo que hay que fittear son los scalers. 
scaledData = scalerModel.transform(ing_df)

In [43]:
scaledData.columns

['ID',
 'Edad',
 'Experiencia',
 'Ingresos',
 'Cod_Postal',
 'Familia',
 'Tarjeta Credito',
 'Educacion',
 'Valor Hipoteca',
 'Cta Comitente',
 'Plazo Fijo',
 'Prestamo',
 'vec_Ingresos',
 'sc_Ingresos',
 'vec_Educacion',
 'vec_Tarjeta Credito',
 'vec_Valor Hipoteca',
 'sc_Educacion',
 'sc_Tarjeta Credito',
 'sc_Valor Hipoteca']

In [48]:
scaledData.select('Tarjeta Credito', 'vec_Tarjeta Credito', 'sc_Tarjeta Credito',
                  'Educacion', 'vec_Educacion', 'sc_Educacion').show(10)

+---------------+-------------------+--------------------+---------+-------------+------------+
|Tarjeta Credito|vec_Tarjeta Credito|  sc_Tarjeta Credito|Educacion|vec_Educacion|sc_Educacion|
+---------------+-------------------+--------------------+---------+-------------+------------+
|            1.6|              [1.6]|[0.16000000000000...|        1|        [1.0]|       [0.0]|
|            1.5|              [1.5]|[0.15000000000000...|        1|        [1.0]|       [0.0]|
|            1.0|              [1.0]|               [0.1]|        1|        [1.0]|       [0.0]|
|            2.7|              [2.7]|              [0.27]|        2|        [2.0]|       [0.5]|
|            1.0|              [1.0]|               [0.1]|        2|        [2.0]|       [0.5]|
|            0.4|              [0.4]|[0.04000000000000...|        2|        [2.0]|       [0.5]|
|            1.5|              [1.5]|[0.15000000000000...|        2|        [2.0]|       [0.5]|
|            0.3|              [0.3]|   

Bien. Todo listo para entrenar el modelo. 

## Train - Test Data 

Separo los datos en un set de entrenamiento y de testeo

In [49]:
trainingData, testData = scaledData.randomSplit([0.8,0.2])  # 80% training - 20% testing

In [50]:
trainingData.count(), testData.count()

(4020, 980)

## ...

## ¿Está bien esto?
¿Qué representa mi set de testing?

### Data Leakage
Cuando escalé las features, usé el set completo. El algoritmo tuvo acceso a información "del futuro". En este caso no afecta demasiado... pero el Data Leakage es algo importante a tener en cuenta <br>
Como regla general:
- Para mirar los datos, podemos usar todos
- Para manipularlos, primero hay que separar el test set

## Split - Scaling do over

#### Primero el Split, después el Normalizacion

In [51]:
# Split
training_data, test_data = spark_df.randomSplit([0.8,0.2])  # 80% training - 20% testing

In [52]:
training_data.count(), test_data.count()

(4009, 991)

In [54]:
training_data.toPandas()[:3]

Unnamed: 0,ID,Edad,Experiencia,Ingresos,Cod_Postal,Familia,Tarjeta Credito,Educacion,Valor Hipoteca,Cta Comitente,Plazo Fijo,Prestamo
0,4,35,9,100,94112,1,2.7,2,0,0,0,0
1,5,35,8,45,91330,4,1.0,2,0,0,0,0
2,7,53,27,72,91711,2,1.5,2,0,0,0,0


In [55]:
# Scaling 
# Hacemos exactamente lo mismo que antes
columns_to_scale = ['Ingresos', 'Educacion', 'Tarjeta Credito', 'Valor Hipoteca']

# Se crean todos los assemblers IGUAL QUE ANTES
assemblers = [VectorAssembler(inputCols=[col], outputCol="vec_" + col) for col in columns_to_scale]
# Se crean todos los scalers IGUAL QUE ANTES
scalers = [MinMaxScaler(inputCol="vec_" + col, outputCol="sc_" + col) for col in columns_to_scale]
# Se crea el pipeline. IGUAL QUE ANTES
pipeline = Pipeline(stages=assemblers + scalers)

scaler_model = pipeline.fit(training_data)
# hago la normalización para los datos de training y los de test
sc_training_df = scaler_model.transform(training_data)
sc_test_df = scaler_model.transform(test_data)

In [56]:
sc_training_df.toPandas()[:5]  # To Pandas es solo para visualización. 

Unnamed: 0,ID,Edad,Experiencia,Ingresos,Cod_Postal,Familia,Tarjeta Credito,Educacion,Valor Hipoteca,Cta Comitente,Plazo Fijo,Prestamo,vec_Ingresos,vec_Educacion,vec_Tarjeta Credito,vec_Valor Hipoteca,sc_Ingresos,sc_Educacion,sc_Tarjeta Credito,sc_Valor Hipoteca
0,4,35,9,100,94112,1,2.7,2,0,0,0,0,[100.0],[2.0],[2.7],[0.0],[0.42592592592592593],[0.5],[0.27],[0.0]
1,5,35,8,45,91330,4,1.0,2,0,0,0,0,[45.0],[2.0],[1.0],[0.0],[0.17129629629629628],[0.5],[0.1],[0.0]
2,7,53,27,72,91711,2,1.5,2,0,0,0,0,[72.0],[2.0],[1.5],[0.0],[0.2962962962962963],[0.5],[0.15000000000000002],[0.0]
3,8,50,24,22,93943,1,0.3,3,0,0,0,0,[22.0],[3.0],[0.3],[0.0],[0.06481481481481481],[1.0],[0.03],[0.0]
4,10,34,9,180,93023,1,8.9,3,0,0,0,1,[180.0],[3.0],[8.9],[0.0],[0.7962962962962963],[1.0],[0.8900000000000001],[0.0]


## Model Training

Para entrenar el modelo, vamos a necesitar un vector similar al que usamos antes para la matriz de correlación. <br>
Primero vamos a necesitar pasar las features normalizadas de vector a numero.<br>
Spark parece muy complicado... comparación con Pandas. (exploratorio vs productivo. Todo lo que se hace acá es para mostrar, pero se podría hacer en un pipeline sin necesidad de mostrar resultados intermedios. Spark no remplaza a Pandas, son complementarios.)

In [None]:
# Se pasan las features normalizadas de vector a float...
# Para poder transformarlas en un solo vector. 
from pyspark.ml.functions import vector_to_array
sc_training_df.withColumn("num_sc_Income", vector_to_array("sc_Income")[0])\
.withColumn('num_sc_Education', vector_to_array('sc_Education')[0])\
.withColumn('num_sc_CCAvg', vector_to_array('sc_CCAvg')[0])\
.withColumn('num_sc_Mortgage', vector_to_array('sc_Mortgage')[0])\
.select('ID', "num_sc_Income", "num_sc_Education", "num_sc_CCAvg", "num_sc_Mortgage", "CD Account", "Personal Loan")\
.toPandas()[:4]

In [None]:
training_set = sc_training_df.withColumn("num_sc_Income", vector_to_array("sc_Income")[0])\
.withColumn('num_sc_Education', vector_to_array('sc_Education')[0])\
.withColumn('num_sc_CCAvg', vector_to_array('sc_CCAvg')[0])\
.withColumn('num_sc_Mortgage', vector_to_array('sc_Mortgage')[0])\
.select('ID', "num_sc_Income", "num_sc_Education", "num_sc_CCAvg", "num_sc_Mortgage", "CD Account", "Personal Loan")

In [None]:
training_set.toPandas()

In [None]:
feature_cols = [col for col in training_set.columns if (col not in ['ID', 'Personal Loan'] )]
feature_cols

In [None]:
# Crear el VectorAssembler
vec_assembler = VectorAssembler(inputCols=feature_cols, outputCol="vec_features")
# transformar sc_training_df en algo que tenga una columna más. 
vec_training_set = vec_assembler.transform(training_set).select('vec_features', 'Personal Loan')

In [None]:
vec_training_set.show(4)

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="Personal Loan", featuresCol="vec_features", maxIter=10)
lr_pipeline = Pipeline(stages=[lr])  # Para una sola transformación, no es necesario el pipeline. 
# Pero, se puede combinar con la transformación anterior (columnas a vectores, para hacer el pipeline)
lr_model = lr_pipeline.fit(vec_training_set)

## Model Evaluation

Transformamos el test set de la misma manera que transformamos el anterior:

In [None]:
test_set = sc_test_df.withColumn("num_sc_Income", vector_to_array("sc_Income")[0])\
.withColumn('num_sc_Education', vector_to_array('sc_Education')[0])\
.withColumn('num_sc_CCAvg', vector_to_array('sc_CCAvg')[0])\
.withColumn('num_sc_Mortgage', vector_to_array('sc_Mortgage')[0])\
.select('ID', "num_sc_Income", "num_sc_Education", "num_sc_CCAvg", "num_sc_Mortgage", "CD Account", "Personal Loan")

In [None]:
test_set.count()

In [None]:
vec_test_set = vec_assembler.transform(test_set).select('vec_features', 'Personal Loan')
vec_test_set.show(4)

#### Predictions

In [None]:
# Predictions
predictions = lr_model.transform(vec_test_set)

In [None]:
predictions.select('vec_features', 'rawPrediction', 'prediction').show()

In [None]:
predictions.where(predictions['prediction'] == 1).count()

In [None]:
predictions.where(predictions['prediction'] == 0).count()

Entonces... Esto es bueno? Esto es malo... ?

Qué es lo que no estamos viendo acá? 

<br> Si la predicción es correcta!

In [None]:
predictions.select('vec_features', 'rawPrediction', 'prediction', 'Personal Loan').show()

#### Confusion Matrix

Qué es? Para qué se usa? PowerPoint

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="Personal Loan", predictionCol="prediction", metricName="accuracy")

In [None]:
accuracy = evaluator.evaluate(predictions)
accuracy

In [None]:
from pyspark.sql.functions import col
predictions_labels = predictions['prediction', 'Personal Loan']\
.withColumn('Personal Loan', col('Personal Loan').cast('float'))

In [None]:
predictions_labels.rdd.take(2)

In [None]:
predictions_labels.rdd.map(tuple).take(2)

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics
metrics = MulticlassMetrics(predictions_labels.rdd.map(tuple))

In [None]:
confusion_matrix = metrics.confusionMatrix().toArray().transpose()
confusion_matrix

In [None]:
tp = confusion_matrix[1][1]
tn = confusion_matrix[0][0]
fp = confusion_matrix[1][0]
fn = confusion_matrix[0][1]

#### Accuracy, Precision, Recall 

Agregar a PowerPoint... 

In [None]:
accuracy = metrics.accuracy
accuracy

In [None]:
(tp + tn) / (tp + tn + fp + fn)  # accuracy

94% suena bien, no?
<br><br><br>...

Veamos el caso de un clasificador que siempre predice 0.

In [None]:
total_positive = tp + fn
total_negative = tn + fp
total_cases = total_positive + total_negative

print('Dummy accuracy: {}'.format((total_negative / (total_cases))))

Entonces: Nuestro clasificador es bueno, pero no taaan bueno. 
<br> Moraleja: No juzgar porcentajes... hay que ver todas las métricas
<br> Otras métricas: 
- Precision
- Recall
<br> La métrica para clasificadores binarios es el área bajo la ROC Curve

In [None]:
precision = metrics.precision(1.0)
precision
# busco minimizar fp (no encarcelar inocentes, aunque queden ladrones libres)

In [None]:
(tp) / (tp + fp)

In [None]:
recall = metrics.recall(1.0)
recall
# busco minimizar fn (encarcelar todos los ladrones, a expensas de encarcelar algún inocente)

In [None]:
(tp) / (tp + fn)

Ejemplo: Minority Report - Ladrones e Inocentes

#### Curve (AUC ROC)
Hay que explicar un poco qué son. Agregar a PowerPoint. 

In [None]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
bin_metrics = BinaryClassificationMetrics(predictions_labels.rdd.map(tuple))

In [None]:
bin_metrics.areaUnderROC

77% No es excelente, pero no está mal. 

### (OPTIONAL)
## Ethics in Data Science and ML