# Combient Workshop Exercise, June 22rd 2016

You are given with the file housing.csv containing data about houses sold in the Seattle area. The goal of this exercise is to use Spark to build a model for predicting the sale price of a house based on information such as size, age, area, etc...

You can find help in the Spark documentation https://spark.apache.org/docs/1.6.0/


This example is inspired from the Coursera course [Machine Learning Foundations](https://www.coursera.org/learn/ml-foundations/) by University of Washington, and ported to Spark by Combient. 

-----------------------




Initialise Spark

In [None]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()   #This line is not needed on some platforms. Comment it out if it causes an error. 
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

The next line tells the notebook that we want all the figures as images inside the notebook.

In [None]:
%matplotlib inline
#%matplotlib notebook #Creates interactive plots

## Importing the data

##### Option 1
Use Pandas to read the file from the local HDD, then push it to Spark. (This will work only for small datasets) 


In [None]:
import pandas as pd
pd_dataframe = pd.read_csv(...)
pd_dataframe.head()

In [None]:
spark_dataframe = sqlContext.createDataFrame(pd_dataframe)
spark_dataframe.printSchema()

In [None]:
spark_dataframe.head()

##### Option 2
Use SFrame to read the file from the local HDD, then push it to Spark. (This will work for any datasets that fits on the HDD of the local computer.) 


In [None]:
import sframe as sf
sf_sframe = sf.SFrame.read_csv(...)
sf_sframe.head()

In [None]:
spark_dataframe = sf_sframe.to_spark_dataframe(sc,sqlContext,2) #2 = number of partitions for the dataframe
spark_dataframe.printSchema()

In [None]:
spark_dataframe.head()

##### Option 3

Read the file from HDFS directly using Spark and [pyspark_csv](https://github.com/seahboonsiew/pyspark-csv). This approach will work for truly big files. **Do not forget to put the file to HDFS first!**

Simply sure the file pyspark_csv.py is in the same directory as the notebook and run the block below.

In [None]:
import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py') #Take the pyspark code and inject it into the python interpreters running on Spark nodes 
plainTextRdd = sc.textFile(...) #Read the file in a distributed fashion.
spark_dataframe = pycsv.csvToDataFrame(sqlContext, plainTextRdd, parseDate=False)
spark_dataframe.printSchema()

In [None]:
spark_dataframe.head()

##### Option 4 [Will not work on the VM supplied]
Read the file from HDFS directly using Spark and spark-csv, similarly to what we did in Scala for the NASA prototype.

This is still work in progress on the VM we are using today ;)

In [None]:
#spark_dataframe = sqlContext.read.format('com.databricks.spark.csv').option('header', 'true').option('inferschema', 'true').load('housing.csv')

## Data exploration

Let's start by plotting the relation between the size of the house ("sqft_living") and the price.

Extract the prices and sqft_living from the Spark dataframe.

In [None]:
prices_sqftliving = spark_dataframe.select(...,...).sample(False,0.1).toPandas()

What have we just done here ???

Now, make a scatter plot using,with Pandas ( http://pandas.pydata.org/pandas-docs/version/0.15.0/visualization.html#scatter-plot )  or Seaborn (https://web.stanford.edu/~mwaskom/software/seaborn/generated/seaborn.jointplot.html#seaborn.jointplot)

In [None]:
prices_sqftliving.plot(kind='scatter', x=..., y=...)

In [None]:
import seaborn as sns
sns.jointplot(x=...,y=...,data=prices_sqftliving)

Let's do the same for, lot size, number of bathroom, construction year and latitude

In [None]:
prices_others = spark_dataframe....

NB: In this dataset, bathrooms with a bath tub are counted as 1, bathrooms with only a shower are counted as 0.5 and those with only a toilet and sink are counted as 0.25. It is thus normal to see non-integer numbers of bathrooms!

## Split the data into train and validation

We split the data into training and test set. From now on, **you cannot touch the test set until the very end of the exercise!**

In [None]:
(spark_DF_train,spark_DF_test) = spark_dataframe.randomSplit([0.8,0.2])

# Simple linear regression

Below, you are asked to perform a linear regression to predict the price from the living area.
We will provide guidlines for Spark.mlib (the "old way" in Spark, using RDD) and Spark.ml (the "new way", using dataframes)


### Spark.mlib

https://spark.apache.org/docs/1.6.0/mllib-linear-methods.html#linear-least-squares-lasso-and-ridge-regression



In [None]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel

Select the variables you want to use as features for the regression

In [None]:
spark_DF_price_sqftliv = spark_DF_train.select(...)

Models in Spark.mlib take as input an RDD of LabeledPoints. A LabeledPoint is in turn a tuples of type (float, Vector[float]). The next lines maps the Dataframe create above into a suitable RDD.

In [None]:
spark_RDD_price_sqftliv = spark_DF_price_sqftliv.map( lambda x : LabeledPoint(float(x.price),[ float(x.sqft_living)] ))
spark_RDD_price_sqftliv.collect()[:10]

Learn the model!
intercept is a boolean parameter telling if the linear regression crosses the origin or not, i.e. if the True, then we learn a model "y = a1 * x + a0"  and if false, "y = a1 * x". 

In [None]:
model = LinearRegressionWithSGD.train(...,intercept=...,iterations=1000,step=0.0000001)

Now, jump to Model evaluation / Spark.mlib section

### Spark.ml

This is similar to what we did in the Santander Bank's problem.

http://spark.apache.org/docs/latest/ml-classification-regression.html#linear-regression



Create a list of names of columns that you want to use as features

In [None]:
FeaturesCol = ...

Annoying feature: the label needs to be of type DoubleType() while 'price' is of type int. The next blocks fixes that.

In [None]:
from pyspark.sql.types import DoubleType
spark_DF_train = spark_DF_train.withColumn("price_float",spark_DF_train["price"].cast(DoubleType()))

#Let's do it on the test data too so that we can forget about it.
spark_DF_test = spark_DF_test.withColumn("price_float",spark_DF_test["price"].cast(DoubleType()))


In [None]:
from pyspark.ml.feature import VectorAssembler,StandardScaler,MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression

#Create an empty list
PipelineStages = ...

#Create a vectorAssembler that packs the selected features into a vector in a column called "features"
vectorAssembler = VectorAssembler(
    inputCols=...,
    outputCol=...)

#Append the vectorAssembler to the list of stages
PipelineStages.append(...)

#Optionally, normalize the data (do it first without)
#feature_scaler = StandardScaler(inputCol=..., outputCol=...,withStd=..., withMean=...)
#feature_scaler_model = feature_scaler.fit(...)
#PipelineStages.append(feature_scaler_model)

#From the doc: 
#class pyspark.ml.regression.LinearRegression(self, featuresCol="features", labelCol="label", 
#predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, 
#standardization=True, solver="auto", weightCol=None)[source]
lr = LinearRegression(...)

#Add it to the pipeline: 
PipelineStages...

#Create a pipeline for the model
modelPipeline = Pipeline(stages=PipelineStages)

Prepare the data and fit by calling the .fit() method of the pipeline on the training data.

In [None]:
Pipe_model = ...

# Evaluating the model


## Spark.mlib 

Let's start by looking at the model parameters:

In [None]:
print model.weights
print model.intercept

Is this number reasonable ? Calculate the mean price/sqft! Does it look reasonable or completely off ? 
(Inside a lambda function, you can call the value of one column of a dataframe as "variable.column_name").

TIP: What is the type of the object returned by map ? There is a very simple way of calculating its mean vallue!

In [None]:
spark_DF_price_sqftliv.map(lambda x: ... ) ....

A linear regression of one variable is simply a line! Can you plot it on the figure to get a feeling of how good the model is ? 

In [None]:
g = sns.jointplot(x="sqft_living",y="price",data=prices_sqftliving,size=12) #Already done above
g.ax_joint.plot(...,...,'--r')

Predict the house price and calculate the mean squared error (MSE), first on the training data: 

In [None]:
#Transform the RDD from (price,features) to (price,prediction)
spark_RDD_price_prediction = spark_RDD_price_sqftliv.map(lambda x: (x.label,model.predict(x.features)))
spark_RDD_price_prediction.collect()[:10]   #Transform to a list and display        

In [None]:
#Calculate the square error for each prediction (In python, square of x is x**2)
spark_RDD_Error = spark_RDD_price_prediction.map(lambda x: ... ) 

from math import sqrt
print "sqrt(MSE) : %f" %sqrt(spark_RDD_Error.mean())

Do the same on the test data. Remember that you have to prepare the data in the exact same way as you did with the training data!

In [None]:
spark_DF_price_sqftliv_test = ...
spark_RDD_price_sqftliv_test = ...
spark_RDD_price_prediction_test = ...
spark_RDD_Error_test= ...

print "sqrt(MSE) : %f" %sqrt(spark_RDD_Error_test.mean())

Annoying, isn't it ? =)

## Spark.ml

Make predictions on the training data:

In [None]:
train_with_predictions = Pipe_model.transform(spark_DF_train)
train_with_predictions.head()

Calculate the MSE. Note that we are now using a Dataframe and not a RDD.

In [None]:
MSE = train_with_predictions.map(lambda x: ...) ....
from math import sqrt
print "sqrt(MSE) : %f" %sqrt(MSE)

A more general way of visualizing the result of the model is to plot the price vs relatve prediction error. 

Calculate the relative prediction error and add it to the dataframe as a new columns.

In [None]:
train_with_predictions = train_with_predictions.withColumn("error",...)
train_with_predictions.head()

£To be able to plot it, you have to retrieve the data from the cluster to the local machine as a Pandas dataframe.  Think about the datasize when you do this! (select only columns you need, sample the data if necessary)

In [None]:
pd_train_with_predictions = train_with_predictions.select(...).......toPandas()

And plot the data

In [None]:
g = sns.jointplot(...,...,...,size=12)

For an idea model, the points should be distributed randomly around the y = 0 axis. It is not the case here...
The model underestimates the price of cheap houses and overestimates the one of expensive ones.



Finally, make predictions on the test data

In [None]:
test_with_predictions = ...
test_with_predictions.head()

Isn't it easier than Spark.mlib ? =)


Do the same error analysis for the test data and comment.

# Multivariate regression

Copy-paste the code of one of the two approaches above and choose two (or more) new variables to add to the linear regression model. Check if this improves the predictive power of the model. 

If you use categorical variables as features, theses need to be indexed. An example can be founded in section 4 here: https://github.com/waichee/pyspark-ipython-notebook/blob/856b45eb17b2243bc08d3842659c6b44892256fa/spark-pyspark-mllib-101.ipynb
Note that the distinction number/category isn't clear for every variables. For example, the number of bedrooms can be viewed as both a number or a category. On one hand, if taken as the only variables, more bedrooms is likely to always increase the price and using the number of bedrooms as a number is probably fine. On the other hand, for a given living area, more bedrooms also means smaller bedrooms, which might ultimetaly affect the price negatively, in which case a categorical view on the bedroom variable might be better. 


Finally, please note that when using variables with values in different orders of magnitude, normalization of the data before learning the model might be required (particularly if you use L1 or L2 relaxation).   http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.feature.StandardScalerModel