## Taller Spark

Jeimy Aristizabal -  Campo Pinillos

In [1]:
import pyspark
import pandas as pd
import numpy as np

In [2]:
dataset = (sqlContext
                   .read
                   .format('com.databricks.spark.csv')
                   .options(header='true', inferSchema='true')
                   .load('titanic3.csv'))
dataset.cache()
dataset.printSchema()

root
 |-- pclass: integer (nullable = true)
 |-- survived: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- sibsp: integer (nullable = true)
 |-- parch: integer (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: double (nullable = true)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)
 |-- boat: string (nullable = true)
 |-- body: integer (nullable = true)
 |-- home.dest: string (nullable = true)



In [3]:
dataset = dataset.select(['survived', 
                          'pclass', 
                          'name', 
                          'sex', 
                          'age', 
                          'sibsp', 
                          'parch', 
                          'ticket', 
                          'fare', 
                          'cabin', 
                          'embarked'])

In [22]:
(train, test) = dataset.randomSplit([0.7, 0.3], seed=100)
train.dtypes

[('survived', 'int'),
 ('pclass', 'int'),
 ('name', 'string'),
 ('sex', 'string'),
 ('age', 'double'),
 ('sibsp', 'int'),
 ('parch', 'int'),
 ('ticket', 'string'),
 ('fare', 'double'),
 ('cabin', 'string'),
 ('embarked', 'string')]

In [23]:
train.toPandas().head()

Unnamed: 0,survived,pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked
0,0,1,"""Lindeberg-Lind, Mr. Erik Gustaf (""""Mr Edward ...",male,42.0,0,0,17475,26.55,,S
1,0,1,"""Rosenshine, Mr. George (""""Mr George Thorne"""")""",male,46.0,0,0,PC 17585,79.2,,C
2,0,1,"Allison, Miss. Helen Loraine",female,2.0,1,2,113781,151.55,C22 C26,S
3,0,1,"Allison, Mr. Hudson Joshua Creighton",male,30.0,1,2,113781,151.55,C22 C26,S
4,0,1,"Andrews, Mr. Thomas Jr",male,39.0,0,0,112050,0.0,A36,S


In [24]:
countings = train.groupBy("survived").count()
countings.toPandas().head()

Unnamed: 0,survived,count
0,1,359
1,0,551


In [25]:
male_countings = train.filter(train.sex == 'male').groupBy("survived").count()
male_countings.toPandas()

Unnamed: 0,survived,count
0,1,117
1,0,458


In [26]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, when, lit, col

def set_child(age):
  if (age != age) | (age is None):
    return None
  if age >= 18:
    return 0
  if age < 18:
    return 1

udfChild = udf(set_child, IntegerType())
train = train.withColumn('child', udfChild('age'))
test = test.withColumn('child', udfChild('age'))
train.toPandas().head(10)

Unnamed: 0,survived,pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked,child
0,0,1,"""Lindeberg-Lind, Mr. Erik Gustaf (""""Mr Edward ...",male,42.0,0,0,17475,26.55,,S,0.0
1,0,1,"""Rosenshine, Mr. George (""""Mr George Thorne"""")""",male,46.0,0,0,PC 17585,79.2,,C,0.0
2,0,1,"Allison, Miss. Helen Loraine",female,2.0,1,2,113781,151.55,C22 C26,S,1.0
3,0,1,"Allison, Mr. Hudson Joshua Creighton",male,30.0,1,2,113781,151.55,C22 C26,S,0.0
4,0,1,"Andrews, Mr. Thomas Jr",male,39.0,0,0,112050,0.0,A36,S,0.0
5,0,1,"Artagaveytia, Mr. Ramon",male,71.0,0,0,PC 17609,49.5042,,C,0.0
6,0,1,"Beattie, Mr. Thomson",male,36.0,0,0,13050,75.2417,C6,C,0.0
7,0,1,"Birnbaum, Mr. Jakob",male,25.0,0,0,13905,26.0,,C,0.0
8,0,1,"Blackwell, Mr. Stephen Weart",male,45.0,0,0,113784,35.5,T,S,0.0
9,0,1,"Borebank, Mr. John James",male,42.0,0,0,110489,26.55,D22,S,0.0


