In [None]:
import pyspark
sc = pyspark.SparkContext('local[*]')

In [2]:
!rm -rf metastore_db/*.lck

from pyspark.sql import SQLContext
sqlc = SQLContext(sc)

### Step 1
- Load the train and test sets
- Check the schema, the variables have their right types?
- If not, how to correctly load the datasets?

In [3]:
# adding the inferSchema option as true retains the correct variable types in the dataframe

train = sqlc.read.format("com.databricks.spark.csv")\
.option("header","true")\
.option("inferSchema","true")\
.load("train.csv")

test = sqlc.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load("test.csv")

In [4]:
train.show(5)
test.show(5)
print("N of train rows = "+str(train.count()))
print("N of test rows = "+str(test.count()))

train.printSchema()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

### Step 2
- Explore the features of your dataset
- You can use DataFrame's ***describe*** method to get summary statistics
    - hint: ***toPandas*** may be useful to ease the manipulation of small dataframes
- Are there any ***NaN*** values in your dataset?
- If so, define value/values to fill these ***NaN*** values
    - hint: ***na*** property of DataFrames provide several methods of handling NA values

In [5]:
# you can easily convert spark dataframes to pandas, but when you're using big data across different machines, you need to use spark

train_pd = train.toPandas()
test_pd = test.toPandas()

In [7]:
# find the number of null values in a given column. You can use the functions module in pyspark to get the .isnull function
# filter the database where 'Age' is null, and then count the number of rows

from pyspark.sql import functions as F
train_age_nulls = train.filter(F.isnull(train['Age']))
count_age_nulls = train_age_nulls.count()
print("N of nulls in Age variable = "+str(count_age_nulls))

# we can do this same thing as above by using the 'where' clause and calling .isNull() on the selection
print('N of nulls in Age variable = '+str(train.where(train['Age'].isNull()).count()))

N of nulls in Age variable = 177
N of nulls in Age variable = 177


In [8]:
# Count how many columns have null values
# this is a pyspark list enumeration function that puts the results into a dictionary

print({col:train.where(train[col].isNull()).count() for col in train.columns})

{'PassengerId': 0, 'Survived': 0, 'Pclass': 0, 'Name': 0, 'Sex': 0, 'Age': 177, 'SibSp': 0, 'Parch': 0, 'Ticket': 0, 'Fare': 0, 'Cabin': 687, 'Embarked': 2}


In [9]:
# count number of passengers by 'Embarked' variable
train.groupby('Embarked').count().show()

# make this a list instead, where each row is a tuple
embarked_df = train.groupby('Embarked').count().sort(F.desc('count')).take(1)
mode_embarked = embarked_df[0].Embarked
print(mode_embarked)

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|    null|    2|
|       C|  168|
|       S|  644|
+--------+-----+

S


In [10]:
# Find out what the mean age is by using my pandas DFs
# I can see from this that the mean() function automatically dismisses null values anyway
meanage_withnulls = train_pd['Age'].mean()
meanage_withoutnulls = train_pd['Age'].dropna().mean()
print(meanage_withnulls)
print(meanage_withoutnulls)

# hold mean age in variable
ageMean = round(float(train_pd.Age.mean()),0)

# now fill all nan values in Age column with ageMean
trainFilled = train.fillna({'Age':ageMean})
testFilled = test.fillna({'Age':ageMean})

# check results
print('There are now '+str(trainFilled.where(trainFilled['Age'].isNull()).count())+' nulls in this column')

# we can continue filling in NaNs in other columns also
trainFilled = trainFilled.fillna({'Embarked':mode_embarked, 'Cabin':0})
testFilled = testFilled.fillna({'Embarked':mode_embarked, 'Cabin':0})

print({col:trainFilled.where(trainFilled[col].isNull()).count() for col in trainFilled.columns})
print({col:testFilled.where(testFilled[col].isNull()).count() for col in testFilled.columns})

# round the Fare column
trainFilled = trainFilled.withColumn('Fare', F.round(trainFilled["Fare"], 1))

29.69911764705882
29.69911764705882
There are now 0 nulls in this column
{'PassengerId': 0, 'Survived': 0, 'Pclass': 0, 'Name': 0, 'Sex': 0, 'Age': 0, 'SibSp': 0, 'Parch': 0, 'Ticket': 0, 'Fare': 0, 'Cabin': 0, 'Embarked': 0}
{'PassengerId': 0, 'Pclass': 0, 'Name': 0, 'Sex': 0, 'Age': 0, 'SibSp': 0, 'Parch': 0, 'Ticket': 0, 'Fare': 1, 'Cabin': 0, 'Embarked': 0}


### Step 3
- How to handle categorical features?
    - hint: check the Estimators and Transformers
- Assemble all desired features into a Vector using the VectorAssembler Transformer
- Make sure to end up with a DataFrame with two columns: ***Survived*** and ***vFeatures***

In [19]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.util import MLUtils

# this cell allows me to just add the variables I want as features, and it works out which are strings to be vectorized

df = trainFilled

# which variables do we want to be features?
featurevars = ['Pclass','Sex','Age','Fare','Embarked']
featurevars_types = df[featurevars].dtypes

# which ones are strings?
stringvars = []
for i in range(len(featurevars_types)):
    if featurevars_types[i][1] == 'string':
        stringvars.append(featurevars_types[i][0])

# which ones are numeric?
numericvars = featurevars
for i in range(len(stringvars)):
    if stringvars[i] in featurevars:
        numericvars.remove(stringvars[i])
print(stringvars)
print(numericvars)

['Sex', 'Embarked']
['Pclass', 'Age', 'Fare']


In [12]:
# let's try putting code from previous cell in a for loop
# index string variables so we can one hot encode them

for i in range(len(stringvars)):
    indexer = StringIndexer().setInputCol(stringvars[i]).setOutputCol('{}Index'.format(stringvars[i]))
    if i == 0:
        indexeddf = indexer.fit(df).transform(df)
    else:
        indexeddf = indexer.fit(indexeddf).transform(indexeddf)

# now one hot encode them
cols = indexeddf.columns[-len(stringvars):]
for i in range(len(cols)):
    encoder = OneHotEncoder().setInputCol(cols[i]).setOutputCol('{}V'.format(cols[i])).setDropLast(False)
    if i == 0:
        encodeddf = encoder.transform(indexeddf)
    else:
        encodeddf = encoder.transform(encodeddf)

# now create features that are vectorized in one column
for i in range(len(cols)):
    cols[i] = cols[i]+'V'

cols = cols + numericvars

assembler = VectorAssembler().setInputCols(cols).setOutputCol('vFeatures')
vectorizeddf = assembler.transform(encodeddf)
vectorizeddf = vectorizeddf['Survived','vFeatures']
#vectorizeddf.show()
vectorizeddf.toPandas().head(10)

Unnamed: 0,Survived,vFeatures
0,0,"[1.0, 0.0, 1.0, 0.0, 0.0, 3.0, 22.0, 7.3]"
1,1,"[0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 38.0, 71.3]"
2,1,"[0.0, 1.0, 1.0, 0.0, 0.0, 3.0, 26.0, 7.9]"
3,1,"[0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 35.0, 53.1]"
4,0,"[1.0, 0.0, 1.0, 0.0, 0.0, 3.0, 35.0, 8.1]"
5,0,"[1.0, 0.0, 0.0, 0.0, 1.0, 3.0, 30.0, 8.5]"
6,0,"[1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 54.0, 51.9]"
7,0,"[1.0, 0.0, 1.0, 0.0, 0.0, 3.0, 2.0, 21.1]"
8,1,"[0.0, 1.0, 1.0, 0.0, 0.0, 3.0, 27.0, 11.1]"
9,1,"[0.0, 1.0, 0.0, 1.0, 0.0, 2.0, 14.0, 30.1]"


### Step 4
- Apply a normalization Estimator of your choice to the ***features*** vector obtained in Step 3

In [13]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler().setInputCol('vFeatures').setOutputCol('scaledFeatures').setWithStd(True).setWithMean(True)
scalerModel = scaler.fit(vectorizeddf).transform(vectorizeddf)
scalerModel.toPandas().head(10)

Unnamed: 0,Survived,vFeatures,scaledFeatures
0,0,"[1.0, 0.0, 1.0, 0.0, 0.0, 3.0, 22.0, 7.3]","[0.7372810452296833, -0.7372810452296834, 0.61..."
1,1,"[0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 38.0, 71.3]","[-1.354812621329705, 1.354812621329705, -1.622..."
2,1,"[0.0, 1.0, 1.0, 0.0, 0.0, 3.0, 26.0, 7.9]","[-1.354812621329705, 1.354812621329705, 0.6154..."
3,1,"[0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 35.0, 53.1]","[-1.354812621329705, 1.354812621329705, 0.6154..."
4,0,"[1.0, 0.0, 1.0, 0.0, 0.0, 3.0, 35.0, 8.1]","[0.7372810452296833, -0.7372810452296834, 0.61..."
5,0,"[1.0, 0.0, 0.0, 0.0, 1.0, 3.0, 30.0, 8.5]","[0.7372810452296833, -0.7372810452296834, -1.6..."
6,0,"[1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 54.0, 51.9]","[0.7372810452296833, -0.7372810452296834, 0.61..."
7,0,"[1.0, 0.0, 1.0, 0.0, 0.0, 3.0, 2.0, 21.1]","[0.7372810452296833, -0.7372810452296834, 0.61..."
8,1,"[0.0, 1.0, 1.0, 0.0, 0.0, 3.0, 27.0, 11.1]","[-1.354812621329705, 1.354812621329705, 0.6154..."
9,1,"[0.0, 1.0, 0.0, 1.0, 0.0, 2.0, 14.0, 30.1]","[-1.354812621329705, 1.354812621329705, -1.622..."


### Step 5
- Instead of doing transformations on separate steps, put everything together with a Pipeline

In [23]:
from pyspark.ml.pipeline import Pipeline

indexer = StringIndexer().setInputCol('Sex').setOutputCol('SexIndex')
indexer2 = StringIndexer().setInputCol('Embarked').setOutputCol('EmbarkedIndex')
encoder = OneHotEncoder().setInputCol('SexIndex').setOutputCol('SexIndexV').setDropLast(False)
encoder2 = OneHotEncoder().setInputCol('EmbarkedIndex').setOutputCol('EmbarkedIndexV').setDropLast(False)
assembler = VectorAssembler().setInputCols(['SexIndexV','EmbarkedIndexV','Pclass','Age','Fare']).setOutputCol('vFeatures')
scaler = StandardScaler().setInputCol('vFeatures').setOutputCol('scaledFeatures').setWithStd(True).setWithMean(True)

pipeline = Pipeline().setStages([indexer,indexer2,encoder,encoder2,assembler,scaler])
pipelineModel = pipeline.fit(trainFilled)
model = pipelineModel.transform(trainFilled)
model = model['Survived','scaledFeatures']
model.toPandas().head(10)

Unnamed: 0,Survived,scaledFeatures
0,0,"[0.7372810452296833, -0.7372810452296834, 0.61..."
1,1,"[-1.354812621329705, 1.354812621329705, -1.622..."
2,1,"[-1.354812621329705, 1.354812621329705, 0.6154..."
3,1,"[-1.354812621329705, 1.354812621329705, 0.6154..."
4,0,"[0.7372810452296833, -0.7372810452296834, 0.61..."
5,0,"[0.7372810452296833, -0.7372810452296834, -1.6..."
6,0,"[0.7372810452296833, -0.7372810452296834, 0.61..."
7,0,"[0.7372810452296833, -0.7372810452296834, 0.61..."
8,1,"[-1.354812621329705, 1.354812621329705, 0.6154..."
9,1,"[-1.354812621329705, 1.354812621329705, -1.622..."


### Step 6
- Train a classifier of your choice (for instance, Random Forest) using your dataset of LabeledPoints
- Make predictions for the training data
- Use the evaluators to find the Area Under ROC and Accuracy of your model
- How is your model performing? Try to tune its parameters

In [64]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import RandomForestClassificationModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

rfC = RandomForestClassifier().setLabelCol('Survived').setFeaturesCol('scaledFeatures').setNumTrees(100)
predict_rfC = rfC.fit(model).transform(model)
predict_rfC.show(10)

# something that's strange about pyspark is that you can only get Area Under ROC as a metric out of the binaryevaluator
# to get accuracy, even for a binary problem, you have to build a multipleclassificationevaluator, and then call the accuracy metric from it

binaryevaluator = BinaryClassificationEvaluator().setLabelCol('Survived')\
.setRawPredictionCol('rawPrediction')\
.setMetricName('areaUnderROC')
bievaluated_rfC = binaryevaluator.evaluate(predict_rfC)
print('Area under ROC = '+str(round(bievaluated_rfC,3)))

# so then use multiclass evaluator here to get accuracy
multievaluator = MulticlassClassificationEvaluator().setLabelCol('Survived')\
.setMetricName('accuracy')
multievaluated_rfC = multievaluator.evaluate(predict_rfC)
print('Accuracy = '+str(round(multievaluated_rfC,3)))

+--------+--------------------+--------------------+--------------------+----------+
|Survived|      scaledFeatures|       rawPrediction|         probability|prediction|
+--------+--------------------+--------------------+--------------------+----------+
|       0|[0.73728104522968...|[90.291677539239,...|[0.90291677539239...|       0.0|
|       1|[-1.3548126213297...|[2.05747820498528...|[0.02057478204985...|       1.0|
|       1|[-1.3548126213297...|[53.8465428305998...|[0.53846542830599...|       0.0|
|       1|[-1.3548126213297...|[3.96761464748734...|[0.03967614647487...|       1.0|
|       0|[0.73728104522968...|[89.9860679848828...|[0.89986067984882...|       0.0|
|       0|[0.73728104522968...|[89.2719053157988...|[0.89271905315798...|       0.0|
|       0|[0.73728104522968...|[70.0919721637712...|[0.70091972163771...|       0.0|
|       0|[0.73728104522968...|[57.2963772601435...|[0.57296377260143...|       0.0|
|       1|[-1.3548126213297...|[49.6961446300845...|[0.4969614463

### Step 7
- Take a look at the test data - use DataFrame's ***createOrReplaceTempView*** method to perform SQL queries over the data
    - hint: check if there are any NULL values in the dataset - if so, handle them
- Apply the transformations to the test data
    - hint: include the model to the pipeline
- Make predictions using the model previously trained and the transformed test data

In [None]:
### INSERT YOUR CODE HERE

### Step 8

- Load the answers for the ***test*** data
- Combine it with your predictions into a single DataFrame
- Use the evaluator you created on ***Step 6***
- What was your score?

In [None]:
### INSERT YOUR CODE HERE