In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Titanic Logistic Regression Classification").getOrCreate()

In [3]:
df = spark.read.csv("titanic.csv", inferSchema=True, header=True)

In [4]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [5]:
df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [24]:
my_cols = df.select(['Survived','Pclass','Sex','Age','SibSp',
                     'Parch','Fare','Embarked'])

In [25]:
#handling missing values and dropiing it for this project. 
my_final_data = my_cols.na.drop()

In [26]:
my_final_data.show()

+--------+------+------+----+-----+-----+-------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+------+----+-----+-----+-------+--------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|
|       1|     1|female|38.0|    1|    0|71.2833|       C|
|       1|     3|female|26.0|    0|    0|  7.925|       S|
|       1|     1|female|35.0|    1|    0|   53.1|       S|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|
|       0|     1|  male|54.0|    0|    0|51.8625|       S|
|       0|     3|  male| 2.0|    3|    1| 21.075|       S|
|       1|     3|female|27.0|    0|    2|11.1333|       S|
|       1|     2|female|14.0|    1|    0|30.0708|       C|
|       1|     3|female| 4.0|    1|    1|   16.7|       S|
|       1|     1|female|58.0|    0|    0|  26.55|       S|
|       0|     3|  male|20.0|    0|    0|   8.05|       S|
|       0|     3|  male|39.0|    1|    5| 31.275|       S|
|       0|     3|female|14.0|    0|    0| 7.8542|       

##Working with Categorical Columns

In [27]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer

In [28]:
#using StringIndexer we convert every string into a number and then one hot encode on the stringIndexer object.
gender_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndexed')
gender_encoder = OneHotEncoder(inputCol='SexIndexed', outputCol='SexVect')

In [29]:
#now lets do the same for embarked column
embarked_indexer= StringIndexer(inputCol='Embarked', outputCol='EmbarkedIndex')
embarked_encoder= OneHotEncoder(inputCol='EmbarkedIndex', outputCol='EmbarkedVect')

In [30]:
assembler = VectorAssembler(inputCols=['Pclass','SexVect','EmbarkedVect','Age','SibSp','Parch','Fare'],
                           outputCol='feature')

Now Lets Build Pipeline and Call The String indexer and One hot Encoder that we created earlier.

In [31]:
from pyspark.ml.classification import LogisticRegression

In [32]:
from pyspark.ml import Pipeline

pipeline sets different stages for a very complex process

In [33]:
#lets create out Logistic regression object
logreg_titanic = LogisticRegression(featuresCol='feature', labelCol='Survived')

In [34]:
pipeline = Pipeline(stages=[gender_indexer, embarked_indexer, gender_encoder, embarked_encoder, assembler, logreg_titanic])

In [35]:
train_data, test_data = my_final_data.randomSplit([0.7,0.3])

In [36]:
fit_model = pipeline.fit(train_data)

In [38]:
#due to column doent exist in dataframe error I made a few changes to my_final_data dataFrame
if 'Survived' not in my_final_data.columns:
    print("Survived column does not exist in the DataFrame")
else:
    # Perform operations with 'Survived' column
    # Example: Select the 'Survived' column
    df.select('Survived').show()

+--------+
|Survived|
+--------+
|       0|
|       1|
|       1|
|       1|
|       0|
|       0|
|       0|
|       0|
|       1|
|       1|
|       1|
|       1|
|       0|
|       0|
|       0|
|       1|
|       0|
|       1|
|       0|
|       1|
+--------+
only showing top 20 rows



In [39]:
#time to transform out test data
results = fit_model.transform(test_data)

In [40]:
#Lets Evaluate the train data with test data
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [42]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Survived')

In [44]:
results.select('Survived','prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows



In [45]:
#lets look at Area Under the Curve
AUC = my_eval.evaluate(results)

In [46]:
#lets look at the AUC
AUC

0.7480650154798762

AUC results shows the curve on side of 1 which means the model prediction was 74% matches. 
AUC = Area Under the Curve.
AUROC = Area Under the Receiver Operating Characteristic curve.