# Least Squares Regression

This notebook explains how to implement least squares regression using PySpark Map-Reduce.

Spark exposes two interfaces to data:

1. An [RDD](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD) interface which represents a collection of rows which can be any python object.
1. A [dataframe](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html) interface which is similar to Pandas (but with less functionality) and built on top of the RDD interface

Today Spark users are encouraged to try to use the dataframe interface which provides additional system performance optimizations. However, in the homework and in this notebook we will use a bit of both to get some exposure the the low-level side of distributed data processing.

## Loading the Diamond Dataset

We begin by loading data describing diamonds.  We will load this data using the dataframe interface to Spark.

In [4]:
diamonds = (
  sqlContext.read.format('csv')
    .options(header='true', inferSchema='true')
    .load('/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv')
    .cache()
  )

We can take a look at a table summarizing this data:

In [6]:
display(diamonds)

## Data Preperation

For this notebook we will examine a subset of the columns and only expensive diamonds.  In the following we use the Dataframe API to subselect the data.

In [9]:
data = (
  diamonds
    .where(diamonds['price'] > 1000)
    .select(['cut', 'color', 'carat', 'clarity', 'price'])
)

In [10]:
display(data.sample(False, 0.01).select(['carat', 'price']))

# Spark RDD Exercise

Let's count the number of diamonds in each cut:

In [13]:
# We can access the underlying RDD for any dataframe
rdd = data.rdd
rdd

Spark is implemented in Scala (which runs in Java) and thus much of the backend of Spark is backed by java functionality (hence `JavaToPython`).  We will now try a few functions on the `rdd`.  This RDD is composed of [`Row` objects](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.Row) that behave like python dictionaries and even have a convenient `toDict()` function that returns a native Python dictionary.  They keys in the row correspond to the columns in the dataframe.

In [15]:
rdd.count()

The following code snippet will take an RDD and transform each row into a tuple consisting of the String corresponding to the color of the diamond in that row and the integer 1.  The [`reduceByKey`](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD.reduceByKey) function assums that the input RDD consists of tuples of `(key, value)` pairs. It takes a function that takes two values (or intermediate value) and combines them to produce a new value.   Notice that nothing happens when we run this cell:

In [17]:
counts_rdd = rdd.map(lambda row: (row['color'], 1)).reduceByKey(lambda a, b: a + b)
counts_rdd

To compute an actual value we instead must invoke an action like [`collect()`](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD.collect) or [`count()`](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD.count).

In [19]:
counts_rdd.collect()

# Least Squares Regression in Map-Reduce

To implement least squares regression in map-reduce we recall that the the optimal parameter value is given by the **normal equation**:

$$
\hat{\theta} = \left(X^TX\right)^{-1} X^T Y
$$

We can compute a single entry in the \\(X^T X\\) matrix:

$$
\left(X^T X\right)_{i,j} 
= \sum_{k=1}^n X_{i,k} X_{k,j}
$$

In the following we specify the set of features and which features are categorical.

In [22]:
features = ['cut', 'color', 'carat', 'clarity']
categorical = set(['cut', 'color', 'clarity'])
label = 'price'

We then define two map functions:
1. `xtx_map`: which computes the entries of the $$ X^T X = \sum_{i=1}^n x_i x_i^T$$ matrix
1. `xty_map`: which computes the entries of the $$ X^T Y = \sum_{i=1}^n x_i y_i$$ matrix

These functions are slightly more complex because they implement one hot encoding of the categorical features.  Try to figure out how they work. A few observations:
1. They reach return multiple values using the python `yield` syntax
1. The values correspond to entries in the matrix \\(x x^T\\) and \\(x y\\) respectively.
1. Categorical features (e.g., `cut="Ideal"`) are mapped to new features (e.g., `cut_Ideal=1.0`) with value 1.0

In [24]:
def xty_map(row):
  row = row.asDict()
  for i in features:
    # If the features is not categorical (e.g., carat) then:
    #    the features name is just the name of the feature and 
    #    the value is just the value
    # otherwise the feature is categorical and we implement a one-hot-encoding
    #    the feature name is the name + "_" + value (e.g., cut_Ideal)
    #    the feature value is 1.0
    (ki, vi) = (i, row[i]) if i not in categorical else (i+"_"+row[i], 1.0)
    yield (ki, vi * row[label])

