### PySpark Example
Logistic regression and HPO+CV with PySpark on the Titanic dataset

***
#### Environment
`conda activate pyspark-env`

***
#### Goals
- Explore PySpark API

***
#### References
https://spark.apache.org/docs/latest/api/python/



### Spark initialization

In [None]:
import pyspark
from pyspark.sql import SQLContext, SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Playground") \
    .master("local[*]") \
    .getOrCreate()


spark.sparkContext.setLogLevel("ERROR")

sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
print("Spark Version: " + spark.version)
print("PySpark Version: " + pyspark.__version__)

In [None]:
%%html
<style>
  table {margin-left: 0 !important;}
</style>

### CSV Data Read - Train

`titanic/train.csv`

| Variable| Definition| Key| 
|:--- |:---   |:--- |
| PassengerId|  	Passenger id|  	| 
| Survived|  	Survival|  	0 = No, 1 = Yes| 
| Pclass|  	Ticket class|  	1 = 1st, 2 = 2nd, 3 = 3rd| 
| Name|  	Passenger name |  	| 
| Sex|  	Sex|  	| 
| Age|  	Age in years|  	| 
| SibSp|  	# of siblings / spouses aboard the Titanic 	| | 
| Parch|  	# of parents / children aboard the Titanic 	| | 
| Ticket|  	Ticket number | 	| 
| Fare|  	Passenger fare 	| | 
| Cabin|  	Cabin number 	| | 
| Embarked|  	Port of Embarkation|  	C = Cherbourg, Q = Queenstown, S = Southampton| 

In [None]:
train_df = spark.read.load("data/titanic/train.csv", format="csv", inferSchema=True,   header="true")
train_df.printSchema()
train_df.show(5)

### CSV Data Read - Test

`titanic/test.csv`

| Variable| Definition| Key| 
|:--- |:---   |:--- |
| PassengerId|  	Passenger id|  	| 
| Pclass|  	Ticket class|  	1 = 1st, 2 = 2nd, 3 = 3rd| 
| Name|  	Passenger name |  	| 
| Sex|  	Sex|  	| 
| Age|  	Age in years|  	| 
| SibSp|  	# of siblings / spouses aboard the Titanic 	| | 
| Parch|  	# of parents / children aboard the Titanic 	| | 
| Ticket|  	Ticket number | 	| 
| Fare|  	Passenger fare 	| | 
| Cabin|  	Cabin number 	| | 
| Embarked|  	Port of Embarkation|  	C = Cherbourg, Q = Queenstown, S = Southampton| 

`titanic/gender_submission.csv`

| Variable| Definition| Key| 
|:--- |:---   |:--- |
| PassengerId|  	Passenger id|  	| 
| Survived|  	Survival|  	0 = No, 1 = Yes| 


In [None]:
test_df_no_label = spark.read.load("data/titanic/test.csv", format="csv", inferSchema=True,   header="true")

#test_df_no_label.printSchema()
#test_df_no_label.show(5)

test_df_label = spark.read.load("data/titanic/gender_submission.csv", format="csv", inferSchema=True,   header="true")

#test_df_label.printSchema()
#test_df_label.show(5)

### Analize null values

In [None]:
from pyspark.sql.functions import isnan,isnull, when, count, col

train_df.select([count(when(isnull(c), c)).alias(c) for c in train_df.columns]).show()

### Data Preparation

In [None]:
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder, VectorIndexer, VectorAssembler, MinMaxScaler

imputer = Imputer(inputCols=["Age", "Fare"], outputCols=["Age_out", "Fare_out"])

indexer = StringIndexer(inputCols=[ 'Sex', 'Cabin', 'Embarked'], outputCols=[ 'Sex_idx', 'Cabin_idx', 'Embarked_idx'], handleInvalid='keep')

ohe_str = OneHotEncoder(inputCols=indexer.getOutputCols(),
                    outputCols=['Sex_idx_ohe', 'Cabin_idx_ohe', 'Embarked_idx_ohe']
                    )

