In [1]:
import os, sys, json, collections, itertools
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from inspect import signature

def udf(f):
    returnType = T._type_mappings[signature(f).return_annotation]()
    return F.UserDefinedFunction(f, returnType)

local_dir = "file:///{d}/".format(d=os.getcwd())

if 'sc' not in globals():
    conf = SparkConf().setAppName('appName').setMaster('local')
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)

# Linear regression with pyspark

In [2]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

## Get the data

It is easier (in this pyspark version) to first load the data as an RDD, and then modify it into a dataFrame. During this process we also remove the header using a combination of _zipWithIndex()_ and _filter()_ (taken from [here][1]). By looking at the file we see the "schema", which is used by the second _map()_.

[1]: http://stackoverflow.com/a/31798247/3121900

In [3]:
weights = spark.read.csv("weight.txt", 
                         header=True, inferSchema=True)
weights.show(5)

+---+---+------+------+
|Sex|Age|Height|Weight|
+---+---+------+------+
|  f| 26| 171.1|  57.0|
|  m| 44| 180.1|  84.7|
|  m| 32| 161.9|  73.6|
|  m| 27| 176.5|  81.0|
|  f| 26| 167.3|  57.4|
+---+---+------+------+
only showing top 5 rows



We already know that the age has no part in the model, so we drop the column.

In [4]:
weights = weights.drop('Age')
weights.show(5)

+---+------+------+
|Sex|Height|Weight|
+---+------+------+
|  f| 171.1|  57.0|
|  m| 180.1|  84.7|
|  m| 161.9|  73.6|
|  m| 176.5|  81.0|
|  f| 167.3|  57.4|
+---+------+------+
only showing top 5 rows



We will illustrate the basics with the boys data and then repeat the process for the girls.

In [5]:
boys = weights.where(weights.Sex == 'm')
boys.show(5)

+---+------+------+
|Sex|Height|Weight|
+---+------+------+
|  m| 180.1|  84.7|
|  m| 161.9|  73.6|
|  m| 176.5|  81.0|
|  m| 165.9|  72.1|
|  m| 168.6|  77.7|
+---+------+------+
only showing top 5 rows



### Vectorizing

While Spark DataFrames were designed to facilitate table-oriented tasks, they are not optimized for the mathematical manipulations required for applying the machine learnign algorithms. To overcome this problem, Spark offers another data structure called **Vector**, which is a list-like data structure.

Its role will be more clear later, but for now we can think of it as a special column, collecting together several not-necessarily-the-same-type columns. Vectors can be created by constructors from the _pyspark.ml.linalg_ module, but they can also be created by assembling existing columns with the [_VectorAssembler_][va] transformer.

[va]: http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler "VectorAssembler() API"

In [6]:
va = VectorAssembler(inputCols=['Height'], outputCol='features')
print (type(va))

<class 'pyspark.ml.feature.VectorAssembler'>


In [7]:
boys = va.transform(boys)
boys.show(5)

+---+------+------+--------+
|Sex|Height|Weight|features|
+---+------+------+--------+
|  m| 180.1|  84.7| [180.1]|
|  m| 161.9|  73.6| [161.9]|
|  m| 176.5|  81.0| [176.5]|
|  m| 165.9|  72.1| [165.9]|
|  m| 168.6|  77.7| [168.6]|
+---+------+------+--------+
only showing top 5 rows



### Splitting the data

In [8]:
train_boys, test_boys = boys.randomSplit([0.7, 0.3], seed=1234)

## Single variable

### Instantiate the model

The model itself is embodied by the [LinearRegression][1] **estimator** class. The initialization of the estimator requires the declaration of the features by the argument _featuresCol_, the target by the argument _labelCol_ and the future prediction column by the argument _predictionCol_. It does **NOT** require the data itself...

[1]: https://spark.apache.org/docs/2.0.2/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegression "LinearRegression API"

In [9]:
boys_lr = LinearRegression(featuresCol='features', 
                           labelCol='Weight', 
                           predictionCol='predicted weight')
print (type(boys_lr))

<class 'pyspark.ml.regression.LinearRegression'>


### Fit the model

Being an **estimator**, a _LinearRegression_ object has a **_fit()_** method. This method applies the linear regression algorithm to fit the data in _featureCol_ to the labels in _labelCol_ to create a **model**, which is a type of **transformer**.

