import pandas as pd
# ^^^ pyforest auto-imports - don't write above this line
# Logistic Regression Code Along
This is a code along of the famous titanic dataset, its always nice to start off with this dataset because it is an example you will find across pretty much every data analysis language.

In [44]:
from pyspark.sql import SparkSession

In [45]:
spark = SparkSession.builder.appName('myproj').getOrCreate()

In [46]:
data = spark.read.csv('titanic.csv',inferSchema=True,header=True)

In [47]:
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [48]:
data.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [49]:
my_cols = data.select(['Survived',
 'Pclass',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Embarked'])

In [50]:
my_final_data = my_cols.na.drop()

### Working with Categorical Columns

Let's break this down into multiple steps to make it all clear.

In [51]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer)

In [52]:
gender_indexer = StringIndexer(inputCol='Sex',outputCol='SexIndex')
gender_encoder = OneHotEncoder(inputCol='SexIndex',outputCol='SexVec')

In [53]:
embark_indexer = StringIndexer(inputCol='Embarked',outputCol='EmbarkIndex')
embark_encoder = OneHotEncoder(inputCol='EmbarkIndex',outputCol='EmbarkVec')

In [54]:
assembler = VectorAssembler(inputCols=['Pclass',
 'SexVec',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'EmbarkVec'],outputCol='features')

In [55]:
from pyspark.ml.classification import LogisticRegression

## Pipelines 

Let's see an example of how to use pipelines (we'll get a lot more practice with these later!)

In [56]:
from pyspark.ml import Pipeline

In [57]:
log_reg_titanic = LogisticRegression(featuresCol='features',labelCol='Survived')

In [58]:
pipeline = Pipeline(stages=[gender_indexer,embark_indexer,
                           gender_encoder,embark_encoder,
                           assembler,log_reg_titanic])

In [59]:
train_titanic_data, test_titanic_data = my_final_data.randomSplit([0.7,.3])

In [64]:
train_titanic_data.show(3)

+--------+------+------+----+-----+-----+-------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+------+----+-----+-----+-------+--------+
|       0|     1|female| 2.0|    1|    2| 151.55|       S|
|       0|     1|female|50.0|    0|    0|28.7125|       C|
|       0|     1|  male|18.0|    1|    0|  108.9|       C|
+--------+------+------+----+-----+-----+-------+--------+
only showing top 3 rows



In [67]:
pipeline.fit(train_titanic_data)

PipelineModel_7ae339290964

In [60]:
fit_model = pipeline.fit(train_titanic_data)

In [22]:
results = fit_model.transform(test_titanic_data)

In [23]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [24]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='Survived')

