# Logistic Regression Code Along
This is a code along of the famous titanic dataset, its always nice to start off with this dataset because it is an example you will find across pretty much every data analysis language.

In [1]:
import findspark
findspark.init('/home/huascar/spark-3.5.6-bin-hadoop3')

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('myproj').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/19 12:27:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [55]:
data = spark.read.csv('titanic.csv',inferSchema=True,header=True)

In [5]:
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [56]:
data['PassengerId','Pclass','Age', 'Sex'].describe().show()
data['SibSp', 'Parch', 'Ticket','Fare','Cabin', 'Embarked'].describe().show()

+-------+-----------------+------------------+------------------+------+
|summary|      PassengerId|            Pclass|               Age|   Sex|
+-------+-----------------+------------------+------------------+------+
|  count|              891|               891|               714|   891|
|   mean|            446.0| 2.308641975308642| 29.69911764705882|  NULL|
| stddev|257.3538420152301|0.8360712409770491|14.526497332334035|  NULL|
|    min|                1|                 1|              0.42|female|
|    max|              891|                 3|              80.0|  male|
+-------+-----------------+------------------+------------------+------+

+-------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|               891|           

In [52]:
#Find number of nulls in each column
from pyspark.sql import functions as F
data.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|  0|    0|    0|     0|   0|    0|       0|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



In [54]:
data.select("Embarked").distinct().show()

+--------+
|Embarked|
+--------+
|       Q|
|       C|
|       S|
|    NULL|
+--------+



In [6]:
#Treat NULLS in Embarked
data = data.fillna({"Embarked": "X"})
data.select("Embarked").distinct().show()

[Stage 2:>                                                          (0 + 1) / 1]

+--------+
|Embarked|
+--------+
|       Q|
|       C|
|       X|
|       S|
+--------+



                                                                                

In [7]:
#Treat NULLS in Age: replace with mean value = 29.7
data = data.fillna({"Age": 29.7})

In [57]:
data.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [58]:
#Remove cabin column
data = data['Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
    'Embarked']

In [60]:
#Bypass nulls treatment removing rows with nulls
data = data.na.drop()

In [61]:
data.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+--------+------+----+---+---+-----+-----+------+----+--------+
|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Embarked|
+--------+------+----+---+---+-----+-----+------+----+--------+
|       0|     0|   0|  0|  0|    0|    0|     0|   0|       0|
+--------+------+----+---+---+-----+-----+------+----+--------+



In [64]:
data['Pclass','Age', 'Sex'].describe().show()
data['SibSp', 'Parch', 'Ticket','Fare', 'Embarked'].describe().show()

+-------+------------------+-----------------+------+
|summary|            Pclass|              Age|   Sex|
+-------+------------------+-----------------+------+
|  count|               712|              712|   712|
|   mean| 2.240168539325843|29.64209269662921|  NULL|
| stddev|0.8368543166903446|14.49293290032352|  NULL|
|    min|                 1|             0.42|female|
|    max|                 3|             80.0|  male|
+-------+------------------+-----------------+------+

+-------+------------------+-------------------+-----------------+------------------+--------+
|summary|             SibSp|              Parch|           Ticket|              Fare|Embarked|
+-------+------------------+-------------------+-----------------+------------------+--------+
|  count|               712|                712|              712|               712|     712|
|   mean|0.5140449438202247|0.43258426966292135|276349.1541425819| 34.56725140449432|    NULL|
| stddev|0.9306921267673427| 0.8541814

In [65]:
#Create indexer for Embarked and Sex
from pyspark.ml.feature import StringIndexer
embarkedIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
data = embarkedIndexer.fit(data).transform(data)
sexIndexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
data = sexIndexer.fit(data).transform(data)
data.show()
data.select("Embarked", "EmbarkedIndex").distinct().show()
data.select("Sex", "SexIndex").distinct().show()

+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-------------+--------+
|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Embarked|EmbarkedIndex|SexIndex|
+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-------------+--------+
|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|       S|          0.0|     0.0|
|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|       C|          1.0|     1.0|
|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|       S|          0.0|     1.0|
|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1|       S|          0.0|     1.0|
|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05|       S|          0.0|     0.0|
|       0|     1|McCarthy, Mr. Tim...|  

In [9]:
data.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked',
 'EmbarkedIndex',
 'SexIndex']

In [97]:
my_data = data.select(['Survived',
 'Pclass',
 'SexIndex',
 'Age',
 #'SibSp',
 #'Parch',
 #'Fare',
 'EmbarkedIndex'])
my_data.show()