In [10]:
boys_lm = boys_lr.fit(train_boys)
print (type(boys_lm))

<class 'pyspark.ml.regression.LinearRegressionModel'>


### Inspect the model

In [11]:
print (boys_lm.coefficients)
print (boys_lm.intercept)

[0.8359550207720998]
-65.4164719973857


Being a **transformer**, a _LinearRegressionModel_ object has a **_transform()_** method. This is the equivalent of the _predict()_ method from scikit-learn, and it applies the applies the model to the data and creates a new column with the name _predictionCol_.

In [12]:
train_boys = boys_lm.transform(train_boys)
train_boys.show(5)

+---+------+------+--------+-----------------+
|Sex|Height|Weight|features| predicted weight|
+---+------+------+--------+-----------------+
|  m| 155.7|  64.5| [155.7]|64.74172473683022|
|  m| 156.3|  62.4| [156.3]|65.24329774929352|
|  m| 157.5|  69.1| [157.5]|66.24644377422003|
|  m| 157.8|  68.4| [157.8]|66.49723028045166|
|  m| 158.5|  65.2| [158.5]|67.08239879499213|
+---+------+------+--------+-----------------+
only showing top 5 rows



### Assess the model

The RMSE (and other measures) are available in the _pyspark.ml.evaluation_ module. As usual, we instantiate an evaluator object with the proper arguments, and then apply it to the data.

In [13]:
evaluator = RegressionEvaluator(predictionCol="predicted weight", 
                                labelCol="Weight", 
                                metricName="rmse")
print (evaluator.evaluate(train_boys))

3.061750977117286


### Validate the model

We apply the same steps to the test data ([pipeline][1], anyone?) and hope the results will be similar, otherwise we apparently have an overfitting problem.

[1]: https://spark.apache.org/docs/2.0.2/ml-pipeline.html "pipeline documentation"

In [14]:
test_boys = boys_lm.transform(test_boys)
print (evaluator.evaluate(test_boys))

3.0484188962626835


## Multiple variables

The process is exactly the same, so we will show the entire code without verbal explanations and review it to note the minor differences.

### Get the data

In [15]:
diet = spark.read.csv("diet.txt", 
                      sep=';', header=True, inferSchema=True).drop('id')

for col_name in diet.columns:
  diet = diet.withColumnRenamed(col_name, col_name.replace('.', '_'))
  
diet.show(5)

+----------+-----------+----------+---------------+---------+
|jogging_km|spinning_hr|hamburgers|ice_cream_balls|change_kg|
+----------+-----------+----------+---------------+---------+
|       269|         29|         6|             35|     -7.5|
|        79|         10|        13|             23|     -0.9|
|       112|         46|         4|             22|     -6.1|
|       172|         27|        14|             28|     -4.7|
|       273|         31|        29|             47|     -7.0|
+----------+-----------+----------+---------------+---------+
only showing top 5 rows



> **NOTE:** Spark does not allow features to have a dot (.) in their name.

#### Vectorizing

In [16]:
va = VectorAssembler(inputCols=diet.columns[:-1], outputCol='features')
diet = va.transform(diet)
diet.show(5)

