In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

In [2]:
!rm -rf metastore_db/*.lck

from pyspark.sql import SQLContext
sqlc = SQLContext(sc)

### Step 1
- Load the train and test sets
- Check the schema, the variables have their right types?
- If not, how to correctly load the datasets?

In [3]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

customSchema = StructType([StructField("PassengerId", IntegerType(), True),
                           StructField("Survived", DoubleType(), True),
                           StructField("Pclass", IntegerType(), True), 
                           StructField("Name", StringType(), True),
                           StructField("Sex", StringType(), True),
                           StructField("Age", DoubleType(), True),
                           StructField("SibSp", IntegerType(), True),
                           StructField("Parch", IntegerType(), True),
                           StructField("Ticket", StringType(), True),
                           StructField("Fare", DoubleType(), True),
                           StructField("Cabin", StringType(), True),
                           StructField("Embarked", StringType(), True)])

customSchema2 = StructType([StructField("PassengerId", IntegerType(), True),
                           StructField("Pclass", IntegerType(), True), 
                           StructField("Name", StringType(), True),
                           StructField("Sex", StringType(), True),
                           StructField("Age", DoubleType(), True),
                           StructField("SibSp", IntegerType(), True),
                           StructField("Parch", IntegerType(), True),
                           StructField("Ticket", StringType(), True),
                           StructField("Fare", DoubleType(), True),
                           StructField("Cabin", StringType(), True),
                           StructField("Embarked", StringType(), True)])

train = sqlc.read.csv("train.csv", header=True, schema=customSchema)
test = sqlc.read.csv("test.csv", header=True, schema=customSchema2)

### Step 2
- Explore the features of your dataset
- You can use DataFrame's ***describe*** method to get summary statistics
    - hint: ***toPandas*** may be useful to ease the manipulation of small dataframes
- Are there any ***NaN*** values in your dataset?
- If so, define value/values to fill these ***NaN*** values
    - hint: ***na*** property of DataFrames provide several methods of handling NA values

In [4]:
# Calculating summary statistics and turning it into Pandas DF
train_desc = train.describe().toPandas().set_index('summary')
train_desc

Unnamed: 0_level_0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
summary,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
count,891.0,891.0,891.0,891,891,714.0,891.0,891.0,891,891.0,204,889
mean,446.0,0.3838383838383838,2.308641975308642,,,29.69911764705882,0.5230078563411896,0.3815937149270482,260318.54916792738,32.2042079685746,,
stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,,,14.526497332334037,1.1027434322934315,0.8060572211299488,471609.26868834975,49.69342859718089,,
min,1.0,0.0,1.0,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""",female,0.42,0.0,0.0,110152,0.0,A10,C
max,891.0,1.0,3.0,"van Melkebeke, Mr. Philemon",male,80.0,8.0,6.0,WE/P 5735,512.3292,T,S


In [5]:
# Computing correlations between Survived and some features
print({col:train.stat.corr('Survived',col) for col in ['Pclass','Age','SibSp','Parch','Fare']})

# Checking which columns have NULL values
print({col:train.where(train[col].isNull()).count() for col in train.columns})

# Taking the mean age from the Pandas DF
ageMean = float(train_desc.loc['mean']['Age'])
print(ageMean)

# Filling the Age in both train and test datasets
trainFilled = train.na.fill({'Age': ageMean, 'Embarked': 'S'})
testFilled = test.na.fill({'Age': ageMean, 'Embarked': 'S'})

{'Fare': 0.2573065223849626, 'Age': 0.010539215871285682, 'Pclass': -0.3384810359610151, 'Parch': 0.08162940708348339, 'SibSp': -0.0353224988857356}
{'SibSp': 0, 'Embarked': 2, 'Fare': 0, 'Ticket': 0, 'Cabin': 687, 'Name': 0, 'Age': 177, 'Pclass': 0, 'Sex': 0, 'Survived': 0, 'PassengerId': 0, 'Parch': 0}
29.69911764705882


In [6]:
from pyspark.sql import functions as F
train.groupby('Sex','PClass').agg(F.mean('age')).show()

+------+------+------------------+
|   Sex|PClass|          avg(age)|
+------+------+------------------+
|  male|     3|26.507588932806325|
|female|     3|             21.75|
|female|     1| 34.61176470588235|
|female|     2|28.722972972972972|
|  male|     2| 30.74070707070707|
|  male|     1| 41.28138613861386|
+------+------+------------------+



### Step 3
- How to handle categorical features?
    - hint: check the Estimators and Transformers
- Assemble all desired features into a Vector using the VectorAssembler Transformer
- Make sure to end up with a DataFrame with two columns: ***Survived*** and ***vFeatures***

In [7]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.util import MLUtils

# Applying Estimators and Transformators
# Here, I actually fitted and transformed them on the training data
# with the purpose of being able to check the intermediate steps

indexer1 = (StringIndexer()
           .setInputCol("Embarked")
           .setOutputCol("nEmbarked")
           .setHandleInvalid('skip'))

indexed1 = indexer1.fit(trainFilled).transform(trainFilled)

indexer2 = (StringIndexer()
           .setInputCol("Sex")
           .setOutputCol("nSex")
           .setHandleInvalid('skip'))

indexed2 = indexer2.fit(indexed1).transform(indexed1)

encoder1 = OneHotEncoder().setInputCol("nEmbarked").setOutputCol("vEmbarked")
encoded1 = encoder1.transform(indexed2)

encoder2 = OneHotEncoder().setInputCol("nSex").setOutputCol("vSex")
encoded2 = encoder2.transform(encoded1)

# Using a VectorAssembler to put together all feature columns
assembler = VectorAssembler(inputCols=['Pclass',
                                       'Age',
                                       'SibSp',
                                       'Parch',
                                       'Fare',
                                       'vSex',
                                       'vEmbarked'], 
                            outputCol='vFeatures')

assembled = assembler.transform(encoded2)

# Keeping only the features and label columns to 
assembled2 = assembled.select("Survived","vFeatures")

In [8]:
assembled2.show(10)

+--------+--------------------+
|Survived|           vFeatures|
+--------+--------------------+
|     0.0|[3.0,22.0,1.0,0.0...|
|     1.0|[1.0,38.0,1.0,0.0...|
|     1.0|(8,[0,1,4,6],[3.0...|
|     1.0|[1.0,35.0,1.0,0.0...|
|     0.0|[3.0,35.0,0.0,0.0...|
|     0.0|(8,[0,1,4,5],[3.0...|
|     0.0|[1.0,54.0,0.0,0.0...|
|     0.0|[3.0,2.0,3.0,1.0,...|
|     1.0|[3.0,27.0,0.0,2.0...|
|     1.0|[2.0,14.0,1.0,0.0...|
+--------+--------------------+
only showing top 10 rows



### Step 4
- In Step 5, you will apply a normalization Estimator
- BUT, it does not accept feature vectors of the Sparse type
- So, it is neccessary to apply an User Defined Function to make all features vectors of type VectorUDT
- In this step, you only have to replace ***YOUR DATAFRAME*** and ***NEW DATAFRAME*** with your variables

In [9]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.ml.linalg import VectorUDT, Vectors

to_vec = UserDefinedFunction(lambda x: Vectors.dense(x.toArray()), 
                             VectorUDT())

# NOT NEEDED ANYMORE, SPARK DOES THE CONVERSION AUTOMATICALLY!
assembled3 = assembled2.select("Survived", to_vec("vFeatures").alias("features"))

### Step 5
- Apply a normalization Estimator of your choice to the ***features*** vector obtained in Step 4

In [10]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler().setInputCol("vFeatures").setOutputCol("scaledFeat").setWithStd(True).setWithMean(True)
scalerModel = scaler.fit(assembled2)
scaled = scalerModel.transform(assembled2)

In [11]:
scaled.show(10)

+--------+--------------------+--------------------+
|Survived|           vFeatures|          scaledFeat|
+--------+--------------------+--------------------+
|     0.0|[3.0,22.0,1.0,0.0...|[0.82691281652436...|
|     1.0|[1.0,38.0,1.0,0.0...|[-1.5652278312782...|
|     1.0|(8,[0,1,4,6],[3.0...|[0.82691281652436...|
|     1.0|[1.0,35.0,1.0,0.0...|[-1.5652278312782...|
|     0.0|[3.0,35.0,0.0,0.0...|[0.82691281652436...|
|     0.0|(8,[0,1,4,5],[3.0...|[0.82691281652436...|
|     0.0|[1.0,54.0,0.0,0.0...|[-1.5652278312782...|
|     0.0|[3.0,2.0,3.0,1.0,...|[0.82691281652436...|
|     1.0|[3.0,27.0,0.0,2.0...|[0.82691281652436...|
|     1.0|[2.0,14.0,1.0,0.0...|[-0.3691575073769...|
+--------+--------------------+--------------------+
only showing top 10 rows



In [12]:
from pyspark.ml.pipeline import Pipeline

pipeline = Pipeline(stages=[indexer1,
                            indexer2,
                            encoder1, 
                            encoder2, 
                            assembler,
                            scaler])

In [13]:
model = pipeline.fit(trainFilled)
scaled = model.transform(trainFilled)

In [14]:
scaled.select('Survived', 'scaledFeat').show(10)

+--------+--------------------+
|Survived|          scaledFeat|
+--------+--------------------+
|     0.0|[0.82691281652436...|
|     1.0|[-1.5652278312782...|
|     1.0|[0.82691281652436...|
|     1.0|[-1.5652278312782...|
|     0.0|[0.82691281652436...|
|     0.0|[0.82691281652436...|
|     0.0|[-1.5652278312782...|
|     0.0|[0.82691281652436...|
|     1.0|[0.82691281652436...|
|     1.0|[-0.3691575073769...|
+--------+--------------------+
only showing top 10 rows



### Step 6
- Train a classifier of your choice (for instance, Random Forest) using your dataset of LabeledPoints
- Make predictions for the training data
- Use the Binary Classification Evaluator to evaluate your model on the training data
- How is your model performing? Try to tune its parameters

In [15]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import RandomForestClassificationModel

# Trains a RF classifier and make predictions
rfC = RandomForestClassifier().setLabelCol("Survived") \
                                .setFeaturesCol("scaledFeat") \
                                .setNumTrees(50)

model = rfC.fit(scaled)

predictions = model.transform(scaled)

In [16]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Defines an evaluator based on the metric areaUnderROC
evaluator = BinaryClassificationEvaluator().setLabelCol("Survived") \
                            .setRawPredictionCol("rawPrediction") \
                            .setMetricName("areaUnderROC")

# Evaluate the predictions
roc = evaluator.evaluate(predictions)

print(roc)

ev2 = (MulticlassClassificationEvaluator()
       .setLabelCol('Survived')
       .setPredictionCol('prediction')
       .setMetricName('accuracy'))

acc = ev2.evaluate(predictions)
print(acc)

0.9082036451176516
0.8529741863075196


### Step 7
- Take a look at the test data - use DataFrame's ***createOrReplaceTempView*** method to perform SQL queries over the data
    - hint: check if there are any NULL values in the dataset - if so, handle them
- Apply the transformations to the test data
    - hint: you can use Pipelines to chain several Estimators/Transformers
    - warning: unfortunately, it is not possible to include the UDF from Step 4 in the Pipeline
- Make predictions using the model previously trained and the transformed test data
- Save it as ***submission.csv*** and submit it to Kaggle
- What was your score?

In [17]:
# Make the test set a "table"
testFilled.createOrReplaceTempView('test')

# Runs a series of SQL queries to get the number of null values in the test set
print({col: sqlc.sql("select * from test where " + col + " is null").count() for col in testFilled.columns})

# So, there is one null Fare, let's check it
sqlc.sql("select * from test where Fare is null").toPandas()

# Since the Fare is highly dependent on the class, it makes more sense to use the average for the given class
# But we need to take the average from the TRAINING set
trainFilled.createOrReplaceTempView('train')
avgFare = sqlc.sql("select mean(Fare) from train where Pclass = 3").take(1)[0][0]
print(avgFare)

# Fill the missing value with the calculated average
testFilled = testFilled.na.fill({'Fare': avgFare})

{'Age': 0, 'Pclass': 0, 'Cabin': 327, 'Sex': 0, 'Embarked': 0, 'SibSp': 0, 'Fare': 1, 'Parch': 0, 'PassengerId': 0, 'Ticket': 0, 'Name': 0}
13.675550101832997


In [18]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[indexer1,
                            indexer2,
                            encoder1, 
                            encoder2, 
                            assembler,
                            scaler,
                            rfC])

model = pipeline.fit(trainFilled)

In [19]:
predictions = model.transform(testFilled)

In [20]:
df_predictions = predictions.select('PassengerId',F.col('prediction').alias('Survived')).toPandas()
df_predictions.to_csv('submission.csv', index=False)

In [21]:
answers = sqlc.read.csv('titanic_answers.csv', header=True)
answers = answers.select('PassengerId',F.col('Survived').cast('Double'))

In [22]:
pred_answer = predictions.join(answers, on='PassengerId')

In [23]:
roc = evaluator.evaluate(pred_answer)
print(roc)

0.8095788704965916


In [24]:
acc = ev2.evaluate(pred_answer)
print(acc)

0.777511961722488


## Example Window Function

In [37]:
from pyspark.sql import Window
w = (Window()
     .partitionBy('PClass')
     .orderBy('Fare')
     .rowsBetween(Window.currentRow - 2, Window.currentRow))
x = trainFilled.withColumn('avgFare', F.mean('Fare').over(w))
x.select('PClass','Fare','avgFare').show(50)

+------+-------+------------------+
|PClass|   Fare|           avgFare|
+------+-------+------------------+
|     1|    0.0|               0.0|
|     1|    0.0|               0.0|
|     1|    0.0|               0.0|
|     1|    0.0|               0.0|
|     1|    0.0|               0.0|
|     1|    5.0|1.6666666666666667|
|     1|25.5875|10.195833333333333|
|     1| 25.925|18.837500000000002|
|     1|25.9292|           25.8139|
|     1|25.9292|           25.9278|
|     1|   26.0|           25.9528|
|     1|   26.0|           25.9764|
|     1|26.2833| 26.09443333333333|
|     1|26.2875|26.190266666666663|
|     1|26.2875|26.286100000000005|
|     1|26.2875|26.287500000000005|
|     1|26.3875|26.320833333333336|
|     1|  26.55| 26.40833333333333|
|     1|  26.55|26.495833333333334|
|     1|  26.55|             26.55|
|     1|  26.55|             26.55|
|     1|  26.55|             26.55|
|     1|  26.55|             26.55|
|     1|  26.55|             26.55|
|     1|  26.55|            