In [1]:
# Basic setup and acquisition of data
from pyspark import SparkContext
from pyspark.sql import Row, SQLContext, DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

sqlCtx = SQLContext(sc)
titanic_train = sqlCtx.read.format('com.databricks.spark.csv').options(header='true', inferSchema='false').load('/mnt/s3_data/train.csv')
titanic_test = sqlCtx.read.format('com.databricks.spark.csv').options(header='true', inferSchema='false').load('/mnt/s3_data/test.csv')

#split training data
(df_train, df_test) = titanic_train.randomSplit([0.8, 0.2])

print df_train.take(1)

In [2]:
#setting up pipeline - get the StringType error
# Configure an ML pipeline, which consists of these stages: VectorAssmembler and Indexer
assembler = VectorAssembler(inputCols=["Pclass", "Gender"], outputCol="features")
indexer = StringIndexer(inputCol="Survived", outputCol="label")

rf = RandomForestClassifier(numTrees=3, maxDepth=4)

pipeline = Pipeline(stages=[assembler, indexer, rf])
# Fit the pipeline to training documents.
model = pipeline.fit(df_train)


In [3]:
#we need to create our own Transformer to perform proper feature extraction and casting
from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import mean, min, max

class PickFeatures(Transformer):

    def __init__(self):
        super(PickFeatures, self).__init__()
        
    def convertColumn(self, df, name, new_type):
        df_1 = df.withColumnRenamed(name, "swap")
        return df_1.withColumn(name, df_1.swap.cast(new_type)).drop("swap")

    def _transform(self, dataset):
        dataset = self.convertColumn(dataset, "Pclass", "int")
        dataset = dataset.replace("", "0.0", "Age")
        dataset = self.convertColumn(dataset, "Age", "float")
        
        avgAge = dataset.select([mean('Age')]).first()[0]
        dataset = dataset.replace(0, avgAge, "Age")        
        dataset = self.convertColumn(dataset, "Fare", "float")
        
        return dataset

In [4]:
#after creating our own Transformer, we are ready to try building the model again
featurizer = PickFeatures()
encoder = StringIndexer(inputCol="Sex", outputCol="Gender")

#redefine assembler to include Age in features
assembler = VectorAssembler(inputCols=["Pclass", "Age", "Fare", "Gender"], outputCol="features")

#pipeline = Pipeline(stages=[featurizer, encoder, assembler, indexer, rf])

#swap the algos
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[featurizer, encoder, assembler, indexer, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(df_train)

# Make predictions on the held-out set of labeled data
prediction = model.transform(df_test)
# Evaluate the performance of the model
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='precision')

# Evaluate the predictions done on the test set
accuracy = evaluator.evaluate(prediction)
print "Accuracy = " + str(accuracy)


In [5]:
#make prediction on the new (unlabeled) data
#this is the killer line that makes all the previous steps worthwhile
test_prediction = model.transform(titanic_test)
selected = test_prediction.select("PassengerId", "prediction")
for row in selected.collect():
    print row

#create a submission .csv file
#selected.repartition(1).write.format('com.databricks.spark.csv').save('/mnt/s3_data/prediction7.csv')


In [6]:
#Improvements to our initial model
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#1. Grid search
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, (3, 4, 5, 6))\
                              .addGrid(rf.maxDepth, (2, 3, 4, 5, 6))\
                              .build()

#2. Model selection via cross-validation
# Tune an entire Pipeline at once, rather than tuning each element in the Pipeline separately
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=MulticlassClassificationEvaluator(), numFolds=4)

model = cv.fit(df_train)
#make predictions using new model on our holdout set
prediction = model.transform(df_test)

# Evaluate the performance of the model
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='precision')
# Evaluate the predictions done on the test set
accuracy = evaluator.evaluate(prediction)
print "Accuracy = " + str(accuracy)

# Note that we could also use a TrainValidationSplit, which is less computationally expensive but also will not produce 
#  as reliable results when the training dataset is not sufficiently large.
tv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=MulticlassClassificationEvaluator(), numFolds=4)
