# PySpark - MLlib

### Scikit-learn
1. *Ease of Use and Accessibility*: It provides simple and consistent API that is easy for both beginners and experienced pratitoiners to understand and work with. This makes it an excellent choice for prototyping and experimentation.
2. *Rich Set of Algorithms and Tools*: It offers a wide range of machine learning algorithms, including classification, regression, clustering dimensionallity reduction, and more. Additionally, it provides various pre-processing and model evaluation tools.

### Spark MLlib
1. *Distributed Computing*: It allows to leverage distributed computing acrsoss a cluster of machines. The distributed architecture can handle large-scale datasets and complex computations efficiently. It's a great choice for big data processing and scalable machine learning tasks.
2. *Integration with Big Data Ecosystem*: Spark MLlib also integrates with other components of Apache Spark ecosystem, such as Spark SQL for data processing and Spark Streaming for real-time data analysis.

How Is Scikit-Learn Compared with Apache Spark’s MLlib? (n.d.). Quora. https://www.quora.com/How-is-scikit-learn-compared-with-Apache-Sparks-MLlib

## Analyze Titanic Dataset using `pyspark.ml`

This is a famous dataset for machine learning. A description of the dataset can be found at the [kaggle website](https://www.kaggle.com/c/titanic/data). In the following, we apply the logistic regression model from pyspark.ml package to this dataset. The goal is to predict the survival of passagers on board titanic, and to use the pipeline and feature engineering tools from pyspark.ml. 

![Data Dictionary](https://i.ibb.co/mhY89DG/titanic-datadictionary.png)

### Step 1: download and load data

2\. Download `titanic.csv` from [http://idsdl.csom.umn.edu/c/share/msba6330/titanic.csv](http://idsdl.csom.umn.edu/c/share/msba6330/titanic.csv) to the local host on your driver node. 

Then load the data into a Spark DataFrame `titanic`.

In [0]:
%%bash
rm -f titanic.csv
wget  http://idsdl.csom.umn.edu/c/share/msba6330/titanic.csv

--2023-10-16 04:32:20--  http://idsdl.csom.umn.edu/c/share/msba6330/titanic.csv
Resolving idsdl.csom.umn.edu (idsdl.csom.umn.edu)... 134.84.138.46, 2607:ea00:101:480a:250:56ff:febb:e76b
Connecting to idsdl.csom.umn.edu (idsdl.csom.umn.edu)|134.84.138.46|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60302 (59K) [text/csv]
Saving to: ‘titanic.csv’

     0K .......... .......... .......... .......... .......... 84%  597K 0s
    50K ........                                              100%  136M=0.08s

2023-10-16 04:32:20 (703 KB/s) - ‘titanic.csv’ saved [60302/60302]



In [0]:
!ls

azure  eventlogs  hadoop_accessed_config.lst  preload_class.lst
conf   ganglia	  logs			      titanic.csv


3\. Verify the schema and display first 5 rows of data, making sure the fields have proper names and data types.
- **tip**: using Spark DataFrame's `toPandas()` for a nicer presentation of data.

In [0]:
titanic = spark.read.csv("file:/databricks/driver/titanic.csv", header=True, inferSchema=True)
titanic.limit(5).toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


In [0]:
titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



### Step 2. Data exploration
the goal of this step is to familiarize yourself with the dataset
- detect data problems
- inform the data engineering steps
- inform the feature selection

4\. Obtain summary statistics on the dataframe, which will inform our data processing strategies
- pay attention to whether there are missing data
- whether a field appears to be continuous or discrete

Missing value field:
- Age: continous
- Cabin: discrete
- Embarked: discrete

In [0]:
titanic.describe().display()

summary,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
count,891.0,891.0,891.0,891,891,714.0,891.0,891.0,891,891.0,204,889
mean,446.0,0.3838383838383838,2.308641975308642,,,29.69911764705882,0.5230078563411896,0.3815937149270482,260318.54916792738,32.2042079685746,,
stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,,,14.526497332334037,1.1027434322934315,0.8060572211299488,471609.26868834975,49.69342859718089,,
min,1.0,0.0,1.0,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""",female,0.42,0.0,0.0,110152,0.0,A10,C
max,891.0,1.0,3.0,"van Melkebeke, Mr. Philemon",male,80.0,8.0,6.0,WE/P 5735,512.3292,T,S


In [0]:
## find missing data
from pyspark.sql.functions import count, when, col
missing_data = titanic.select([count(when(col(c).isNull(), c)).alias(c) for c in titanic.columns])
missing_data.display()

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,0,0,0,0,177,0,0,0,0,687,2


5\. Value-frequency for Categorical Columns. 

For all columns except `name`,`ticket`, `Age` and `Fare` (these fields either are not good features for analysis or they are continuous variables). 
- Print the **10-most frequent values** in the column, along with the count of the values' appearances, ordered by the descending order of the count.

For example, for the Sex column, the value-frequency table could be (numbers made up)

Sex | Count
--|--
male | 52
female | 38


Such a value-frequency table is useful for
- deciding whether a string column can be treated as a categorical variable
- detecting errors and missing values in such columns

*Tip: since we are doing this repeatedly, it may be useful for you to write a function*

In [0]:
from pyspark.sql.functions import col, desc
exclude_columns = ['Name', 'Ticket', 'Age', 'Fare']
for col_name in titanic.columns:
    if col_name not in exclude_columns:
        print(f"Top 10 most frequent values in column '{col_name}':")
        t = titanic.groupBy(col_name).count().orderBy(desc("count"))
        t.limit(10).display()

Top 10 most frequent values in column 'PassengerId':


PassengerId,count
148,1
463,1
471,1
496,1
833,1
243,1
392,1
540,1
623,1
737,1


Top 10 most frequent values in column 'Survived':


Survived,count
0,549
1,342


Top 10 most frequent values in column 'Pclass':


Pclass,count
3,491
1,216
2,184


Top 10 most frequent values in column 'Sex':


Sex,count
male,577
female,314


Top 10 most frequent values in column 'SibSp':


SibSp,count
0,608
1,209
2,28
4,18
3,16
8,7
5,5


Top 10 most frequent values in column 'Parch':


Parch,count
0,678
1,118
2,80
3,5
5,5
4,4
6,1


Top 10 most frequent values in column 'Cabin':


Cabin,count
,687
B96 B98,4
G6,4
C23 C25 C27,4
F2,3
C22 C26,3
D,3
E101,3
F33,3
C65,2


Top 10 most frequent values in column 'Embarked':


Embarked,count
S,644
C,168
Q,77
,2


6\. **Distribution of numerical columns**: 

Calculate and display an **approximate histogram** of `age` column; that is:

- Use DataFrame API to calculate how many rows fall into each of the age bins that round to 0, 10, 20, 30, and so on. 
- Visualize the results in a bar chart using Databricks' built-in chart tool.

Then display a histogram of `Age` using the Databricks' histogram chart tool, using a random 50% of the data.

In [0]:
from pyspark.sql.functions import round
age_group = titanic.select(round(titanic.Age, -1).alias("age_round")).groupBy("age_round").count().orderBy("age_round").display()

age_round,count
,177
0.0,40
10.0,38
20.0,200
30.0,201
40.0,120
50.0,73
60.0,31
70.0,10
80.0,1


Databricks visualization. Run in Databricks to view.

In [0]:
# Histogram version
titanic.select('Age').sample(False, 0.5).display()

Age
22.0
26.0
""
54.0
27.0
14.0
4.0
58.0
39.0
14.0


Databricks visualization. Run in Databricks to view.

### Step 3. Some feature engineering
the goal of this step is to do necessary feature engineering. Note that currently, `pyspark.ml.feature` provides a few feature engineering tools such as:
- stringindexer: for convert string labels into numerical labels (0, 1,...), ordered by label frequencies
- one-hot-encoder: mapping a column of category indices to a column of binary vectors.
- vector assembler: merges multiple columns into one vector columns needed for most algorithms

here we will focus on
- dealing with missing values
- creating new columns
- converting data types

7\. Create a new data frame `titanic_cast` that 
- consists of only the features and label (`survived`) columns that are plausible and will be used. 
- converts all numerical columns into double type. The reason we convert all numeric types to doubles is that it seems that PySpark sometimes (in earlier Spark versions) does not work well with other numerical types. 

Then show the first 10 rows and verify the schema.

In [0]:
from pyspark.sql.functions import split, length, when, expr, size, regexp_replace
from pyspark.sql.types import DoubleType
titanic_cast = titanic.drop(*['PassengerId','Name','Ticket']).withColumn("Pclass", titanic.Pclass.cast(DoubleType())).withColumn("SibSp", titanic.SibSp.cast(DoubleType())).withColumn("Parch", titanic.Parch.cast(DoubleType()))
titanic_cast.limit(10).display()

Survived,Pclass,Sex,Age,SibSp,Parch,Fare,Cabin,Embarked
0,3.0,male,22.0,1.0,0.0,7.25,,S
1,1.0,female,38.0,1.0,0.0,71.2833,C85,C
1,3.0,female,26.0,0.0,0.0,7.925,,S
1,1.0,female,35.0,1.0,0.0,53.1,C123,S
0,3.0,male,35.0,0.0,0.0,8.05,,S
0,3.0,male,,0.0,0.0,8.4583,,Q
0,1.0,male,54.0,0.0,0.0,51.8625,E46,S
0,3.0,male,2.0,3.0,1.0,21.075,,S
1,3.0,female,27.0,0.0,2.0,11.1333,,S
1,2.0,female,14.0,1.0,0.0,30.0708,,C


In [0]:
titanic_cast.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: double (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: double (nullable = true)
 |-- Parch: double (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



8\. From the summary statistics, we notice that there are missing values in the `Age` and `Embarked` columns. One way of dealing with this is to drop rows with missing values. Alternatively, we may use a mean replacement strategy. 

Create a new dataframe `titanic_final` after you

- replace the null values in the `Age` column with the mean age of `29.69911764705882`. 
- create a new binary indicator column **AgeNA** to denote whether the `Age` value was imputed (**hint**: you may use isNull() function from pyspark.sql.functions or the isNull() column method). This column needs to be a double type. (The reason we include this indicator is to allow for the case where missing age carries a special meaning, thus AgeNA could be potent predictor).
- drop the rows with missing values in the `Embarked` column

In [0]:
from pyspark.sql.functions import when, mean
titanic_final = titanic_cast.withColumn("AgeNA", when(titanic_cast.Age.isNull() == True, 1).otherwise(0)).na.fill("Unknown", subset=['Cabin']).na.fill(titanic_cast.select(mean('Age')).first()[0], subset=['Age']).dropna(subset=['Embarked'])

In [0]:
titanic_final.limit(10).display()

Survived,Pclass,Sex,Age,SibSp,Parch,Fare,Cabin,Embarked,AgeNA
0,3.0,male,22.0,1.0,0.0,7.25,Unknown,S,0
1,1.0,female,38.0,1.0,0.0,71.2833,C85,C,0
1,3.0,female,26.0,0.0,0.0,7.925,Unknown,S,0
1,1.0,female,35.0,1.0,0.0,53.1,C123,S,0
0,3.0,male,35.0,0.0,0.0,8.05,Unknown,S,0
0,3.0,male,29.69911764705882,0.0,0.0,8.4583,Unknown,Q,1
0,1.0,male,54.0,0.0,0.0,51.8625,E46,S,0
0,3.0,male,2.0,3.0,1.0,21.075,Unknown,S,0
1,3.0,female,27.0,0.0,2.0,11.1333,Unknown,S,0
1,2.0,female,14.0,1.0,0.0,30.0708,Unknown,C,0


9\. Verify the new DataFrame by printing its schema and descriptive statistics (to ascertain there are no missing values).

In [0]:
titanic_final.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: double (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: double (nullable = true)
 |-- Parch: double (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = false)
 |-- Embarked: string (nullable = true)
 |-- AgeNA: integer (nullable = false)



In [0]:
titanic_final.describe().display()

summary,Survived,Pclass,Sex,Age,SibSp,Parch,Fare,Cabin,Embarked,AgeNA
count,889.0,889.0,889,889.0,889.0,889.0,889.0,889,889,889.0
mean,0.3824521934758155,2.3115860517435323,,29.653446370674192,0.5241844769403825,0.3824521934758155,32.09668087739029,,,0.1991001124859392
stddev,0.4862596883147733,0.8346997785705753,,12.968366309252314,1.103704875596923,0.8067607445174785,49.69750431670795,,,0.3995482811002537
min,0.0,1.0,female,0.42,0.0,0.0,0.0,A10,C,0.0
max,1.0,3.0,male,80.0,8.0,6.0,512.3292,Unknown,S,1.0


In [0]:
missing_data = titanic_final.select([count(when(col(c).isNull(), c)).alias(c) for c in titanic_final.columns])
missing_data.display()

Survived,Pclass,Sex,Age,SibSp,Parch,Fare,Cabin,Embarked,AgeNA
0,0,0,0,0,0,0,0,0,0


### Step 4: Indexers and Encoders for string/categorical columns (for use with Pipeline)

String columns cannot be directly in with some of the models such as LogisticRegression. Neither are categorical columns. Our strategy is to convert string and categorical variables into a series of binary dummies. Fortunately, these steps are already provided as part of pyspark.ml.features. 

To convert indexed categorical values into a vector of binary indicators, we can leverage `OneHotEncoder`

Finally, we need to assemble a vector column with all the numerical features. This is achieved by `VectorAssembler`


Click on each below to familiar with them if you need further information.
- [StringIndexer](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.StringIndexer): for convert string labels into numerical labels (0, 1,...), ordered by label frequencies
- [OneHotEncoderEstimator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.OneHotEncoderEstimator): mapping a column of category indices to a column of binary vectors (the least frequent one will be dropped by default).
- [VectorAssembler](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler): merges multiple columns into one vector columns needed for most algorithms

10\. Create a string indexer `si` for indexing the string features (the resultant columns should have a postfix of `_idx`)

Then create a OneHotEncoder `ohe` for indexed string columns (the resultant columns should have a postfix of `_ohe`):

You should test them by appying these transformations and display the first 5 rows after each transformation.

In [0]:
titanic_final.columns

Out[68]: ['Survived',
 'Pclass',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Cabin',
 'Embarked',
 'AgeNA']

In [0]:
from pyspark.ml.feature import StringIndexer
si = StringIndexer(inputCols = ['Sex','Embarked'], outputCols=["Sex_idx","Embarked_idx"])

In [0]:
train_si = si.fit(titanic_final).transform(titanic_final)
train_si.limit(5).display()

Survived,Pclass,Sex,Age,SibSp,Parch,Fare,Cabin,Embarked,AgeNA,Sex_idx,Embarked_idx
0,3.0,male,22.0,1.0,0.0,7.25,Unknown,S,0,0.0,0.0
1,1.0,female,38.0,1.0,0.0,71.2833,C85,C,0,1.0,1.0
1,3.0,female,26.0,0.0,0.0,7.925,Unknown,S,0,1.0,0.0
1,1.0,female,35.0,1.0,0.0,53.1,C123,S,0,1.0,0.0
0,3.0,male,35.0,0.0,0.0,8.05,Unknown,S,0,0.0,0.0


In [0]:
from pyspark.ml.feature import OneHotEncoder
ohe = OneHotEncoder(inputCols = ['Sex_idx','Embarked_idx'], outputCols=["Sex_ohe","Embarked_ohe"])

In [0]:
train_ohe = ohe.fit(train_si).transform(train_si)
train_ohe.limit(5).display()

Survived,Pclass,Sex,Age,SibSp,Parch,Fare,Cabin,Embarked,AgeNA,Sex_idx,Embarked_idx,Sex_ohe,Embarked_ohe
0,3.0,male,22.0,1.0,0.0,7.25,Unknown,S,0,0.0,0.0,"Map(indices -> List(0), length -> 1, values -> List(1.0), vectorType -> sparse)","Map(indices -> List(0), length -> 2, values -> List(1.0), vectorType -> sparse)"
1,1.0,female,38.0,1.0,0.0,71.2833,C85,C,0,1.0,1.0,"Map(indices -> List(), length -> 1, values -> List(), vectorType -> sparse)","Map(indices -> List(1), length -> 2, values -> List(1.0), vectorType -> sparse)"
1,3.0,female,26.0,0.0,0.0,7.925,Unknown,S,0,1.0,0.0,"Map(indices -> List(), length -> 1, values -> List(), vectorType -> sparse)","Map(indices -> List(0), length -> 2, values -> List(1.0), vectorType -> sparse)"
1,1.0,female,35.0,1.0,0.0,53.1,C123,S,0,1.0,0.0,"Map(indices -> List(), length -> 1, values -> List(), vectorType -> sparse)","Map(indices -> List(0), length -> 2, values -> List(1.0), vectorType -> sparse)"
0,3.0,male,35.0,0.0,0.0,8.05,Unknown,S,0,0.0,0.0,"Map(indices -> List(0), length -> 1, values -> List(1.0), vectorType -> sparse)","Map(indices -> List(0), length -> 2, values -> List(1.0), vectorType -> sparse)"


In [0]:
train_ohe.columns

Out[73]: ['Survived',
 'Pclass',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Cabin',
 'Embarked',
 'AgeNA',
 'Sex_idx',
 'Embarked_idx',
 'Sex_ohe',
 'Embarked_ohe']

### Step 5: Assemble feature columns into a feature vector (for use with the pipeline)
11\. Assemble all features into one vector column, we shall the new column `features`

- note that both `Age` and `AgeNA` should be included.
- For `Sex` and `Embarked`, only include the output columns of OneHotEncoder for these columns.

In [0]:
from pyspark.ml.feature import VectorAssembler
featureCols = ['Age','AgeNA','Sex_ohe','Pclass','Fare','Sex_ohe','Embarked_ohe']

In [0]:
va = VectorAssembler(inputCols=featureCols, outputCol="features")

In [0]:
train_va = va.transform(train_ohe)
train_va.limit(5).display()

Survived,Pclass,Sex,Age,SibSp,Parch,Fare,Cabin,Embarked,AgeNA,Sex_idx,Embarked_idx,Sex_ohe,Embarked_ohe,features
0,3.0,male,22.0,1.0,0.0,7.25,Unknown,S,0,0.0,0.0,"Map(indices -> List(0), length -> 1, values -> List(1.0), vectorType -> sparse)","Map(indices -> List(0), length -> 2, values -> List(1.0), vectorType -> sparse)","Map(length -> 8, values -> List(22.0, 0.0, 1.0, 3.0, 7.25, 1.0, 1.0, 0.0), vectorType -> dense)"
1,1.0,female,38.0,1.0,0.0,71.2833,C85,C,0,1.0,1.0,"Map(indices -> List(), length -> 1, values -> List(), vectorType -> sparse)","Map(indices -> List(1), length -> 2, values -> List(1.0), vectorType -> sparse)","Map(indices -> List(0, 3, 4, 7), length -> 8, values -> List(38.0, 1.0, 71.2833, 1.0), vectorType -> sparse)"
1,3.0,female,26.0,0.0,0.0,7.925,Unknown,S,0,1.0,0.0,"Map(indices -> List(), length -> 1, values -> List(), vectorType -> sparse)","Map(indices -> List(0), length -> 2, values -> List(1.0), vectorType -> sparse)","Map(indices -> List(0, 3, 4, 6), length -> 8, values -> List(26.0, 3.0, 7.925, 1.0), vectorType -> sparse)"
1,1.0,female,35.0,1.0,0.0,53.1,C123,S,0,1.0,0.0,"Map(indices -> List(), length -> 1, values -> List(), vectorType -> sparse)","Map(indices -> List(0), length -> 2, values -> List(1.0), vectorType -> sparse)","Map(indices -> List(0, 3, 4, 6), length -> 8, values -> List(35.0, 1.0, 53.1, 1.0), vectorType -> sparse)"
0,3.0,male,35.0,0.0,0.0,8.05,Unknown,S,0,0.0,0.0,"Map(indices -> List(0), length -> 1, values -> List(1.0), vectorType -> sparse)","Map(indices -> List(0), length -> 2, values -> List(1.0), vectorType -> sparse)","Map(length -> 8, values -> List(35.0, 0.0, 1.0, 3.0, 8.05, 1.0, 1.0, 0.0), vectorType -> dense)"


### Step 6: Create the LogisticRegression model (for use with the pipeline)
- the documentation of LogisticRegression from pyspark.ml can be [found here](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression).

12\. Create the LogisticRegression estimator `lr`

In [0]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = "features", labelCol="Survived")

### Step 7. Define Pipeline 
13\. put together the stages of the pipeline, with the last step being the logisistic regression

In [0]:
from pyspark.ml import Pipeline
pl = Pipeline(stages = [si, ohe, va, lr])

### Step 8: prepare training and test datasets
14\. Please use a 70-30 random split here for training and testing data sets respectively, then verify their sizes

- for grading purpose, please use `seed=0` for the random split.

In [0]:
train, test = titanic_final.randomSplit([0.7,0.3], seed=0)

In [0]:
print("train {} test {}".format(train.count(), test.count()))

train 618 test 271


### Step 9: Fit the model and use it on the test dataset

15\. Fit the model using the predefined pipeline on the training set

In [0]:
pl_model = pl.fit(train)

16\. Use the fitted model to obtain predictions on the test set, saving the resulting dataFrame as `results`

In [0]:
results = pl_model.transform(test)

17\. Obtain the logistic regression model, and report its intercept and coefficients
- you can obtain a stage of the pipeline using **.stages[index]**
- Please refer to the LogisticRegression documentation on how to obtain the estimated model's intercept and coefficients.
- **Question**: An increase (or a "1" for a binary feature) in what feature(s) seem to have a positive effect on the probability of survival (based on the coefficient signs)

In [0]:
results.select('features', 'prediction').limit(10).display()

features,prediction
"Map(indices -> List(0, 3, 4, 6), length -> 8, values -> List(2.0, 1.0, 151.55, 1.0), vectorType -> sparse)",1.0
"Map(length -> 8, values -> List(19.0, 0.0, 1.0, 1.0, 53.1, 1.0, 1.0, 0.0), vectorType -> dense)",1.0
"Map(length -> 8, values -> List(24.0, 0.0, 1.0, 1.0, 79.2, 1.0, 0.0, 1.0), vectorType -> dense)",1.0
"Map(length -> 8, values -> List(29.69911764705882, 1.0, 1.0, 1.0, 27.7208, 1.0, 0.0, 1.0), vectorType -> dense)",1.0
"Map(length -> 8, values -> List(29.69911764705882, 1.0, 1.0, 1.0, 27.7208, 1.0, 0.0, 1.0), vectorType -> dense)",1.0
"Map(length -> 8, values -> List(29.69911764705882, 1.0, 1.0, 1.0, 52.0, 1.0, 1.0, 0.0), vectorType -> dense)",0.0
"Map(length -> 8, values -> List(29.69911764705882, 1.0, 1.0, 1.0, 221.7792, 1.0, 1.0, 0.0), vectorType -> dense)",0.0
"Map(length -> 8, values -> List(29.69911764705882, 1.0, 1.0, 1.0, 227.525, 1.0, 0.0, 1.0), vectorType -> dense)",1.0
"Map(length -> 8, values -> List(31.0, 0.0, 1.0, 1.0, 52.0, 1.0, 1.0, 0.0), vectorType -> dense)",0.0
"Map(length -> 8, values -> List(36.0, 0.0, 1.0, 1.0, 78.85, 1.0, 1.0, 0.0), vectorType -> dense)",0.0


In [0]:
from pyspark.ml.feature import VectorAssembler
lr_model = pl_model.stages[-1]
intercept = lr_model.intercept
coefficient = lr_model.coefficients

In [0]:
print("Intercept:", intercept)
print("Coefficient:", coefficient)

Intercept: 5.066257350923905
Coefficient: [-0.0380630156863308,-0.09755949255861895,-1.3229258889040447,-1.135422614611272,0.0008042271786788155,-1.3229258889040447,-0.40552151465067027,0.06075028834218951]


In [0]:
positive_effect_features = []

for i, feature_name in enumerate(featureCols):
    coefficient = coefficients[i]
    if coefficient > 0:
        positive_effect_features.append(feature_name)
        print(f"Feature '{feature_name}' has a positive effect on survival with a coefficient of {coefficient}")

Feature 'Fare' has a positive effect on survival with a coefficient of 0.0008042271786788155


- An increase (or a "1" for a binary feature) in "Fare" seem to have a positive effect on the probability of survival (based on the coefficient signs).

### Step 10. Evaluate model performance

It is useful to see the content of the `results` DataFrame after applying the model.

18\. print first 5 rows of the `results`

In [0]:
results.limit(5).display()

Survived,Pclass,Sex,Age,SibSp,Parch,Fare,Cabin,Embarked,AgeNA,Sex_idx,Embarked_idx,Sex_ohe,Embarked_ohe,features,rawPrediction,probability,prediction
0,1.0,female,2.0,1.0,2.0,151.55,C22 C26,S,0,1.0,0.0,"Map(indices -> List(), length -> 1, values -> List(), vectorType -> sparse)","Map(indices -> List(0), length -> 2, values -> List(1.0), vectorType -> sparse)","Map(indices -> List(0, 3, 4, 6), length -> 8, values -> List(2.0, 1.0, 151.55, 1.0), vectorType -> sparse)","Map(length -> 2, values -> List(-3.571067819218076, 3.571067819218076), vectorType -> dense)","Map(length -> 2, values -> List(0.0273563840589339, 0.9726436159410661), vectorType -> dense)",1.0
0,1.0,male,19.0,1.0,0.0,53.1,D30,S,0,0.0,0.0,"Map(indices -> List(0), length -> 1, values -> List(1.0), vectorType -> sparse)","Map(indices -> List(0), length -> 2, values -> List(1.0), vectorType -> sparse)","Map(length -> 8, values -> List(19.0, 0.0, 1.0, 1.0, 53.1, 1.0, 1.0, 0.0), vectorType -> dense)","Map(length -> 2, values -> List(-0.1989686090014331, 0.1989686090014331), vectorType -> dense)","Map(length -> 2, values -> List(0.4504213021519494, 0.5495786978480506), vectorType -> dense)",1.0
0,1.0,male,24.0,0.0,0.0,79.2,B86,C,0,0.0,1.0,"Map(indices -> List(0), length -> 1, values -> List(1.0), vectorType -> sparse)","Map(indices -> List(1), length -> 2, values -> List(1.0), vectorType -> sparse)","Map(length -> 8, values -> List(24.0, 0.0, 1.0, 1.0, 79.2, 1.0, 0.0, 1.0), vectorType -> dense)","Map(length -> 2, values -> List(-0.49591566292615674, 0.49591566292615674), vectorType -> dense)","Map(length -> 2, values -> List(0.3785009821527093, 0.6214990178472908), vectorType -> dense)",1.0
0,1.0,male,29.69911764705882,0.0,0.0,27.7208,Unknown,C,1,0.0,1.0,"Map(indices -> List(0), length -> 1, values -> List(1.0), vectorType -> sparse)","Map(indices -> List(1), length -> 2, values -> List(1.0), vectorType -> sparse)","Map(length -> 8, values -> List(29.69911764705882, 1.0, 1.0, 1.0, 27.7208, 1.0, 0.0, 1.0), vectorType -> dense)","Map(length -> 2, values -> List(-0.14002959419265082, 0.14002959419265082), vectorType -> dense)","Map(length -> 2, values -> List(0.4650496924360247, 0.5349503075639753), vectorType -> dense)",1.0
0,1.0,male,29.69911764705882,0.0,0.0,27.7208,Unknown,C,1,0.0,1.0,"Map(indices -> List(0), length -> 1, values -> List(1.0), vectorType -> sparse)","Map(indices -> List(1), length -> 2, values -> List(1.0), vectorType -> sparse)","Map(length -> 8, values -> List(29.69911764705882, 1.0, 1.0, 1.0, 27.7208, 1.0, 0.0, 1.0), vectorType -> dense)","Map(length -> 2, values -> List(-0.14002959419265082, 0.14002959419265082), vectorType -> dense)","Map(length -> 2, values -> List(0.4650496924360247, 0.5349503075639753), vectorType -> dense)",1.0


19\. Obtain the area under ROC (AUC), f1, Accuracy for the model 

**tip**: we will report the same metrics below a few times. It may be handy for you to create a function for the calculation of the three metrics.

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
def model_evaluate(data, labelCol):
  e1 = BinaryClassificationEvaluator(labelCol=labelCol)
  e2 = MulticlassClassificationEvaluator(labelCol=labelCol)
  auc = e1.evaluate(data, {e1.metricName: "areaUnderROC"})
  a = e2.evaluate(data, {e2.metricName: "accuracy"})
  f1 = e2.evaluate(data, {e2.metricName: "f1"})
  print(f"AUC: {auc}")
  print(f"Accuracy: {a}")
  print(f"F1: {f1}")

In [0]:
model_evaluate(results, 'Survived')

AUC: 0.8496409847276046
Accuracy: 0.7785977859778598
F1: 0.7782243517117846


20\. Fit a RandomForest model, obtain predictions, and report same the performance metrics as above

In [0]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="Survived")

In [0]:
rf_pl = Pipeline(stages = [si, ohe, va, rf])

In [0]:
rf_model = rf_pl.fit(train)
rf_results = rf_model.transform(test)

In [0]:
rf_evaluation_result = model_evaluate(rf_results, 'Survived')
print(rf_evaluation_result)

AUC: 0.8738602689765219
Accuracy: 0.8302583025830258
F1: 0.8263863655344694
None


21\. RandomForest has a few parameters can be further tuned. We are considering among these:

- maxDepth: {5,7}
- numTrees: {20,30}


Use CrossValidator (3 folds), as part of a pipeline, to find the best parameter combination. Obtain predictions using the best model (using areaUnderROC as a metric), and report same the performance metrics as above.

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
grid = ParamGridBuilder().addGrid(rf.maxDepth, [5,7]).addGrid(rf.numTrees, [20,30]).build()

In [0]:
auc = BinaryClassificationEvaluator(labelCol="Survived", metricName="areaUnderROC")

In [0]:
cv = CrossValidator(estimator=rf, estimatorParamMaps=grid, evaluator=auc, numFolds=3)

In [0]:
pl_cv = Pipeline(stages = [si, ohe, va, cv])

In [0]:
pl_cv_model = pl_cv.fit(train)

In [0]:
test_cv_predicted = pl_cv_model.transform(test)

In [0]:
model_evaluate(test_cv_predicted, "Survived")

AUC: 0.8738602689765219
Accuracy: 0.8302583025830258
F1: 0.8263863655344694


22\. What are the parameters selected by the CrossValidator?

**tip**: 
- CrossValidator's `bestModel` stores the best model
- You can call the best random forest model's `explainParam(parametername)` to see the value of a given parameter

In [0]:
print(pl_cv_model.stages[3].bestModel.explainParam('maxDepth'))
print(pl_cv_model.stages[3].bestModel.explainParam('numTrees'))

maxDepth: Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30]. (default: 5, current: 5)
numTrees: Number of trees to train (>= 1). (default: 20, current: 20)


In [0]:

print(pl_cv_model.stages[3].bestModel.explainParams())

bootstrap: Whether bootstrap samples are used when building trees. (default: True)
cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the featur

In [0]:
print(pl_cv_model.stages[2].explainParams())

handleInvalid: How to handle invalid data (NULL and NaN values). Options are 'skip' (filter out rows with invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the output). Column lengths are taken from the size of ML Attribute Group, which can be set using `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'). (default: error)
inputCols: input column names. (current: ['Age', 'AgeNA', 'Sex_ohe', 'Pclass', 'Fare', 'Sex_ohe', 'Embarked_ohe'])
outputCol: output column name. (default: VectorAssembler_0ac7c03e9eec__output, current: features)
