In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


#   MLlib: La librería de aprendizaje automático escalable de Apache Spark

En este notebook aprenderemos a entrenar modelos de Machine Learning con Spark, utilizando la librería de MLlib

In [None]:
!pip install jupyter_contrib_nbextensions

In [None]:
!pip install findspark
!pip install pyspark

#### Fase 1. Importación y análisis de datos

In [None]:
import findspark
findspark.init()

import pandas as pd
import pyspark

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('Titanic Data') \
    .getOrCreate()

In [None]:
spark

In [None]:
df = (spark.read
          .format("csv")
          .option('header', 'true')
          .load("/content/drive/MyDrive/0_MSc in Data Science/19_PySpark/data/tested.csv"))

In [None]:
df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|        892|       0|     3|    Kelly, Mr. James|  male|34.5|    0|    0| 330911| 7.8292| null|       Q|
|        893|       1|     3|Wilkes, Mrs. Jame...|female|  47|    1|    0| 363272|      7| null|       S|
|        894|       0|     2|Myles, Mr. Thomas...|  male|  62|    0|    0| 240276| 9.6875| null|       Q|
|        895|       0|     3|    Wirz, Mr. Albert|  male|  27|    0|    0| 315154| 8.6625| null|       S|
|        896|       1|     3|Hirvonen, Mrs. Al...|female|  22|    1|    1|3101298|12.2875| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
only showing top 5 rows



In [None]:
df.toPandas().head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,892,0,3,"Kelly, Mr. James",male,34.5,0,0,330911,7.8292,,Q
1,893,1,3,"Wilkes, Mrs. James (Ellen Needs)",female,47.0,1,0,363272,7.0,,S
2,894,0,2,"Myles, Mr. Thomas Francis",male,62.0,0,0,240276,9.6875,,Q
3,895,0,3,"Wirz, Mr. Albert",male,27.0,0,0,315154,8.6625,,S
4,896,1,3,"Hirvonen, Mrs. Alexander (Helga E Lindqvist)",female,22.0,1,1,3101298,12.2875,,S


In [None]:
df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|        892|       0|     3|    Kelly, Mr. James|  male|34.5|    0|    0| 330911| 7.8292| null|       Q|
|        893|       1|     3|Wilkes, Mrs. Jame...|female|  47|    1|    0| 363272|      7| null|       S|
|        894|       0|     2|Myles, Mr. Thomas...|  male|  62|    0|    0| 240276| 9.6875| null|       Q|
|        895|       0|     3|    Wirz, Mr. Albert|  male|  27|    0|    0| 315154| 8.6625| null|       S|
|        896|       1|     3|Hirvonen, Mrs. Al...|female|  22|    1|    1|3101298|12.2875| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
only showing top 5 rows



In [None]:
df.count()

418

In [None]:
df.dtypes

