# Classification and Statistics
<img src='https://raw.githubusercontent.com/bradenrc/Spark_POT/master/Modules/MachineLearning/Classification/titanic.jpg' width="70%" height="70%"></img>
With Spark, we can easily describe data and use it to make predictions.  We'll be using the famous Titanic data set from Kaggle (https://www.kaggle.com/c/titanic/data) and the machine learning package in Spark to do just that.
## Access your data
We have the titanic data on an instance of Object Storage, a cloud datat store for access and storage of unstructured data content.  We'll configure the connection here.

In [133]:
def set_hadoop_config(credentials):
    prefix = "fs.swift.service." + credentials['name'] 
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set(prefix + ".auth.url", credentials['auth_url']+'/v3/auth/tokens')
    hconf.set(prefix + ".auth.endpoint.prefix", "endpoints")
    hconf.set(prefix + ".tenant", credentials['project_id'])
    hconf.set(prefix + ".username", credentials['user_id'])
    hconf.set(prefix + ".password", credentials['password'])
    hconf.setInt(prefix + ".http.port", 8080)
    hconf.set(prefix + ".region", credentials['region'])
    hconf.setBoolean(prefix + ".public", True)

In [77]:
credentials = {
  'auth_url':'https://identity.open.softlayer.com',
  'project':'object_storage_5a6ce20f_2d5d_4ce5_afa1_a28eb274ef0f',
  'project_id':'44aa87e0d8d8484c9b875270726c0598',
  'region':'dallas',
  'user_id':'e8e24e0512324d3cadfd9c2539afd277',
  'domain_id':'1bb51fba380c4a4e8c5621851fd06eed',
  'domain_name':'853513',
  'username':'Admin_f78408b189a87fe9e90b67b40d92cc515ff458c0',
  'password':"""y-m6Tl_fHDTuu2D.""",
  'filename':'train.csv',
  'container':'notebooks',
  'tenantId':'saa8-5843b0fd5f79d7-67415d73dbb5'
}

credentials['name'] = 'keystone'
set_hadoop_config(credentials)

## Data processing
Once we have the data, all of the processing is done in memory.  Here, we're formatting the data, removing columns, dropping rows with insufficient data, creating a DataFrame, and creating columns using user defined functions.

In [122]:
from pyspark.sql import SQLContext,Row
from pyspark.sql.functions import lit

loadTitanicData = sc.textFile("swift://" + credentials['container'] +"." + credentials['name'] + '/' + credentials['filename'])
header = loadTitanicData.first()
loadTitanicData = loadTitanicData.filter(lambda l: l != header).\
                                map(lambda l: l.split(",")).\
                                map(lambda l: [l[1],l[2],l[4],l[5],l[6],l[7],l[9],l[11]]).\
                                filter(lambda l: len(l[3]) > 0 and len(l[7]) > 0)

print loadTitanicData.count()
sqlContext = SQLContext(sc)

loadTitanicData = loadTitanicData.map(lambda l: Row(survived=int(l[0]),\
                                    classRank=int(l[1]),\
                                    sex=l[2],\
                                    age=float(l[3]),\
                                    sibSpou=int(l[4]),\
                                    parChi=int(l[5]),\
                                    fare=float(l[6]),\
                                    embarked=l[7]))
                                    #cherbourg=0,\
                                    #queenstown=0,\
                                    #southampton=0,\
                                    #male=0,\
                                    #female=0))
titanicDf = sqlContext.createDataFrame(loadTitanicData)

#titanicDf = titanicDf.map(lambda l: l.cherbourg = 1 if l.embarked == 'C' else l.cherbourg = 0)

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import IntegerType
isCherb = UserDefinedFunction(lambda x: 1 if x == 'C' else 0, IntegerType())
isQueen = UserDefinedFunction(lambda x: 1 if x == 'Q' else 0, IntegerType())
isSouth = UserDefinedFunction(lambda x: 1 if x == 'S' else 0, IntegerType())
isMale = UserDefinedFunction(lambda x: 1 if x == 'male' else 0, IntegerType())
isFemale = UserDefinedFunction(lambda x: 1 if x == 'female' else 0, IntegerType())
titanicDf = titanicDf.withColumn("cherbourg",isCherb(titanicDf.embarked)).\
                    withColumn("queenstown",isQueen(titanicDf.embarked)).\
                    withColumn("southampton",isSouth(titanicDf.embarked)).\
                    withColumn("male",isMale(titanicDf.sex)).\
                    withColumn("female",isFemale(titanicDf.sex))

titanicDf = titanicDf.drop("sex").drop("embarked")
titanicDf.show(2)

712
+----+---------+-------+------+-------+--------+---------+----------+-----------+----+------+
| age|classRank|   fare|parChi|sibSpou|survived|cherbourg|queenstown|southampton|male|female|
+----+---------+-------+------+-------+--------+---------+----------+-----------+----+------+
|22.0|        3|   7.25|     0|      1|       0|        0|         0|          1|   1|     0|
|38.0|        1|71.2833|     0|      1|       1|        1|         0|          0|   0|     1|
+----+---------+-------+------+-------+--------+---------+----------+-----------+----+------+
only showing top 2 rows



## Gaining insight
### Pearson Correlation
Now that our data is formatted, we can start to do some basic statistics.  Let's look at what features correlate with surviving the titanic crash.