In [27]:
child_countings = train.filter(train.child == 1).groupBy("survived").count()
child_countings.toPandas()

Unnamed: 0,survived,count
0,1,64
1,0,48


In [28]:
def set_gender(gender):
  if gender == 'male':
    return 0
  else:
    return 1
    
udfGender = udf(set_gender, IntegerType())
train = train.withColumn('sex', udfGender('sex'))
test = test.withColumn('sex', udfGender('sex'))
train.limit(10).toPandas()

Unnamed: 0,survived,pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked,child
0,0,1,"""Lindeberg-Lind, Mr. Erik Gustaf (""""Mr Edward ...",0,42.0,0,0,17475,26.55,,S,0
1,0,1,"""Rosenshine, Mr. George (""""Mr George Thorne"""")""",0,46.0,0,0,PC 17585,79.2,,C,0
2,0,1,"Allison, Miss. Helen Loraine",1,2.0,1,2,113781,151.55,C22 C26,S,1
3,0,1,"Allison, Mr. Hudson Joshua Creighton",0,30.0,1,2,113781,151.55,C22 C26,S,0
4,0,1,"Andrews, Mr. Thomas Jr",0,39.0,0,0,112050,0.0,A36,S,0
5,0,1,"Artagaveytia, Mr. Ramon",0,71.0,0,0,PC 17609,49.5042,,C,0
6,0,1,"Beattie, Mr. Thomson",0,36.0,0,0,13050,75.2417,C6,C,0
7,0,1,"Birnbaum, Mr. Jakob",0,25.0,0,0,13905,26.0,,C,0
8,0,1,"Blackwell, Mr. Stephen Weart",0,45.0,0,0,113784,35.5,T,S,0
9,0,1,"Borebank, Mr. John James",0,42.0,0,0,110489,26.55,D22,S,0


In [29]:
test_embarked = train.groupBy("embarked").count()
test_embarked.toPandas()

Unnamed: 0,embarked,count
0,Q,89
1,,1
2,C,176
3,S,644


In [30]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator

indexer = StringIndexer(inputCol="embarked", outputCol="embarkedIndex", handleInvalid="keep")
encoder = OneHotEncoderEstimator(inputCols=["embarkedIndex"], outputCols=["embarkedCategorical"])
pipeline = Pipeline(stages=[indexer, encoder])
pipeline = pipeline.fit(train)
train = pipeline.transform(train)

In [31]:
train.toPandas().head()

Unnamed: 0,survived,pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked,child,embarkedIndex,embarkedCategorical
0,0,1,"""Lindeberg-Lind, Mr. Erik Gustaf (""""Mr Edward ...",0,42.0,0,0,17475,26.55,,S,0.0,0.0,"(1.0, 0.0, 0.0)"
1,0,1,"""Rosenshine, Mr. George (""""Mr George Thorne"""")""",0,46.0,0,0,PC 17585,79.2,,C,0.0,1.0,"(0.0, 1.0, 0.0)"
2,0,1,"Allison, Miss. Helen Loraine",1,2.0,1,2,113781,151.55,C22 C26,S,1.0,0.0,"(1.0, 0.0, 0.0)"
3,0,1,"Allison, Mr. Hudson Joshua Creighton",0,30.0,1,2,113781,151.55,C22 C26,S,0.0,0.0,"(1.0, 0.0, 0.0)"
4,0,1,"Andrews, Mr. Thomas Jr",0,39.0,0,0,112050,0.0,A36,S,0.0,0.0,"(1.0, 0.0, 0.0)"


In [32]:
train.select("embarkedCategorical").show()

+-------------------+
|embarkedCategorical|
+-------------------+
|      (3,[0],[1.0])|
|      (3,[1],[1.0])|
|      (3,[0],[1.0])|
|      (3,[0],[1.0])|
|      (3,[0],[1.0])|
|      (3,[1],[1.0])|
|      (3,[1],[1.0])|
|      (3,[1],[1.0])|
|      (3,[0],[1.0])|
|      (3,[0],[1.0])|
|      (3,[0],[1.0])|
|      (3,[1],[1.0])|
|      (3,[0],[1.0])|
|      (3,[0],[1.0])|
|      (3,[0],[1.0])|
|      (3,[0],[1.0])|
|      (3,[0],[1.0])|
|      (3,[0],[1.0])|
|      (3,[0],[1.0])|
|      (3,[1],[1.0])|
+-------------------+
only showing top 20 rows



