In [1]:
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import pyspark.sql.functions
from pyspark.sql.functions import array


In [2]:
#Read the train and test data into pandas data framesfrom pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# This function includes credentials to your Object Storage.
# You might want to remove those credentials before you share your notebook.
def set_hadoop_config_with_credentials_f2ced6cdb4894c42945223d461f16d59(name):
    """This function sets the Hadoop configuration so it is possible to
    access data from Bluemix Object Storage V3 using Spark"""

    prefix = 'fs.swift.service.' + name
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set(prefix + '.auth.url', 'https://identity.open.softlayer.com'+'/v3/auth/tokens')
    hconf.set(prefix + '.auth.endpoint.prefix', 'endpoints')
    hconf.set(prefix + '.tenant', '5d9314bd83c8420a8be5abb3608958bc')
    hconf.set(prefix + '.username', '5be67aca97714d2781f516dfdd48a567')
    hconf.set(prefix + '.password', 'gtdtV_)A5^rYuev9')
    hconf.setInt(prefix + '.http.port', 8080)
    hconf.set(prefix + '.region', 'dallas')
    hconf.setBoolean(prefix + '.public', True)

# you can choose any name
name = 'keystone'
set_hadoop_config_with_credentials_f2ced6cdb4894c42945223d461f16d59(name)

train = sqlContext.read.format('com.databricks.spark.csv')\
    .options(header='true', inferschema='true')\
    .load("swift://notebooks." + name + "/train.csv")

test = sqlContext.read.format('com.databricks.spark.csv')\
    .options(header='true', inferschema='true')\
    .load("swift://notebooks." + name + "/test.csv")


In [3]:
# Difference between Spark dataframes and Pandas; you always need to show() in Spark
train.head()

Row(PassengerId=1, Survived=0, Pclass=3, Name=u'Braund, Mr. Owen Harris', Sex=u'male', Age=22.0, SibSp=1, Parch=0, Ticket=u'A/5 21171', Fare=7.25, Cabin=u'', Embarked=u'S')

In [4]:
train.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|     |       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|     |       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05|     |       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [5]:
# The describe() function in SparkSQL is not as useful as the one in Pandas.  I can't get summary stats for non-numeric fields 
train.describe().show()

+-------+-----------------+-------------------+------------------+------------------+------------------+-------------------+------------------+
|summary|      PassengerId|           Survived|            Pclass|               Age|             SibSp|              Parch|              Fare|
+-------+-----------------+-------------------+------------------+------------------+------------------+-------------------+------------------+
|  count|              891|                891|               891|               714|               891|                891|               891|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642| 29.69911764705882|0.5230078563411896|0.38159371492704824|32.204207968574615|
| stddev|257.3538420152301|0.48659245426485737|0.8360712409770491|14.526497332334039| 1.102743432293432| 0.8060572211299486|  49.6934285971809|
|    min|                1|                  0|                 1|              0.42|                 0|                  0|            

In [6]:
train.dtypes

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

In [7]:
#Replace missing values in Age
##Compute the average age for passengers by sex and class
train_aggSexPclass = train.groupby(['Sex','Pclass']).agg({"Age":"mean"})
train_aggSexPclass = train_aggSexPclass.withColumn('Age', train_aggSexPclass['avg(Age)']).drop('avg(Age)')
train_aggSexPclass.show(10)


+------+------+------------------+
|   Sex|Pclass|               Age|
+------+------+------------------+
|  male|     1| 41.28138613861386|
|  male|     2| 30.74070707070707|
|  male|     3|26.507588932806325|
|female|     1| 34.61176470588235|
|female|     2|28.722972972972972|
|female|     3|             21.75|
+------+------+------------------+



In [8]:
# Merge the mean Age values by Sex and Pclass with the primary dataframe
# First, merge the mean values into a dataframe with the original records that had null values
train_merge = train.where(train.Age.isNull()).drop('Age').join(train_aggSexPclass, ['Sex','Pclass'], 'inner')
# Union with the original records 
train_ready = train.where(train.Age.isNotNull()).unionAll(train_merge)
train_ready.show(10)


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|     |       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|     |       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05|     |       S|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|      

In [9]:
train_ready.dtypes

