In [None]:
from typing import List
import numpy as np
import collections
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.cm as cm

class SuperClassifier:
  #Our MultiLayer Perceptron classifier class. Represents a neural network with a feedforward classifier

  #Constructor class, where we set our hyperparameters (learning rate and number of epoches)
  #We also set up the layers, weights, biases and loss
  def __init__(self, learningRate : float, epoches: int):
    self.learningRate = learningRate
    self.epoches = epoches
    self.layers = [3,5,1] #3 input layers, 5 hidden layers and 1 output layer

    #Set weights, biases and losses to empty arrays that we'll populate later
    self.weights = []
    self.biases = []
    self.loss = []

  #LOSS AND SIGMOID FUNCTIONS

  def ourLossFun(self, y_trueVals : np.array, y_predVals : np.array) -> np.array:
    #Our loss function, takes a numPy array of the true values, a numPy array of the predicted values, then returns a numPy array
    #Current loss function: Cross-Entropy
    #TODO: Gabe put in another loss function also remove this comment
    return (y_trueVals * -1) * np.log2(y_predVals) - (1 - y_trueVals) * np.log2(1 - y_predVals)

  def sigmoid(self, dotProd: np.array) -> np.array:
    #Our function to calculate the sigmoid activation
    #Takes the dot product of weights and inputs, with the bias added on (similar to what we did in the labs)
    #The output is our activation value

    #np.exp gets the exponential of each array element. For each element x we do x^e, where e is Euler's number
    return 1/(1+np.exp(-dotProd))

  #DERIVATIONS OF LOSS AND SIGMOID FUNCTIONS

  def deriveLossFun(self, y_trueVals : np.array, y_predVals : np.array) -> np.array:
    #The derivation of cross-entropy (not sure if necessary)
    #This is the result of the same derivation from our lecture slides!

    return -(1/np.log(2)) * ( (y_trueVals/y_predVals) - ( (1-y_trueVals) / (1-y_predVals) ) )

  def deriveSigmoid(self, dotProd: np.array) -> np.array:
    #The derivation of our sigmoid function
    #dotProd is the same input as our normal sigmoid function

    return self.sigmoid(dotProd)*(1- self.sigmoid(dotProd))

  #FORWARD PASS, BACK PROPOGATION AND WEIGHT UPDATING

  def forwardPass(self, X: np.array):# -> Tuple[List[np.array], List[np.array]]: Not hard-setting the return value as Python is throwing a warning for Tuples

    #Our forward pass function, similar to what we've seen in lectures
    #Input is an array with predicted values
    #We output a list of activations and a list of derivations of each activation for every layer

    #Input layer!
    inputToLayer = np.copy(X) #Copy the predicted values to a new variable
    activations = [inputToLayer] #Put this into a regular array
    derivatives = [np.zeros(X.shape)] #Creates a new numPy array the dimensions of X, but every element is 0. This is put in a regular array.

    #Hidden layer!
    dotProd = np.matmul(self.weights[1], inputToLayer) + self.biases[1] #Gets the dot product and turns it into a sigmoid, similar to our labs
    inputToLayer = self.sigmoid(dotProd) #Get a sigmoid of this
    activations.append(inputToLayer) #Add this to our activations
    derivatives.append(self.deriveSigmoid(dotProd)) #Add the derivative to our derivatives (TODO: Learn how to write better comments)

    return(activations, derivatives)

  def backpropogation(self, activations: List[np.array], derivatives : List[np.array], y: np.array):
    #This function will pass drivatives with losses back through the network

    #Get our accumulative loss for this layer and add it to the global array for passing back
    self.loss.append( (1/y.shape[1])*np.sum( self.ourLossFun(y, activations[-1]) ) )

    #Output layer derivations
    dl_dy2 = self.deriveLossFun(y, activations[2])
    dl_dz2 = np.multiply(dl_dy2, derivatives[2])
    dl_dw2 = ( 1/y.shape[1] )*np.matmul(dl_dz2, activations[1].T)
    dl_db2 = ( 1/y.shape[1] )*np.sum(dl_dz2, axis = 1)

    #Hidden layer derivations
    dl_dy1 = np.matmul(self.weights[1].T, dl_dz2)
    dl_dz1 = np.multiply(dl_dy1, derivatives[1])
    dl_dw1 = ( 1/y.shape[1] )*np.matmul(dl_dz1, activations[0].T)
    dl_db1 = ( 1/y.shape[1] )*np.sum(dl_dz1, axis = 1)

    #Return the loss derivatives for the weights and biases
    return([dl_dw1, dl_dw2], [dl_db1, dl_db2])

  def updateWeights(self, derivedWeights: List[np.array], derivedBias: List[np.array]):
    #Update the weights using our derived losses and the MLP learning rule

    self.weights[0] -= self.learningRate*derivedWeights[0]
    self.weights[1] -= self.learningRate*derivedWeights[1]
    self.biases[0] -= self.learningRate*derivedBias[0].reshape(-1, 1)
    self.biases[1] -= self.learningRate*derivedBias[1].reshape(-1, 1)

  def trainIt(self, X: np.array, y:np.array):
    #This function, usually called 'fit' in these types of programs, trains the model

    #X is an array of predicted values and their shape
    #y is an array of target values and their shape

    #Initialise the weights, biases and losses by resetting them
    self.weights.clear()
    self.biases.clear()
    self.loss.clear()

    #Randomize the elements in the arrays
    for index in range(len(self.layers)-1):
      self.weights.append( np.random.randn( self.layers[index+1], self.layers[index] ) * 0.1 )
      self.biases.append( np.random.randn( self.layers[index+1], 1 ) * 0.1 )

    #Now adjust the weights for each epoch
    for unused in range(self.epoches):
      #Run the forward pass
      activations, derivations = self.forwardPass(X.T)
      #Run the backpropogation
      deriveWeight, deriveBias = self.backpropogation(activations, derivations, y.T)
      self.updateWeights(deriveWeight, deriveBias)

  def predictFun(self, X: np.array) -> np.array:
    #Get the predicted values using a trained MLP class as a NumPy array

    activations, _ = self.forwardPass(X.T) # We only want the activations here, _ lets us put the derivatives into a null variable, essentially deleting them
    return np.rint(activations[2]).reshape(-1)


