<a href="https://colab.research.google.com/github/danny1461/CSCI-191T-Machine-Learning/blob/main/assignment_03_linear_logistic_regression.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Linear & Logistical Gradient Descent

By Daniel Flynn

A simple description of machine learning would be process of evaluating inputs in order to predict an output. By converting our inputs into numbers and assigning a weight (how much value we consider this particular input to have in our final decision) for each, we arrive at a simple way aggregating everything into our final *conclusion*.

A popular technique for determining these weights is Gradient Descent where we can iteratively refine our weights through a process of tweaks intended to minimize the errors of each prediction. We can accomplish this by using a convex function and then finding the local minimum.

![Gradient Descent](https://miro.medium.com/max/600/1*iNPHcCxIvcm7RwkRaMTx1g.jpeg)

*Source: [Saugat Bhattarai](https://saugatbhattarai.com.np/)*

---

## Getting Started

Here is a simple model representation. This is over-engineered for most of this assignment but will start to shine with the Iris dataset.

In [None]:
class Model:
  def __init__(self, dimensions, predictFn):
    self.dimensions = dimensions
    self.data = [uniform(-0.1, 0.1) for i in range(dimensions)] # initialize random weights close to zero
    self.predictFn = predictFn

  def __getitem__(self, arg):
    return self.data[arg]

  def __setitem__(self, arg, value):
    self.data[arg] = value

  def __str__(self):
    return str(self.data)

  def __iter__(self):
    self.current = -1
    return self

  def __next__(self):
    self.current += 1
    if self.current < self.dimensions:
      return self.data[self.current]
    raise StopIteration

  def predict(self, input):
    return self.predictFn(self.data, input)

I'm going with the factory pattern because very little changes between linear and logistical regression and this leads to the most re-usability.

As follows is one way of implementing gradient descent:

In [None]:
%matplotlib inline
%load_ext google.colab.data_table

# All imports here to make it easier to run each problem piecemeal
import math
import matplotlib.pyplot as plt
from random import uniform
from google.colab import data_table

def calculateModelDeltasFactory(convexDerivativeFn):
  """Generate a model refinement function given the derivative of the convex function for determining the direction the weights need to move to get closer to local minimum

  (r: float, p: float) -> float
  """
  def result(outputs, predicted, inputs, dimensions):
    dataPoints = len(predicted)

    delta = []
    for i in range(dimensions):
      d = 1/dataPoints * sum([convexDerivativeFn(r, p) * x[i] for r, p, x in zip(outputs, predicted, inputs)])
      delta.append(d)
    
    return delta

  return result

def problemSolverFactory(
    predictFn, # receives the model and inputs
    convexFn, # Error function
    convexDerivativeFn, # derivative of error function
    learningRate = 0.01, # This value is multiplied by the weight deltas during refinement. Too big and the model doesn't converge, too small and it takes forever
    learningThreshhold = 0.001 # When the RATE of error loss becomes less than this value the model is returned
):
  calculateModelDeltas = calculateModelDeltasFactory(convexDerivativeFn)

  def result(
      inputs, # [ (x0, x1, ...), ... ]
      outputs # [ output0, output1, ... ]
  ):
    dimensions = len(inputs[0])
    model = Model(dimensions, predictFn)

    last_error = 0
    error = float('inf')
    iterations = 0

    while last_error - error > learningThreshhold or math.isinf(error):
      last_error = error;

      predictions = [model.predict(i) for i in inputs] # one predicted value for each input of training data
      error = sum([ convexFn(r, p) for r, p in zip(outputs, predictions) ]) # ONE value encompassing the entirety of our model's error
      deltas = calculateModelDeltas(outputs, predictions, inputs, dimensions) # one derivative value per weight of our model
      for i in range(dimensions):
        model[i] = model[i] - learningRate * deltas[i]  # w0 = w0 - learningRate * Δw0, etc...

      iterations += 1

    if error > last_error: # Error was increasing
      raise Exception('Model not converging')

    return {
        'model': model,
        'iterations': iterations
    }

  return result


We now have enough boiler plate to dive into each problem set.

---

## Linear Regression

The heart of this process is the linear sum:

$\sum_{i=0}^{d} w_{i} * x_{i}$

In [None]:
def linearSum(model, inputs):
  """Accepts weights and inputs and multiplies each pair together and returns the sum

  The dimensions of model and inputs must be identical
  """
  return sum([w*x for w,x in zip(model, inputs)]) # w0*x0 + w1*x1 +...

linearSolver = problemSolverFactory(
    linearSum,
    lambda r, p: (r - p)**2, # squared error
    lambda r, p: -2 * (r - p)
)

We also want to plot visually how well our model turned out:

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

def plotLinear(inputs, outputs, model):
  x = [i[1] for i in inputs]
  predicted = [linearSum(model, i) for i in inputs]

  plt.plot(x, outputs, "g+")
  plt.plot(x, predicted, "b")
  plt.show()

#### Noiseless data

In [None]:
data = [((1, -2), 1), ((1, -1), 3), ((1, 0), 5), ((1, 1), 7), ((1, 2), 9), ((1, 3), 11), ((1, 4), 13), ((1, 5), 15), ((1, 6), 17), ((1, 7), 19), ((1, 8), 21), ((1, 9), 23), ((1, 10), 25)]
inputs, outputs = [list(i) for i in zip(*data)] # unzip data

solution = linearSolver(inputs, outputs)
print('Model = ', solution['model'])
print('Iterations = ', solution['iterations'])
plotLinear(inputs, outputs, solution['model'])

#### Noisy data

In [None]:
data = [((1, -2), 6.39), ((1, -1), 16.51), ((1, 0), -3.11), ((1, 1), 10.79), ((1, 2), 11.62), ((1, 3), 23.24), ((1, 4), 18.27), ((1, 5), 27.58), ((1, 6), 22.21), ((1, 7), 5.12), ((1, 8), 8.86), ((1, 9), 10.69), ((1, 10), 14.82)]
inputs, outputs = [list(i) for i in zip(*data)] # unzip data

solution = linearSolver(inputs, outputs)
print('Model = ', solution['model'])
print('Iterations = ', solution['iterations'])
plotLinear(inputs, outputs, solution['model'])

---

## Logistical Regression

Logistical regression is similar to linear regression but is used primarily for classification operations. This is accomplished by passing the linear sum into a sigmoid function to fix the output into a range from 0 to 1 non-inclusive. The closer the value is to 0 or 1 the more confident the algorithm is in it's prediction.

$sigmoid(x) = \frac{1}{1 + e^{-x}}$

In [None]:
def sigmoid(x):
  return 1 / (1 + math.e ** (-x))

logisticSolverSquaredError = problemSolverFactory(
    lambda model, inputs: sigmoid(linearSum(model, inputs)),
    lambda r, p: (r - p)**2, # squared error
    lambda r, p: -2 * (r - p)            *    p * (1 - p)
    #            ¯¯¯¯¯¯¯¯¯¯¯¯            *    ¯¯¯¯¯¯¯¯¯¯¯
    #    derivative of squared error     *    derivative of sigmoid
)

logisticSolverCrossEntropy = problemSolverFactory(
    lambda model, inputs: sigmoid(linearSum(model, inputs)),
    lambda r, p: -math.log(p) if r == 1 else -math.log(1 - p), # cross entropy
    lambda r, p: p - r,
    learningRate = 0.005
)

#### Plot Helpers

Just like with linear regression, we would also like to plot how our model is doing. Unlike our above linear regression problems though, we have more inputs and must do a bit more work to make it presentable.

In [None]:
def plotLogistic(
    inputs,
    outputs,
    model
):
  """Hardcoded for a 3 weight model"""
  x1Pos = []
  x2Pos = []
  x1Neg = []
  x2Neg = []
  x1Min = float('inf')
  x1Max = float('-inf')

  for i in range(len(outputs)):
    if outputs[i] == positive:
      x1Pos.append(inputs[i][1])
      x2Pos.append(inputs[i][2])
    else:
      x1Neg.append(inputs[i][1])
      x2Neg.append(inputs[i][2])

    x1Min = min(x1Min, inputs[i][1])
    x1Max = max(x1Max, inputs[i][1])

  x1Line = []
  i = x1Min
  while i < x1Max:
    x1Line.append(i)
    i += 1
  x1Line.append(i)

  plt.plot(x1Pos, x2Pos, 'go')
  if showNeg:
    plt.plot(x1Neg, x2Neg, 'ro')

  x2Line = []
  for i in x1Line:
    x2Line.append((-model[0] / model[2]) - (model[1] / model[2]) * i)

  plt.plot(x1Line, x2Line, 'b')
  if show:
    plt.show()



#### Training Data 1

In [None]:
data  = [((1, 0, 0), 1), ((1, 1, 7), 0), ((1, -3, -2), 0), ((1, 8, 9), 1), ((1, 4, 3), 1), ((1, 5, -2), 1), ((1, 0, 0), 1), ((1, 6, 9), 1), ((1, 4, 2), 1), ((1, 1, -9), 1), ((1, -7, 7), 0), ((1, 0, -1), 1), ((1, 9, -4), 1), ((1, 1, 0), 1), ((1, -2, -5), 1), ((1, 2, 3), 1), ((1, -7, 2), 0), ((1, -3, 0), 0), ((1, 5, 0), 1), ((1, 0, -3), 1), ((1, -2, 3), 0), ((1, 9, 6), 1), ((1, 0, -8), 1), ((1, 0, 2), 0), ((1, -8, 6), 0), ((1, 1, 9), 0), ((1, 0, 5), 0), ((1, -4, 9), 0), ((1, 8, 2), 1), ((1, 2, 6), 0)]
inputs, outputs = [list(i) for i in zip(*data)] # unzip data

##### Squared Loss

In [None]:
solution = logisticSolverSquaredError(inputs, outputs)
print('Model = ', solution['model'])
print('Iterations = ', solution['iterations'])
plotLogistic(inputs, outputs, solution['model'])

##### Cross Entropy

In [None]:
solution = logisticSolverCrossEntropy(inputs, outputs)
print('Model = ', solution['model'])
print('Iterations = ', solution['iterations'])
plotLogistic(inputs, outputs, solution['model'])

#### Training Data 2

In [None]:
data = [((1, 0, 0), 0), ((1, 1, 7), 0), ((1, -3, -2), 0), ((1, 8, 9), 1), ((1, 4, 3), 1), ((1, 5, -2), 1), ((1, 0, 0), 0), ((1, 6, 9), 1), ((1, 4, 2), 1), ((1, 1, -9), 1), ((1, -7, 7), 0), ((1, 0, -1), 1), ((1, 9, -4), 1), ((1, 1, 0), 1), ((1, -2, -5), 1), ((1, 2, 3), 1), ((1, -7, 2), 0), ((1, -3, 0), 0), ((1, 5, 0), 1), ((1, 0, -3), 1), ((1, -2, 3), 0), ((1, 9, 6), 1), ((1, 0, -8), 1), ((1, 0, 2), 1), ((1, -8, 6), 0), ((1, 1, 9), 0), ((1, 0, 5), 0), ((1, -4, 9), 0), ((1, 8, 2), 1), ((1, 2, 6), 0)]
inputs, outputs = [list(i) for i in zip(*data)] # unzip data

##### Squared Loss

In [None]:
solution = logisticSolverSquaredError(inputs, outputs)
print('Model = ', solution['model'])
print('Iterations = ', solution['iterations'])
plotLogistic(inputs, outputs, solution['model'])

##### Cross Entropy

In [None]:
solution = logisticSolverCrossEntropy(inputs, outputs)
print('Model = ', solution['model'])
print('Iterations = ', solution['iterations'])
plotLogistic(inputs, outputs, solution['model'])

#### Training Data 3

In [None]:
data = [((1, 2, 7), 0), ((1, 19, 72), 0), ((1, -39, -20), 0), ((1, 86, 95), 1), ((1, 45, 37), 1), ((1, 52, -23), 1), ((1, -8, -8), 0), ((1, 67, 93), 1), ((1, 46, 21), 1), ((1, 18, -97), 1), ((1, -78, 70), 0), ((1, -9, -17), 1), ((1, 94, -40), 1), ((1, 11, -3), 1), ((1, -24, -59), 1), ((1, 25, 33), 1), ((1, -71, 23), 0), ((1, -34, -7), 0), ((1, 55, -3), 1), ((1, -5, -33), 1), ((1, -22, 38), 0), ((1, 94, 66), 1), ((1, 4, -89), 1), ((1, -9, 26), 0), ((1, -83, 61), 0), ((1, 19, 98), 0), ((1, -1, 55), 0), ((1, -43, 90), 0), ((1, 86, 27), 1), ((1, 24, 69), 0)]
inputs, outputs = [list(i) for i in zip(*data)] # unzip data

##### Squared Loss

In [None]:
solution = logisticSolverSquaredError(inputs, outputs)
print('Model = ', solution['model'])
print('Iterations = ', solution['iterations'])
plotLogistic(inputs, outputs, solution['model'])

##### Cross Entropy

In [None]:
solution = logisticSolverCrossEntropy(inputs, outputs)
print('Model = ', solution['model'])
print('Iterations = ', solution['iterations'])
plotLogistic(inputs, outputs, solution['model'])

#### Training Data 4 (Iris dataset)

This dataset contains more than 2 classifications. This will require a model per classification.

Importing data from GitHub:

In [None]:
import pandas as pd
from enum import Enum

class IrisEnum(Enum):
  setosa = 0
  versicolor = 1
  virginica = 2

iris_data = pd.read_csv(
    'https://raw.githubusercontent.com/danny1461/CSCI-191T-Machine-Learning/main/iris.data.csv',
    names=['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'classification']
)

inputs = iris_data.iloc[:, 0:4].values.tolist()
outputs = [IrisEnum[n].value for n in iris_data['classification']]

Some helpers to handle the multi-model results:

In [None]:
def solveIris(solverFn):
  models = []

  for i in IrisEnum:
    classOutputs = [(1 if o == i.value else 0) for o in outputs]
    solution = solverFn(inputs, classOutputs)
    models.append(solution['model'])
    print('{} Model = {}'.format(i.name, solution['model']))
    print('    Iterations = ', solution['iterations'])

  plt.show()
  return models

def classifyIris(input, models):
  predictions = [model.predict(input) for model in models]
  bestGuess = max(predictions)
  bestGuessClass = predictions.index(bestGuess)

  return IrisEnum(bestGuessClass)

##### Squared Loss

In [None]:
models = solveIris(logisticSolverSquaredError)
iris_data['predicted'] = [ classifyIris(input, models).name for input in inputs ]
data_table.DataTable(iris_data, num_rows_per_page=10)

##### Cross Entropy

In [None]:
models = solveIris(logisticSolverCrossEntropy)
iris_data['predicted'] = [ classifyIris(input, models).name for input in inputs ]
data_table.DataTable(iris_data, num_rows_per_page=10)