# PEC 1

## Introducción
La idea de este ejercicio es ser capaz de crear un pipeline de machine learning sobre cualquiera de las 3 herramientas que vimos durante el módulo (Google Colab, PySpark y Databricks), básicamente la que les quede más comoda

### Ejercicio 1
Cargar el archivo titanic.csv y mostrar las primeras filas para analizar su contenido

In [0]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, OneHotEncoder,VectorAssembler, Imputer

In [0]:
spark = SparkSession.builder.getOrCreate()
spark

In [0]:
df = spark.read.option("header", "true").option("delimiter", ";").option("inferSchema", "true").csv("dbfs:/FileStore/titanic.csv")
display(df)

pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked,survived
1,"Allen, Miss. Elisabeth Walton",female,29.0,0,0,24160,2113375.0,B5,S,1
1,"Allison, Master. Hudson Trevor",male,0.9167,1,2,113781,151.55,C22 C26,S,1
1,"Allison, Miss. Helen Loraine",female,2.0,1,2,113781,151.55,C22 C26,S,0
1,"Allison, Mr. Hudson Joshua Creighton",male,30.0,1,2,113781,151.55,C22 C26,S,0
1,"Allison, Mrs. Hudson J C (Bessie Waldo Daniels)",female,25.0,1,2,113781,151.55,C22 C26,S,0
1,"Anderson, Mr. Harry",male,48.0,0,0,19952,26.55,E12,S,1
1,"Andrews, Miss. Kornelia Theodosia",female,63.0,1,0,13502,779583.0,D7,S,1
1,"Andrews, Mr. Thomas Jr",male,39.0,0,0,112050,0.0,A36,S,0
1,"Appleton, Mrs. Edward Dale (Charlotte Lamson)",female,53.0,2,0,11769,514792.0,C101,S,1
1,"Artagaveytia, Mr. Ramon",male,71.0,0,0,PC 17609,495042.0,,C,0


In [0]:
df.printSchema()
df.count()

root
 |-- pclass: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- sibsp: integer (nullable = true)
 |-- parch: integer (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: double (nullable = true)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)
 |-- survived: integer (nullable = true)

Out[7]: 1308

## Ejercicio 2
Objetivo : Conocer la probabilidad de que un pasajero viva en base a las siguientes variables:
1. Variables continuas/numéricas: pclass,age,fare
2. Variables categóricas: sex
3. Label: survived

Crear por lo menos 2 Estimator.\
**Tip: Sex tiene Nulls**

In [0]:
df_titanic = df['pclass','age','fare','sex', 'survived']
df_titanic.printSchema()

root
 |-- pclass: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- fare: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- survived: integer (nullable = true)



In [0]:
trainDF, testDF = df_titanic.randomSplit([0.7, 0.3], seed=42)
print(trainDF.cache().count())
print(testDF.count())

960
348


In [0]:
from pyspark.sql import functions as F
#Comprobar variables con nulos. Variables "age", "cabin" y "embarked" tiene nulos
nulls = [F.round(F.sum(F.col(c).isNull().cast('int')) * 100 / df_titanic.count(), 4).alias(c) for c in df_titanic.columns]
df_titanic.select(nulls).show()

+------+------+----+---+--------+
|pclass|   age|fare|sex|survived|
+------+------+----+---+--------+
|   0.0|20.107| 0.0|0.0|     0.0|
+------+------+----+---+--------+



- Utilizamos la mediana como medida para calcular los valores nulos de la edad, ya que no se ve afectada por valores extremos como en el caso de la media. Podemos decir que los 28 años  representa la edad de los pasajeros.Por lo que en este caso la mediana es la medida de tendencia central más apropiada para representar este conjunto de datos.

In [0]:
# Imputación de valores nulos
imputer = Imputer(inputCols=["age"], outputCols=["out_age"]).setStrategy("median")

# Se crean las variables categoricas 
categoricalCols = ["sex"]
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "Index" for x in categoricalCols])

# OHE para asignar una característica categórica, representada como un índice de etiqueta, a un vector binario
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols])

# Variable a predecir
labelToIndex = StringIndexer(inputCol="survived", outputCol="label")

# Se añaden variables numericas
numericCols = ["out_age", "fare", "pclass"]
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

## Ejercicio 3
Crear un Estimator para cumplir con el objetivo

In [0]:
lr = LogisticRegression(featuresCol='features',labelCol='label', maxIter=10, regParam=0.01)

## Ejercicio 4
Crear un pipeline y mostrar las predicciones

In [0]:
pipeline = Pipeline(stages=[imputer, stringIndexer, encoder, labelToIndex, vecAssembler, lr])

In [0]:
# Fit the pipeline to training.
pipelineModel = pipeline.fit(trainDF)

In [0]:
# Apply the pipeline model to the test dataset.
predDF = pipelineModel.transform(testDF) \
    .select("features", "label", "prediction") 

