# SOLVING THE TITANIC CHALLENGE WITH SPARK ML

### Testando Spark ML

##### Dataset: https://www.kaggle.com/c/titanic/data

In [None]:
%%configure -f
{
    "name": "Titanic-Spark-ML",
    "driverMemory": "10g",
    "executorMemory": "10g",
    "numExecutors": 5,
    "executorCores": 2,
    "conf": {
        "spark.dynamicAllocation.maxExecutors": 20,
        "spark.dynamicAllocation.minExecutors": 2,
        "spark.sql.broadcastTimeout":  36000,
        "driver-memory": "47g",
        "executor-memory": "47g",
        "num-executors": 1,
        "executor-cores": 1
    },
}

# Dicionário de Dados

## Variáveis:

### Sobreviveu -> se sobreviveu ou não (0 = Não, 1 = Sim)

### classe -> classe da passagem (1 = 1a classe, 2 = 2a classe, 3 = 3a classe)

### sexo -> Sexo

### idade -> Idade do passageiro

### num_irmaos_conjuge -> número de irmãos / cônjuge a bordo do Titanic
O conjunto de dados define as relações familiares desta forma:
Irmão = irmão, irmã, meio-irmão, meia-irmã
Cônjuge = marido, esposa (amantes e noivos foram ignorados)

### num_pais_filhos -> número de pais / filhos a bordo do Titanic
O conjunto de dados define as relações familiares desta forma:
Pai = mãe, pai
Criança = filha, filho, enteada, enteado
Algumas crianças viajavam apenas com a babá, portanto parch = 0 para elas.

### ticket -> Número da passagem

### preco -> Custo da Passagem

### cabine -> Número da Cabine

### local_embarque -> Local de Embarque (C = Cherbourg, Q = Queenstown, S = Southampton)

In [177]:
#Executar essa célula para iniciar a sessão no Hadoop

import sys
print(sys.version_info)

sys.version_info(major=3, minor=7, micro=10, releaselevel='final', serial=0)

In [178]:
import os
import pyspark
import time, logging
from functools import wraps
from pyspark.sql import functions as f
from pyspark.sql.functions import *
from pyspark.sql import Window

from datetime import datetime, timedelta, date
from dateutil.relativedelta import relativedelta
from timeit import default_timer as timer


from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


import pandas as pd

In [179]:
start = 0
end = 0

def tempo():
    global start
    start = timer()
    #print(f"start: {str(start)}")

def tempo_tot():
    end = timer()
    #print(f"end: {str(end)}")
    
    segs = (end - start)
    #print(f"segs: {str(segs)}")
    
    sTotal = timedelta(seconds=segs)
    print(f"Tempo total: { str(sTotal) }")
    

In [180]:
train_path = 's3://datalake-prd/try/testes_spark/titanic/train.csv'
test_path = 's3://datalake-prd/try/testes_spark/titanic/test.csv'

In [181]:
spark = SparkSession \
            .builder \
            .appName('Spark ML Tests - Titanic Data') \
            .getOrCreate()

In [182]:
train_df = (spark.read
              .format("csv")
              .option('header', 'true')
              .load(train_path))

In [183]:
train_df = train_df\
            .withColumnRenamed('PassengerId', 'cod_passageiro')\
            .withColumnRenamed('Survived', 'sobreviveu')\
            .withColumnRenamed('Pclass', 'classe')\
            .withColumnRenamed('Name', 'nome')\
            .withColumnRenamed('Sex', 'sexo')\
            .withColumnRenamed('Age', 'idade')\
            .withColumnRenamed('SibSp', 'num_irmaos_conjuge')\
            .withColumnRenamed('Parch', 'num_pais_filhos')\
            .withColumnRenamed('Ticket', 'ticket')\
            .withColumnRenamed('Fare', 'preco')\
            .withColumnRenamed('Cabin', 'cabine')\
            .withColumnRenamed('Embarked', 'local_embarque')

In [184]:
train_df.toPandas()

    cod_passageiro sobreviveu classe  ...    preco cabine local_embarque
0                1          0      3  ...     7.25   None              S
1                2          1      1  ...  71.2833    C85              C
2                3          1      3  ...    7.925   None              S
3                4          1      1  ...     53.1   C123              S
4                5          0      3  ...     8.05   None              S
..             ...        ...    ...  ...      ...    ...            ...
886            887          0      2  ...       13   None              S
887            888          1      1  ...       30    B42              S
888            889          0      3  ...    23.45   None              S
889            890          1      1  ...       30   C148              C
890            891          0      3  ...     7.75   None              Q

