# **Quiz - Procesamiento de Datos**

Pontificia Universidad Javeriana - 2410

Samuel Peña García - ID - 20486979

Programa: Ciencia de Datos

Clase: 8184 - Lun, Jue 07-09


## Dataset Titanic

Contexto: Este dataset presenta información acerca de los pasajeros que abordaron el Titanic.

###Datos:

  - survival:  Survival  0 = No, 1 = Si
  - pclass:	Ticket class	1 = 1st, 2 = 2nd, 3 = 3rd
  - sex:	Sex	
  - Age:	Age in years	
  - sibsp:	# of siblings / spouses aboard the Titanic	
  - parch:	# of parents / children aboard the Titanic	
  - ticket:	Ticket number	
  - fare:	Passenger fare	
  - cabin:	Cabin number	
  - embarked:	Port of Embarkation	C = Cherbourg, Q = Queenstown, S = Southampton


### 1- Importación de las librerías, cargado de los datos y creación del dataframe

In [0]:
#Se importan las librerías necesarias para el análisis, en este caso pandas, pyspark y los complementos de sql de pyspark.

import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd

In [0]:
#Se levanta la sesión de Spark para poder realizar posteriormente todo el procesamiento de datos con las funciones que la librería ofrece

sc = SparkContext.getOrCreate()
sql_sc = SQLContext(sc)
sc



In [0]:
#Se carga el conjunto de datos en formato RAW directamente desde el repositorio de GitHub donde se encuentran los datos a través de la url
url = "https://raw.githubusercontent.com/corredor-john/ExploratoryDataAnalisys/main/Varios/Titanic/train.csv"

#Se asigna el dataset a Titanic_DFP a través de pandas
Titanic_DFP = pd.read_csv(url, sep = ',')

#Se muestran los primeros 5 registros de Titanic_DFP
Titanic_DFP.head(5)

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


In [0]:
#Se formatea el dataframe al tipo de Spark
#El formato Spark prima la velocidad sobre la interfaz gráfica.

Titanic_DFS = sql_sc.createDataFrame(Titanic_DFP)

#Se revisa el resultado de la conversión
Titanic_DFS.limit(4).toPandas()

<bound method PandasConversionMixin.toPandas of DataFrame[PassengerId: bigint, Survived: bigint, Pclass: bigint, Name: string, Sex: string, Age: double, SibSp: bigint, Parch: bigint, Ticket: string, Fare: double, Cabin: string, Embarked: string]>


### 2- Transformación de los datosw

In [0]:
#Nombre de las columnas:
Titanic_DFS.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [0]:
#Cantidad de registros en el DataFrame
print(f"la cantidad de registros en el DataFrame Spark es: {Titanic_DFS.count()}")

la cantidad de registros en el DataFrame Spark es: 891


In [0]:
#Tipos de Datos en el DataFrame
Titanic_DFS.printSchema()

