<a href="https://colab.research.google.com/github/MarinaChau/IASD_classes/blob/master/Criteo/td2_mllib_questions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Presentation
In this workshop we will discover Mllib features, and apply them on the titanic dataset.

We will try to predict passenger survival rate based on a few features, with a logistic regression model.

## Install Spark Environment
Since we are not running on databricks, we will need to install Spark by ourselves, every time we run the session.  
We need to install Spark, as well as a Java Runtime Environment.  
Then we need to setup a few environment variables.

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!curl -O https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  287M  100  287M    0     0   231M      0  0:00:01  0:00:01 --:--:--  231M


In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

conf = SparkConf().set('spark.ui.port', '4050')
sc = SparkContext(conf=conf)
spark = SparkSession.builder.master('local[*]').getOrCreate()

## Optional step : Enable SparkUI through secure tunnel
This step is useful if you want to look at Spark UI.
First, you need to create a free ngrok account : https://dashboard.ngrok.com/login.  
Then connect on the website and copy your AuthToken.

In [4]:
# this step downloads ngrok, configures your AuthToken, then starts the tunnel
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
!./ngrok authtoken my_ngrok_auth_token_retrieved_from_website # <-------------- change this line !
get_ipython().system_raw('./ngrok http 4050 &')

--2022-04-01 12:49:47--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 18.205.222.128, 54.161.241.46, 52.202.168.65, ...
Connecting to bin.equinox.io (bin.equinox.io)|18.205.222.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip.1’


2022-04-01 12:49:52 (2.67 MB/s) - ‘ngrok-stable-linux-amd64.zip.1’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: ngrok                   
Authtoken saved to configuration file: /root/.ngrok2/ngrok.yml


**Now** get the Spark UI url on https://dashboard.ngrok.com/endpoints/status. We're done !

## Load dataset
We need to download dataset and put it inside HDFS.

In [10]:
# download dataset, make sure it is available on your gateway
import urllib
import urllib.request
def get_dbutils(spark):
        try:
            from pyspark.dbutils import DBUtils
            dbutils = DBUtils(spark)
        except ImportError:
            import IPython
            dbutils = IPython.get_ipython().user_ns["dbutils"]
        return dbutils

dbutils = get_dbutils(spark)

url = "https://www.dropbox.com/s/1tl236ptjuwvcib/titanic-passengers.csv?dl=1"
urllib.request.urlretrieve(url, "titanic.csv")
dbutils.fs.ls("file:/databricks/driver/")

# move the dataset to the file storage
dbutils.fs.mv("file:/databricks/driver/titanic.csv", "dbfs:/titanic.csv", recurse=True)

NameError: ignored

In [11]:
# download dataset, make sure it is available on your gateway
from urllib import request
url = "https://www.dropbox.com/s/1tl236ptjuwvcib/titanic-passengers.csv?dl=1"

request.urlretrieve(url, "titanic.csv")

('titanic.csv', <http.client.HTTPMessage at 0x7f14bee15490>)

## Tools of the trade
We need a few imports to learn some model with MLLib.

In [12]:
from pyspark.sql import functions as F # you already know this one ! need it whenever you want to transform columns
from pyspark.ml.feature import *       # this package contains most of mllib feature engineering tools
from pyspark.ml import Pipeline        # pipeline is used to combine features

## Question 0
Load the dataset.

Make sure the remainder of the schema is correct.

In [42]:
csv_2_df = spark.read.csv("titanic.csv", sep=";", header=True, inferSchema=True)
csv_2_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+-----------------+------------------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|           Ticket|              Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+-----------------+------------------+-----+--------+
|        343|      No|     2|Collander, Mr. Er...|  male|28.0|    0|    0|           248740|              13.0| null|       S|
|         76|      No|     3|Moen, Mr. Sigurd ...|  male|25.0|    0|    0|           348123|              7.65|F G73|       S|
|        641|      No|     3|Jensen, Mr. Hans ...|  male|20.0|    0|    0|           350050|7.8542000000000005| null|       S|
|        568|      No|     3|Palsson, Mrs. Nil...|female|29.0|    0|    4|           349909|            21.075| null|       S|
|        672|      No|     1|Davidson, Mr. Tho...|  male|31.0|    1|    0|       F.C. 12750|              52.0|

In [43]:
csv_2_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [44]:
train, test = csv_2_df.cache().randomSplit([0.9, 0.1], seed=12345)

## Question 1
On training set, fit a model that predicts passenger survival probability, function of ticket price.

You will need to convert survived column in 0/1 to pass it to the logistic regression. Transform it with StringIndexer.

Use a pipeline ending with a logistic regression.

Compute model AUC on validation set.

Documentation:
- https://spark.apache.org/docs/latest/ml-classification-regression.html#binomial-logistic-regression
- https://spark.apache.org/docs/latest/ml-pipeline.html#example-pipeline
- https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#binary-classification

In [45]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [46]:
# Convert Survived column with StringIndexer
indexer = StringIndexer(inputCol="Survived", outputCol="Survived_index")
indexed = indexer.fit(train).transform(train)
indexed.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+------------------+-----+--------+--------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|              Fare|Cabin|Embarked|Survived_index|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+------------------+-----+--------+--------------+
|          1|      No|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|              7.25| null|       S|           0.0|
|          2|     Yes|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|           71.2833|  C85|       C|           1.0|
|          3|     Yes|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|             7.925| null|       S|           1.0|
|          4|     Yes|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|              53.1| C123|       S|           1.0|
|          5|