[891 rows x 12 columns]

In [185]:
train_df.count()

891

In [186]:
train_df.describe().toPandas()

  summary     cod_passageiro  ... cabine local_embarque
0   count                891  ...    204            889
1    mean              446.0  ...   None           None
2  stddev  257.3538420152301  ...   None           None
3     min                  1  ...    A10              C
4     max                 99  ...      T              S

[5 rows x 13 columns]

In [187]:
dataset = train_df.select(col('sobreviveu').cast('float'),
                             col('classe').cast('float'),
                             col('sexo'),
                             col('idade').cast('float'),
                             col('preco').cast('float'),
                             col('local_embarque')
                        )
dataset.show()

+----------+------+------+-----+-------+--------------+
|sobreviveu|classe|  sexo|idade|  preco|local_embarque|
+----------+------+------+-----+-------+--------------+
|       0.0|   3.0|  male| 22.0|   7.25|             S|
|       1.0|   1.0|female| 38.0|71.2833|             C|
|       1.0|   3.0|female| 26.0|  7.925|             S|
|       1.0|   1.0|female| 35.0|   53.1|             S|
|       0.0|   3.0|  male| 35.0|   8.05|             S|
|       0.0|   3.0|  male| null| 8.4583|             Q|
|       0.0|   1.0|  male| 54.0|51.8625|             S|
|       0.0|   3.0|  male|  2.0| 21.075|             S|
|       1.0|   3.0|female| 27.0|11.1333|             S|
|       1.0|   2.0|female| 14.0|30.0708|             C|
|       1.0|   3.0|female|  4.0|   16.7|             S|
|       1.0|   1.0|female| 58.0|  26.55|             S|
|       0.0|   3.0|  male| 20.0|   8.05|             S|
|       0.0|   3.0|  male| 39.0| 31.275|             S|
|       0.0|   3.0|female| 14.0| 7.8542|        

In [188]:
from pyspark.sql import functions as f
from pyspark.sql.functions import isnull, when, count, col

dataset.select([f.count(f.when(f.isnull(c), c)).alias(c) for c in dataset.columns]).show()

+----------+------+----+-----+-----+--------------+
|sobreviveu|classe|sexo|idade|preco|local_embarque|
+----------+------+----+-----+-----+--------------+
|         0|     0|   0|  177|    0|             2|
+----------+------+----+-----+-----+--------------+

In [189]:
dataset = dataset.replace('?', None)\
            .dropna(how='any')

In [190]:
dataset = StringIndexer(
    inputCol='sexo', 
    outputCol='sexo_num', 
    handleInvalid='keep').fit(dataset).transform(dataset)

dataset = StringIndexer(
    inputCol='local_embarque', 
    outputCol='local_embarque_num', 
    handleInvalid='keep').fit(dataset).transform(dataset)


dataset.show()

+----------+------+------+-----+-------+--------------+--------+------------------+
|sobreviveu|classe|  sexo|idade|  preco|local_embarque|sexo_num|local_embarque_num|
+----------+------+------+-----+-------+--------------+--------+------------------+
|       0.0|   3.0|  male| 22.0|   7.25|             S|     0.0|               0.0|
|       1.0|   1.0|female| 38.0|71.2833|             C|     1.0|               1.0|
|       1.0|   3.0|female| 26.0|  7.925|             S|     1.0|               0.0|
|       1.0|   1.0|female| 35.0|   53.1|             S|     1.0|               0.0|
|       0.0|   3.0|  male| 35.0|   8.05|             S|     0.0|               0.0|
|       0.0|   1.0|  male| 54.0|51.8625|             S|     0.0|               0.0|
|       0.0|   3.0|  male|  2.0| 21.075|             S|     0.0|               0.0|
|       1.0|   3.0|female| 27.0|11.1333|             S|     1.0|               0.0|
|       1.0|   2.0|female| 14.0|30.0708|             C|     1.0|            

In [191]:
dataset.dtypes

[('sobreviveu', 'float'), ('classe', 'float'), ('sexo', 'string'), ('idade', 'float'), ('preco', 'float'), ('local_embarque', 'string'), ('sexo_num', 'double'), ('local_embarque_num', 'double')]

In [192]:
dataset = dataset.drop('sexo')
dataset = dataset.drop('local_embarque')

dataset.show()

