# IST 718: Big Data Analytics

- Professor: Daniel Acuna <deacuna@syr.edu>

## General instructions:

- You are welcome to discuss the problems with your classmates but __you are not allowed to copy any part of your answers either from your classmates or from the internet__
- You can put the homework files anywhere you want in your http://notebook.acuna.io workspace but _do not change_ the file names. The TAs and the professor use these names to grade your homework.
- Remove or comment out code that contains `raise NotImplementedError`. This is mainly to make the `assert` statement fail if nothing is submitted.
- The tests shown in some cells (i.e., `assert` and `np.testing.` statements) are used to grade your answers. **However, the professor and TAs will use __additional__ test for your answer. Think about cases where your code should run even if it passess all the tests you see.**
- Before downloading and submitting your work through Blackboard, remember to save and press `Validate` (or go to 
`Kernel`$\rightarrow$`Restart and Run All`). 
- Good luck!

In [2]:
# this code creates the spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
import numpy as np

# Part 1: Map-Reduce: Gradient descent

Throughout this assignment, you should use vanilla Python and not Numpy.

Some statistical models $f(x)$ are learned by optimizing a loss function $L(\Theta)$ that depends on a set of parameters $\Theta$. There are several ways of finding the optimal $\Theta$ for the loss function, one of which is to iteratively update following the gradient:

$$
\nabla L = 
\begin{pmatrix} 
    \frac{\partial L}{\partial \theta_0}\\ 
    \frac{\partial L}{\partial \theta_1} \\ 
    \vdots\\ 
    \frac{\partial L}{\partial \theta_p}
\end{pmatrix}
$$

To then, compute the update
$$\Theta^{t+1} = \Theta^t - \eta \nabla L$$

Because we assume independence between data points, the gradient becomes a summation:

$$\nabla L = \sum_{i=1}^{n} \nabla L_i$$
where $L_i$ is the loss function for the $i$-th data point.

Take as example, the statistical model $f(x) = b_0 + b_1 x$ and loss function $L(\Theta) = (f(x) - y)^2$. If we have a set of three datapoints $D=\{ (x=1,y=2), (x=-2, y=-1), (x=4, y = 3)\}$

Then the loss function for each of them is
$L_1 = \left(b_{0} + b_{1} - 2\right)^{2}$, 
$L_2 = \left(b_{0} - 2 b_{1} + 1\right)^{2}$, and
$L_3 = \left(b_{0} + 4 b_{1} - 3\right)^{2}$

with 
$$\nabla L_i = \left[\begin{matrix}2 b_{0} + 2 b_{1} x_i - 2 y_i\\2 x_i \left(b_{0} + b_{1} x_i - y_i\right)\end{matrix}\right]$$

if we start with a solution $b_0 = 0, b_1 = 1$, then the gradients are:

$$\nabla L_1 = \left[\begin{matrix}-2\\-2\end{matrix}\right]$$
$$\nabla L_2 = \left[\begin{matrix}-2\\4\end{matrix}\right]$$
$$\nabla L_3 = \left[\begin{matrix}2\\8\end{matrix}\right]$$

which after accumulation would yield
$$\nabla L = \left[\begin{matrix}-2\\10\end{matrix}\right]$$

## Question 1 (5 pts)

Create a function `f_linear(b, x)` that receives the parameters `b` and one data point `x` as lists and return the prediction for that data point. Assume that the first element of `b` is the intercept.

In [6]:
# define below the function `f_linear` which performs a linear prediction based on parameters as data point
def f_linear(b, x):
    z = b[0]
    for i in range(len(x)):
        z = z + b[i+1]*x[i]
    return(z)
    

In [7]:
# for the example above, if we assume b = [0, 1], and the first data point x = [1], y = 2
f_linear([0, 1], [1])

In [8]:
# test (5 pts)
assert f_linear([1, 1, 2, 3], [2, 1, 3]) == 14
assert f_linear([1], []) == 1
assert f_linear([0, 1, 0, 1, 0, 1], [0, 10, 10, 10 , 10]) == 20