In [125]:
for col in titanicDf.columns:
    print col + " " + str(titanicDf.corr('survived',col))

age -0.0824458680434
classRank -0.356461588445
fare 0.266099600477
parChi 0.0952652942869
sibSpou -0.0155230236317
survived 1.0
cherbourg 0.195672717021
queenstown -0.0489660937057
southampton -0.159015410677
male -0.536761623349
female 0.536761623349


It doesn't look like age had as much impact as we would have guessed.  Let's try to find to correlation after accounting for gender:

In [135]:
maleTitanicDf = titanicDf.filter(titanicDf.male == 1)
print "male age " + str(maleTitanicDf.corr('survived','age'))
femaleTitanicDf = titanicDf.filter(titanicDf.female == 1)
print "female age " + str(femaleTitanicDf.corr('survived','age'))

male age -0.119617523233

female age 0.110711430946


### Chi Squared Hypothesis Testing
Now that we've seen the correlation, we can double check if they are statistically significant with a chi-squared test:

In [80]:
from pyspark.mllib.regression import LabeledPoint 

labRDD = titanicDf.map(lambda l: LabeledPoint(l.survived, [l.classRank,l.age,l.sibSpou,l.parChi,l.fare,\
                                                           l.cherbourg,l.queenstown,l.southampton,l.male,l.female]))
goodnessOfFitTestResult = Statistics.chiSqTest(labRDD)
for result in goodnessOfFitTestResult:
    print result
    print

Chi squared test summary:
method: pearson
degrees of freedom = 2 
statistic = 91.08074548791019 
pValue = 0.0 
Very strong presumption against null hypothesis: the occurrence of the outcomes is statistically independent..

Chi squared test summary:
method: pearson
degrees of freedom = 87 
statistic = 104.09890718065553 
pValue = 0.10209429637643497 
No presumption against null hypothesis: the occurrence of the outcomes is statistically independent..

Chi squared test summary:
method: pearson
degrees of freedom = 5 
statistic = 22.454253153661288 
pValue = 4.2907488830223883E-4 
Very strong presumption against null hypothesis: the occurrence of the outcomes is statistically independent..

Chi squared test summary:
method: pearson
degrees of freedom = 6 
statistic = 28.78494064715035 
pValue = 6.681060065050204E-5 
Very strong presumption against null hypothesis: the occurrence of the outcomes is statistically independent..

Chi squared test summary:
method: pearson
degrees of freedom = 

## Classification Machine Learning
We can use observed data to make predictions on guests' survival.  First, we form our data into a usable format, split it into a training set and a test set, and finally, create predictive models.

In [81]:
from pyspark.mllib.linalg import Vectors
titanicDf = titanicDf.map(lambda l: Row(label=float(l.survived),features=\
                                       Vectors.dense([l.age,float(l.classRank),l.fare,float(l.parChi),float(l.sibSpou),\
                                       float(l.cherbourg),float(l.queenstown),float(l.southampton),\
                                       float(l.male),float(l.female)]))).toDF()
testDf, trainDf = titanicDf.randomSplit([.15,.85],1)

print testDf.take(2)
print
print trainDf.take(2)

[Row(features=DenseVector([22.0, 3.0, 7.25, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0]), label=0.0), Row(features=DenseVector([26.0, 3.0, 7.925, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0]), label=1.0)]

[Row(features=DenseVector([38.0, 1.0, 71.2833, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0]), label=1.0), Row(features=DenseVector([35.0, 1.0, 53.1, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0]), label=1.0)]


### Logistic Regression Model:

In [92]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()
lrModel = lr.fit(trainDf)

lrPred = lrModel.transform(testDf)
print lrPred.map(lambda line: (line.label - line.prediction)**2).mean()

0.171171171171


### Decision Tree Model:

In [96]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer

labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(trainDf)
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(trainDf)

dtc = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
dtcPipeline = Pipeline(stages=[labelIndexer, featureIndexer, dtc])

dtcModel = dtcPipeline.fit(trainDf)
dtcPred = dtcModel.transform(testDf)

print dtcPred.map(lambda line: (line.label - line.prediction)**2).mean()

0.207207207207


### Random Forest Model:

In [99]:
from pyspark.ml.classification import RandomForestClassifier

rfc = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
rfcPipeline = Pipeline(stages=[labelIndexer, featureIndexer, rfc])

rfcModel = rfcPipeline.fit(trainDf)
rfcPred = rfcModel.transform(testDf)

print rfcPred.map(lambda line: (line.label - line.prediction)**2).mean()

0.198198198198


### Gradient Boosted Tree:

In [100]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
gbtPipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

gbtModel = gbtPipeline.fit(trainDf)
gbtPred = gbtModel.transform(testDf)

print gbtPred.map(lambda line: (line.label - line.prediction)**2).mean()

0.234234234234


#### In this case, the logistic regression model was the most accurate.  
Finally.... the ultimate test... would I survive the Titanic crash?

In [121]:
#age,classRank,fare,parChi,sibSpou,cherbourg,queenstown,southampton,male,female
userInput = sc.parallelize([Row(features=Vectors.dense([27.0, 2.0, 50.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0]))]).toDF()
lrModel.transform(userInput).show(1)

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|[27.0,2.0,50.0,0....|[1.26112758443864...|[0.77922015408913...|       0.0|
+--------------------+--------------------+--------------------+----------+

