In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
df_titanic = spark.read.csv(path='data/titanic.csv', header=True, inferSchema=True)
df_titanic.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [5]:
df_titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [6]:
df_titanic.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [7]:
df_titanic_nona = df_titanic.select(['Survived', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked']).na.drop()

In [8]:
df_titanic.count(), df_titanic_nona.count()

(891, 712)

In [9]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer

In [12]:
sex_indexer = StringIndexer(inputCol='Sex', outputCol='Sex_indexed')
sex_encoder = OneHotEncoder(inputCol='Sex_indexed', outputCol='Sex_enc')

In [14]:
embark_indexer = StringIndexer(inputCol='Embarked', outputCol='Embarked_indexed')
embark_encoder = OneHotEncoder(inputCol='Embarked_indexed', outputCol='Embarked_enc')

In [15]:
assembler = VectorAssembler(inputCols=['Pclass', 'Sex_enc', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked_enc'],
                           outputCol='features')

### Model Building Using Pipeline

In [18]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [17]:
log_reg_titanic = LogisticRegression(featuresCol='features', labelCol='Survived')

In [19]:
pipeline = Pipeline(stages=[
    sex_indexer, sex_encoder, embark_indexer, embark_encoder, assembler, log_reg_titanic
])

In [20]:
df_titanic_train, df_titanic_test = df_titanic_nona.randomSplit([0.7, 0.3])

In [21]:
mod_pipe = pipeline.fit(df_titanic_train)

In [22]:
mod_pipe_results = mod_pipe.transform(df_titanic_test)

In [24]:
mod_pipe_results.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Sex_indexed: double (nullable = false)
 |-- Sex_enc: vector (nullable = true)
 |-- Embarked_indexed: double (nullable = false)
 |-- Embarked_enc: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



### Evaluation

In [23]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [25]:
bi_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Survived')

In [26]:
auc_metric = bi_eval.evaluate(mod_pipe_results)

In [27]:
auc_metric

0.8082753700731229