## Question 2 (5 pts)
Define the function `L(y_pred, y)` that receives a prediction `y_pred` and the actual value `y` and returns the squared error between them.

In [10]:
def L(y_pred, y):
  z= (y - y_pred)**2
  return z


In [11]:
# there should be not error here
L(1, 1)

In [12]:
# 2^2 error
L(0, 2)

In [13]:
# (5 pts)
assert L(1, 1) == 0
assert L(0, 4) == 16

## Question 3 (10 pts)
Create a function `gf_linear(f, b, x, y)` which returns the gradient of the linear function `f` with parameter `b` with respect to the squared loss function, evaluated at `x` and the actual outcome `y`. This function should return a vector with each element $j$ corresponding to the gradient with respect $b_j$, with $j = \{0, 1, \dots, p\}$.

In [15]:
def gf_linear(f, b, x, y):
    f = f_linear(b,x)
    Data_point = []
    if len(x) == 0:
        a = 2*(f - y)
        Data_point.append(a)
     
    else:
      a = 2*(f-y)
      Data_point.append(a)
      Data_point2 =[]
      for i in range(len(x)):
        Data_point2 = 2*x[i]*(f-y)
        Data_point.append(Data_point2)
    return Data_point

In [16]:
# for the example above and first data point
x = [1]
y = 2
b = [0, 1]
gf_linear(f_linear, b, x, y)

In [17]:
# for the example above and second data point
x = [-2]
y = -1
b = [0, 1]
gf_linear(f_linear, b, x, y)

In [18]:
## (10 pts)
np.testing.assert_array_equal(gf_linear(f_linear, [0, 1], [1], 2), [-2, -2])
np.testing.assert_array_equal(gf_linear(f_linear, [0, 1], [-2], -1), [-2, 4])
np.testing.assert_array_equal(gf_linear(f_linear, [1], [], 0), [2])

## Question 4 (15 pts)

Develop a map-reduce job that produces a value so that the first element of the value is the mean loss function across all the data. You might use other pieces of information as part of the value to create your computation.

You will implement your map function as `map_mse(f, b, L, xy)` where `f` is the function `b` are the parameters of the function `L` is the loss function and `xy` is the data. Assume that the data will come as an RDD where each element is of the format:

`[x, y]` where `x` is a list and `y` is a scalar.

Since the key does not matter for this map reduce job, just put a constant of your choice.

In [20]:
# data
rdd_data = sc.parallelize([
    [[1, 2], 3],
    [[3, 1], 4],
    [[-1, 1.5], 0],
    [[-9, 3], 0]
])

In [21]:

# create function `map_mse` below
def map_mse(f, b, L, xy):
    f = f_linear(b,xy[0])
    L= L(f,xy[1])
    return [1,[L,1]]

You should apply the map function in the following way:

```python
# for an example set of `b = [0, 0, 0]`
rdd_data.map(lambda x: map_generator(f_linear, [0, 0, 0], L, x))
```

In [23]:
# try it here
rdd_data.map(lambda x: map_mse(f_linear, [0, 0, 0], L, x)).collect()

In [24]:
# (10 pts)
assert rdd_data.map(lambda x: map_mse(f_linear, [0, 0, 0], L, x)).count() == 4
assert rdd_data.map(lambda x: map_mse(f_linear, [0, 0, 0], L, x)).map(lambda x: len(x)).\
    distinct().\
    first() == 2

assert rdd_data.map(lambda x: map_mse(f_linear, [0, 0, 0], L, x)).count() == 4
# the first element should be a number
assert isinstance((rdd_data.map(lambda x: map_mse(f_linear, [0, 0, 0], L, x)).first()[1][0]), 
                  (int, float, complex))
# try with other initializations
assert isinstance((rdd_data.map(lambda x: map_mse(f_linear, [1, 2, 3], L, x)).first()[1][0]), 
                  (int, float, complex))

You will now create a reduce job that receives two values of a previous reduce (or map) and merge them appropriately. Remember that at the end of the reduce job, the first element of the value should be the mean squared error. Create the function `reduce_mse(v1, v2)` below.

