In [2]:
from random import seed
from csv import reader
from random import randrange
from math import exp
errors=[]

In [13]:
def fileLoad(filename):
    d = [[i for i in line.strip().split(';')] for line in open(filename).readlines()]
    return d

In [4]:
def calcAccuracy(actual, predicted):
    correct = 0
    for i in range(len(actual)):
        if actual[i] == predicted[i]:
            correct += 1
    return correct / float(len(actual)) * 100.0

In [5]:
def normalizeData(data, minmax):
    for row in data:
        for i in range(len(row)):
            row[i] = (row[i] - minmax[i][0]) / (minmax[i][1] - minmax[i][0])

In [6]:
def dataset_minmax(dataset):
    minmax = list()
    for i in range(len(dataset[0])):
        col_values = [row[i] for row in dataset]
        value_min = min(col_values)
        value_max = max(col_values)
        minmax.append([value_min, value_max])
    return minmax

In [17]:
def str_column_to_float(dataset, column):
    for row in dataset:
        row[column] = float(row[column].strip())

In [7]:
def predict(row, coefficients):
    yhat = coefficients[0]
    for i in range(len(row) - 1):
        yhat += coefficients[i + 1] * row[i]
    return 1.0 / (1.0 + exp(-yhat))

In [35]:
def sgdCoeff(train, learnRate, epochRate):
    global errors 
    errors=[]
    coeff = [0.0 for i in range(len(train[0]))]
    for epoch in range(epochRate):
        sumError = 0
        
        for row in train:
            yhat = predict(row, coeff)
            error = row[-1] - yhat
            sumError += error**2
            coeff[0] = coeff[0] + learnRate * yhat * error * (1-yhat)
            for i in range(len(row) - 1):
                coeff[i + 1] = coeff[i + 1] + learnRate * yhat * error * (1-yhat) * row[i]
        errors.append(sumError)
    return coeff

In [9]:
def evalMethod(data, numFolds, *args):
    folds = list()
    scores = list()
    copyData = list(data)
    foldSize = int(len(data) / numFolds)
    for i in range(numFolds):
        fold = list()
        while len(fold) < foldSize:
            index = randrange(len(copyData))
            fold.append(copyData.pop(index))
        folds.append(fold)
    for fold in folds:
        trainingSet = list(folds)
        trainingSet.remove(fold)
        trainingSet = sum(trainingSet, [])
        testingSet = list()
        for row in fold:
            copyRow = list(row)
            testingSet.append(copyRow)
            copyRow[-1] = None
        predicted = lr(trainingSet, testingSet, *args)
        actual = [row[-1] for row in fold]
        accuracy = calcAccuracy(actual, predicted)
        scores.append(accuracy)
    return scores

In [10]:
def lr(trainData, testData, learnRate, epochRate):
    predictions = list()
    coeff = sgdCoeff(trainData, learnRate, epochRate)
    for row in testData:
        yhat = predict(row, coeff)
        yhat = round(yhat)
        predictions.append(yhat)
    return (predictions)

In [56]:
def driver(epochRate):    
    seed(100)
    dataset = fileLoad('winequality-red.csv')
    for i in range(len(dataset[0])):
        str_column_to_float(dataset, i)
    normalizeData(dataset, dataset_minmax(dataset))
    
    epochRate = int(epochRate)
    numFolds = 5
    learnRate = 0.01

    scores = evalMethod(dataset, numFolds, learnRate,
                                epochRate)
    print('RMSE: %.3f%%' % (sum(scores) / float(len(scores))))

In [58]:
driver(100)

RMSE: 1.693%


In [59]:
import unittest
class TestNotebook(unittest.TestCase):
    def test_acc(self):
          seed(123)
          dataset = [[2.7810836,2.550537003,0],
                    [1.38807019,1.850220317,0],
                    [1.465489372,2.362125076,0],
                    [3.396561688,4.400293529,0],
                    [7.627531214,2.759262235,1],        
                    [3.06407232,3.005305973,0],
                    [5.332441248,2.088626775,1],
                    [6.922596716,1.77106367,1],
                    [8.675418651,-0.242068655,1],
                    [7.673756466,3.508563011,1]];
          normalizeData(dataset, dataset_minmax(dataset))
          scores = evalMethod(dataset,10,0.1,10)
          acc10 = (sum(scores) / float(len(scores)))
          scores = evalMethod(dataset,10,0.1,50)
          acc50 = (sum(scores) / float(len(scores)))
          scores = evalMethod(dataset,10,0.1,100)
          acc100 = (sum(scores) / float(len(scores)))
          self.assertGreaterEqual(acc50, acc10)
          self.assertGreaterEqual(acc100, acc50)
          self.assertGreaterEqual(acc100, acc10)

    def test_err(self):
        global errors
        for i in range(0,len(errors)-1):
            self.assertGreaterEqual(errors[i],errors[i+1])

In [60]:
if __name__ == '__main__':
    unittest.main(argv=[''],verbosity=2, exit=False)

test_acc (__main__.TestNotebook) ... ok
test_err (__main__.TestNotebook) ... ok

----------------------------------------------------------------------
Ran 2 tests in 0.037s

OK
