In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('titanic').getOrCreate()

In [9]:
df = spark.read.csv(path='data/titanic.csv', inferSchema=True, header=True)
print(df.count(), len(df.columns))

891 12


In [10]:
df.createOrReplaceTempView('titanic_csv')

In [13]:
test = spark.sql("SELECT * FROM titanic_csv")
# print(test.count(), len(test.count()))
print(test.printSchema())

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

None


In [14]:
df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [17]:
my_cols = df.select([
    'Survived',
    'Pclass',
    'Sex',
    'Age',
    'SibSp',
    'Parch', 
    'Fare', 
    'Embarked']
)
print(my_cols.count())
my_final_data = my_cols.na.drop()
print(my_final_data.count())

891
712


In [18]:
from pyspark.ml.feature import (VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer)

In [19]:
my_final_data.show(5)

+--------+------+------+----+-----+-----+-------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+------+----+-----+-----+-------+--------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|
|       1|     1|female|38.0|    1|    0|71.2833|       C|
|       1|     3|female|26.0|    0|    0|  7.925|       S|
|       1|     1|female|35.0|    1|    0|   53.1|       S|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|
+--------+------+------+----+-----+-----+-------+--------+
only showing top 5 rows



In [21]:
gender_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')
one_hot_encoder_gender = OneHotEncoder(inputCol='SexIndex', outputCol='SexVec')

embark_indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkIndex')
embark_encoder = OneHotEncoder(inputCol='EmbarkIndex', outputCol='EmbarkVec')

assembler = VectorAssembler(inputCols=[
    'Pclass',
    'SexVec',
    'EmbarkVec',
    'Age',
    'SibSp',
    'Parch',
    'Fare'],
    outputCol='features'
)

In [23]:
from pyspark.ml.classification import LogisticRegression

from pyspark.ml import Pipeline

In [24]:
log_reg_titanic = LogisticRegression(featuresCol='features', labelCol='Survived')

pipeline = Pipeline(stages=[
    gender_indexer, 
    embark_indexer, 
    one_hot_encoder_gender, 
    embark_encoder, 
    assembler, 
    log_reg_titanic]
)

In [28]:
train_data, test_data = my_final_data.randomSplit([0.7,0.3])
fit_model = pipeline.fit(train_data)
results = fit_model.transform(test_data)
results

DataFrame[Survived: int, Pclass: int, Sex: string, Age: double, SibSp: int, Parch: int, Fare: double, Embarked: string, SexIndex: double, EmbarkIndex: double, SexVec: vector, EmbarkVec: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [29]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [32]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Survived')
my_eval.evaluate(results)

0.8048469387755102

In [34]:
results.select('Survived', 'prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       0.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows



### Evaluator

In [37]:
AUC = my_eval.evaluate(results)
print(AUC)

0.8048469387755102
