## Quick reminder on Logistic Regression
Logistic Regression is a supervised machine learning algorithm used for classification problems. Unlike linear regression which predicts continuous values it predicts the probability that an input belongs to a specific class. It is used for binary classification where the output can be one of two possible categories such as Yes/No, True/False or 0/1. It uses sigmoid function to convert inputs into a probability value between 0 and 1. In this article, we will see the basics of logistic regression and its core concepts.

In this tutorial series, we are going to cover **Logistic Regression** using **Pyspark**.

**Logistic Regression** is one of the basic ways to perform classification (don’t be confused by the word “regression”). Logistic Regression is a classification method.

Some examples of classification are:
- Spam detection
- Disease Diagnosis
  

We will be using the data for Titanic where I have columns PassengerId, Survived, Pclass, Name, Sex, Age, SibSp, Parch, Ticket, Fare, Cabin, and Embarked. **We have to predict whether the passenger will survive or not using the Logistic Regression machine learning model**. To get started, open a new notebook and follow the steps mentioned in the below code:

### 1) Import library

In [1]:
import pyspark
import numpy as np
from pyspark.sql import SparkSession
#SparkSession is now the entry point of Spark
#SparkSession can also be construed as gateway to spark libraries

### 2) Load the data

In [2]:
# Starting the Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Titanic').getOrCreate()

# Reading the data
df = spark.read.csv('titanic.csv',inferSchema=True, header=True)

# Showing the data
df.show(5)
print(df.count())

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/31 15:59:37 WARN Utils: Your hostname, GERARD, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/10/31 15:59:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/31 15:59:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

#### Schema

In [3]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [4]:
df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [5]:
df.groupBy('Survived').count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



In [6]:
from pyspark.sql import functions as F

df.groupBy('Survived') \
  .agg(F.count('*').alias('count')) \
  .withColumn('percentage', F.round(F.col('count') / df.count() * 100, 2)) \
  .show()


+--------+-----+----------+
|Survived|count|percentage|
+--------+-----+----------+
|       1|  342|     38.38|
|       0|  549|     61.62|
+--------+-----+----------+



Now it's clear that the data is realy umbalanced.

### 3) Data cleaning

##### Number of null values 

In [7]:
from pyspark.sql import functions as F

# Total number lines
total_rows = df.count()

# Pourcentage calculus
pourcentage_null = df.select([
    (F.count(F.when(F.col(c).isNull(), c)) / total_rows * 100).alias(c)
    for c in df.columns
])

pourcentage_null.show()


+-----------+--------+------+----+---+------------------+-----+-----+------+----+-----------------+-------------------+
|PassengerId|Survived|Pclass|Name|Sex|               Age|SibSp|Parch|Ticket|Fare|            Cabin|           Embarked|
+-----------+--------+------+----+---+------------------+-----+-----+------+----+-----------------+-------------------+
|        0.0|     0.0|   0.0| 0.0|0.0|19.865319865319865|  0.0|  0.0|   0.0| 0.0|77.10437710437711|0.22446689113355783|
+-----------+--------+------+----+---+------------------+-----+-----+------+----+-----------------+-------------------+



Removing NULL Values Columns

The next step includes removing the data having null values as shown in the above picture. **We do not need the columns PassengerId, Name, Ticket, and Cabin as they are not required to train and test the model**.

In [8]:
# Selecting the columns which are required 
# to train and test the model.

df_clean = df.dropna(subset=['Embarked', 'Survived', 'Age', 'Fare'])

# Again showing the data
df_clean.show(5)
print(df_clean.count())

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

#### Convert String Column to Ordinal Columns 
The next task is to convert the string columns (Sex and Embarked) to integral columns as without doing this, we cannot vectorize the data using VectorAssembler. 

In [9]:
# Importing the required libraries
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder

# Converting the Sex Column
sex_indexer = StringIndexer(inputCol="Sex", outputCol="SexIndex", handleInvalid="keep")
sex_encoder = OneHotEncoder(inputCol="SexIndex", outputCol="SexVec")


