In [46]:
%tensorflow_version 2.x  # this line is not required unless you are in google colab
# TensorFlow and tf.keras
import tensorflow as tf
from tensorflow import keras
import pandas as pd
from sklearn.utils import shuffle

# Helper libraries
import numpy as np
import matplotlib.pyplot as plt

`%tensorflow_version` only switches the major version: 1.x or 2.x.
You set: `2.x  # this line is not required unless you are in google colab`. This will be interpreted as: `2.x`.


TensorFlow is already loaded. Please restart the runtime to change versions.


In [47]:
# Formatting
df=pd.read_csv('Wine.csv',header=None)
df.columns = [  'name'
                 ,'alcohol'
             	,'malicAcid'
             	,'ash'
            	,'ashalcalinity'
             	,'magnesium'
            	,'totalPhenols'
             	,'flavanoids'
             	,'nonFlavanoidPhenols'
             	,'proanthocyanins'
            	,'colorIntensity'
             	,'hue'
             	,'od280_od315'
             	,'proline'
                ]
X= df.drop(['name','ash'], axis=1)

KeyError: ignored

In [14]:
X.head(2)

Unnamed: 0,alcohol,malicAcid,ashalcalinity,magnesium,totalPhenols,flavanoids,nonFlavanoidPhenols,proanthocyanins,colorIntensity,hue,od280_od315,proline
0,14.23,1.71,15.6,127,2.8,3.06,0.28,2.29,5.64,1.04,3.92,1065
1,13.2,1.78,11.2,100,2.65,2.76,0.26,1.28,4.38,1.05,3.4,1050


In [15]:
Y=df.iloc[:,:1]
Y.head(2)

Unnamed: 0,name
0,1
1,1


In [16]:
from sklearn.model_selection import train_test_split
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.3, random_state=0)

print(X_train.shape)
print(X_test.shape)

(124, 12)
(54, 12)


In [37]:
#VECTORIZED VERSION
@np.vectorize
def sigmoid(z):
    if z >= 0:
      return 1.0 / (1.0 + np.e**(-z))
    else:
      return np.e**(z) / (1 + np.e**(z))

@np.vectorize
def sigmoidPrime(z):
    # Derivative of Sigmoid Function
      return sigmoid(z) * (1-sigmoid(z))