In [47]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

In [48]:
indexed['Survived_index', 'Fare']

DataFrame[Survived_index: double, Fare: double]

In [54]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["Fare"],outputCol="Fare_vec")
indexed = assembler.transform(indexed)
indexed.show(5)


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------------+---------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Survived_index| Fare_vec|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------------+---------+
|          1|      No|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|           0.0|   [7.25]|
|          2|     Yes|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|           1.0|[71.2833]|
|          3|     Yes|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|           1.0|  [7.925]|
|          4|     Yes|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|           1.0|   [53.1]|
|          5|      N

In [63]:
# Configure an ML pipeline
indexer = StringIndexer(inputCol="Survived", outputCol="Survived_index")
assembler = VectorAssembler(inputCols=["Fare"],outputCol="Fare_vec")
lr = LogisticRegression(maxIter=10, regParam=0.001, featuresCol='Fare_vec', labelCol='Survived_index')
pipeline = Pipeline(stages=[indexer, assembler, lr])

training = train

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Make predictions on training documents and print columns of interest.
prediction = model.transform(test)

In [79]:
model

PipelineModel_5d9b7a81ebbc

In [76]:
preds = prediction.select("Survived_index", "rawPrediction")
preds = preds.withColumnRenamed("Survived_index","label")

In [78]:
# AUC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()

evaluation = evaluator.evaluate(preds)

print('Test Area Under Roc',evaluation)

Test Area Under Roc 0.7175925925925926


## Question 2
We will do a lots of feature engineering now and we don't want you to copy-paste code all-way long.

Write the following function:

Inputs:
- pipeline
- training set
- validation set

Outputs:
- auc
- transformed dataset (with prediction)

Make sure it returns on previous pipeline.

In [81]:
def make_prediction(pipeline, training_set, validation_set):

    print("AliceTLabest")

    # Fit the pipeline to training documents.
    model = pipeline.fit(training_set)

    # Make predictions on training documents and print columns of interest.
    prediction = model.transform(validation_set)

    # get the modified training dataset
    training_transformed = model.transform(training) 

    # Evaluate validation set with AUC
    evaluation = evaluator.evaluate(preds)
    print('Test Area Under Roc',evaluation)

    return evaluation, training_transformed



In [82]:
auc, df_transformed = make_prediction(pipeline, train, test)

AliceTLabest
Test Area Under Roc 0.7175925925925926


## Question 3
Relying on raw continuous feature may be a bit rough.
We can try to bucketize numeric feature in five buckets instead.

In [92]:
# find min of subjects column
df_transformed.agg({'Fare': 'min'}).show()

# find max of subjects column
df_transformed.agg({'Fare': 'max'}).show()

+---------+
|min(Fare)|
+---------+
|      0.0|
+---------+

+---------+
|max(Fare)|
+---------+
| 512.3292|
+---------+



In [98]:
import numpy as np

buckets = np.arange(0, 513, 102)
buckets

array([  0, 102, 204, 306, 408, 510])

In [100]:
# Create 5 buckets according to the column "Fare"
from pyspark.ml.feature import QuantileDiscretizer
qd = QuantileDiscretizer(numBuckets=5, inputCol="Fare", outputCol="Buckets_fare", relativeError=0.001)
bucketizer = qd.fit(train)
train_bucketed = bucketizer.transform(train)
train_bucketed.show()


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+------------------+-----+--------+------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|              Fare|Cabin|Embarked|Buckets_fare|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+------------------+-----+--------+------------+
|          1|      No|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|              7.25| null|       S|         0.0|
|          2|     Yes|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|           71.2833|  C85|       C|         4.0|
|          3|     Yes|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|             7.925| null|       S|         1.0|
|          4|     Yes|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|              53.1| C123|       S|         4.0|
|          5|      No|     

## Question 4
Why don't you try to rely on other numerical features now ?

You can try to leverage 'Age', and maybe 'PassengerId' while we're at it.

Is it better ?

In [132]:
train.filter(col("PassengerID").isNull()).count()

160

In [111]:
train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [127]:
from pyspark.sql.functions import col
print(train.filter(col("Age").isNotNull()).count())

656


In [117]:
# Configure an ML pipeline
indexer = StringIndexer(inputCol="Survived", outputCol="Survived_index")
qd = QuantileDiscretizer(numBuckets=5, inputCol="Fare", outputCol="Buckets_fare", relativeError=0.001)
assembler = VectorAssembler(inputCols=["Buckets_fare", "Age"], outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001, featuresCol='features', labelCol='Survived_index')
pipeline_2 = Pipeline(stages=[indexer, qd, assembler, lr])

In [118]:
auc, df_transformed = make_prediction(pipeline_2, train, test)

AliceTLabest


Py4JJavaError: ignored

## Question 5
We should try to use categorial features.

Remember, spark just understands vectors. So you need to convert categories in vectors with OneHotEncoder.

Try several categories and identify what works.

Sex is not numeric, we need to convert it before one-hot-encoding it !

## Question 6

These are open questions you can try to tackle in any order:
- cross features. E.g., try to use features like : passenger is male and passenger is older than 30 years.
- use feature hashing
- rely on name feature

### Feature Hashing
In this one, you will need to create a custom transformation that transforms a sparse vector into another sparse vector with lower dimension (MLLib does not have exactly what we want there).
- you can rely on this post to see how to create transformer : https://csyhuang.github.io/2020/08/01/custom-transformer/
- look at the following classes for your udf : VectorUDT ; SparseVector