In [26]:
# create function `reduce_mse` below
def reduce_mse(v1, v2):
    a1,n1 = v1
    a2,n2 = v2
    return[(a1*n1+a2*n2)/(n1+n2),n1+n2]
    raise NotImplementedError()

In [27]:
# the following function call should return the mean squared error
rdd_data.\
    map(lambda x: map_mse(f_linear, [0, 0, 0], L, x)).\
    reduceByKey(reduce_mse).first()[1][0]

In [28]:
# the following function call should return the mean squared error
rdd_data.\
    map(lambda x: map_mse(f_linear, [2, 2, 3], L, x)).\
    reduceByKey(reduce_mse).first()[1][0]

In [29]:
# (5 pts)
assert rdd_data.\
    map(lambda x: map_mse(f_linear, [0, 0, 0], L, x)).\
    reduceByKey(reduce_mse).first()[1][0] == 6.25

assert rdd_data.\
    map(lambda x: map_mse(f_linear, [2, 0, 0], L, x)).\
    reduceByKey(reduce_mse).first()[1][0] == 3.25

assert rdd_data.\
    map(lambda x: map_mse(f_linear, [2, 2, 3], L, x)).\
    reduceByKey(reduce_mse).first()[1][0] == 41.8125

## Question 5 (10 pts)

In this question, you will compute the cumulative gradient of a model on the data. You will define a map function `map_gradient(f, gf, b, xy)` that would receive a function `f`, its gradient `gf`, its parameters `b`, and a data point `xy = [x, y]`. Also you will define a function `reduce_gradient(v1, v2)` that combines the two values appropriately. In the map function, you probably do not need to keep extra values beyond the actual gradient.

In [31]:
# define the function `map_gradient` below
def map_gradient(f, gf, b, xy):
    gf = gf_linear(f,b,xy[0],xy[1])
    print(gf)
    return [1,gf]
    raise NotImplementedError()

In [32]:
# (5 pts)
assert len(rdd_data.map(lambda xy: map_gradient(f_linear, gf_linear, [0, 0, 0], xy)[1]).first()) == 3

In [33]:
# define the function `reduce_gradient` below
def reduce_gradient(v1, v2):
    reduce_grad=[]
    for i in range(len(v1)):
        reduce_grad.append(v1[i]+v2[i])
    return(reduce_grad)
    raise NotImplementedError()

In [34]:
rdd_data.map(lambda xy: map_gradient(f_linear, gf_linear, [0, 0, 0], xy))


In [35]:
# (5 pts)
np.testing.assert_array_equal(
    rdd_data.map(lambda xy: map_gradient(f_linear, gf_linear, [0, 0, 0], xy)).\
    reduceByKey(reduce_gradient).first()[1],
    [-14.0, -30.0, -20.0])

np.testing.assert_array_equal(
    rdd_data.map(lambda xy: map_gradient(f_linear, gf_linear, [0, 0, 0], xy)).\
    reduceByKey(reduce_gradient).first()[1],
    [-14.0, -30.0, -20.0])

if all your answers are correct, then we can run an optimization below, and the MSE should decrease with each iteration

In [37]:
b = [0, 0, 0]
learning_rate = 0.07
print("Initial solution: \t", b)
for _ in range(10):
    print("New iteration")
    print("=============")
    gradient = rdd_data.map(lambda xy: map_gradient(f_linear, gf_linear, b, xy)).\
        reduceByKey(reduce_gradient).first()[1]
    b = [b0 - learning_rate*g0 for b0, g0 in zip(b, gradient)]
    print("Current solution: \t", b)
    mse = rdd_data.\
        map(lambda x: map_mse(f_linear, b, L, x)).\
        reduceByKey(reduce_mse).first()[1][0]
    print("Current MSE: \t\t", mse)
    
    

**(5 pts)** In the code, above, play with the value of `learning_rate` less than 1.0 until the optimizer diverges (the loss function goes down and then goes *up*). What is this learning rate?

In [39]:
#To reach the optimal solution we use a rate at which we adjust our optimal soltion during each process. 