class Model3:
    def __init__(self, model_structure=[2, 3, 2], modelActivationFunctions=["sigmoid", "sigmoid"], sig=None, sigPrime=None):
        self.model_structure = model_structure
        self.modelActivationFunctions = modelActivationFunctions
        self.modelWidth = len(model_structure)

        self.sigmoid = sig
        self.sigmoidPrime = sigPrime

        # Safety Check to make sure model structure is legitimate
        if self.modelWidth<3 or self.modelWidth-1!=len(self.modelActivationFunctions):
            print("Model Structure Error!")
            exit(1)  

        # Weights (Parameters) - Randomly Assigned
        self.weights = list()
        self.weights.append(np.random.randn(self.model_structure[1],self.model_structure[0])) # Input Layer Weights
        for i in range(1, self.modelWidth-1):
            self.weights.append(np.random.randn(self.model_structure[i+1], self.model_structure[i]))

        # Biases - Randomly Assigned
        self.biases = list()
        for i in range(1, self.modelWidth):
            self.biases.append(np.random.randn(self.model_structure[i], 1))  # Length should be number of columns of X
        '''self.weights = [np.array(([[0.15, 0.3], [0.2, 0.35], [0.25, 0.4]]), dtype=float), np.array(([[0.5, 0.6, 0.7], [0.55, 0.65, 0.75]]), dtype=float)]
        self.biases = [[[0.45], [0.45], [0.45]], [[0.8], [0.8]]]'''
    
    def softmax(self, x):
        return np.exp(x) / np.sum(np.exp(x), axis=0)
    '''
    def softmax(z):
        z -= np.max(z)
        sm = (np.exp(z).T / np.sum(np.exp(z), axis=0)).T
        return sm
    '''
    def softmaxPrime(self, z):
        z -= np.max(z)
        sm = (np.exp(z).T / np.sum(np.exp(z), axis=0)).T
        s = sm.reshape(-1,1)# Reshape the 1-d softmax to 2-d so that np.dot will do the matrix multiplication
        return np.diagflat(s) - np.dot(s, s.T)

    def relu(self, z):
        return np.maximum(0, z)
    
    def reluPrime(self, x):
        x[x<=0] = 0
        x[x>0] = 1
        return x
    
    def applyActivationFunction(self, values, activation):
        if activation=='sigmoid':
            return self.sigmoid(values)
        elif activation=='softmax':
            return self.softmax(values)
        elif activation=='relu':
            return self.relu(values)
        else:
            print("Unknown Activation Function! Got:", activation)
            exit(1)
    
    def applyActivationFunctionPrime(self, values, activation):
        if activation=='sigmoid':
            return self.sigmoidPrime(values)
        elif activation=='softmax':
            return self.softmaxPrime(values)
        elif activation=='relu':
            return self.reluPrime(values)
        else:
            print("Unknown Activation Function! Got:", activation)
            exit(1)

    def forward(self, X):
        # Propogate inputs through networks
        self.aValues = list()
        self.zValues = list()
        self.aValues.append(X) # First a value = input values
        for i in range(0, len(self.weights)):
            self.zValues.append(np.dot(self.weights[i], self.aValues[i]) + self.biases[i])
            self.aValues.append(self.applyActivationFunction(self.zValues[i], self.modelActivationFunctions[i]))
        yHat = self.aValues[-1]
        return yHat

    def costFunction(self, X, y):
        # Compute cost using the weights already stored
        self.yHat = self.forward(X)
        J = 0.5*sum((y-self.yHat)**2)
        return J
    
    def costFunctionPrime(self, X, y):
        # Computes partial derivatives of Cost function with respect to weights & biases
        self.yHat = self.forward(X)
        
        weightDerivatives = list() # Derivative of Cost function with respect to weights
        biasDerivatives = list() # Derivative of Cost function with respect to biases

        # Last Layer derivatives
        delta = np.multiply(-(y-self.aValues[-1]), self.sigmoidPrime(self.zValues[-1]))
        weightDerivatives.insert(0, np.dot(delta, self.aValues[-2].T))
        biasDerivatives.insert(0, delta.sum(axis=1).reshape(delta.shape[0],1)) 
        
        # Derivatives for the other layers (L-1, L-2, ...)
        for i in range(self.modelWidth-2, 0, -1):
            delta = np.multiply(np.dot(self.weights[i].T, delta), self.applyActivationFunctionPrime(self.zValues[i-1], self.modelActivationFunctions[i])) 
            weightDerivatives.insert(0, np.dot(delta, self.aValues[i-1].T))
            biasDerivatives.insert(0, delta.sum(axis=1).reshape(delta.shape[0],1))

        return weightDerivatives, biasDerivatives
    
    def tuneParams(self, X, y, learning_rate=0.5, getLoss=False):
        # Get Derivatives of Weights & Biases, and then adjust weights/biases with learning rate*derivatives
        # getLoss: if you want to calculate loss (before adjusting weights)
        self.weightDerivatives, self.biasDerivatives = self.costFunctionPrime(X, y)
        
        loss = None
        if getLoss:
           loss = ((y-self.yHat)**2)/2
           # print(X.shape)
           loss = loss.sum()/X.shape[1]

        scalar = learning_rate # learning rate divided by number of samples
        for i in range(0, len(self.weights)):
            self.weights[i] = self.weights[i] - (scalar*self.weightDerivatives[i] / X.shape[1])
            self.biases[i] = self.biases[i] - (scalar*self.biasDerivatives[i] / X.shape[1])
        
        return loss
    
    def one_hot_encoder(self, expected):
        # formats label into array of zeros except for the index of the correct prediction
        #expected = expected.to_numpy()
        expected = expected.reshape(expected.shape[0],)
        b = np.zeros((expected.size, int(expected.max()+1)))
        b[np.arange(expected.size), expected] = 1
        return b.T

    def fit(self, X, y, TestDataX=None, TestDatay=None, learning_rate=0.5, epochs=10, batch_size=32, flatten=False, stagger=True):
        # X: Training Data
        # y: Labels for Training Data
        # TestingDataX: Testing Data (if staggering)
        # TestingDatay: Labels for Testing Data (if staggering)
        # learning_rate: rate at which to update the weights with their derivatives
        # epochs: number of "runs" to execute. Each run is going through the entire dataset once
        # batch_size: number of samples to go through before tuning parameters
        # Flatten: if each sample of the test data needs to be flatten (like if each sample is an image)
        # Stagger: if to test model after every epoch

        if X.shape[0] % batch_size != 0:
          print("# of training samples isn't divisible by entered batch size!")
          exit(1)
        
        if stagger and ((TestDataX is None) or (TestDatay is None)):
          print("You can't stagger when you don't pass in testing data and/or it's labels!")
          exit(1)

        if flatten:
          inputNumber = 1
          #print(X.shape)
          for i in range(1, len(X.shape)):
            inputNumber*=X.shape[i]
          X = X.reshape(X.shape[0], inputNumber)
          #print(X.shape)
          TestDataX = TestDataX.reshape(TestDataX.shape[0], inputNumber)
        
        y_formatted = self.one_hot_encoder(TestDatay)
        loss_list_average = list() # Keeps all the average losses from each epoch
        train_accuracies = list() # Keeps all the training accuracies from each epoch (which is apparently: (1-averageLoss))
        test_accuracies = list() # Keeps all the testing accuracies from each epoch
        for i in range(1, epochs+1):
          print("Epoch:", i)
          loss_list = list() # This is for keeping all the losses from each batch in the epoch; used to calculate lost_list_average
          for j in range(1, int(X.shape[0]/batch_size)+1): # Going through each batch
            if j % int((int(X.shape[0]/batch_size))) == 0:
              print("Iteration:", j)
            # Slice appropriate batch sizes
            X_batch = X[(j-1)*batch_size : j*batch_size].T #'''Why do we need to transpose here???'''
            # print(y_formatted.shape)
            y_batch_formatted = y_formatted[:, (j-1)*batch_size : j*batch_size]
            # print(y_batch_formatted.shape)
            # Training & Getting Loss
            loss = self.tuneParams(X_batch, y_batch_formatted, learning_rate, getLoss=True)
            # print("Loss:", loss)
            loss_list.append(loss)
          # print(model.weights)
          loss_list_average.append((sum(loss_list)/len(loss_list)))
          train_accuracies.append(1 - loss_list_average[i-1])
          print("Average loss across batches:", loss_list_average[i-1])
          print("Training Accuracy:", train_accuracies[i-1])

          if stagger:
            outputs = self.forward(TestDataX.T)
            # print(outputs.shape)
            predictions = list()
            outputs = outputs.T
            for output in outputs: # Going through output for each sample to format prediction
              #print(output)
              predictions.append(np.argmax(output))
            numberOfCorrect = 0
            # print(len(predictions))
            # print(TestDatay.shape)
            for j in range(0, len(TestDatay)): # Calculating the accruacy
              # print(str(j)+"|", "Prediction:", predictions[j], "Actual:", TestDatay[j])
              if TestDatay[j] == predictions[j]:
                numberOfCorrect+=1
            print("Number of Correct:", numberOfCorrect)
            print("Total Number:", len(TestDatay))
            testAccuracy = numberOfCorrect/len(TestDatay)
            print("Testing Accuracy:", testAccuracy)
            test_accuracies.append(testAccuracy)
          print()
          
          # Shuffling
          # print(X.shape)
          # print(y_formatted.shape)
          X_shuffled, y_formatted_shuffled = shuffle(X, y_formatted.T, random_state=0)
          X = X_shuffled
          y_formatted = y_formatted_shuffled.T

        return loss_list_average, train_accuracies, test_accuracies
        

In [41]:
model = Model3(model_structure=[12, 144, 2], modelActivationFunctions=['relu', 'sigmoid']) # Another test

In [48]:
# Training the model

X_train, X_test, Y_train, Y_test

model.fit(X_train.to_numpy(), Y_train.to_numpy(), X_test.to_numpy(), Y_test.to_numpy(), learning_rate=0.1, epochs=10, batch_size=31, flatten=False, stagger=True)

Epoch: 1


TypeError: ignored