[('PassengerId', 'string'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'string'),
 ('Ticket', 'string'),
 ('Fare', 'string'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

In [10]:
# Train a RandomForest model.
from pyspark.ml.classification import RandomForestClassifier
estimator_rf = RandomForestClassifier(labelCol="indexed", numTrees=3, maxDepth=2, seed=42)


In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
vecAssembler = VectorAssembler(inputCols=['Age','Pclass','SibSp'], outputCol="features")
stringIndexer = StringIndexer(inputCol="Survived", outputCol="indexed")
pipeline = Pipeline(stages=[vecAssembler, stringIndexer, estimator_rf])
model = pipeline.fit(train_ready)


In [None]:
# Some testing of VectorAssembler to see if I was using it correctly
#trainingData = vecAssembler.transform(train_ready)
#trainingData.dtypes

In [None]:
###  SUCCESS!  Now I'd like to turn the whole process from start to finish into a pipeline in order to apply the transformations to the test set
###  however, it looks like join can't be pipelined



In [13]:
# Merge the mean Age values by Sex and Pclass with the test dataframe
#test_merge = test.join(train_aggSexPclass, ['Sex','Pclass'], 'inner')
# Replace the missing values of Age with Age_Mean, then drop Age_Mean
#test_ready = test_merge.where(test_merge.Age.isNotNull()).unionAll(test_merge.where(test_merge.Age.isNull()).withColumn('Age',test_merge.Age_Mean)).drop('Age_Mean')

# Merge the mean Age values by Sex and Pclass with the primary dataframe
# First, merge the mean values into a dataframe with the original records that had null values
test_merge = test.where(test.Age.isNull()).drop('Age').join(train_aggSexPclass, ['Sex','Pclass'], 'inner')
# Union with the original records 
test_ready = test.where(test.Age.isNotNull()).unionAll(test_merge)
test_ready.show(10)


+-----------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch|   Ticket|   Fare|Cabin|Embarked|
+-----------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|        892|     3|    Kelly, Mr. James|  male|34.5|    0|    0|   330911| 7.8292|     |       Q|
|        893|     3|Wilkes, Mrs. Jame...|female|47.0|    1|    0|   363272|    7.0|     |       S|
|        894|     2|Myles, Mr. Thomas...|  male|62.0|    0|    0|   240276| 9.6875|     |       Q|
|        895|     3|    Wirz, Mr. Albert|  male|27.0|    0|    0|   315154| 8.6625|     |       S|
|        896|     3|Hirvonen, Mrs. Al...|female|22.0|    1|    1|  3101298|12.2875|     |       S|
|        897|     3|Svensson, Mr. Joh...|  male|14.0|    0|    0|     7538|  9.225|     |       S|
|        898|     3|Connolly, Miss. Kate|female|30.0|    0|    0|   330972| 7.6292|     |       Q|
|        8

In [14]:
prediction = model.transform(test_ready)
prediction.select('PassengerId','prediction').show()

+-----------+----------+
|PassengerId|prediction|
+-----------+----------+
|        892|       0.0|
|        893|       0.0|
|        894|       0.0|
|        895|       0.0|
|        896|       0.0|
|        897|       0.0|
|        898|       0.0|
|        899|       0.0|
|        900|       0.0|
|        901|       0.0|
|        903|       0.0|
|        904|       0.0|
|        905|       0.0|
|        906|       0.0|
|        907|       0.0|
|        908|       0.0|
|        909|       0.0|
|        910|       0.0|
|        911|       0.0|
|        912|       0.0|
+-----------+----------+
only showing top 20 rows



In [52]:
# Hey, pipelines can also be used just for transformations, though it's a little clunky
pipe2 = Pipeline(stages=[vecAssembler, stringIndexer])
model2 = pipe2.fit(train_ready)
pred2 = model2.transform(train_ready)
pred2.show(10)

+----+------+-----------+--------+--------------------+----+-----+-----+-----------+-------+-----------+--------+------------------+-------+
| Sex|Pclass|PassengerId|Survived|                Name| Age|SibSp|Parch|     Ticket|   Fare|      Cabin|Embarked|          features|indexed|
+----+------+-----------+--------+--------------------+----+-----+-----+-----------+-------+-----------+--------+------------------+-------+
|male|     1|          7|       0|McCarthy, Mr. Tim...|54.0|    0|    0|      17463|51.8625|        E46|       S|[54.0,1.0,0.0,0.0]|    0.0|
|male|     1|         24|       1|Sloper, Mr. Willi...|28.0|    0|    0|     113788|   35.5|         A6|       S|[28.0,1.0,0.0,0.0]|    1.0|
|male|     1|         28|       0|Fortune, Mr. Char...|19.0|    3|    2|      19950|  263.0|C23 C25 C27|       S|[19.0,1.0,3.0,2.0]|    0.0|
|male|     1|         31|       0|Uruchurtu, Don. M...|40.0|    0|    0|   PC 17601|27.7208|           |       C|[40.0,1.0,0.0,0.0]|    0.0|
|male|     1|