root
 |-- PassengerId: long (nullable = true)
 |-- Survived: long (nullable = true)
 |-- Pclass: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: long (nullable = true)
 |-- Parch: long (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [0]:
#Se requiere ver la cantidad de personas según sexo
Titanic_DFS.groupby("Sex").count().show()

+------+-----+
|   Sex|count|
+------+-----+
|female|  314|
|  male|  577|
+------+-----+




El comando __when__ con __otherwise__ funciona como si fuese un else if

In [0]:
#Se requiere saber si existen valores nulos
Titanic_DFS.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in Titanic_DFS.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



In [0]:
#Se procede a eliminar la columna "Cabin" teniendo en cuenta el alto porcentaje de nulidad en la columna
Titanic_DFS = Titanic_DFS.drop("Cabin")
Titanic_DFS.limit(3).toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,S


In [0]:
#Para hacer la imputación de edad nula, se hará la estratificación en función del saludo
#Primero se hará una nueva columna que contenga los saludos de cada pasajero
#Se extrae cualquier palabra cuyo patrón es de la A hasta la Z finalizando con un (.)

Titanic_DFS = Titanic_DFS.withColumn("Saludo",regexp_extract(col('Name'), "([A-Z,a-z]+)\.",1))
Titanic_DFS.limit(5).toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Embarked,Saludo
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,S,Mr
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C,Mrs
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,S,Miss
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,S,Mrs
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,S,Mr


In [0]:
#Se presentan los valores únicos o distintos de la columna Saludo
Titanic_DFS.select("Saludo").distinct().show()

+--------+
|  Saludo|
+--------+
|     Don|
|    Miss|
|  Master|
|      Mr|
|     Mrs|
|     Rev|
|      Dr|
|     Mme|
|      Ms|
|   Major|
|     Col|
|    Lady|
|     Sir|
|    Mlle|
|Countess|
|    Capt|
|Jonkheer|
+--------+



In [0]:
Titanic_DFS.groupBy("Saludo").count().show()

+--------+-----+
|  Saludo|count|
+--------+-----+
|     Don|    1|
|    Miss|  182|
|  Master|   40|
|      Mr|  517|
|     Mrs|  125|
|     Rev|    6|
|      Dr|    7|
|     Mme|    1|
|      Ms|    1|
|   Major|    2|
|     Col|    2|
|    Lady|    1|
|     Sir|    1|
|    Mlle|    2|
|Countess|    1|
|    Capt|    1|
|Jonkheer|    1|
+--------+-----+



In [0]:
#Dado que son muchos saludos, se requiere reducirlos a 4 o 5
saludo_inicial = ["Don", "Rev", "Dr", "Mme", "Ms", "Major", "Col", "Lady", "Sir", "Mlle", "Countess", "Capt", "Jonkheer"]
saludo_final = ["Mr", "Other", "Other", "Mrs", "Miss", "Mr", "Mr", "Miss", "Mr", "Miss", "Miss", "Mr", "Other"]
Titanic_DFS = Titanic_DFS.replace(saludo_inicial, saludo_final)
Titanic_DFS.groupBy("Saludo").count().limit(5).toPandas

<bound method PandasConversionMixin.toPandas of DataFrame[Saludo: string, count: bigint]>

In [0]:
#Se revisa el promedio de edad por saludo para poder realizar la sustitución de edades nulas
Titanic_DFS.groupBy("Saludo").avg("Age").collect()

[Row(Saludo='Miss', avg(Age)=22.09271523178808),
 Row(Saludo='Master', avg(Age)=4.574166666666667),
 Row(Saludo='Mr', avg(Age)=32.727160493827164),
 Row(Saludo='Mrs', avg(Age)=35.788990825688074),
 Row(Saludo='Other', avg(Age)=42.23076923076923)]

In [0]:
#Se hace efectiva la sustitución

Titanic_DFS = Titanic_DFS.withColumn("Age", when((Titanic_DFS["Saludo"] == 'Miss') & (Titanic_DFS["Age"].isNull()), 22 ).otherwise(Titanic_DFS["Age"]))
Titanic_DFS = Titanic_DFS.withColumn("Age", when((Titanic_DFS["Saludo"] == 'Master') & (Titanic_DFS["Age"].isNull()), 5 ).otherwise(Titanic_DFS["Age"]))
Titanic_DFS = Titanic_DFS.withColumn("Age", when((Titanic_DFS["Saludo"] == 'Mr') & (Titanic_DFS["Age"].isNull()), 33 ).otherwise(Titanic_DFS["Age"]))
Titanic_DFS = Titanic_DFS.withColumn("Age", when((Titanic_DFS["Saludo"] == 'Mrs') & (Titanic_DFS["Age"].isNull()), 36 ).otherwise(Titanic_DFS["Age"]))
Titanic_DFS = Titanic_DFS.withColumn("Age", when((Titanic_DFS["Saludo"] == 'Other') & (Titanic_DFS["Age"].isNull()), 42 ).otherwise(Titanic_DFS["Age"]))

In [0]:
#Se hace un análisis de los datos de embarque para reemplazar los valores nulos

Titanic_DFS.groupBy("Embarked").count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|    NULL|    2|
|       C|  168|
|       S|  644|
+--------+-----+



In [0]:
#Se realiza el reemplazo con el valor m,ás significativo

Titanic_DFS = Titanic_DFS.na.fill({'Embarked' : 'S'})
Titanic_DFS.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in Titanic_DFS.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+--------+------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Embarked|Saludo|
+-----------+--------+------+----+---+---+-----+-----+------+----+--------+------+
|          0|       0|     0|   0|  0|  0|    0|    0|     0|   0|       0|     0|
+-----------+--------+------+----+---+---+-----+-----+------+----+--------+------+



In [0]:
#Nuevamente se muestran los primeros 5 registros del dataset

Titanic_DFS.limit(5).toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Embarked,Saludo
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,S,Mr
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C,Mrs
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,S,Miss
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,S,Mrs
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,S,Mr



### 3- Agregación de columnas

In [0]:
# Añadir una nueva columna "Tam_Familia" al DataFrame "Titanic_DFS"
# Esta columna será la suma de las columnas "SibSp" y "Parch" 
Titanic_DFS = Titanic_DFS.withColumn("Tam_Familia", col("SibSp") + col("Parch"))

# Añadir una nueva columna "Solo" al DataFrame "Titanic_DFS" con valor predeterminado 0
Titanic_DFS = Titanic_DFS.withColumn("Solo", lit(0))

# Actualizar la columna "Solo" basada en la condición:
# Si la columna "Tam_Familia" es igual a 0, establecer el valor de la columna "Solo" en 1, de lo contrario, mantener el valor actual de la columna "Solo"
Titanic_DFS = Titanic_DFS.withColumn("Solo", when(Titanic_DFS["Tam_Familia"] == 0, 1).otherwise(Titanic_DFS["Solo"]))

# Limitar las primeras 5 filas del DataFrame y convertirlas a un DataFrame de Pandas para su visualización
Titanic_DFS.limit(5).toPandas()


Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Embarked,Saludo,Tam_Familia,Solo
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,S,Mr,1,0
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C,Mrs,1,0
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,S,Miss,0,1
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,S,Mrs,1,0
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,S,Mr,0,1


In [0]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

# Se crea una lista de objetos StringIndexer para cada columna especificada
# Se itera sobre las columnas "Sex", "Embarked" y "Saludo" del DataFrame "Titanic_DFS"
# Cada objeto StringIndexer se ajusta (fit) al DataFrame "Titanic_DFS"
indexador = [StringIndexer(inputCol=columna, outputCol=columna+"ind").fit(Titanic_DFS) for columna in ["Sex", "Embarked", "Saludo"]]

# Se crea un objeto Pipeline que utilizará los StringIndexers definidos anteriormente como etapas (stages)
pipeIndexer = Pipeline(stages=indexador)

# Se aplica el pipeline al DataFrame "Titanic_DFS"
# El método fit ajusta el pipeline al DataFrame
# El método transform transforma el DataFrame según el pipeline ajustado
Titanic_DFS = pipeIndexer.fit(Titanic_DFS).transform(Titanic_DFS)

Titanic_DFS.limit(5).toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Embarked,Saludo,Tam_Familia,Solo,Sexind,Embarkedind,Saludoind
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,S,Mr,1,0,0.0,0.0,0.0
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C,Mrs,1,0,1.0,1.0,2.0
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,S,Miss,0,1,1.0,0.0,1.0
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,S,Mrs,1,0,1.0,0.0,2.0
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,S,Mr,0,1,0.0,0.0,0.0


In [0]:
# Se eliminan las columnas "PassengerId", "Name", "Ticket", "Embarked", "Saludo", "Sex" del DataFrame "Titanic_DFS"
Titanic_DFS = Titanic_DFS.drop("PassengerId", "Name", "Ticket", "Embarked", "Saludo", "Sex")

Titanic_DFS.limit(5).toPandas()


Unnamed: 0,Survived,Pclass,Age,SibSp,Parch,Fare,Tam_Familia,Solo,Sexind,Embarkedind,Saludoind
0,0,3,22.0,1,0,7.25,1,0,0.0,0.0,0.0
1,1,1,38.0,1,0,71.2833,1,0,1.0,1.0,2.0
2,1,3,26.0,0,0,7.925,0,1,1.0,0.0,1.0
3,1,1,35.0,1,0,53.1,1,0,1.0,0.0,2.0
4,0,3,35.0,0,0,8.05,0,1,0.0,0.0,0.0


In [0]:
from pyspark.ml.feature import VectorAssembler

# Se crea un VectorAssembler que tomará todas las columnas del DataFrame excepto la primera como características de entrada
# La primera columna es la columna objetivo, por lo que se omite aquí (de ahí el Titanic_DFS.columns[1:])
# La salida se colocará en una nueva columna llamada "VarCaracteristicas"
varCaracter = VectorAssembler(inputCols=Titanic_DFS.columns[1:], outputCol="VarCaracteristicas")

# Se transforma el DataFrame "Titanic_DFS" utilizando el VectorAssembler creado
# Esto generará una nueva columna llamada "VarCaracteristicas" que contiene las características vectorizadas
varCaracterTrans = varCaracter.transform(Titanic_DFS)

Titanic_DFS.limit(5).toPandas()


Unnamed: 0,Survived,Pclass,Age,SibSp,Parch,Fare,Tam_Familia,Solo,Sexind,Embarkedind,Saludoind
0,0,3,22.0,1,0,7.25,1,0,0.0,0.0,0.0
1,1,1,38.0,1,0,71.2833,1,0,1.0,1.0,2.0
2,1,3,26.0,0,0,7.925,0,1,1.0,0.0,1.0
3,1,1,35.0,1,0,53.1,1,0,1.0,0.0,2.0
4,0,3,35.0,0,0,8.05,0,1,0.0,0.0,0.0


In [0]:
# Se seleccionan las columnas "VarCaracteristicas" y "Survived" del DataFrame transformado
Titanic_DFS_ML = varCaracterTrans.select(["VarCaracteristicas", "Survived"])

# Se divide el DataFrame en conjuntos de entrenamiento y prueba
# El 75% de los datos se utilizarán para entrenamiento y el 25% para pruebas
# Se utiliza una semilla para reproducibilidad
(df_train, df_test) = Titanic_DFS_ML.randomSplit([0.75,0.25], seed= 0.12)

print(f"Datos de entrenamiento: {df_train.count()}")
print(f"Datos de prueba: {df_test.count()}")
print(f"Datos de entrenamiento: {Titanic_DFS_ML.count()}")

Datos de entrenamiento: 652
Datos de prueba: 239
Datos de entrenamiento: 891
