# "Mini-batch Gradient Descent with PySpark"


In [1]:
import numpy as np
import matplotlib
#matplotlib.rcParams['text.usetex'] = True
import matplotlib.pyplot as plt
from sklearn.datasets import make_regression
from sklearn.metrics import r2_score
import time
import random

random.seed(42)
np.random.seed(24)

In [2]:
# Setting up PySpark
!pip install pyspark
import pyspark
sc = pyspark.SparkContext().getOrCreate()

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=94ca74521d2d255f463205176699a9e2f040e4760c67d624b071787a8bb06d62
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


Calculate the gradient of MSE loss for every example below.

In [3]:
# define the gradient of MSE for every example
def per_example_mse_gradient(example, w):
    x = example[:-1]
    y = example[-1]
    # ... calculate the gradient here
    weights_gradient = 2 * (np.dot(w, x) - y) * np.array(x)
    return weights_gradient

In [4]:
# calculate cumulative sum of gradients
def cum_sum_gradients(row, next_row):
    return [gradient+next_gradient for gradient, next_gradient in zip(row, next_row)]

Write the map and reduce function below, using the per_example_mse_gradient and cum_sum_gradients functions

In [5]:
def distributed_minibatch_gradient_descent(rdd, learning_rate=0.1, n_iters=100, mini_frac=0.1):
    w = np.zeros(len(rdd.first()) - 1).tolist()  # -1 because the last value is y
    print(rdd.count())
    for i in range(n_iters):
        mini_batch = rdd.sample(False,mini_frac)
        m = mini_batch.count()

        # fill out the map and reduce functions using the per_example and cum_sum functions defined above
        rdd_gradient = mini_batch.map(lambda example: per_example_mse_gradient(example, w))\
                                 .reduce(cum_sum_gradients)

        # scaling with m and learning rate
        w_gradient = [learning_rate * (w / m) for w in rdd_gradient]

        # updating weights
        w = [w_j - w_grad_j for w_j, w_grad_j in zip(w, w_gradient)]

        #print([i, m])

    return w

Write a distributed function for gradient descent (without mini-batching) below

In [6]:
# write a distributed function for gradient descent (without mini-batching)
def distributed_gradient_descent(rdd, learning_rate=0.1, epoch=10):
    w = np.zeros(len(rdd.first())-1).tolist() # -1 because the last value is y
    m = rdd.count()
    for i in range(epoch):
        rdd_gradient = rdd.map(lambda example: per_example_mse_gradient(example, w))\
                          .reduce(cum_sum_gradients)

        # scaling with m and learning rate
        w_gradient = [learning_rate*(w/m) for w in rdd_gradient]

        # updating weights
        w = [w_j - w_grad_j for w_j, w_grad_j in zip(w, w_gradient)]

    return w

In [7]:
def comparison_with_coef(w, coef):
  print(f"""Here is a side-by-side comparison of our coefficients with Scikit-Learn's:
    distributed gradient descent -> {w}
    Scikit-Learn's coefficients  -> {coef}""")

In [8]:
data_kwargs = {"n_samples":1_000_00, "n_features":12, "noise":3, "coef":True, "random_state":101}

In [9]:
def test_mb_gd(n_iters=100, mini_frac=0.1, learning_rate=0.1):
  # Let's create the data
  X, y, coef = make_regression(**data_kwargs)
  coef = [round(c, 2) for c in coef]

  # Parallelize the data to run distributed
  rdd = sc.parallelize(np.hstack([X, y.reshape(-1,1)]).tolist()).cache()
  print(f"Our RDD has {rdd.getNumPartitions()} partitions.")

  w = distributed_minibatch_gradient_descent(rdd, n_iters=n_iters, mini_frac=mini_frac, learning_rate=learning_rate)

  w = [round(w_j, 2) for w_j in w]
  comparison_with_coef(w, coef)

In [10]:
def test_gd(epoch = 10):
  # Let's create the data
  X, y, coef = make_regression(**data_kwargs)
  coef = [round(c, 2) for c in coef]

  # Parallelize the data to run distributed
  rdd = sc.parallelize(np.hstack([X, y.reshape(-1,1)]).tolist()).cache()
  print(f"Our RDD has {rdd.getNumPartitions()} partitions.")

  w = distributed_gradient_descent(rdd, epoch=epoch)

  w = [round(w_j, 2) for w_j in w]
  comparison_with_coef(w, coef)

In [11]:
epoch=10
m_frac=0.4
n_iter = int(1.0/m_frac*epoch)
print(n_iter)

25


In [12]:
test_mb_gd()