+--------+------+--------+----+-------------+
|Survived|Pclass|SexIndex| Age|EmbarkedIndex|
+--------+------+--------+----+-------------+
|       0|     3|     0.0|22.0|          0.0|
|       1|     1|     1.0|38.0|          1.0|
|       1|     3|     1.0|26.0|          0.0|
|       1|     1|     1.0|35.0|          0.0|
|       0|     3|     0.0|35.0|          0.0|
|       0|     1|     0.0|54.0|          0.0|
|       0|     3|     0.0| 2.0|          0.0|
|       1|     3|     1.0|27.0|          0.0|
|       1|     2|     1.0|14.0|          1.0|
|       1|     3|     1.0| 4.0|          0.0|
|       1|     1|     1.0|58.0|          0.0|
|       0|     3|     0.0|20.0|          0.0|
|       0|     3|     0.0|39.0|          0.0|
|       0|     3|     1.0|14.0|          0.0|
|       1|     2|     1.0|55.0|          0.0|
|       0|     3|     0.0| 2.0|          2.0|
|       0|     3|     1.0|31.0|          0.0|
|       0|     2|     0.0|35.0|          0.0|
|       1|     2|     0.0|34.0|   

In [29]:
#my_final_data = my_cols.na.drop()

### Working with Categorical Columns

Let's break this down into multiple steps to make it all clear.

In [12]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer)

In [13]:
gender_indexer = StringIndexer(inputCol='Sex',outputCol='SexIndex')
gender_encoder = OneHotEncoder(inputCol='SexIndex',outputCol='SexVec')

In [14]:
embark_indexer = StringIndexer(inputCol='Embarked',outputCol='EmbarkIndex')
embark_encoder = OneHotEncoder(inputCol='EmbarkIndex',outputCol='EmbarkVec')

In [98]:
assembler = VectorAssembler(inputCols=['Pclass',
 'SexVec',
 'Age',
 #'SibSp',
 #'Parch',
 #'Fare',
 'EmbarkVec'],outputCol='features')

In [67]:
from pyspark.ml.feature import VectorAssembler

In [99]:
assembler = VectorAssembler(inputCols=['Pclass',
 'SexIndex',
 'Age',
 #'SibSp',
 #'Parch',
 #'Fare',
 'EmbarkedIndex'],outputCol='features')
df = assembler.transform(my_data)
df.show()

+--------+------+--------+----+-------------+------------------+
|Survived|Pclass|SexIndex| Age|EmbarkedIndex|          features|
+--------+------+--------+----+-------------+------------------+
|       0|     3|     0.0|22.0|          0.0|[3.0,0.0,22.0,0.0]|
|       1|     1|     1.0|38.0|          1.0|[1.0,1.0,38.0,1.0]|
|       1|     3|     1.0|26.0|          0.0|[3.0,1.0,26.0,0.0]|
|       1|     1|     1.0|35.0|          0.0|[1.0,1.0,35.0,0.0]|
|       0|     3|     0.0|35.0|          0.0|[3.0,0.0,35.0,0.0]|
|       0|     1|     0.0|54.0|          0.0|[1.0,0.0,54.0,0.0]|
|       0|     3|     0.0| 2.0|          0.0| [3.0,0.0,2.0,0.0]|
|       1|     3|     1.0|27.0|          0.0|[3.0,1.0,27.0,0.0]|
|       1|     2|     1.0|14.0|          1.0|[2.0,1.0,14.0,1.0]|
|       1|     3|     1.0| 4.0|          0.0| [3.0,1.0,4.0,0.0]|
|       1|     1|     1.0|58.0|          0.0|[1.0,1.0,58.0,0.0]|
|       0|     3|     0.0|20.0|          0.0|[3.0,0.0,20.0,0.0]|
|       0|     3|     0.0

In [25]:
df.where(df.PassengerId.isin([1])).select('features').first()

Row(features=DenseVector([3.0, 0.0, 22.0, 1.0, 0.0, 7.25, 0.0]))

In [26]:
df.where(df.PassengerId.isin([5])).select('features').first()

Row(features=SparseVector(7, {0: 3.0, 2: 35.0, 5: 8.05}))

In [69]:
from pyspark.ml.classification import LogisticRegression

In [100]:
logRec = LogisticRegression(featuresCol='features',labelCol='Survived')

In [101]:
train, test = df.randomSplit([0.7,.3])

In [102]:
logRecModel = logRec.fit(train)

In [103]:
results = logRecModel.transform(test)

In [104]:
results['Survived','prediction'].show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows



In [105]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [106]:
evaluation = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Survived')

In [107]:
AUC = evaluation.evaluate(results)
AUC

0.7589995324918185

## Pipelines 

Let's see an example of how to use pipelines (we'll get a lot more practice with these later!)

In [17]:
from pyspark.ml import Pipeline

In [18]:
log_reg_titanic = LogisticRegression(featuresCol='features',labelCol='Survived')

In [19]:
pipeline = Pipeline(stages=[gender_indexer,embark_indexer,
                           gender_encoder,embark_encoder,
                           assembler,log_reg_titanic])

In [20]:
train_titanic_data, test_titanic_data = my_final_data.randomSplit([0.7,.3])

In [21]:
fit_model = pipeline.fit(train_titanic_data)

In [22]:
results = fit_model.transform(test_titanic_data)

In [23]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [24]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='Survived')

In [26]:
results.select('Survived','prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
+--------+----------+
only showing top 20 rows



In [27]:
AUC = my_eval.evaluate(results)

In [28]:
AUC

0.7918269230769232

## Great Job!