# Converting the Embarked Column
embark_indexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkIndex", handleInvalid="keep")
embark_encoder = OneHotEncoder(inputCol="EmbarkIndex", outputCol="EmbarkVec")

# Vectorizing the data into a new column "features" 
# which will be our input/features class
assembler = VectorAssembler(
    inputCols=["Pclass", "SexVec", "Age", "SibSp", "Parch", "Fare", "EmbarkVec"],
    outputCol="features"
)


Now we need Pipeline to stack the tasks one by one and import and call the Logistic Regression Model.

**Note**: ``LogisticRegression(regParam=0.01, elasticNetParam=0.0)  # Ridge`` and ``LogisticRegression(regParam=0.01, elasticNetParam=1.0)  # Lasso``

In particular, ``LogisticRegression(regParam=0.0, elasticNetParam=0.0)  # ordinary least squares``

In [10]:
# Importing Pipeline and Model
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

# Load the logistic regression (optimized)
log_reg = LogisticRegression(
    featuresCol="features",
    labelCol="Survived",
    regParam=0.01
)


# Creating the pipeline
pipeline = Pipeline(stages=[sex_indexer, embark_indexer, sex_encoder, embark_encoder, assembler, log_reg])


After pipelining the tasks, we will split the data into training data and testing data to train and test the model.

In [11]:
# Splitting the data into train and test
train_data, test_data = df_clean.randomSplit([0.7, 0.3], seed=42)
# Fitting the model on training data
fit_model = pipeline.fit(train_data)

# Storing the results on test data
results = fit_model.transform(test_data)

# Showing the results
results.select('features', 'Survived', 'prediction', 'probability' ).show(10)

25/10/31 15:59:49 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+--------+----------+--------------------+
|            features|Survived|prediction|         probability|
+--------------------+--------+----------+--------------------+
|(10,[0,2,3,6,7],[...|       1|       1.0|[0.38793956061383...|
|[3.0,1.0,0.0,2.0,...|       0|       0.0|[0.85469593420884...|
|[2.0,0.0,1.0,14.0...|       1|       1.0|[0.11708252002681...|
|[3.0,0.0,1.0,4.0,...|       1|       1.0|[0.26437873163312...|
|(10,[0,2,3,6,7],[...|       0|       1.0|[0.28840096116707...|
|(10,[0,2,3,6,7],[...|       1|       1.0|[0.39450839653089...|
|[3.0,1.0,0.0,2.0,...|       0|       0.0|[0.92053408959047...|
|(10,[0,2,3,6,9],[...|       1|       1.0|[0.38802353469557...|
|[3.0,0.0,1.0,8.0,...|       0|       1.0|[0.41943696872783...|
|[1.0,1.0,0.0,19.0...|       0|       0.0|[0.50052289937674...|
+--------------------+--------+----------+--------------------+
only showing top 10 rows


#### Model evaluation using ROC-AUC
The results will add extra columns rawPrediction, probability, and prediction because we are transforming the results on our data. After getting the results, we will now find the AUC(Area under the ROC Curve) which will give the efficiency of the model. For this, we will use BinaryClassificationEvaluator as shown:

In [12]:
# Importing the evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Calling the evaluator
res = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Survived')

# Evaluating the AUC, AUCPR, F1 on results
ROC_AUC = res.evaluate(results)
AUCPR=BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Survived', metricName="areaUnderPR").evaluate(results)
f1 = MulticlassClassificationEvaluator(metricName="f1", labelCol="Survived").evaluate(results)


In [13]:
print(f"ROC AUC  : {ROC_AUC:.4f}")
print(f"PR  AUC  : {AUCPR:.4f}")
print(f"F1-score : {f1:.4f}")

ROC AUC  : 0.8240
PR  AUC  : 0.7178
F1-score : 0.8257


**Note**: In general, an AUC value above 0.7 is considered good, but it's important to compare the value to the expected performance of the problem and the data to determine if it's actually good.

In [14]:
spark.stop()