In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession \
    .builder \
    .appName("Titanic Data") \
    .master("local[*]") \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.sql.warehouse.dir", "file:///tmp/spark-warehouse") \
    .config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow") \
    .getOrCreate()

spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/10 10:44:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/10 10:44:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
df = (spark.read
      .format("csv")
      .option("header", "true")
      .load("Data/titanic/train.csv")
      )

df.show(5)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450|   8.05| NULL|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+---------------

In [6]:
from pyspark.sql import functions as F
from pyspark.sql import types as T


from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml import Pipeline

In [7]:
(train_df, test_df) = df.randomSplit([0.8, 0.2], 11)
print('Number of train samples: ' + str(train_df.count()))
print('Number of test samples: ' + str(test_df.count()))

Number of train samples: 703
Number of test samples: 188


In [8]:
#Label Encoding of categorical variables without any .fit or .transform
Sex_indexer = StringIndexer(inputCol="Sex", outputCol="Gender")
Embarker_indexer = StringIndexer(inputCol="Embarked", outputCol="Boarded")

In [9]:
#Assemble all the features with VectorAssembler
inputCols = ["Pclass", "Age", "Fare", "Gender", "Boarded"]
output_cols = "features"

vector_assembler = VectorAssembler(inputCols=inputCols, outputCol=output_cols)

#Modeling using DecisionTreeClassifier
dt_model = RandomForestClassifier(featuresCol="features", labelCol="Survived")

In [10]:
from pyspark.sql.functions import col

train_df = train_df.withColumn("Pclass", col("Pclass").cast("integer"))
train_df = train_df.withColumn("Age", col("Age").cast("double"))
train_df = train_df.withColumn("Fare", col("Fare").cast("double"))
train_df = train_df.withColumn("Survived", col("Survived").cast("integer"))


test_df = test_df.withColumn("Pclass", col("Pclass").cast("integer"))
test_df = test_df.withColumn("Age", col("Age").cast("double"))
test_df = test_df.withColumn("Fare", col("Fare").cast("double"))

train_df = train_df.na.drop()
test_df = test_df.na.drop()

train_df.printSchema()
test_df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [11]:
pipeline = Pipeline(stages=[Sex_indexer, Embarker_indexer, vector_assembler, dt_model])

final_pipeline = pipeline.fit(train_df)

test_predictions_from_pipeline = final_pipeline.transform(test_df)
test_predictions_from_pipeline.show(5, truncate=False)

+-----------+--------+------+-----------------------------------------+------+----+-----+-----+--------+-------+-----+--------+------+-------+--------------------------+---------------------------------------+-----------------------------------------+----------+
|PassengerId|Survived|Pclass|Name                                     |Sex   |Age |SibSp|Parch|Ticket  |Fare   |Cabin|Embarked|Gender|Boarded|features                  |rawPrediction                          |probability                              |prediction|
+-----------+--------+------+-----------------------------------------+------+----+-----+-----+--------+-------+-----+--------+------+-------+--------------------------+---------------------------------------+-----------------------------------------+----------+
|103        |0       |1     |White, Mr. Richard Frasar                |male  |21.0|0    |1    |35281   |77.2875|D26  |S       |0.0   |0.0    |[1.0,21.0,77.2875,0.0,0.0]|[9.891685977383917,10.108314022616085] |[0