# In this example, we will use the Titanic dataset to build a logistic regression model to answer the question “what types of people were more likely to survive?” 

## Required data file: titanic.csv

Resources:

Spark MLlib classification and regression documentation
https://spark.apache.org/docs/latest/ml-classification-regression.html#linear-regression

Spark extracting, transforming, and selecting features documentation
https://spark.apache.org/docs/latest/ml-features.html 

Spark data types
https://spark.apache.org/docs/2.3.0/mllib-data-types.html

Spark StringIndexer
https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/ml/feature/StringIndexer.html

Spark OneHotEncoder
https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/ml/feature/OneHotEncoder.html

In [3]:
# These lines are not needed for CCAST OnDemand
#import findspark
#findspark.init()

In [4]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('logrex').getOrCreate()

## Step 1: Read and explore the data

In [5]:
data = spark.read.csv('titanic.csv',inferSchema=True,header=True)

In [6]:
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [7]:
data.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [8]:
data.groupBy('Survived').count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       0|  549|
|       1|  342|
+--------+-----+



### select columns as features

In [9]:
my_cols=data.select(['Survived',
 'Pclass',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Embarked'])

### drop missing data

In [10]:
new_data = my_cols.na.drop()

In [11]:
new_data.groupBy('Survived').count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       0|  424|
|       1|  288|
+--------+-----+



## Step 2: Transform data using pipeline

### Convert non-numerical data to numerical data 

In [12]:
from pyspark.ml.feature import (VectorAssembler,
                               VectorIndexer,
                               OneHotEncoder,
                               StringIndexer)

In [13]:
# StringIndexer: create index (0,1,2,...) for categories ('A','B','C',...)
gender_indexer = StringIndexer(inputCol='Sex',outputCol='SexIndex')

In [14]:
# StringIndexer (aka label encoding in SK Learn) assumes higher the categorical
#    value, better the category
# OneHotEncoder: create a vector indicating category
# for example: categories (0,1,2)
# category 0 would be [1,0,0]
# category 1 would be [0,1,0]
# category 2 would be [0,0,1]
gender_encoder = OneHotEncoder(inputCol='SexIndex',outputCol='SexVec')

In [15]:
embark_indexer = StringIndexer(inputCol='Embarked',outputCol='EmbarkIndex')

In [16]:
embark_encoder = OneHotEncoder(inputCol='EmbarkIndex',outputCol='EmbarkVec')

### Combine the selected columns into a "features" column

In [17]:
assembler = VectorAssembler(inputCols=['Pclass','SexVec','EmbarkVec','Age',
                                      'SibSp','Parch','Fare'],outputCol='features')

### Process data using pipeline

In [18]:
from pyspark.ml import Pipeline

In [19]:
pipeline = Pipeline(stages=[gender_indexer,embark_indexer,
                           gender_encoder,embark_encoder,
                           assembler])

In [20]:
output = pipeline.fit(new_data).transform(new_data)

In [21]:
output.head()

Row(Survived=0, Pclass=3, Sex='male', Age=22.0, SibSp=1, Parch=0, Fare=7.25, Embarked='S', SexIndex=0.0, EmbarkIndex=0.0, SexVec=SparseVector(1, {0: 1.0}), EmbarkVec=SparseVector(2, {0: 1.0}), features=DenseVector([3.0, 1.0, 1.0, 0.0, 22.0, 1.0, 0.0, 7.25]))

In [22]:
final_data = output.select(['features','Survived'])

In [23]:
final_data.show()

+--------------------+--------+
|            features|Survived|
+--------------------+--------+
|[3.0,1.0,1.0,0.0,...|       0|
|[1.0,0.0,0.0,1.0,...|       1|
|(8,[0,2,4,7],[3.0...|       1|
|[1.0,0.0,1.0,0.0,...|       1|
|[3.0,1.0,1.0,0.0,...|       0|
|[1.0,1.0,1.0,0.0,...|       0|
|[3.0,1.0,1.0,0.0,...|       0|
|[3.0,0.0,1.0,0.0,...|       1|
|[2.0,0.0,0.0,1.0,...|       1|
|[3.0,0.0,1.0,0.0,...|       1|
|(8,[0,2,4,7],[1.0...|       1|
|[3.0,1.0,1.0,0.0,...|       0|
|[3.0,1.0,1.0,0.0,...|       0|
|(8,[0,2,4,7],[3.0...|       0|
|(8,[0,2,4,7],[2.0...|       1|
|[3.0,1.0,0.0,0.0,...|       0|
|[3.0,0.0,1.0,0.0,...|       0|
|[2.0,1.0,1.0,0.0,...|       0|
|[2.0,1.0,1.0,0.0,...|       1|
|(8,[0,4,7],[3.0,1...|       1|
+--------------------+--------+
only showing top 20 rows



## Step 3: build the model

### Split data into training and testing sets

In [24]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

### Train the model

In [26]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='features',labelCol='Survived')
lr_model = lr.fit(train_data)

In [28]:
# output coefficients
lr_model.coefficients

DenseVector([-1.2037, -2.5198, 0.642, 1.1132, -0.0371, -0.2543, -0.0269, 0.0001])

In [30]:
lr_model.summary.areaUnderROC

0.8555486246348588

## Step 4: make predictions on the test set

In [31]:
results = lr_model.transform(test_data)

In [32]:
results.select('survived','prediction').show()

+--------+----------+
|survived|prediction|
+--------+----------+
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       1|       1.0|
|       1|       1.0|
|       1|       1.0|
|       1|       1.0|
|       1|       1.0|
|       1|       1.0|
|       1|       1.0|
|       1|       1.0|
|       1|       1.0|
|       0|       1.0|
|       1|       1.0|
|       0|       1.0|
|       1|       1.0|
|       1|       1.0|
|       0|       1.0|
|       1|       1.0|
|       1|       1.0|
+--------+----------+
only showing top 20 rows



In [33]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [34]:
eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='Survived')

In [35]:
AUC = eval.evaluate(results)

In [36]:
AUC

0.7930555555555556