In [25]:
def xtx_map(row):
  row = row.asDict()
  # Iterating over features = ['cut', 'carat', ...]
  # we are computing the outer product over each feature value:
  for i in features:
    (ki, vi) = (i, row[i]) if i not in categorical else (i+"_"+row[i], 1.0)
    for j in features:
      (kj, vj) = (j, row[j]) if j not in categorical else (j+"_"+row[j], 1.0)
      yield ((ki,kj), vi * vj)

## Debugging

Because debugging in Spark can be difficult we will want to test these functions outside of Spark first:

In [27]:
row = data.take(1)[0]
row

In [28]:
[a for a in xtx_map(row)]

In [29]:
[a for a in xty_map(row)]

## FlatMap and Reduce

Here we use the [`flatMap`](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD.flatMap) function which is nearly identical to the [`map`](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD.map) function except that it may output multiple values which are all concatenated to the output.

The following lines will trigger computation across the data center by shipping our `xtx_map` and `xty_map` functions to remote workers wher ethe data is stored.

In [31]:
xtx_data = ( data.rdd
  .flatMap(xtx_map)
  .reduceByKey(lambda a, b: a + b)
  .collect()
)

xty_data = ( data.rdd
  .flatMap(xty_map)
  .reduceByKey(lambda a, b: a + b)
  .collect()
)

In the following block of code we compute the index in the feature vector for each of the string features.

In [33]:
index = dict(zip([r[0] for r in xty_data], range(len(xty_data))))
p = len(index)
print("Dimensions", p)
index

Using the index which translates each feature name to a particular location in an `x` vector we then fill out the entries of the local `XTY` and `XTX` matrices.  Notice that this is done entirely in this python notebook and not by calling Spark.  For very high dimensional problems this could be an issue.

In [35]:
import numpy as np
XTY = np.zeros((p,1))
for (k,v) in xty_data:
  XTY[index[k]] = v

XTX = np.zeros((p,p))
for ((k1,k2),v) in xtx_data:
  XTX[index[k1], index[k2]] = v

To compute the optimal \\(\hat{\theta}\\) value we need to solve the \\(p \times p\\) system of linear equations.  This is done by calling the linear algebra library in numpy.  This computation is running locally inside of this notebook.

In [37]:
theta = np.linalg.solve(XTX, XTY)

# Rendering Predictions

In the following block of code we define a new **user defined function** to run on the PySpark dataframe and render predictions.  We first demonstrate how this function can be applied to the raw:

In [39]:
def predict(row):
  row = row.asDict()
  pred = 0.0
  for i in features:
    (ki, vi) = (i, row[i]) if i not in categorical else (i+"_"+row[i], 1.0)
    pred += vi * theta[index[ki],0]
  return float(pred)

The following function computes the root mean squared error in the predicted price by running a sequence of map functions which run across the datacenter.

In [41]:
rmse = np.sqrt(data.rdd
   .map(lambda row: (row[label], predict(row)))
   .map(lambda (y,yhat): (yhat - y) * (yhat - y) )
   .mean()
 )
print("RMSE:", rmse)

Ideally we would like to add the predictions directly into our dataframe.  The following block of code creates a **user defined function** (UDF) and then uses the UDF to add an additional column to the Dataframe.

In [43]:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import FloatType

# A UDF must have a defined return type
predict_udf = udf(predict, FloatType())

# the Struct of features states which columns in the dataframe are given to the UDF
data = data.withColumn("pred", predict_udf(struct(features)))

In [44]:
from pyspark.sql.functions import mean
data = data.withColumn("resid", data['pred'] - data['price'])
np.sqrt(data.select(mean(data['resid'] * data['resid'])).collect()[0][0])

In [45]:
display(data.sample(False, 0.1).select('resid'))

In [46]:
display(data.sample(False, 0.01))

In [47]:
from pyspark.sql.functions import lit
avg_price = data.select(mean(data['price'])).collect()[0][0]
data = data.withColumn("mean_resid", data['pred'] - lit(avg_price))
np.sqrt(data.select(mean(data['mean_resid'] * data['mean_resid'])).collect()[0][0])