Our RDD has 2 partitions.
100000
Here is a side-by-side comparison of our coefficients with Scikit-Learn's:
    distributed gradient descent -> [9.34, 9.13, 63.71, 64.1, 74.71, 85.57, 55.23, -0.01, 54.22, 19.58, 62.32, -0.02]
    Scikit-Learn's coefficients  -> [9.34, 9.12, 63.74, 64.1, 74.72, 85.55, 55.22, 0.0, 54.25, 19.59, 62.32, 0.0]


In [13]:
test_gd()

Our RDD has 2 partitions.
Here is a side-by-side comparison of our coefficients with Scikit-Learn's:
    distributed gradient descent -> [8.16, 8.19, 57.14, 57.26, 66.6, 76.73, 49.38, -0.08, 48.35, 17.45, 55.63, 0.26]
    Scikit-Learn's coefficients  -> [9.34, 9.12, 63.74, 64.1, 74.72, 85.55, 55.22, 0.0, 54.25, 19.59, 62.32, 0.0]


2. Compare your solution for distributed GD and distributed mini-batch DG with
that of Sklearn. What do you observe and why?
(Select all that are correct):
A. Distributed mini-batch GD is converging faster (less time)  
B. Distributed GD is converging faster  (less time)  
C. Distributed mini-batch GD has a closer solution to that of sklearn  
D. Distributed GD has a closer solution to that of sklearn  
E. Distributed GD and distributed mini-batch GD behave the same.  

In [14]:
test_gd(20)

Our RDD has 2 partitions.
Here is a side-by-side comparison of our coefficients with Scikit-Learn's:
    distributed gradient descent -> [9.19, 9.03, 63.05, 63.38, 73.83, 84.65, 54.61, -0.02, 53.6, 19.35, 61.59, 0.04]
    Scikit-Learn's coefficients  -> [9.34, 9.12, 63.74, 64.1, 74.72, 85.55, 55.22, 0.0, 54.25, 19.59, 62.32, 0.0]


In [15]:
test_gd(50)

Our RDD has 2 partitions.
Here is a side-by-side comparison of our coefficients with Scikit-Learn's:
    distributed gradient descent -> [9.34, 9.12, 63.73, 64.11, 74.72, 85.56, 55.23, -0.0, 54.24, 19.59, 62.31, -0.02]
    Scikit-Learn's coefficients  -> [9.34, 9.12, 63.74, 64.1, 74.72, 85.55, 55.22, 0.0, 54.25, 19.59, 62.32, 0.0]


In [16]:
test_gd(80)

Our RDD has 2 partitions.
Here is a side-by-side comparison of our coefficients with Scikit-Learn's:
    distributed gradient descent -> [9.34, 9.12, 63.73, 64.11, 74.72, 85.56, 55.23, -0.0, 54.24, 19.59, 62.31, -0.02]
    Scikit-Learn's coefficients  -> [9.34, 9.12, 63.74, 64.1, 74.72, 85.55, 55.22, 0.0, 54.25, 19.59, 62.32, 0.0]


In [17]:
test_gd(100)

Our RDD has 2 partitions.
Here is a side-by-side comparison of our coefficients with Scikit-Learn's:
    distributed gradient descent -> [9.34, 9.12, 63.73, 64.11, 74.72, 85.56, 55.23, -0.0, 54.24, 19.59, 62.31, -0.02]
    Scikit-Learn's coefficients  -> [9.34, 9.12, 63.74, 64.1, 74.72, 85.55, 55.22, 0.0, 54.25, 19.59, 62.32, 0.0]


3. Increase the number of iterations for distributed GD and compare the solutions. What do you observe?  
A. Distributed GD’s solution gets farther away from the sklearn solution  
B. Distributed GD’s solution gets closer to the sklearn solution  
C. More iterations do not change the output of Distributed GD.  


In [18]:
test_mb_gd(mini_frac=0.5)

Our RDD has 2 partitions.
100000
Here is a side-by-side comparison of our coefficients with Scikit-Learn's:
    distributed gradient descent -> [9.34, 9.12, 63.74, 64.1, 74.71, 85.57, 55.22, -0.0, 54.23, 19.59, 62.31, -0.01]
    Scikit-Learn's coefficients  -> [9.34, 9.12, 63.74, 64.1, 74.72, 85.55, 55.22, 0.0, 54.25, 19.59, 62.32, 0.0]


In [19]:
test_mb_gd(mini_frac=0.6)

Our RDD has 2 partitions.
100000
Here is a side-by-side comparison of our coefficients with Scikit-Learn's:
    distributed gradient descent -> [9.34, 9.12, 63.73, 64.11, 74.72, 85.56, 55.23, -0.0, 54.24, 19.59, 62.31, -0.02]
    Scikit-Learn's coefficients  -> [9.34, 9.12, 63.74, 64.1, 74.72, 85.55, 55.22, 0.0, 54.25, 19.59, 62.32, 0.0]