+----------+------+-----+-------+--------+------------------+
|sobreviveu|classe|idade|  preco|sexo_num|local_embarque_num|
+----------+------+-----+-------+--------+------------------+
|       0.0|   3.0| 22.0|   7.25|     0.0|               0.0|
|       1.0|   1.0| 38.0|71.2833|     1.0|               1.0|
|       1.0|   3.0| 26.0|  7.925|     1.0|               0.0|
|       1.0|   1.0| 35.0|   53.1|     1.0|               0.0|
|       0.0|   3.0| 35.0|   8.05|     0.0|               0.0|
|       0.0|   1.0| 54.0|51.8625|     0.0|               0.0|
|       0.0|   3.0|  2.0| 21.075|     0.0|               0.0|
|       1.0|   3.0| 27.0|11.1333|     1.0|               0.0|
|       1.0|   2.0| 14.0|30.0708|     1.0|               1.0|
|       1.0|   3.0|  4.0|   16.7|     1.0|               0.0|
|       1.0|   1.0| 58.0|  26.55|     1.0|               0.0|
|       0.0|   3.0| 20.0|   8.05|     0.0|               0.0|
|       0.0|   3.0| 39.0| 31.275|     0.0|               0.0|
|       

In [193]:
required_features = ['classe',
                        'idade',
                        'preco',
                        'sexo_num',
                        'local_embarque_num'
                    ]

assembler = VectorAssembler(
                inputCols=required_features, 
                outputCol='features')

transformed_data = assembler.transform(dataset)

In [194]:
transformed_data.show()

