# Logistic Regression Example

Logistic Regression example using the famous Titanic Dataset

Other links to review:
* https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html
* https://stats.stackexchange.com/questions/132777/what-does-auc-stand-for-and-what-is-it

## Load Spark and Data

In [5]:
# start spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('logred_titanic').getOrCreate()

In [6]:
# read in the input csv file.
data = spark.read.csv('titanic.csv', inferSchema=True, header=True)

In [32]:
data.count()

891

In [7]:
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



## Clean Data

In [33]:
# select only desired columns
mycols_data = data.select('Survived','Pclass','Sex','Age','SibSp','Parch','Fare','Embarked')

In [34]:
# list number of NANs or NULLs in each column
from pyspark.sql.functions import count, when, isnan, col
mycols_data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in mycols_data.columns]).show()

+--------+------+---+---+-----+-----+----+--------+
|Survived|Pclass|Sex|Age|SibSp|Parch|Fare|Embarked|
+--------+------+---+---+-----+-----+----+--------+
|       0|     0|  0|177|    0|    0|   0|       2|
+--------+------+---+---+-----+-----+----+--------+



In [35]:
# Drop all rows with a null value
cleaned_data = mycols_data.na.drop()
cleaned_data.count()

712

## Format Categorical and Numerical Data with a Pipeline

Transform categorical columns to OneHotEncoded (first String Indexing, and then One hot Encoding) and assemble the features vector, all that configured within a pipeline

In [38]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer)
from pyspark.ml import Pipeline

Add the transformations on categorical columns as stages in the Pipeline 

In [40]:
# list all categorical values where we will apply the transformations
categoricalColumns = ['Sex', 'Embarked']

In [41]:
# add the transformations on categorical columns as stages in the Pipeline
stages = [] 
for categoricalCol in categoricalColumns:
  # Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
    
  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"Vec")
    
  # Add into stages
  stages += [stringIndexer, encoder]

Configure the vector assembler

In [42]:
# list all numerical values that need no transformation
numericCols = ['Pclass','Age','SibSp','Parch','Fare']

In [58]:
# What about normalization of numerical columns with a normalizer?
#from pyspark.ml.feature import Normalizer
#normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=2.0)

In [44]:
# Configure the vector assembler add it as pipeline stage 
assemblerInputs = list(map(lambda c: c + "Vec", categoricalColumns)) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [46]:
# the stages so far...
stages

[StringIndexer_42ddbaec25f6043addb2,
 OneHotEncoder_4a11a4ed6b16bfa6fbf3,
 StringIndexer_43a786309767a32df370,
 OneHotEncoder_42ccbaf699903766ea99,
 VectorAssembler_4bae997d3bf54ff3477b]

In [48]:
# Create a Pipeline with all previous actions
features_pipeline = Pipeline(stages=stages)

In [51]:
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
final_data = features_pipeline.fit(cleaned_data).transform(cleaned_data)

# select only features and label columns
final_data = final_data.select('features', 'Survived')

In [54]:
final_data.head(5)

[Row(features=DenseVector([1.0, 1.0, 0.0, 3.0, 22.0, 1.0, 0.0, 7.25]), Survived=0),
 Row(features=DenseVector([0.0, 0.0, 1.0, 1.0, 38.0, 1.0, 0.0, 71.2833]), Survived=1),
 Row(features=SparseVector(8, {1: 1.0, 3: 3.0, 4: 26.0, 7: 7.925}), Survived=1),
 Row(features=DenseVector([0.0, 1.0, 0.0, 1.0, 35.0, 1.0, 0.0, 53.1]), Survived=1),
 Row(features=DenseVector([1.0, 1.0, 0.0, 3.0, 35.0, 0.0, 0.0, 8.05]), Survived=0)]

In [57]:
# Randomly split data into training and test sets. set seed for reproducibility
train_data, test_data = final_data.randomSplit([0.7, 0.3], seed = 100)
print(train_data.count())
print(test_data.count())

487
225


## Fit Logistic Regression model

In [59]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
log_reg = LogisticRegression(labelCol='Survived', featuresCol='features', maxIter=10)

# Train model with Training Data
log_reg_model = log_reg.fit(train_data)

## Evaluate on test data

In [65]:
test_results = log_reg_model.transform(test_data)
test_results.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [73]:
test_results.select('Survived','prediction','probability').show()

+--------+----------+--------------------+
|Survived|prediction|         probability|
+--------+----------+--------------------+
|       1|       0.0|[0.89845272234831...|
|       0|       0.0|[0.99421909288322...|
|       0|       0.0|[0.99015289928703...|
|       0|       0.0|[0.99071807477584...|
|       0|       0.0|[0.99819441937833...|
|       1|       1.0|[0.06127098232752...|
|       1|       1.0|[0.03259240401987...|
|       1|       1.0|[0.03851987436034...|
|       1|       1.0|[0.04423406999497...|
|       1|       1.0|[0.02292416975057...|
|       1|       1.0|[0.17405298229200...|
|       0|       1.0|[0.18491655095609...|
|       1|       1.0|[0.18491655095609...|
|       1|       1.0|[0.22130337442356...|
|       1|       1.0|[0.23427956583345...|
|       1|       1.0|[0.24795832655046...|
|       0|       1.0|[0.27688037272054...|
|       1|       1.0|[0.30779520830304...|
|       0|       1.0|[0.44737225771414...|
|       0|       1.0|[0.27283654818479...|
+--------+-

Evaluate with the BinaryClassificationEvaluator

In [81]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
test_evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='Survived')

In [82]:
# Get the area under the ROC curve
test_evaluator.evaluate(test_results)

0.8024619960343694

80% under the curve, not bad...