In [33]:
test = pipeline.transform(test)

In [34]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=["age", "fare"], outputCols=["age", "fare"], strategy='median')
model = imputer.fit(train)
train = model.transform(train)
test = model.transform(test)

In [35]:
train.toPandas().head()

Unnamed: 0,survived,pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked,child,embarkedIndex,embarkedCategorical
0,0,1,"""Lindeberg-Lind, Mr. Erik Gustaf (""""Mr Edward ...",0,42.0,0,0,17475,26.55,,S,0.0,0.0,"(1.0, 0.0, 0.0)"
1,0,1,"""Rosenshine, Mr. George (""""Mr George Thorne"""")""",0,46.0,0,0,PC 17585,79.2,,C,0.0,1.0,"(0.0, 1.0, 0.0)"
2,0,1,"Allison, Miss. Helen Loraine",1,2.0,1,2,113781,151.55,C22 C26,S,1.0,0.0,"(1.0, 0.0, 0.0)"
3,0,1,"Allison, Mr. Hudson Joshua Creighton",0,30.0,1,2,113781,151.55,C22 C26,S,0.0,0.0,"(1.0, 0.0, 0.0)"
4,0,1,"Andrews, Mr. Thomas Jr",0,39.0,0,0,112050,0.0,A36,S,0.0,0.0,"(1.0, 0.0, 0.0)"


In [36]:
test.toPandas().head()

Unnamed: 0,survived,pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked,child,embarkedIndex,embarkedCategorical
0,0,1,"Allison, Mrs. Hudson J C (Bessie Waldo Daniels)",1,25.0,1,2,113781,151.55,C22 C26,S,0.0,0.0,"(1.0, 0.0, 0.0)"
1,0,1,"Astor, Col. John Jacob",0,47.0,1,0,PC 17757,227.525,C62 C64,C,0.0,1.0,"(0.0, 1.0, 0.0)"
2,0,1,"Baumann, Mr. John D",0,28.0,0,0,PC 17318,25.925,,S,,0.0,"(1.0, 0.0, 0.0)"
3,0,1,"Baxter, Mr. Quigg Edmond",0,24.0,0,1,PC 17558,247.5208,B58 B60,C,0.0,1.0,"(0.0, 1.0, 0.0)"
4,0,1,"Brandeis, Mr. Emil",0,48.0,0,0,PC 17591,50.4958,B10,C,0.0,1.0,"(0.0, 1.0, 0.0)"


In [37]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

target_labeler = StringIndexer(inputCol='survived', outputCol='label').fit(train)

train_set = target_labeler.transform(train)

In [38]:
assembler = VectorAssembler(
 inputCols=['pclass', 'sex','age', 'fare', 'embarkedCategorical'],
 outputCol="features")

train_set = assembler.transform(train_set)
test_set = assembler.transform(test)
test_set = target_labeler.transform(test_set)

In [39]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def evaluate(model, dataset):
  predictions = model.transform(dataset)
  #display(predictions)
  evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                                metricName="accuracy")
  accuracy = evaluator.evaluate(predictions)
  print("Set accuracy = " + str(accuracy))

In [66]:
from pyspark.ml.classification import DecisionTreeClassifier

model = DecisionTreeClassifier(maxDepth=10).fit(train_set)
evaluate(model, train_set)

Set accuracy = 0.8989010989010989


In [67]:
evaluate(model, test_set)

Set accuracy = 0.7894736842105263


