In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder.appName("DS 2.3 Final Project").getOrCreate()

In [3]:
df = spark.read.csv("titanic.csv", header=True, inferSchema=True)
df = df.na.drop()

In [4]:
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+-----------+--------+-----------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|     Ticket|    Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+-----------+--------+-----------+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|   PC 17599| 71.2833|        C85|       C|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|     113803|    53.1|       C123|       S|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|      17463| 51.8625|        E46|       S|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|    PP 9549|    16.7|         G6|       S|
|         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|     113783|   26.55|       C103|       S|
|         22|       1|     2|Beesley, Mr. Lawr...|  male|34.0|  

In [5]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [6]:
df = df.select(col("Survived"), col("Pclass").cast("float"), col("Sex"), col("Age").cast("float"), col("SibSp").cast("float"), col("Fare").cast("float"), col("Embarked"))

In [7]:
train_data, test_data = df.randomSplit([0.7, 0.3])

In [8]:
indexer = StringIndexer(inputCols=["Survived", "Sex", "Embarked"], outputCols=["Survived_i", "Sex_i", "Embarked_i"])
encoder = OneHotEncoder(inputCols=["Sex_i", "Embarked_i"], outputCols=["Sex_v", "Embarked_v"])
assembler = VectorAssembler(inputCols=["Pclass", "Sex_v", "Age", "SibSp", "Fare", "Embarked_v"], outputCol="features")
 
pipeline = Pipeline(stages=[indexer, encoder, assembler])
model = pipeline.fit(train_data)
predict = model.transform(test_data)
predict.select("features", "Survived")

log_reg = LogisticRegression(featuresCol="features", labelCol="Survived", maxIter=10)
log_reg_model = log_reg.fit(predict)

In [9]:
print("\nPrecision (Positive Predictive Value): {}".format(log_reg_model.summary.weightedPrecision))
print("\nRecall (True Positive Rate): {}".format(log_reg_model.summary.weightedRecall))
print("\nF-measure: {}".format(log_reg_model.summary.weightedFMeasure()))
# print("\nROC: {}".format(log_reg_model.summary.roc))
print("\nArea Under ROC Curve: {}".format(log_reg_model.summary.areaUnderROC))
# print("\nArea Under Precision-Recall Curve: {}".format(log_reg_model.summary.pr))

print("\nCoefficients: {}".format(log_reg_model.coefficients))
print("\nIntercept: {}".format(log_reg_model.intercept))


Precision (Positive Predictive Value): 0.7911635148477254

Recall (True Positive Rate): 0.7936507936507937

F-measure: 0.7921912059843095

Area Under ROC Curve: 0.897674418604651

Coefficients: [-2.997003035927543,4.97410161601156,-0.10211241654689895,0.016533331275285428,-0.0041760776284109555,4.488084408150434,5.425050932867806]

Intercept: 2.5853567694179147