In [20]:
test_mb_gd(mini_frac=0.7)

Our RDD has 2 partitions.
100000
Here is a side-by-side comparison of our coefficients with Scikit-Learn's:
    distributed gradient descent -> [9.34, 9.12, 63.73, 64.11, 74.72, 85.56, 55.23, -0.0, 54.23, 19.59, 62.31, -0.02]
    Scikit-Learn's coefficients  -> [9.34, 9.12, 63.74, 64.1, 74.72, 85.55, 55.22, 0.0, 54.25, 19.59, 62.32, 0.0]


In [21]:
test_mb_gd(mini_frac=0.8)

Our RDD has 2 partitions.
100000
Here is a side-by-side comparison of our coefficients with Scikit-Learn's:
    distributed gradient descent -> [9.34, 9.12, 63.73, 64.11, 74.72, 85.56, 55.23, -0.0, 54.23, 19.59, 62.31, -0.01]
    Scikit-Learn's coefficients  -> [9.34, 9.12, 63.74, 64.1, 74.72, 85.55, 55.22, 0.0, 54.25, 19.59, 62.32, 0.0]


4. Change the mini-batch size to a larger batch size for distributed mini-batch GD and compare the solutions again. What do you observe?  
A. Distributed mini-batch GD now converges faster (less time)  
B. Distributed mini-batch GD now converges slower (less time)  
C. Change of batch size does not influence the behavior of Distributed mini-batch GD.    

In [22]:
test_mb_gd(learning_rate=0.1)

Our RDD has 2 partitions.
100000
Here is a side-by-side comparison of our coefficients with Scikit-Learn's:
    distributed gradient descent -> [9.34, 9.11, 63.74, 64.1, 74.72, 85.57, 55.21, -0.01, 54.25, 19.57, 62.32, -0.01]
    Scikit-Learn's coefficients  -> [9.34, 9.12, 63.74, 64.1, 74.72, 85.55, 55.22, 0.0, 54.25, 19.59, 62.32, 0.0]


In [23]:
test_mb_gd(learning_rate=0.01)

Our RDD has 2 partitions.
100000
Here is a side-by-side comparison of our coefficients with Scikit-Learn's:
    distributed gradient descent -> [7.97, 7.98, 55.5, 55.64, 64.66, 74.57, 47.89, -0.04, 47.01, 16.96, 53.94, 0.29]
    Scikit-Learn's coefficients  -> [9.34, 9.12, 63.74, 64.1, 74.72, 85.55, 55.22, 0.0, 54.25, 19.59, 62.32, 0.0]


In [24]:
test_mb_gd(learning_rate=0.001)

Our RDD has 2 partitions.
100000
Here is a side-by-side comparison of our coefficients with Scikit-Learn's:
    distributed gradient descent -> [1.54, 1.74, 11.69, 11.66, 13.5, 15.72, 10.04, -0.08, 9.84, 3.52, 11.3, 0.19]
    Scikit-Learn's coefficients  -> [9.34, 9.12, 63.74, 64.1, 74.72, 85.55, 55.22, 0.0, 54.25, 19.59, 62.32, 0.0]


In [25]:
test_mb_gd(learning_rate=0.5)

Our RDD has 2 partitions.
100000
Here is a side-by-side comparison of our coefficients with Scikit-Learn's:
    distributed gradient descent -> [9.32, 9.14, 63.68, 64.09, 74.74, 85.61, 55.22, 0.02, 54.23, 19.6, 62.3, 0.02]
    Scikit-Learn's coefficients  -> [9.34, 9.12, 63.74, 64.1, 74.72, 85.55, 55.22, 0.0, 54.25, 19.59, 62.32, 0.0]


In [26]:
test_mb_gd(learning_rate=1.0)

Our RDD has 2 partitions.
100000
Here is a side-by-side comparison of our coefficients with Scikit-Learn's:
    distributed gradient descent -> [150.47, -28.52, -371.69, -611.63, -10.65, -1044.47, -393.71, 403.81, -15.58, 8.11, 102.37, -626.36]
    Scikit-Learn's coefficients  -> [9.34, 9.12, 63.74, 64.1, 74.72, 85.55, 55.22, 0.0, 54.25, 19.59, 62.32, 0.0]


5. Select the learning rate for distributed mini-batch GD with a batch size of 40\% (on the master node), that converges closest to the sklearn solution in 10 iterations.  
A. 0.1  
B. 0.01  
C. 0.001   
D. 0.5  
E. 1.0

In [27]:
# shutting down spark context
sc.stop()