In [68]:
print(model.toDebugString)

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_6b50a49609f7) of depth 10 with 215 nodes
  If (feature 1 <= 0.5)
   If (feature 2 <= 9.5)
    If (feature 0 <= 2.5)
     Predict: 1.0
    Else (feature 0 > 2.5)
     If (feature 3 <= 21.0375)
      If (feature 3 <= 12.9375)
       Predict: 1.0
      Else (feature 3 > 12.9375)
       If (feature 3 <= 15.3729)
        Predict: 0.0
       Else (feature 3 > 15.3729)
        Predict: 1.0
     Else (feature 3 > 21.0375)
      If (feature 2 <= 3.5)
       If (feature 3 <= 24.808349999999997)
        Predict: 0.0
       Else (feature 3 > 24.808349999999997)
        If (feature 3 <= 34.8271)
         Predict: 1.0
        Else (feature 3 > 34.8271)
         Predict: 0.0
      Else (feature 2 > 3.5)
       Predict: 0.0
   Else (feature 2 > 9.5)
    If (feature 0 <= 1.5)
     If (feature 2 <= 35.5)
      If (feature 2 <= 33.5)
       If (feature 2 <= 17.5)
        Predict: 1.0
       Else (feature 2 > 17.5)
        If (feature 3 <= 90.539

In [43]:
predictions = model.transform(test_set)
predictions.toPandas().head()

Unnamed: 0,survived,pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked,child,embarkedIndex,embarkedCategorical,features,label,rawPrediction,probability,prediction
0,0,1,"Allison, Mrs. Hudson J C (Bessie Waldo Daniels)",1,25.0,1,2,113781,151.55,C22 C26,S,0.0,0.0,"(1.0, 0.0, 0.0)","[1.0, 1.0, 25.0, 151.55, 1.0, 0.0, 0.0]",0.0,"[15.0, 171.0]","[0.08064516129032258, 0.9193548387096774]",1.0
1,0,1,"Astor, Col. John Jacob",0,47.0,1,0,PC 17757,227.525,C62 C64,C,0.0,1.0,"(0.0, 1.0, 0.0)","[1.0, 0.0, 47.0, 227.525, 0.0, 1.0, 0.0]",0.0,"[447.0, 98.0]","[0.8201834862385321, 0.1798165137614679]",0.0
2,0,1,"Baumann, Mr. John D",0,28.0,0,0,PC 17318,25.925,,S,,0.0,"(1.0, 0.0, 0.0)","[1.0, 0.0, 28.0, 25.925, 1.0, 0.0, 0.0]",0.0,"[447.0, 98.0]","[0.8201834862385321, 0.1798165137614679]",0.0
3,0,1,"Baxter, Mr. Quigg Edmond",0,24.0,0,1,PC 17558,247.5208,B58 B60,C,0.0,1.0,"(0.0, 1.0, 0.0)","[1.0, 0.0, 24.0, 247.5208, 0.0, 1.0, 0.0]",0.0,"[447.0, 98.0]","[0.8201834862385321, 0.1798165137614679]",0.0
4,0,1,"Brandeis, Mr. Emil",0,48.0,0,0,PC 17591,50.4958,B10,C,0.0,1.0,"(0.0, 1.0, 0.0)","[1.0, 0.0, 48.0, 50.4958, 0.0, 1.0, 0.0]",0.0,"[447.0, 98.0]","[0.8201834862385321, 0.1798165137614679]",0.0


In [47]:
predictions.select("age","child", "survived").toPandas()

Unnamed: 0,age,child,survived
0,25.0,0.0,0
1,47.0,0.0,0
2,28.0,,0
3,24.0,0.0,0
4,48.0,0.0,0
5,28.0,,0
6,33.0,0.0,0
7,17.0,1.0,0
8,28.0,,0
9,27.0,0.0,0


## Solución Taller

In [58]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [59]:
model = RandomForestClassifier()
paramGrid = ParamGridBuilder().addGrid(model.numTrees, [5, 10, 15, 20, 25, 50]).build()

In [60]:
crossval = CrossValidator(estimator=model,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy"),
                          numFolds=5)

In [61]:
cvModel = crossval.fit(train_set)

In [63]:
evaluate(cvModel, test_set)

Set accuracy = 0.7944862155388471


In [64]:
exploration = sqlContext.createDataFrame(zip([5, 10, 15, 20, 25, 50], cvModel.avgMetrics), ['numTrees', 'score'])
exploration.toPandas()

Unnamed: 0,numTrees,score
0,5,0.799462
1,10,0.789206
2,15,0.784529
3,20,0.798544
4,25,0.798429
5,50,0.798392


In [65]:
display(exploration)

DataFrame[numTrees: bigint, score: double]