In [None]:
import numpy as np
import math , sys
import matplotlib.pyplot as plt
%matplotlib inline

from pyspark.mllib.regression import LabeledPoint

# hyper-parameters and configuration options 
NUM_ITERATIONS = 100
LEARNING_RATE = 0.3
SHOW_PROGRESS = True
DECISION_BOUNDARY_STYLE = ':'


def sigmoid(gamma):
  if gamma < 0:
    return 1 - 1 / (1 + math.exp(gamma))
  return 1 / (1 + math.exp(-gamma))

def plot_decision_boundary(w):
  x = np.linspace(1, 10, 50)
  y = -(w[0]*x + w[2])/w[1]
  plt.plot(x, y, DECISION_BOUNDARY_STYLE)


# generate artificial datasets for binary classification
class_a = np.random.multivariate_normal([3, 3], [[1, 0], [0, 1]], 100)
class_b = np.random.multivariate_normal([8, 8], [[1, 0], [0, 1]], 100)

In [None]:
# prepare the dataset for logistic regression  and distribute it 
part_a = sc.parallelize(class_a).map(lambda features: LabeledPoint(1, np.append(features, [1])))
part_b = sc.parallelize(class_b).map(lambda features: LabeledPoint(-1, np.append(features, [1])))
training_data = part_a.union(part_b).cache()


# initialize the parameters 
w = np.random.randn(1, 3)
w = w[0,:]

# visualize the dataset
plt.plot(class_a[:,0], class_a[:,1], 'bo')
plt.plot(class_b[:,0], class_b[:,1], 'ro')
plot_decision_boundary(w)
plt.show()

The dataset you saw upon running the cells above has 2 classes, and you will implement Logistic Regression with stochastic gradient descent to perform classification. The steps are as follows:

* The data has been put into an RDD `training_data` for you. This has labels and features you can access as `x.label` and `x.features` for an element `x` which is being referred to in the map phase.
* The weights are available as `w` and are initialized to the values corresponding to the line you see in the above plot.
* The _map_ phase is to compute the gradient using the usual logistic regression update. This is found as
$$ (\sigma (y \cdot w^{T}x) - 1)\cdot y \cdot x $$
  where $\sigma$ is the logistic function defined for you as `sigmoid`, $x$ is the feature vector and $y$ is the label which is given as +1 or -1.
* Next, randomly sample elements using the `gradient.sample(False, 0.05)`. This will return an RDD with the gradient of a random set of 5 percent of the data.
* The _reduce_ phase is just to sum up these gradients as before.
* Once you have the minibatch gradient, update the weights with the learning rate as defined for you in the variable `LEARNING_RATE`.

In [None]:
# train logistic regression with distributed gradient descent
for i in range(1, NUM_ITERATIONS+1):
  sys.stdout.write( "EPOCH %2d :"% (i))

  # Your code to do the gradient update
  
  ###########

  if SHOW_PROGRESS:
    #print the line equation 
    #print  str(format(w[0], '3.2f')) + ' x + ' + str(round(w[1],2)) + ' y + ' + str(round(w[2],2))
    print( " %7.2f x + %7.2f y + %7.2f " % (w[0], w[1], w[2]))
    plot_decision_boundary(w)

# plot the final decision boundary 
x = np.linspace(1, 12, 20)
y = -(w[0]*x + w[2])/w[1]
plt.plot(x, y,'g', linewidth=2)

# plot the dataset 
plt.plot(class_a[:,0], class_a[:,1], 'bo')
plt.plot(class_b[:,0], class_b[:,1], 'ro')
axes = plt.gca()
axes.set_ylim([-4, 12])

# show the evolution of decision boundary along with the dataset
plt.show()