In [1]:
import numpy as np
import pandas as pd
from mlxtend.preprocessing import shuffle_arrays_unison
from sklearn import linear_model
import unittest

In [2]:
def MSE(y, y_hat):
    sse = 0
    for i in range(y.size):
        sse += (y[i] - y_hat[i])**2
    return sse/len(y) 

In [3]:
def SGD(X, y, _lambda, eta, epochs):
    n = len(X)
    k = len(X[0])
    beta = np.zeros(k)
    flag = True
    _iter = 0
    while _iter<epochs:
        X, y = shuffle_arrays_unison(arrays=[X, y], random_seed=3)
        '''We shuffle the data points and for every training point we compute the gradient and update the weights'''
        for i in range(n): 
            for j in range(k):
                beta_gradient = _lambda*beta[j] -  (y[i] - np.dot(X[i],beta))*X[i,j]
                beta[j] = beta[j] - eta*beta_gradient
        _iter += 1
    return beta,MSE(y, np.dot(X,beta))

In [4]:
class SGDTestCase(unittest.TestCase):   
    def test_SGD(self):
        data = np.genfromtxt('sample_data.csv',delimiter=",")
        X = np.hstack((np.ones((len(data),1)),data[:,:-1]))
        y = data[:,-1]
        _lambda = 0.001
        _eta = 0.01
        epochs = 10000
        
        beta, error = SGD(X, y, _lambda, _eta, epochs)
        coefficients = beta[1:]
        intercept = beta[0]
        #Asserting if error is low
        self.assertTrue(error<=0.1, msg=None)
        
        '''Let's test the value with respect to the coefficients and intercept 
        after fitting the data over SGDRegressor, Linear Regression and Ridge Rigression
        '''
        
        #SGD Regression
        sgd = linear_model.SGDRegressor(penalty = 'l2', alpha = _lambda, max_iter=epochs, shuffle=True, learning_rate='constant', eta0=_eta)
        sgd.fit(X[:,1:],y)
        #Linear Regression
        lr = linear_model.LinearRegression().fit(X[:,1:], y)
        #Ridge Regression
        rr = linear_model.Ridge(alpha=_lambda).fit(X[:,1:], y)
        
        for i in range(len(coefficients)):
            self.assertTrue(abs(sgd.coef_[i]-coefficients[i])<=0.1, msg=None)
            self.assertTrue(abs(lr.coef_[i]-coefficients[i])<=0.1, msg=None)
            self.assertTrue(abs(rr.coef_[i]-coefficients[i])<=0.1, msg=None)
        self.assertTrue(abs(sgd.intercept_ - intercept)<=0.1, msg=None)
        self.assertTrue(abs(lr.intercept_ - intercept)<=0.1, msg=None)
        self.assertTrue(abs(rr.intercept_ - intercept)<=0.1, msg=None)
        
if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False)

.
----------------------------------------------------------------------
Ran 1 test in 26.494s

OK