+----------+-----------+----------+---------------+---------+--------------------+
|jogging_km|spinning_hr|hamburgers|ice_cream_balls|change_kg|            features|
+----------+-----------+----------+---------------+---------+--------------------+
|       269|         29|         6|             35|     -7.5|[269.0,29.0,6.0,3...|
|        79|         10|        13|             23|     -0.9|[79.0,10.0,13.0,2...|
|       112|         46|         4|             22|     -6.1|[112.0,46.0,4.0,2...|
|       172|         27|        14|             28|     -4.7|[172.0,27.0,14.0,...|
|       273|         31|        29|             47|     -7.0|[273.0,31.0,29.0,...|
+----------+-----------+----------+---------------+---------+--------------------+
only showing top 5 rows



### Split the data

In [17]:
train_diet, test_diet = diet.randomSplit([0.7, 0.3], seed=1729)

### Instantiate the model

In [18]:
diet_lr = LinearRegression(featuresCol='features', 
                           labelCol='change_kg', 
                           predictionCol='predicted change')

### Fit the model

In [19]:
diet_lm = diet_lr.fit(train_diet)

### Inspect the model

In [20]:
print (diet_lm.coefficients)
print (diet_lm.intercept)

[-0.04011369017559605,-0.09906748087233982,0.11823546860099315,0.10502143858543478]
0.37139828664060204


### Apply the model

In [21]:
train_diet = diet_lm.transform(train_diet)
train_diet.show(5)

+----------+-----------+----------+---------------+---------+--------------------+-------------------+
|jogging_km|spinning_hr|hamburgers|ice_cream_balls|change_kg|            features|   predicted change|
+----------+-----------+----------+---------------+---------+--------------------+-------------------+
|         3|         26|        10|             30|      2.4|[3.0,26.0,10.0,30.0]|  2.008300557005953|
|         7|         20|        30|             24|      3.9|[7.0,20.0,30.0,24.0]|  4.176831422044862|
|        17|         49|         4|              8|     -4.1| [17.0,49.0,4.0,8.0]| -3.851727626001731|
|        18|          2|         3|              9|      0.7|  [18.0,2.0,3.0,9.0]| 0.7511162548070859|
|        21|         45|        20|              7|     -0.4|[21.0,45.0,20.0,7.0]|-1.8291664041843005|
+----------+-----------+----------+---------------+---------+--------------------+-------------------+
only showing top 5 rows



### Assess the model

In [22]:
evaluator = RegressionEvaluator(predictionCol="predicted change", 
                                labelCol="change_kg", 
                                metricName="rmse")
print (evaluator.evaluate(train_diet))

0.9169297083439855


### Validate the model

In [23]:
test_diet = diet_lm.transform(test_diet)
print (evaluator.evaluate(test_diet))

0.9171880251134429


# Exercise 1:
Read the grades.txt file. For the sake of this exercise you may ignore the splitting step and use the entire data for the regression.

* Part I - Fit three single-variable regression models for the SAT grade based on each of the math grade, the english grade and the literature grade, and analyze them. Which of the models is the best?
* Part II - Fit a new linear regression model with all three grades as predictors, and analyze the model. Is the new model better than the previous ones?

## Dummy variables

The concept of dummy variables is implemented in _pyspark.ml_ by a combination of two optional **estimators and transformers** - [_StringIndexer_][1] and [_OneHotEncoder_][2]. _StringIndexer_ maps a "categorical" feature column of type string into arbitrary integers, and _OneHotEncoder_ maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index. It sounds complicated, but it is not...

More generally, the module [_features_][3] of _pyspark.ml_ supports a large family of data transformers, which are documented [here][4].

[1]: https://spark.apache.org/docs/2.0.2/api/python/pyspark.ml.html#pyspark.ml.feature.StringIndexer "StringIndexer API"
[2]: https://spark.apache.org/docs/2.0.2/api/python/pyspark.ml.html#pyspark.ml.feature.OneHotEncoder "OneHotEncoder API"
[3]: https://spark.apache.org/docs/2.0.2/api/python/pyspark.ml.html#module-pyspark.ml.feature "ml.features API"
[4]: https://spark.apache.org/docs/2.0.2/ml-features.html "ml.features documentation"

To apply this concept to the _gender_ feature we roll back to the step before the vectorizing. This time we consider both boys and girls.

#### Indexing

In [24]:
si = StringIndexer(inputCol='Sex', outputCol='Sex (indexed)')
si_model = si.fit(weights)
weights = si_model.transform(weights)
weights.show(5)

+---+------+------+-------------+
|Sex|Height|Weight|Sex (indexed)|
+---+------+------+-------------+
|  f| 171.1|  57.0|          1.0|
|  m| 180.1|  84.7|          0.0|
|  m| 161.9|  73.6|          0.0|
|  m| 176.5|  81.0|          0.0|
|  f| 167.3|  57.4|          1.0|
+---+------+------+-------------+
only showing top 5 rows



#### Encoding

In [25]:
ohe = OneHotEncoder(inputCol='Sex (indexed)', outputCol='Sex (one hot)')
ohe.setDropLast(False)
weights = ohe.transform(weights)
weights.show(5)

+---+------+------+-------------+-------------+
|Sex|Height|Weight|Sex (indexed)|Sex (one hot)|
+---+------+------+-------------+-------------+
|  f| 171.1|  57.0|          1.0|(2,[1],[1.0])|
|  m| 180.1|  84.7|          0.0|(2,[0],[1.0])|
|  m| 161.9|  73.6|          0.0|(2,[0],[1.0])|
|  m| 176.5|  81.0|          0.0|(2,[0],[1.0])|
|  f| 167.3|  57.4|          1.0|(2,[1],[1.0])|
+---+------+------+-------------+-------------+
only showing top 5 rows



> **NOTE:** _OneHotEncoder()_ returns [sparse vectors][1], which is a standard representation of arrays with a lot of zeroes. In this representation, the tuple (_n_, [_locs_], [_vals_]) means there are _n_ elements in the vector, and the value in location _locs[i]_ is _vals[i]_. This makes the illustration not very intuitive, but we will have to deal with that...

[1]: https://en.wikipedia.org/wiki/Sparse_array "Sparse array - Wikipedia"

#### Vectorizing

In [26]:
va = VectorAssembler(inputCols=['Height', 'Sex (one hot)'], outputCol='features')
weights = va.transform(weights)
weights.show(5)

+---+------+------+-------------+-------------+---------------+
|Sex|Height|Weight|Sex (indexed)|Sex (one hot)|       features|
+---+------+------+-------------+-------------+---------------+
|  f| 171.1|  57.0|          1.0|(2,[1],[1.0])|[171.1,0.0,1.0]|
|  m| 180.1|  84.7|          0.0|(2,[0],[1.0])|[180.1,1.0,0.0]|
|  m| 161.9|  73.6|          0.0|(2,[0],[1.0])|[161.9,1.0,0.0]|
|  m| 176.5|  81.0|          0.0|(2,[0],[1.0])|[176.5,1.0,0.0]|
|  f| 167.3|  57.4|          1.0|(2,[1],[1.0])|[167.3,0.0,1.0]|
+---+------+------+-------------+-------------+---------------+
only showing top 5 rows



### Split the data

In [27]:
train_weights, test_weights = weights.randomSplit([0.7, 0.3], seed=8128)

### Instantiate the model

In [30]:
weight_lr = LinearRegression(featuresCol='features', 
                             labelCol='Weight', 
                             predictionCol='predicted weight')

### Fit the model

In [31]:
weight_lm = weight_lr.fit(weights)

### Inspect the model

In [32]:
print (weight_lm.coefficients)
print (weight_lm.intercept)

[0.7398664663003207,7.981299963618155,-7.981299963620369]
-57.164935986522075


### Apply the model

In [33]:
train_weights = weight_lm.transform(train_weights)
train_weights.show(5)

+---+------+------+-------------+-------------+---------------+------------------+
|Sex|Height|Weight|Sex (indexed)|Sex (one hot)|       features|  predicted weight|
+---+------+------+-------------+-------------+---------------+------------------+
|  f| 153.0|  52.1|          1.0|(2,[1],[1.0])|[153.0,0.0,1.0]| 48.05333339380662|
|  f| 155.9|  52.6|          1.0|(2,[1],[1.0])|[155.9,0.0,1.0]| 50.19894614607755|
|  f| 156.1|  53.6|          1.0|(2,[1],[1.0])|[156.1,0.0,1.0]| 50.34691943933762|
|  f| 156.3|  48.3|          1.0|(2,[1],[1.0])|[156.3,0.0,1.0]|50.494892732597684|
|  f| 156.6|  51.4|          1.0|(2,[1],[1.0])|[156.6,0.0,1.0]| 50.71685267248777|
+---+------+------+-------------+-------------+---------------+------------------+
only showing top 5 rows



### Assess the model

In [34]:
evaluator = RegressionEvaluator(predictionCol="predicted weight", 
                                labelCol="Weight", 
                                metricName="rmse")
print (evaluator.evaluate(train_weights))

3.001442785651967


### Validate the model

In [35]:
test_weights = weight_lm.transform(test_weights)
print (evaluator.evaluate(test_weights))

2.8654785372425797


# Execrise 2:
The file prices.csv contains rental details for many apartments in several cities. Read the file, use its data to create two linear models for estimating the price (part I and part II below), and explain which one is better and why.

* Part I - The ‘Rooms’ feature is an integer.
* Part II - The ‘Rooms’ feature is a dummy variables.