In [0]:
# Se muestran las predicciones
predDF.groupBy('label','prediction').count().show()
predDF.show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   89|
|  0.0|       1.0|   24|
|  1.0|       0.0|   38|
|  0.0|       0.0|  197|
+-----+----------+-----+

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|  [1.0,28.0,0.0,1.0]|  0.0|       0.0|
|[1.0,28.0,26.55,1.0]|  0.0|       0.0|
|[1.0,28.0,26.55,1.0]|  1.0|       0.0|
| [1.0,28.0,29.7,1.0]|  1.0|       0.0|
| [1.0,28.0,35.0,1.0]|  0.0|       0.0|
| [1.0,28.0,35.5,1.0]|  1.0|       0.0|
| [1.0,28.0,39.6,1.0]|  0.0|       0.0|
| [0.0,28.0,52.0,1.0]|  1.0|       1.0|
| [0.0,28.0,55.0,1.0]|  1.0|       1.0|
|[0.0,28.0,133.65,...|  1.0|       1.0|
|[1.0,28.0,25925.0...|  0.0|       0.0|
|[1.0,28.0,277208....|  0.0|       0.0|
|[1.0,28.0,277208....|  0.0|       0.0|
|[1.0,28.0,306958....|  0.0|       0.0|
|[0.0,28.0,518625....|  1.0|       1.0|
|[0.0,28.0,821708....|  1.0|       1.0|
|[0.0,28.0,891042....|  1.0|       1.0|

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
lr_accuracy = evaluator.evaluate(predDF)
print("Accuracy de la Regresion Logistica es = %g"% (lr_accuracy))
print("Error de Test de la Regresion Logistica es = %g " % (1.0 - lr_accuracy))

Accuracy de la Regresion Logistica es = 0.821839
Error de Test de la Regresion Logistica es = 0.178161 


## Extras
1. Tunear los hyperparameters y ver los resultados
2. adjuntar archivo (pec1.py) que permita la ejecución con pyspark

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0, 0.001, 0.01, 0.05, 0.1, 0.3, 0.5, 1]) \
    .addGrid(lr.maxIter,  [3, 5 , 10, 20, 30]) \
     .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.5, 0.8, 1]) \
     .addGrid(lr.fitIntercept, [False, True])\
    .build()

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5) 

In [0]:
# Cross-validation, se elige el mejor set de parametros.
cvModel = crossval.fit(trainDF)

In [0]:
from pyspark.ml.classification import LogisticRegressionModel

#Se muestran los parametros utilizados en el modelo despues del Cross-validation
print('Best Param (regParam):',[x._java_obj.getRegParam()                                
for x in cvModel.bestModel.stages if isinstance(x, LogisticRegressionModel)])

print('Best Param (MaxIter):',[x._java_obj.getMaxIter()
for x in cvModel.bestModel.stages if isinstance(x, LogisticRegressionModel)])

print('Best Param (eleasticNetParam):',[x._java_obj.getElasticNetParam()
for x in cvModel.bestModel.stages if isinstance(x, LogisticRegressionModel)])

print('Best Param (fitIntercept):',[x._java_obj.getFitIntercept()
for x in cvModel.bestModel.stages if isinstance(x, LogisticRegressionModel)])

Best Param (regParam): [0.05]
Best Param (MaxIter): [5]
Best Param (eleasticNetParam): [0.0]
Best Param (fitIntercept): [True]


In [0]:
# Se realizan las predicciones con el conjuntos de Test
predDF_cv = cvModel.transform(testDF).select("features", "label", "prediction")
#Se muestran las predicciones
predDF_cv.groupBy('label','prediction').count().show()
predDF_cv.show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   87|
|  0.0|       1.0|   24|
|  1.0|       0.0|   40|
|  0.0|       0.0|  197|
+-----+----------+-----+

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|  [1.0,28.0,0.0,1.0]|  0.0|       0.0|
|[1.0,28.0,26.55,1.0]|  0.0|       0.0|
|[1.0,28.0,26.55,1.0]|  1.0|       0.0|
| [1.0,28.0,29.7,1.0]|  1.0|       0.0|
| [1.0,28.0,35.0,1.0]|  0.0|       0.0|
| [1.0,28.0,35.5,1.0]|  1.0|       0.0|
| [1.0,28.0,39.6,1.0]|  0.0|       0.0|
| [0.0,28.0,52.0,1.0]|  1.0|       1.0|
| [0.0,28.0,55.0,1.0]|  1.0|       1.0|
|[0.0,28.0,133.65,...|  1.0|       1.0|
|[1.0,28.0,25925.0...|  0.0|       0.0|
|[1.0,28.0,277208....|  0.0|       0.0|
|[1.0,28.0,277208....|  0.0|       0.0|
|[1.0,28.0,306958....|  0.0|       0.0|
|[0.0,28.0,518625....|  1.0|       1.0|
|[0.0,28.0,821708....|  1.0|       1.0|
|[0.0,28.0,891042....|  1.0|       1.0|

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
lr_accuracy = evaluator.evaluate(predDF_cv)
print("Accuracy de la Regresion Logistica es = %g"% (lr_accuracy))
print("Error de Test de la Regresion Logistica es = %g " % (1.0 - lr_accuracy))

Accuracy de la Regresion Logistica es = 0.816092
Error de Test de la Regresion Logistica es = 0.183908 
