In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName("Titanic Data") \
    .config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow") \
    .getOrCreate()

spark

25/02/28 13:21:31 WARN Utils: Your hostname, Djordjes-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.33 instead (on interface en0)
25/02/28 13:21:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/28 13:21:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
df = (spark.read
      .format("csv")
      .option("header", "true")
      .load("titanic/train.csv"))

df.show(3)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
only showing top 3 rows



In [8]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [4]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [9]:
from pyspark.ml.classification import RandomForestClassifier

In [10]:
from pyspark.ml import Pipeline

In [13]:
(train_df, test_df) = df.randomSplit([0.8, 0.2], 11)
print('Number of training records: {}'.format(train_df.count()))
print('Number of testing records: {}'.format(test_df.count()))

Number of training records: 703
Number of testing records: 188


In [16]:
Sex_indexer = StringIndexer(inputCol="Sex", outputCol="Gender")
Embarker_indexer = StringIndexer(inputCol="Embarked", outputCol="Boarder")

In [19]:
inputCols = ["Pclass", "Age", "Fare", "Gender", "Boarder"]
output_cols = "features"

vector_assembler = VectorAssembler(inputCols=inputCols, outputCol=output_cols)

dt_model = RandomForestClassifier(featuresCol="features", labelCol="Survived")

In [None]:
from pyspark.sql.functions import col

train_df = train_df.withColumn("Pclass", col("Pclass").cast("integer"))
train_df = train_df.withColumn("Age", col("Age").cast("double"))
train_df = train_df.withColumn("Fare", col("Fare").cast("double"))
train_df = train_df.withColumn("Survived", col("Survived").cast("integer"))


test_df = test_df.withColumn("Pclass", col("Pclass").cast("integer"))
test_df = test_df.withColumn("Age", col("Age").cast("double"))
test_df = test_df.withColumn("Fare", col("Fare").cast("double"))

train_df = train_df.na.drop()
test_df = test_df.na.drop()

train_df.printSchema()
test_df.printSchema()


root
 |-- PassengerId: string (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [27]:
pipeline = Pipeline(stages=[Sex_indexer, Embarker_indexer, vector_assembler, dt_model])

final_pipeline = pipeline.fit(train_df)

test_predictions_from_pipeline = final_pipeline.transform(test_df)
test_predictions_from_pipeline.show(5, truncate=False)

+-----------+--------+------+-----------------------------------------+------+----+-----+-----+--------+-------+-----+--------+------+-------+--------------------------+---------------------------------------+----------------------------------------+----------+
|PassengerId|Survived|Pclass|Name                                     |Sex   |Age |SibSp|Parch|Ticket  |Fare   |Cabin|Embarked|Gender|Boarder|features                  |rawPrediction                          |probability                             |prediction|
+-----------+--------+------+-----------------------------------------+------+----+-----+-----+--------+-------+-----+--------+------+-------+--------------------------+---------------------------------------+----------------------------------------+----------+
|103        |0       |1     |White, Mr. Richard Frasar                |male  |21.0|0    |1    |35281   |77.2875|D26  |S       |0.0   |0.0    |[1.0,21.0,77.2875,0.0,0.0]|[9.758230610939687,10.241769389060313] |[0.48