[('PassengerId', 'string'),
 ('Survived', 'string'),
 ('Pclass', 'string'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'string'),
 ('SibSp', 'string'),
 ('Parch', 'string'),
 ('Ticket', 'string'),
 ('Fare', 'string'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

In [None]:
from pyspark.sql.types import *

data_schema = [
               StructField('PassengerId', IntegerType(), True),
               StructField('Survived', StringType(), True),
               StructField('Pclass', IntegerType(), True),
               StructField('Name', StringType(), True),
               StructField('Sex', StringType(), True),
               StructField('Age', IntegerType(), True),
               StructField('SibSp', IntegerType(), True),
               StructField('Parch', IntegerType(), True),
               StructField('Ticket', StringType(), True),
               StructField('Fare', DoubleType(), True),
               StructField('Cabin', StringType(), True),
               StructField('Embarked', StringType(), True),
            ]

final_struc = StructType(fields=data_schema)

df = spark.read.csv(
    "/content/drive/MyDrive/0_MSc in Data Science/19_PySpark/data/tested.csv",
    sep = ',',
    header = True,
    schema = final_struc
    )

In [None]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [None]:
# Basics stats from our columns
df.describe().toPandas()

Unnamed: 0,summary,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,count,418.0,418.0,418.0,418,418,312.0,418.0,418.0,418,417.0,91,418
1,mean,1100.5,0.3636363636363636,2.2655502392344498,,,30.810897435897434,0.4473684210526316,0.3923444976076555,223850.98986486485,35.6271884892086,,
2,stddev,120.81045760473994,0.4816221409322309,0.8418375519640503,,,13.869609024680017,0.8967595611217135,0.9814288785371694,369523.7764694362,55.90757617997384,,
3,min,892.0,0.0,1.0,"""Assaf Khalil, Mrs. Mariana (Miriam"""")""""""",female,1.0,0.0,0.0,110469,0.0,A11,C
4,max,1309.0,1.0,3.0,"van Billiard, Master. Walter John",male,76.0,8.0,9.0,W.E.P. 5734,512.3292,G6,S


In [None]:
!pip install profiling

In [None]:
!pip install pandas-profiling

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from pandas_profiling import ProfileReport

df_pandas = df.toPandas()

pfr = ProfileReport(df_pandas)
pfr.to_notebook_iframe()
pfr

#### Fase 2. Pre-procesamiento de datos

In [None]:
from pyspark.sql.functions import col

dataset = df.select(col('Survived').cast('float'),
                         col('Pclass').cast('float'),
                         col('Sex'),
                         col('Age').cast('float'),
                         col('Fare').cast('float'),
                         col('Embarked')
                        )
dataset.show()

+--------+------+------+----+-------+--------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|
+--------+------+------+----+-------+--------+
|     0.0|   3.0|  male|null| 7.8292|       Q|
|     1.0|   3.0|female|47.0|    7.0|       S|
|     0.0|   2.0|  male|62.0| 9.6875|       Q|
|     0.0|   3.0|  male|27.0| 8.6625|       S|
|     1.0|   3.0|female|22.0|12.2875|       S|
|     0.0|   3.0|  male|14.0|  9.225|       S|
|     1.0|   3.0|female|30.0| 7.6292|       Q|
|     0.0|   2.0|  male|26.0|   29.0|       S|
|     1.0|   3.0|female|18.0| 7.2292|       C|
|     0.0|   3.0|  male|21.0|  24.15|       S|
|     0.0|   3.0|  male|null| 7.8958|       S|
|     0.0|   1.0|  male|46.0|   26.0|       S|
|     1.0|   1.0|female|23.0|82.2667|       S|
|     0.0|   2.0|  male|63.0|   26.0|       S|
|     1.0|   1.0|female|47.0| 61.175|       S|
|     1.0|   2.0|female|24.0|27.7208|       C|
|     0.0|   2.0|  male|35.0|  12.35|       Q|
|     0.0|   3.0|  male|21.0|  7.225|       C|
|     1.0|   

In [None]:
#Eliminar valores nulos
from pyspark.sql.functions import isnull, when, count, col

dataset.select([count(when(isnull(c), c)).alias(c) for c in dataset.columns]).show()

+--------+------+---+---+----+--------+
|Survived|Pclass|Sex|Age|Fare|Embarked|
+--------+------+---+---+----+--------+
|       0|     0|  0|106|   1|       0|
+--------+------+---+---+----+--------+



In [None]:
# Drop missing values
dataset = dataset.replace('null', None)\
        .dropna(how='any')

In [None]:
from pyspark.ml.feature import StringIndexer

dataset = StringIndexer(
    inputCol='Sex', 
    outputCol='Gender', 
    handleInvalid='keep').fit(dataset).transform(dataset)

dataset = StringIndexer(
    inputCol='Embarked', 
    outputCol='Boarded', 
    handleInvalid='keep').fit(dataset).transform(dataset)

dataset.show()

+--------+------+------+----+-------+--------+------+-------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|Gender|Boarded|
+--------+------+------+----+-------+--------+------+-------+
|     1.0|   3.0|female|47.0|    7.0|       S|   1.0|    0.0|
|     0.0|   2.0|  male|62.0| 9.6875|       Q|   0.0|    2.0|
|     0.0|   3.0|  male|27.0| 8.6625|       S|   0.0|    0.0|
|     1.0|   3.0|female|22.0|12.2875|       S|   1.0|    0.0|
|     0.0|   3.0|  male|14.0|  9.225|       S|   0.0|    0.0|
|     1.0|   3.0|female|30.0| 7.6292|       Q|   1.0|    2.0|
|     0.0|   2.0|  male|26.0|   29.0|       S|   0.0|    0.0|
|     1.0|   3.0|female|18.0| 7.2292|       C|   1.0|    1.0|
|     0.0|   3.0|  male|21.0|  24.15|       S|   0.0|    0.0|
|     0.0|   1.0|  male|46.0|   26.0|       S|   0.0|    0.0|
|     1.0|   1.0|female|23.0|82.2667|       S|   1.0|    0.0|
|     0.0|   2.0|  male|63.0|   26.0|       S|   0.0|    0.0|
|     1.0|   1.0|female|47.0| 61.175|       S|   1.0|    0.0|
|     1.

In [None]:
dataset.dtypes

[('Survived', 'float'),
 ('Pclass', 'float'),
 ('Sex', 'string'),
 ('Age', 'float'),
 ('Fare', 'float'),
 ('Embarked', 'string'),
 ('Gender', 'double'),
 ('Boarded', 'double')]

In [None]:
# Drop unnecessary columns
dataset = dataset.drop('Sex')
dataset = dataset.drop('Embarked')
dataset.show()

+--------+------+----+-------+------+-------+
|Survived|Pclass| Age|   Fare|Gender|Boarded|
+--------+------+----+-------+------+-------+
|     1.0|   3.0|47.0|    7.0|   1.0|    0.0|
|     0.0|   2.0|62.0| 9.6875|   0.0|    2.0|
|     0.0|   3.0|27.0| 8.6625|   0.0|    0.0|
|     1.0|   3.0|22.0|12.2875|   1.0|    0.0|
|     0.0|   3.0|14.0|  9.225|   0.0|    0.0|
|     1.0|   3.0|30.0| 7.6292|   1.0|    2.0|
|     0.0|   2.0|26.0|   29.0|   0.0|    0.0|
|     1.0|   3.0|18.0| 7.2292|   1.0|    1.0|
|     0.0|   3.0|21.0|  24.15|   0.0|    0.0|
|     0.0|   1.0|46.0|   26.0|   0.0|    0.0|
|     1.0|   1.0|23.0|82.2667|   1.0|    0.0|
|     0.0|   2.0|63.0|   26.0|   0.0|    0.0|
|     1.0|   1.0|47.0| 61.175|   1.0|    0.0|
|     1.0|   2.0|24.0|27.7208|   1.0|    1.0|
|     0.0|   2.0|35.0|  12.35|   0.0|    2.0|
|     0.0|   3.0|21.0|  7.225|   0.0|    1.0|
|     1.0|   3.0|27.0|  7.925|   1.0|    0.0|
|     1.0|   3.0|45.0|  7.225|   1.0|    1.0|
|     0.0|   1.0|55.0|   59.4|   0

In [None]:
# Assemble all the features with VectorAssembler
from pyspark.ml.feature import VectorAssembler

required_features = ['Pclass',
                    'Age',
                    'Fare',
                    'Gender',
                    'Boarded'
                   ]


assembler = VectorAssembler(inputCols=required_features, outputCol='features')
transformed_data = assembler.transform(dataset)

In [None]:
transformed_data.show()

+--------+------+----+-------+------+-------+--------------------+
|Survived|Pclass| Age|   Fare|Gender|Boarded|            features|
+--------+------+----+-------+------+-------+--------------------+
|     1.0|   3.0|47.0|    7.0|   1.0|    0.0|[3.0,47.0,7.0,1.0...|
|     0.0|   2.0|62.0| 9.6875|   0.0|    2.0|[2.0,62.0,9.6875,...|
|     0.0|   3.0|27.0| 8.6625|   0.0|    0.0|[3.0,27.0,8.66250...|
|     1.0|   3.0|22.0|12.2875|   1.0|    0.0|[3.0,22.0,12.2875...|
|     0.0|   3.0|14.0|  9.225|   0.0|    0.0|[3.0,14.0,9.22500...|
|     1.0|   3.0|30.0| 7.6292|   1.0|    2.0|[3.0,30.0,7.62919...|
|     0.0|   2.0|26.0|   29.0|   0.0|    0.0|[2.0,26.0,29.0,0....|
|     1.0|   3.0|18.0| 7.2292|   1.0|    1.0|[3.0,18.0,7.22919...|
|     0.0|   3.0|21.0|  24.15|   0.0|    0.0|[3.0,21.0,24.1499...|
|     0.0|   1.0|46.0|   26.0|   0.0|    0.0|[1.0,46.0,26.0,0....|
|     1.0|   1.0|23.0|82.2667|   1.0|    0.0|[1.0,23.0,82.2667...|
|     0.0|   2.0|63.0|   26.0|   0.0|    0.0|[2.0,63.0,26.0,0.

In [None]:
transformed_data.head()

Row(Survived=1.0, Pclass=3.0, Age=47.0, Fare=7.0, Gender=1.0, Boarded=0.0, features=DenseVector([3.0, 47.0, 7.0, 1.0, 0.0]))

#### Fase 3. Entrenamiento del modelo


In [None]:
(training_data, test_data) = transformed_data.randomSplit([0.8,0.2])

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='Survived', 
                            featuresCol='features',
                            maxDepth=5)

In [None]:
model = rf.fit(training_data)

In [None]:
# Predict with the test dataset
predictions = model.transform(test_data)

#### Fase 4. Evaluación del modelo

In [None]:
# Evaluate our model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol='Survived', 
    predictionCol='prediction', 
    metricName='accuracy')

In [None]:
# Accuracy
accuracy = evaluator.evaluate(predictions)
print('Test Accuracy = ', accuracy)

Test Accuracy =  1.0