va_numeric = VectorAssembler(inputCols=['Pclass', 'SibSp', 'Parch'], outputCol='numeric_features_asm')
vi_numeric = VectorIndexer(inputCol=va_numeric.getOutputCol(), outputCol='numeric_cat', handleInvalid='keep')

va_scalers = VectorAssembler(inputCols=imputer.getOutputCols()+ [vi_numeric.getOutputCol()], outputCol='numeric_va_out', handleInvalid='error')
scaler = MinMaxScaler(inputCol=va_scalers.getOutputCol(), outputCol="numeric_scaled")

va_features = VectorAssembler(inputCols=ohe_str.getOutputCols()+[scaler.getOutputCol()], outputCol='features')

### Model

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.03, featuresCol='features', labelCol='Survived')

### Pipeline

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[imputer, 
                            indexer, 
                            ohe_str, 
                            va_numeric, 
                            vi_numeric, 
                            va_scalers, 
                            scaler, 
                            va_features, 
                            lr])


### Train

In [None]:
pipeline_model = pipeline.fit(train_df)

### Show results for training data

In [None]:
scored_df = pipeline_model.transform(train_df)
#scored_df.printSchema()
scored_df.show(1, truncate=False, vertical=True)

### Score Test Data

In [None]:
scored_df = pipeline_model.transform(test_df_no_label)

In [None]:
scored_df_with_label = scored_df.join(test_df_label, ['PassengerId'] )
#scored_df_with_label.printSchema()
scored_df_with_label.show(1, truncate=False, vertical=True)

### Save prediction in CSV format

In [None]:
scored_df_with_label.select('PassengerId',
                            'Pclass', 
                            'Name', 
                            'Sex',
                            'Age',
                            'SibSp',
                            'Parch',
                            'Ticket',
                            'Fare', 
                            'Cabin', 
                            'Embarked', 
                            'Survived',
                           'prediction').write.csv("./scored_titanic.csv", mode="overwrite", header='true')

### Model Evaluation

In [None]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

pred_label_df = scored_df_with_label.select(scored_df_with_label.prediction,scored_df_with_label.Survived.cast('float'))

metrics = BinaryClassificationMetrics(pred_label_df.rdd)

# Root mean squared error
print("AuROC curve = %s" % metrics.areaUnderROC)

# R-squared
print("area under the precision-recall curve = %s" % metrics.areaUnderPR)

### Save Pipeline

In [None]:
pipeline_model.write().overwrite().save("./spark-logistic-regression-pipeline-model")

### Hyperparameter tunning with CrossValidator

In [None]:
%%time
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

paramGrid = ParamGridBuilder() \
    .addGrid(imputer.strategy, ['mean', 'median']) \
    .addGrid(ohe_str.dropLast, [True, False]) \
    .addGrid(lr.regParam, [0.1, 0.01, 0.001]) \
    .build()

print("Searching for the optimal Hyperparameters. This will take a while ...")

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(labelCol='Survived'),
                          numFolds=3)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train_df)

In [None]:
scored_df_cv= cvModel.transform(test_df_no_label)
scored_df_with_label_cv = scored_df_cv.join(test_df_label, ['PassengerId'] )
pred_label_df_cv = scored_df_with_label_cv.select(scored_df_with_label_cv.prediction,scored_df_with_label_cv.Survived.cast('float'))

metrics_cv = BinaryClassificationMetrics(pred_label_df_cv.rdd)

# Root mean squared error
print("CV AuROC curve = %s" % metrics_cv.areaUnderROC)

# R-squared
print("CV area under the precision-recall curve = %s" % metrics_cv.areaUnderPR)

In [None]:
print("Best model:")
for stage in cvModel.bestModel.stages:
    display (stage)

In [None]:
scored_df_with_label_cv.sample(False, 0.1).limit(1).show(1, truncate=False, vertical=True)