In [None]:
#data_loader.py

import pandas as pd
import pickle as cPickle
import numpy as np


def load_data():
    #open file
    f = open('./data/data_cleaned.csv', 'r')
    #split data
    training_data, validation_data, test_data, garbage_data = cPickle.load(f, encoding='latin')
    #close file
    f.close()

    #make one of the subsets noisy for fun
    df = pd.DataFrame(garbage_data)
    noise = np.random.normal(0,0.4, garbage_data.shape)
    garbage_data = garbage_data + noise

    #return as list
    return list((training_data, validation_data, test_data, garbage_data))

In [None]:
#data_cleaner.py

import pandas as pd
from sklearn.preprocessing import LabelEncoder
from pandas.plotting import scatter_matrix

#load dataset
data = pd.read_csv('data/data.csv')
scatter_matrix(data, alpha=0.5, figsize=(20, 20)) #Setting up a scatter plot of the initial data

#clean data
data.drop(columns=['VIN (1-10)'], inplace=True)
data['County'] = data['County'].fillna(data['County'].mode()[0])
data['City'] = data['City'].fillna(data['City'].mode()[0])
data.dropna(subset=['Postal Code'], inplace=True)
data['Model'] = data['Model'].fillna(data['Model'].mode()[0])
data['Electric Range'] = data['Electric Range'].fillna(data['Electric Range'].median())
data['Legislative District'] = data['Legislative District'].fillna(data['Legislative District'].mode()[0])
data['Electric Utility'] = data['Electric Utility'].fillna(data['Electric Utility'].mode()[0])
data['2020 Census Tract'] = data['2020 Census Tract'].fillna(data['2020 Census Tract'].mode()[0])
data.drop(columns=['2020 Census Tract'], inplace=True)
data.drop(columns=['DOL Vehicle ID'], inplace=True)

#converting categorical data to numeric data
label_encoder = LabelEncoder()
data['E.V_Type'] = label_encoder.fit_transform(data['E.V_Type'])
data['County'] = label_encoder.fit_transform(data['County'])
data['City'] = label_encoder.fit_transform(data['City'])
data['State'] = label_encoder.fit_transform(data['State'])
data['Make'] = label_encoder.fit_transform(data['Make'])
data['Model'] = label_encoder.fit_transform(data['Model'])
data['CAFV'] = label_encoder.fit_transform(data['CAFV'])
data['Vehicle Location'] = label_encoder.fit_transform(data['Vehicle Location'])
data['Electric Utility'] = label_encoder.fit_transform(data['Electric Utility'])
data['County'] = label_encoder.fit_transform(data['County'])


#export cleaned data
data.to_csv('data/data_cleaned.csv', index = False)

In [None]:
#We plot our loss with this function. The inputs are our hyperparameters, letting us modify them easily and check for overfitting
def plotFunction(lr : float, ec : int):

  #lr is the Learning Rate
  #ec is the number of Epoches

  dataList = load_data()
  X_train, y_train, X_test, y_test = dataList[0], dataList[1], dataList[2], dataList[3]

  model = SuperClassifier(learningRate=lr, epoches=ec)
  model.trainIt(X_train, y_train)

  #Now to plot the loss...
  plt.plot(model.loss)
  plt.title("Training Loss")
  plt.xlabel("epochs = ", ec)
  plt.ylabel("loss = ", lr)
  plt.show()

plotFunction(0.4, 50) #Standard hyperparameters
plotFunction(0.1, 50) #Very small learning rate
plotFunction(0.95, 70) #Very large learning rate and more epochs
plotFunction(0.4, 200) #Normal learning rate, large number of epochs