In [None]:
df = spark.read.csv("gs://pdl-dataproc/titanic/train.csv", header=True, inferSchema=True)
df

In [None]:
import pyspark.sql.functions as f  # We tend to need this a lot. Let's import it now.

In [None]:
# Rename label column to make use of PySpark classifier defaults.
df = df.withColumnRenamed('Survived', 'label')
df.show(5)

In [None]:
# Let's ensure Age has a non-null value. If Age is Null, set it to -1.
df = df.withColumn('Age', f.when(f.col("Age").isNull(), -1).otherwise(f.col("Age")))
df.show(5)

In [None]:
# Let's cast the Sex column types to integers so the ML classifier can leverage this data.
df = df.withColumn('Sex',
    f.when(
        f.col('Sex') == 'male', 1
    ).when(
        f.col('Sex') == 'female', 0
    ).otherwise(-1)
)
df.show(5)

In [None]:
set(df.select('Embarked').collect())  # Let's see our enum options...

In [None]:
# Again, let's cast the Embarked column to something numeric.
df = df.withColumn('Embarked',
    f.when(
        f.col('Embarked') == 'C', 1
    ).when(
        f.col('Embarked') == 'Q', 2
    ).when(
        f.col('Embarked') == 'S', 3
    ).otherwise(-1)  
)
df.show(5)

In [None]:
# Similar clean up of Fare
df = df.withColumn('Fare', f.when(f.col("Fare").isNull(), -1).otherwise(f.col("Fare")))

In [None]:
# Let's clean up the Cabin column by casting the cabin class to an integer and replacing nulls with -1. 
df = df.withColumn('Cabin', f.lower(f.col('Cabin')))  # lower case the value
df = df.withColumn('Cabin', f.substring(f.col('Cabin'), 0, 1))  # get the first char from the value
df = df.withColumn('Cabin', f.ascii(f.col('Cabin')))  # get a number for the value (ASCII code)
df = df.withColumn('Cabin', f.when(f.col("Cabin").isNull(), -1).otherwise(f.col("Cabin"))) # repalce NULL -> -1

df.show(5)

In [None]:
# Get a list of columns to use as features
feature_cols = set(df.columns)
feature_cols -= {'PassengerId', 'label', 'Name', 'Ticket'}
feature_cols = list(feature_cols)

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# PySpark likes is features and lables bundled up into VectorAssembler objects.
features = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features",  # PySpark uses this as the default column
).transform(df)

features

In [None]:
# Split up data for test and training
train, test = features.randomSplit([0.7, 0.3], seed=2018)

In [None]:
from pyspark.ml.classification import RandomForestClassifier

classifier = RandomForestClassifier()

model = classifier.fit(train)

predictions = model.transform(test)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

print('Test Area Under ROC', BinaryClassificationEvaluator().evaluate(predictions))

In [None]:
df = spark.read.csv("gs://pdl-dataproc/titanic/test.csv", header=True, inferSchema=True)
df = df.withColumnRenamed('Survived', 'label')
df = df.withColumn('Age', f.when(f.col("Age").isNull(), -1).otherwise(f.col("Age")))
df = df.withColumn('Sex',
    f.when(
        f.col('Sex') == 'male', 1
    ).when(
        f.col('Sex') == 'female', 0
    ).otherwise(-1)
)
df = df.withColumn('Embarked',
    f.when(
        f.col('Embarked') == 'C', 1
    ).when(
        f.col('Embarked') == 'Q', 2
    ).when(
        f.col('Embarked') == 'S', 3
    ).otherwise(-1)  
)
df = df.withColumn('Fare', f.when(f.col("Fare").isNull(), -1).otherwise(f.col("Fare")))
df = df.withColumn('Cabin', f.lower(f.col('Cabin')))
df = df.withColumn('Cabin', f.substring(f.col('Cabin'), 0, 1))
df = df.withColumn('Cabin', f.ascii(f.col('Cabin')))
df = df.withColumn('Cabin', f.when(f.col("Cabin").isNull(), -1).otherwise(f.col("Cabin")))

feature_cols = set(df.columns)
feature_cols -= {'PassengerId', 'label', 'Name', 'Ticket'}
feature_cols = list(feature_cols)

features = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features",
).transform(df)

predictions = model.transform(features)

In [None]:
predictions = predictions.withColumnRenamed('prediction', 'Survived')

In [None]:
# Return a new RDD that is reduced into numPartitions partitions.
rdd = predictions.select(['PassengerId', 'Survived']).coalesce(1)
rdd.write.csv('gs://pdl-dataproc/titanic/results.csv')

In [None]:
from pyspark.sql.types import IntegerType

# FYI, this could cause OOM errors! All data sent to one node
rdd = predictions.select(['PassengerId', 'Survived']).coalesce(1)
rdd = rdd.withColumn("Survived", f.col("Survived").cast(IntegerType()))
rdd.write.option("header","true").csv('gs://pdl-dataproc/titanic/results')