+----------+------+-----+-------+--------+------------------+--------------------+
|sobreviveu|classe|idade|  preco|sexo_num|local_embarque_num|            features|
+----------+------+-----+-------+--------+------------------+--------------------+
|       0.0|   3.0| 22.0|   7.25|     0.0|               0.0|[3.0,22.0,7.25,0....|
|       1.0|   1.0| 38.0|71.2833|     1.0|               1.0|[1.0,38.0,71.2833...|
|       1.0|   3.0| 26.0|  7.925|     1.0|               0.0|[3.0,26.0,7.92500...|
|       1.0|   1.0| 35.0|   53.1|     1.0|               0.0|[1.0,35.0,53.0999...|
|       0.0|   3.0| 35.0|   8.05|     0.0|               0.0|[3.0,35.0,8.05000...|
|       0.0|   1.0| 54.0|51.8625|     0.0|               0.0|[1.0,54.0,51.8624...|
|       0.0|   3.0|  2.0| 21.075|     0.0|               0.0|[3.0,2.0,21.07500...|
|       1.0|   3.0| 27.0|11.1333|     1.0|               0.0|[3.0,27.0,11.1332...|
|       1.0|   2.0| 14.0|30.0708|     1.0|               1.0|[2.0,14.0,30.0708...|
|   

In [195]:
(training_data, test_data) = transformed_data.randomSplit([0.8,0.2])

In [196]:
rf = RandomForestClassifier(
                    labelCol='sobreviveu', 
                    featuresCol='features',
                    maxDepth=5)

In [197]:
model = rf.fit(training_data)

In [198]:
predictions = model.transform(test_data)

In [199]:
evaluator = MulticlassClassificationEvaluator(
                        labelCol='sobreviveu', 
                        predictionCol='prediction', 
                        metricName='accuracy')

In [200]:
predictions.show()

+----------+------+-----+--------+--------+------------------+--------------------+--------------------+--------------------+----------+
|sobreviveu|classe|idade|   preco|sexo_num|local_embarque_num|            features|       rawPrediction|         probability|prediction|
+----------+------+-----+--------+--------+------------------+--------------------+--------------------+--------------------+----------+
|       0.0|   1.0| 22.0|135.6333|     0.0|               1.0|[1.0,22.0,135.633...|[13.3997622064202...|[0.66998811032101...|       0.0|
|       0.0|   1.0| 29.0|    66.6|     0.0|               0.0|[1.0,29.0,66.5999...|[11.5078697060808...|[0.57539348530404...|       0.0|
|       0.0|   1.0| 33.0|     5.0|     0.0|               0.0|[1.0,33.0,5.0,0.0...|[14.4560278425978...|[0.72280139212989...|       0.0|
|       0.0|   1.0| 40.0|     0.0|     0.0|               0.0|(5,[0,1],[1.0,40.0])|[15.0521816887517...|[0.75260908443758...|       0.0|
|       0.0|   1.0| 40.0| 27.7208|     0.

In [201]:
accuracy = evaluator.evaluate(predictions)

print('Teste de Acuracia = ', accuracy)

Teste de Acuracia =  0.7703703703703704

# APLICANDO TESTES

In [202]:
test_df = (spark.read
              .format("csv")
              .option('header', 'true')
              .load(test_path))

In [203]:
test_df = test_df\
            .withColumnRenamed('PassengerId', 'cod_passageiro')\
            .withColumnRenamed('Survived', 'sobreviveu')\
            .withColumnRenamed('Pclass', 'classe')\
            .withColumnRenamed('Name', 'nome')\
            .withColumnRenamed('Sex', 'sexo')\
            .withColumnRenamed('Age', 'idade')\
            .withColumnRenamed('SibSp', 'num_irmaos_conjuge')\
            .withColumnRenamed('Parch', 'num_pais_filhos')\
            .withColumnRenamed('Ticket', 'ticket')\
            .withColumnRenamed('Fare', 'preco')\
            .withColumnRenamed('Cabin', 'cabine')\
            .withColumnRenamed('Embarked', 'local_embarque')

dataset_testes = test_df.select(
                             col('classe').cast('float'),
                             col('sexo'),
                             col('idade').cast('float'),
                             col('preco').cast('float'),
                             col('local_embarque')
                        )

dataset_testes = dataset_testes.replace('?', None)\
                    .dropna(how='any')

dataset_testes = StringIndexer(
                    inputCol='sexo', 
                    outputCol='sexo_num', 
                    handleInvalid='keep').fit(dataset_testes).transform(dataset_testes)

dataset_testes = StringIndexer(
                    inputCol='local_embarque', 
                    outputCol='local_embarque_num', 
                    handleInvalid='keep').fit(dataset_testes).transform(dataset_testes)

dataset_testes.drop('sexo')
dataset_testes.drop('local_embarque')

testes_transformado = assembler.transform(dataset_testes)

testes_transformado.show(10)

+------+------+-----+-------+--------------+--------+------------------+--------------------+
|classe|  sexo|idade|  preco|local_embarque|sexo_num|local_embarque_num|            features|
+------+------+-----+-------+--------------+--------+------------------+--------------------+
|   3.0|  male| 34.5| 7.8292|             Q|     0.0|               2.0|[3.0,34.5,7.82919...|
|   3.0|female| 47.0|    7.0|             S|     1.0|               0.0|[3.0,47.0,7.0,1.0...|
|   2.0|  male| 62.0| 9.6875|             Q|     0.0|               2.0|[2.0,62.0,9.6875,...|
|   3.0|  male| 27.0| 8.6625|             S|     0.0|               0.0|[3.0,27.0,8.66250...|
|   3.0|female| 22.0|12.2875|             S|     1.0|               0.0|[3.0,22.0,12.2875...|
|   3.0|  male| 14.0|  9.225|             S|     0.0|               0.0|[3.0,14.0,9.22500...|
|   3.0|female| 30.0| 7.6292|             Q|     1.0|               2.0|[3.0,30.0,7.62919...|
|   2.0|  male| 26.0|   29.0|             S|     0.0|       

In [204]:
predictions_testes = model.transform(testes_transformado)

predictions_testes.select('classe', 'sexo', 'idade', 'preco', 'local_embarque', 'sexo', 'probability', 'prediction').show(10)

+------+------+-----+-------+--------------+------+--------------------+----------+
|classe|  sexo|idade|  preco|local_embarque|  sexo|         probability|prediction|
+------+------+-----+-------+--------------+------+--------------------+----------+
|   3.0|  male| 34.5| 7.8292|             Q|  male|[0.91898314233620...|       0.0|
|   3.0|female| 47.0|    7.0|             S|female|[0.74555526067274...|       0.0|
|   2.0|  male| 62.0| 9.6875|             Q|  male|[0.89845119901616...|       0.0|
|   3.0|  male| 27.0| 8.6625|             S|  male|[0.89990596355372...|       0.0|
|   3.0|female| 22.0|12.2875|             S|female|[0.39269641416572...|       1.0|
|   3.0|  male| 14.0|  9.225|             S|  male|[0.67455997114667...|       0.0|
|   3.0|female| 30.0| 7.6292|             Q|female|[0.65318715322902...|       0.0|
|   2.0|  male| 26.0|   29.0|             S|  male|[0.85029130862415...|       0.0|
|   3.0|female| 18.0| 7.2292|             C|female|[0.40837099791430...|    

# ALGUM VOLUNTÁRIO A VIAJAR NO TITANIC??

In [213]:
nome = "Douglas Mendes"
classe = "3"
sexo = "male"
idade = "39"
num_irmaos_conjuge = "1"
num_pais_filhos = "0"
preco_em_libras = "7.0"
local_embarque = "Q"

ticket = "123"
cod_passageiro = "123"
cabine = "B45"


In [214]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

data2 = [(cod_passageiro,classe,nome,sexo,idade,num_irmaos_conjuge,num_pais_filhos,ticket,preco_em_libras,cabine,local_embarque) ]

schema = StructType([ 
    StructField("cod_passageiro",StringType(),True), 
    StructField("classe", StringType(), True), 
    StructField("nome", StringType(), True), 
    StructField("sexo", StringType(), True), 
    StructField("idade", StringType(), True), 
    StructField("num_irmaos_conjuge", StringType(), True), 
    StructField("num_pais_filhos", StringType(), True), 
    StructField("ticket", StringType(), True), 
    StructField("preco", StringType(), True), 
    StructField("cabine", StringType(), True), 
    StructField("local_embarque", StringType(), True)
  ])

df_testando = spark.createDataFrame(data=data2,schema=schema)

df_testando.show()

+--------------+------+--------------+----+-----+------------------+---------------+------+-----+------+--------------+
|cod_passageiro|classe|          nome|sexo|idade|num_irmaos_conjuge|num_pais_filhos|ticket|preco|cabine|local_embarque|
+--------------+------+--------------+----+-----+------------------+---------------+------+-----+------+--------------+
|           123|     3|Douglas Mendes|male|   39|                 1|              0|   123|  7.0|   B45|             Q|
+--------------+------+--------------+----+-----+------------------+---------------+------+-----+------+--------------+

In [215]:
df_testando = test_df.union(df_testando)

dataset_testes2 = df_testando.select(
                             col('classe').cast('float'),
                             col('sexo'),
                             col('idade').cast('float'),
                             col('preco').cast('float'),
                             col('local_embarque'),
                             col('nome')
                        )

dataset_testes2 = dataset_testes2.replace('?', None)\
                    .dropna(how='any')

dataset_testes2 = StringIndexer(
                    inputCol='sexo', 
                    outputCol='sexo_num', 
                    handleInvalid='keep').fit(dataset_testes2).transform(dataset_testes2)

dataset_testes2 = StringIndexer(
                    inputCol='local_embarque', 
                    outputCol='local_embarque_num', 
                    handleInvalid='keep').fit(dataset_testes2).transform(dataset_testes2)

dataset_testes2.drop('sexo')
dataset_testes2.drop('local_embarque')

testes_transformado2 = assembler.transform(dataset_testes2)

testes_transformado2.show(10)

+------+------+-----+-------+--------------+--------------------+--------+------------------+--------------------+
|classe|  sexo|idade|  preco|local_embarque|                nome|sexo_num|local_embarque_num|            features|
+------+------+-----+-------+--------------+--------------------+--------+------------------+--------------------+
|   3.0|  male| 34.5| 7.8292|             Q|    Kelly, Mr. James|     0.0|               2.0|[3.0,34.5,7.82919...|
|   3.0|female| 47.0|    7.0|             S|Wilkes, Mrs. Jame...|     1.0|               0.0|[3.0,47.0,7.0,1.0...|
|   2.0|  male| 62.0| 9.6875|             Q|Myles, Mr. Thomas...|     0.0|               2.0|[2.0,62.0,9.6875,...|
|   3.0|  male| 27.0| 8.6625|             S|    Wirz, Mr. Albert|     0.0|               0.0|[3.0,27.0,8.66250...|
|   3.0|female| 22.0|12.2875|             S|Hirvonen, Mrs. Al...|     1.0|               0.0|[3.0,22.0,12.2875...|
|   3.0|  male| 14.0|  9.225|             S|Svensson, Mr. Joh...|     0.0|      

In [216]:
predictions_voluntario = model.transform(testes_transformado2)

from pyspark.sql import functions as f

predictions_voluntario = predictions_voluntario.withColumn('Sobreviveu',
                                                           when(f.col('prediction') == 1, 'SIM')\
                                                           .when(f.col('prediction') == 0, 'NAO'))

predictions_voluntario.where(col('nome') == nome)\
                    .select('nome','classe', 'sexo', 'idade', 'preco', 
                              'local_embarque', 'sexo', 'probability', 'prediction', 'Sobreviveu').show()

+--------------+------+----+-----+-----+--------------+----+--------------------+----------+----------+
|          nome|classe|sexo|idade|preco|local_embarque|sexo|         probability|prediction|Sobreviveu|
+--------------+------+----+-----+-----+--------------+----+--------------------+----------+----------+
|Douglas Mendes|   3.0|male| 39.0|  7.0|             Q|male|[0.92502114133083...|       0.0|       NAO|
+--------------+------+----+-----+-----+--------------+----+--------------------+----------+----------+