In [26]:
results.select('Survived','prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
+--------+----------+
only showing top 20 rows



In [27]:
AUC = my_eval.evaluate(results)

In [28]:
AUC

0.7918269230769232

## Great Job!

In [56]:
import pyspark
import findspark
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import (VectorAssembler, StringIndexer, OneHotEncoder, MinMaxScaler)
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [32]:
spark=SparkSession.builder.getOrCreate()

In [33]:
df=spark.read.csv('titanic.csv', inferSchema=True, header=True)

In [34]:
df.show(3)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
only showing top 3 rows



In [35]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [36]:
df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [37]:
df=df.select(['Survived',
 'Pclass',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Embarked'])

In [38]:
df.show(3)

+--------+------+------+----+-----+-----+-------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+------+----+-----+-----+-------+--------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|
|       1|     1|female|38.0|    1|    0|71.2833|       C|
|       1|     3|female|26.0|    0|    0|  7.925|       S|
+--------+------+------+----+-----+-----+-------+--------+
only showing top 3 rows



In [39]:
str_indx=StringIndexer(inputCol='Embarked', outputCol='emb_str_index', handleInvalid='skip')
one_hot_indx=OneHotEncoder(inputCol='emb_str_index', outputCol='one_hot_emb_str_index')

In [40]:
str_indx_1=StringIndexer(inputCol='Sex', outputCol='Sex_str_index', handleInvalid='skip')
one_hot_indx_1=OneHotEncoder(inputCol='Sex_str_index', outputCol='one_hot_sex_str_index')

In [41]:
one_hot_indx_2=OneHotEncoder(inputCol='Pclass', outputCol='Pclass_one_hot')

In [42]:
assembler=VectorAssembler(inputCols=['one_hot_emb_str_index','one_hot_sex_str_index','SibSp','Parch','Fare'], outputCol='features')

In [43]:
norm=Normalizer(inputCol='features', outputCol='norm_vec')

In [57]:
norm=MinMaxScaler(inputCol='features', outputCol='minmax_vec')

In [58]:
rfc=RandomForestClassifier(featuresCol='minmax_vec', labelCol='Survived')

In [59]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [60]:
grid=ParamGridBuilder()

In [61]:
grid=grid.addGrid(rfc.maxDepth, [2,4,8]).addGrid(rfc.numTrees,[100,200,500])

In [62]:
grid=grid.build()

In [63]:
evalu=MulticlassClassificationEvaluator(labelCol='Survived',metricName='accuracy')

In [64]:
gd_cv=CrossValidator(estimator=rfc,estimatorParamMaps=grid, numFolds=5, evaluator=evalu)

In [65]:
pipe=Pipeline(stages=[str_indx,
                     str_indx_1,
                      one_hot_indx,
                     one_hot_indx_1,
                     one_hot_indx_2,
                     assembler,
                     norm,
                     gd_cv])

In [66]:
train, test= df.randomSplit([0.7,0.3], seed=1429)

In [67]:
cls=pipe.fit(train)

In [68]:
pred=cls.transform(test)

In [69]:
evalu.evaluate(pred)

0.8091872791519434

In [1]:
import pyspark
import findspark
from pyspark.sql import SparkSession
from pyspark.ml.classification import (RandomForestClassifier, MultilayerPerceptronClassifier)
from pyspark.ml.feature import (StringIndexer, VectorAssembler, OneHotEncoder, MinMaxScaler)
from pyspark.ml.evaluation import (BinaryClassificationEvaluator, MulticlassClassificationEvaluator)
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.tuning import (CrossValidator, ParamGridBuilder)

In [2]:
spark=SparkSession.builder.getOrCreate()

In [3]:
df=spark.read.csv('titanic.csv', header=True, inferSchema=True)

In [4]:
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [5]:
df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [6]:
df=df.select([
 'Pclass','Sex',
 'Age',
 'SibSp',
 'Parch','Fare','Embarked','Survived',])

In [7]:
str_sex=StringIndexer(inputCol='Sex', outputCol='sex_str', handleInvalid='skip')
onh_sex=OneHotEncoder(dropLast=True, inputCol='sex_str', outputCol='str_sex_oh')

In [8]:
str_emb=StringIndexer(inputCol='Embarked', outputCol='emb_str',handleInvalid='skip')
onh_emb=OneHotEncoder(dropLast=True, inputCol='emb_str', outputCol='str_emb_oh')

In [9]:
onh_Pc=OneHotEncoder(dropLast=True, inputCol='Pclass', outputCol='pc_oh')

In [10]:
vec=VectorAssembler(inputCols=['str_sex_oh',
                              'str_emb_oh',
                              'pc_oh',
                              'Age',
                              'SibSp',
                              'Parch','Fare'], outputCol='Features' , handleInvalid='skip')

In [11]:
scaler=MinMaxScaler(inputCol='Features', outputCol='scaled_features')

In [12]:
rfc=RandomForestClassifier(featuresCol='scaled_features', labelCol='Survived')

In [13]:
grid=ParamGridBuilder()

In [15]:
grid=grid.addGrid(rfc.maxDepth, [2,4,8]).addGrid(rfc.numTrees,[100,200,500]).addGrid(rfc.impurity,['Gini','Entropy'])

In [16]:
grid=grid.build()

In [12]:
evalu=MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='Survived', metricName='f1')

In [18]:
gr_cv=CrossValidator(estimator=rfc,estimatorParamMaps=grid,evaluator=evalu, numFolds=10)

In [19]:
pipe=Pipeline(stages=[str_sex,
                     onh_sex,
                     str_emb,
                     onh_emb,
                     onh_Pc,
                     vec,
                    scaler,
                     gr_cv])

In [13]:
train, test= df.randomSplit([0.7,0.3], seed=1429)

In [21]:
cls=pipe.fit(train)

In [23]:
pred=cls.transform(test)

In [37]:
cls.stages[7].bestModel.extractParamMap()

{Param(parent='RandomForestClassifier_84fb9c5395a6', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees.'): False,
 Param(parent='RandomForestClassifier_84fb9c5395a6', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext'): 10,
 Param(parent='RandomForestClassifier_84fb9c5395a6', name='featureSubsetStrategy', doc='The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n].'): 'auto',
 Param(parent='RandomForestClassifier_84fb9c5395a6', name='featuresCol', doc='features column name'): 'scaled_features',
 Param(parent='RandomForestClassifier_84fb9c53

In [24]:
evalu.evaluate(pred)

0.7683226699770817