In [26]:
import matplotlib.pyplot as plt
import pandas

In [27]:
%%writefile optimizers.py
import numpy as np

######################################################################
## class Optimizers()
######################################################################

class Optimizers():

    def __init__(self, all_weights):
        '''all_weights is a vector of all of a neural networks weights concatenated into a one-dimensional vector'''
        
        self.all_weights = all_weights

        # The following initializations are only used by adam.
        # Only initializing m, v, beta1t and beta2t here allows multiple calls to adam to handle training
        # with multiple subsets (batches) of training data.
        self.mt = np.zeros_like(all_weights)
        self.vt = np.zeros_like(all_weights)
        self.beta1 = 0.9
        self.beta2 = 0.999
        self.beta1t = 1
        self.beta2t = 1

        
    def sgd(self, error_f, gradient_f, fargs=[], n_epochs=100, learning_rate=0.001, verbose=True, error_convert_f=None):
        '''
error_f: function that requires X and T as arguments (given in fargs) and returns mean squared error.
gradient_f: function that requires X and T as arguments (in fargs) and returns gradient of mean squared error
            with respect to each weight.
error_convert_f: function that converts the standardized error from error_f to original T units.
        '''

        error_trace = []
        epochs_per_print = n_epochs // 10

        for epoch in range(n_epochs):

            error = error_f(*fargs)
            grad = gradient_f(*fargs)

            # Update all weights using -= to modify their values in-place.
            self.all_weights -= learning_rate * grad

            if error_convert_f:
                error = error_convert_f(error)
            error_trace.append(error)

            if verbose and ((epoch + 1) % max(1, epochs_per_print) == 0):
                print(f'sgd: Epoch {epoch+1:d} Error={error:.5f}')

        return error_trace

    def adam(self, error_f, gradient_f, fargs=[], n_epochs=100, learning_rate=0.001, verbose=True, error_convert_f=None):
        '''
error_f: function that requires X and T as arguments (given in fargs) and returns mean squared error.
gradient_f: function that requires X and T as arguments (in fargs) and returns gradient of mean squared error
            with respect to each weight.
error_convert_f: function that converts the standardized error from error_f to original T units.
        '''

        alpha = learning_rate  # learning rate called alpha in original paper on adam
        epsilon = 1e-8
        error_trace = []
        epochs_per_print = n_epochs // 10

        for epoch in range(n_epochs):
            
            #print("aaaaaaa")

            error = error_f(*fargs)
            grad = gradient_f(*fargs)

            self.mt[:] = self.beta1 * self.mt + (1 - self.beta1) * grad
            self.vt[:] = self.beta2 * self.vt + (1 - self.beta2) * grad * grad
            self.beta1t *= self.beta1
            self.beta2t *= self.beta2

            m_hat = self.mt / (1 - self.beta1t)
            v_hat = self.vt / (1 - self.beta2t)

            # Update all weights using -= to modify their values in-place.
            self.all_weights -= alpha * m_hat / (np.sqrt(v_hat) + epsilon)
    
            if error_convert_f:
                error = error_convert_f(error)
            error_trace.append(error)

            if verbose and ((epoch + 1) % max(1, epochs_per_print) == 0):
                print(f'Adam: Epoch {epoch+1:d} Error={error:.5f}')

        return error_trace

if __name__ == '__main__':

    import matplotlib.pyplot as plt
    plt.ion()

    def parabola(wmin):
        return ((w - wmin) ** 2)[0]

    def parabola_gradient(wmin):
        return 2 * (w - wmin)

    w = np.array([0.0])
    optimizer = Optimizers(w)

    wmin = 5
    optimizer.sgd(parabola, parabola_gradient, [wmin],
                  n_epochs=500, learning_rate=0.1)

    print(f'sgd: Minimum of parabola is at {wmin}. Value found is {w}')

    w = np.array([0.0])
    optimizer = Optimizers(w)
    optimizer.adam(parabola, parabola_gradient, [wmin],
                   n_epochs=500, learning_rate=0.1)
    
    print(f'adam: Minimum of parabola is at {wmin}. Value found is {w}')

Overwriting optimizers.py


In [44]:
import numpy as np
import optimizers
import sys  # for sys.float_info.epsilon

######################################################################
## class NeuralNetwork()
######################################################################

class NeuralNetwork():


    def __init__(self, n_inputs, n_hiddens_per_layer, n_outputs, activation_function='tanh'):
        self.n_inputs = n_inputs
        self.n_outputs = n_outputs
        self.activation_function = activation_function
        
        self.classes = np.arange(10)
        (Ttrain == classes).shape

        # Set self.n_hiddens_per_layer to [] if argument is 0, [], or [0]
        if n_hiddens_per_layer == 0 or n_hiddens_per_layer == [] or n_hiddens_per_layer == [0]:
            self.n_hiddens_per_layer = []
        else:
            self.n_hiddens_per_layer = n_hiddens_per_layer

        # Initialize weights, by first building list of all weight matrix shapes.
        n_in = n_inputs
        shapes = []
        for nh in self.n_hiddens_per_layer:
            shapes.append((n_in + 1, nh))
            n_in = nh
        shapes.append((n_in + 1, n_outputs))

        # self.all_weights:  vector of all weights
        # self.Ws: list of weight matrices by layer
        self.all_weights, self.Ws = self.make_weights_and_views(shapes)

        # Define arrays to hold gradient values.
        # One array for each W array with same shape.
        self.all_gradients, self.dE_dWs = self.make_weights_and_views(shapes)

        self.trained = False
        self.total_epochs = 0
        self.error_trace = []
        self.Xmeans = None
        self.Xstds = None
        self.Tmeans = None
        self.Tstds = None


    def make_weights_and_views(self, shapes):
        # vector of all weights built by horizontally stacking flatenned matrices
        # for each layer initialized with uniformly-distributed values.
        all_weights = np.hstack([np.random.uniform(size=shape).flat / np.sqrt(shape[0])
                                 for shape in shapes])
        # Build list of views by reshaping corresponding elements from vector of all weights
        # into correct shape for each layer.
        views = []
        start = 0
        for shape in shapes:
            size =shape[0] * shape[1]
            views.append(all_weights[start:start + size].reshape(shape))
            start += size
        return all_weights, views


    # Return string that shows how the constructor was called
    def __repr__(self):
        return f'{type(self).__name__}({self.n_inputs}, {self.n_hiddens_per_layer}, {self.n_outputs}, \'{self.activation_function}\')'


    # Return string that is more informative to the user about the state of this neural network.
    def __str__(self):
        result = self.__repr__()
        if len(self.error_trace) > 0:
            return self.__repr__() + f' trained for {len(self.error_trace)} epochs, final training error {self.error_trace[-1]:.4f}'


    def train(self, X, T, n_epochs, learning_rate, method='adam', verbose=True):
        '''
train: 
  X: n_samples x n_inputs matrix of input samples, one per row
  T: n_samples x n_outputs matrix of target output values, one sample per row
  n_epochs: number of passes to take through all samples updating weights each pass
  learning_rate: factor controlling the step size of each update
  method: is either 'sgd' or 'adam'
        '''

        # Setup standardization parameters
        if self.Xmeans is None:
            self.Xmeans = X.mean(axis=0)
            self.Xstds = X.std(axis=0)
            self.Xstds[self.Xstds == 0] = 1  # So we don't divide by zero when standardizing
            self.Tmeans = T.mean(axis=0)
            self.Tstds = T.std(axis=0)
            
        # Standardize X and T
        X = (X - self.Xmeans) / self.Xstds
        T = (T - self.Tmeans) / self.Tstds

        # Instantiate Optimizers object by giving it vector of all weights
        optimizer = optimizers.Optimizers(self.all_weights)

        # Define function to convert value from error_f into error in original T units, 
        # but only if the network has a single output. Multiplying by self.Tstds for 
        # multiple outputs does not correctly unstandardize the error.
        if len(self.Tstds) == 1:
            #error_convert_f = lambda err: np.sqrt(err)#[0]
            error_convert_f = lambda err: (np.sqrt(err) * self.Tstds)[0]# to scalar
            #print("hi")
        else:
            #print("hi2")
            error_convert_f = lambda err: np.sqrt(err)[0] # to scalar
            

        if method == 'sgd':

            error_trace = optimizer.sgd(self.error_f, self.gradient_f,
                                        fargs=[X, T], n_epochs=n_epochs,
                                        learning_rate=learning_rate,
                                        verbose=True,
                                        error_convert_f=error_convert_f)

        elif method == 'adam':

            error_trace = optimizer.adam(self.error_f, self.gradient_f,
                                         fargs=[X, T], n_epochs=n_epochs,
                                         learning_rate=learning_rate,
                                         verbose=True,
                                         error_convert_f=error_convert_f)

        else:
            raise Exception("method must be 'sgd' or 'adam'")
        
        self.error_trace = error_trace

        # Return neural network object to allow applying other methods after training.
        #  Example:    Y = nnet.train(X, T, 100, 0.01).use(X)
        return self
     
        
    def relu(self, s):
        s[s < 0] = 0
        return s

    def grad_relu(self, s):
        return (s > 0).astype(int)
    
    
    
    def forward_pass(self, X):
        
        #print("bv")
        '''X assumed already standardized. Output returned as standardized.'''
        self.Ys = [X]
        for W in self.Ws[:-1]:
            if self.activation_function == 'relu':
                self.Ys.append(self.relu(self.Ys[-1] @ W[1:, :] + W[0:1, :]))
            else:
                self.Ys.append(np.tanh(self.Ys[-1] @ W[1:, :] + W[0:1, :]))
        last_W = self.Ws[-1]
        self.Ys.append(self.Ys[-1] @ last_W[1:, :] + last_W[0:1, :])
        Y1=np.array(self.Ys[-1])
        #print(Y1.shape)
        #1/0
        return self.Ys

    # Function to be minimized by optimizer method, mean squared error
    def error_f(self, X, T):
        Ys = self.forward_pass(X)
        Ys=np.array(Ys[-1])
        #print(Ys.shape)
        mean_sq_error = np.mean((T - Ys) ** 2)
        return mean_sq_error

    # Gradient of function to be minimized for use by optimizer method
    def gradient_f(self, X, T):
        '''Assumes forward_pass just called with layer outputs in self.Ys.'''
        error = T - self.Ys[-1]
        n_samples = X.shape[0]
        n_outputs = T.shape[1]
        delta = - error / (n_samples * n_outputs)
        #print("hi")
        #print(delta.shape)
        
        
        n_layers = len(self.n_hiddens_per_layer) + 1
        #print(n_layers)
        # Step backwards through the layers to back-propagate the error (delta)
        for layeri in range(n_layers - 1, -1, -1):
            # gradient of all but bias weights
            #print("hi2")
            #print(self.dE_dWs[layeri][1:, :].shape)
            #print(self.Ys[layeri].shape)
            #1/0
            self.dE_dWs[layeri][1:, :] = self.Ys[layeri].T @ delta
            # gradient of just the bias weights
            self.dE_dWs[layeri][0:1, :] = np.sum(delta, 0)
            # Back-propagate this layer's delta to previous layer
            if self.activation_function == 'relu':
                delta = delta @ self.Ws[layeri][1:, :].T * self.grad_relu(self.Ys[layeri])
            else:
                delta = delta @ self.Ws[layeri][1:, :].T * (1 - self.Ys[layeri] ** 2)
                #print("ff")
        return self.all_gradients

   

In [45]:
import optimizers
import sys
import numpy as np
class NeuralNetworkClassifier(NeuralNetwork):
  
    
    def train(self, X, T, n_epochs, learning_rate, method='adam', verbose=True):
        
        if self.Xmeans is None:
            
            self.Xmeans = X.mean(axis=0)
            self.Xstds = X.std(axis=0)
            self.Xstds[self.Xstds == 0] = 1  # So we don't divide by zero when standardizing
            self.Tstds=T.std(axis=0)
            
        
        X = (X - self.Xmeans) / self.Xstds
        
        # Instantiate Optimizers object by giving it vector of all weights
        optimizer = optimizers.Optimizers(self.all_weights)

       
        error_convert_f = lambda err: np.sqrt(err)#[0] # to scalar
            #print("ko")
            #error_convert_f = lambda err: (np.sqrt(err) * self.Tstds)[0] # to scalar
        #else:
            #error_convert_f = lambda err: np.sqrt(err)[0] # to scalar
        
        

        if method == 'sgd':

            error_trace = optimizer.sgd(self.error_f, self.gradient_f,
                                        fargs=[X, T], n_epochs=n_epochs,
                                        learning_rate=learning_rate,
                                        verbose=True,
                                        error_convert_f=error_convert_f)

        elif method == 'adam':
            
            
            

            error_trace = optimizer.adam(self.error_f, self.gradient_f,
                                         fargs=[X, T], n_epochs=n_epochs,
                                         learning_rate=learning_rate,
                                         verbose=True,
                                         error_convert_f=error_convert_f)

        else:
            raise Exception("method must be 'sgd' or 'adam'")
        
        self.error_trace = error_trace
        
    def forward_pass(self, X):
        
        
        #print('a')
        
        self.Ys = [X]
        for W in self.Ws[:-1]:
            if self.activation_function == 'relu':
                self.Ys.append(self.relu(self.Ys[-1] @ W[1:, :] + W[0:1, :]))
            else:
                self.Ys.append(np.tanh(self.Ys[-1] @ W[1:, :] + W[0:1, :]))

        last_W = self.Ws[-1]
        self.Ys.append(self.Ys[-1] @ last_W[1:, :] + last_W[0:1, :])
        
        return self.Ys
        # Return neural network object to allow applying other methods after training.
        #  Example:    Y = nnet.train(X, T, 100, 0.01).use(X)
            
    def softmax(self,X):
        
        fs = np.exp(X)  # N x K
        denom = np.sum(fs, axis=1).reshape((-1, 1))
        gs = fs / denom
        
        return gs

    def makeIndicatorVars(self,T):
        # Make sure T is two-dimensional. Should be nSamples x 1.
        if T.ndim == 1:
            T = T.reshape((-1, 1)) 
        #print("red") 
        return (T == np.unique(T)).astype(int)

    #TtrainI = makeIndicatorVars(T)
      
          #neg_log_likelihood
    def error_f(self, X, T):
        
        Ys = self.forward_pass(X)
        Ys=np.array(Ys[-1])
        #print(T.shape)
        #print("help")
        #print(Ys.shape)
        #1/0
        # w = warg.reshape((-1,K))  
        Y = self.softmax(Ys)  
        #print("help")
        #1/0
        #print(- np.mean(self.makeIndicatorVars(T) * np.log(Y)))
        return - np.mean(self.makeIndicatorVars(T) * np.log(Y))
    
    def gradient_neg_log_likelihood(self,Ys,T):
        
        #print("kil")
        #print(self.Ys[-1].shape )
    
        Y = self.softmax(self.Ys[-1])
        #print("Y")
        #print(Y.shape)
        #1/0
        #print((self.makeIndicatorVars(T).shape))
        #print("one")
        #1/0
        #* self.makeIndicatorVars(T).shape[1]))
        #print(Ys.T.shape)
        #1/0
        #self.Ys[-1].T @
        #print(Ttrain.shape)
        #print("redbull1")
        #print(T.shape)
        #1/0
        grad =  (Y - self.makeIndicatorVars(T)) / (self.makeIndicatorVars(T).shape[0] * self.makeIndicatorVars(T).shape[1])
        #print("redbull2")
        #print(grad.shape)
        #print("one")
        #1/0
        return grad
        
    def gradient_f(self, X, T):
        '''Assumes forward_pass just called with layer outputs in self.Ys.'''
        error = self.makeIndicatorVars(T) - self.Ys[-1]
        #print("red")
        #print(T.shape)
        n_samples = X.shape[0]
        n_outputs = self.makeIndicatorVars(T).shape[1]
        #print("bull")
        #1/0
        delta = - error / (n_samples * n_outputs)
        delta1=self.gradient_neg_log_likelihood(self.Ys[-1],T)
        #print("bekow")
        #print(delta1.shape)
        #1/0
        n_layers = len(self.n_hiddens_per_layer) + 1
        # Step backwards through the layers to back-propagate the error (delta)
        for layeri in range(n_layers - 1, -1, -1):
            #print("hi")
            #1/0
            # gradient of all but bias weights
            #print((self.Ys[layeri]).shape)
            #print(self.dE_dWs[layeri][1:, :].shape)
            #print("done")
            #print(delta.shape)

            self.dE_dWs[layeri][1:, :] = self.Ys[layeri].T @ delta1
            #print("hi2")
            
            # gradient of just the bias weights
            self.dE_dWs[layeri][0:1, :] = np.sum(delta1, 0)
            # Back-propagate this layer's delta to previous layer
            if self.activation_function == 'relu':
                delta1 = delta1 @ self.Ws[layeri][1:, :].T * self.grad_relu(self.Ys[layeri])
            else:
                delta1 = delta1 @ self.Ws[layeri][1:, :].T * (1 - self.Ys[layeri] ** 2)
            #print("redbull")
            #1/0
        return self.all_gradients

    def use(self,X):
        
        X = (X - self.Xmeans) / self.Xstds
        Ys = self.forward_pass(X)
        Y = Ys[-1]
        prob=self.softmax(Y)
        self.classes =np.argmax(prob[:,:],axis=1)
        
        return self.classes,prob

        
    
            
                    
            
        #return self
    
    


In [30]:
import pickle
import gzip

with gzip.open('mnist.pkl.gz', 'rb') as f:
    train_set, valid_set, test_set = pickle.load(f, encoding='latin1')

Xtrain = train_set[0]
Ttrain = train_set[1].reshape(-1, 1)

Xval = valid_set[0]
Tval = valid_set[1].reshape(-1, 1)

Xtest = test_set[0]
Ttest = test_set[1].reshape(-1, 1)

print(Xtrain.shape, Ttrain.shape,  Xval.shape, Tval.shape,  Xtest.shape, Ttest.shape)

(50000, 784) (50000, 1) (10000, 784) (10000, 1) (10000, 784) (10000, 1)


In [31]:
classes = np.arange(10)
(Ttrain == classes).shape


(50000, 10)

In [32]:
n_epochs = 5
learning_rate = 0.01

np.random.seed(142)

nnet = NeuralNetworkClassifier(Xtrain.shape[1], [5], len(classes))


In [33]:
nnet.train(Xtrain, Ttrain, n_epochs, learning_rate, method='adam', verbose=True)

Adam: Epoch 1 Error=0.48043
Adam: Epoch 2 Error=0.46889
Adam: Epoch 3 Error=0.46022
Adam: Epoch 4 Error=0.45391
Adam: Epoch 5 Error=0.44925


In [34]:
print(nnet)  # uses the __str__ method

NeuralNetworkClassifier(784, [5], 10, 'tanh') trained for 5 epochs, final training error 0.4492


In [35]:
Y_classes, Y_Probs = nnet.use(Xtest)
print(Y_classes.shape)

(10000,)


In [36]:
import pandas

result = []
result.append(['Train', *(Ttrain == classes).sum(axis=0) / Ttrain.shape[0]])
result.append(['Tval', *(Tval == classes).sum(axis=0) / Tval.shape[0]])
result.append(['Ttest', *(Ttest == classes).sum(axis=0) / Ttest.shape[0]])
pandas.DataFrame(result)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10
0,Train,0.09864,0.11356,0.09936,0.10202,0.09718,0.09012,0.09902,0.1035,0.09684,0.09976
1,Tval,0.0991,0.1064,0.099,0.103,0.0983,0.0915,0.0967,0.109,0.1009,0.0961
2,Ttest,0.098,0.1135,0.1032,0.101,0.0982,0.0892,0.0958,0.1028,0.0974,0.1009


In [37]:
def confusion_matrix(Y_classes, Ttest):
    table = []
    for true_class in range(10):
        row = []
        Ttest=Ttest.reshape(-1)
        for predicted_class in range(10):
            row.append(100 * np.mean(Y_classes[Ttest == true_class] == predicted_class))
        table.append(row)
    table
    
    def T_unique(T):
        return np.unique(T)
    Ttest_names=T_unique(Ttest)
    Y_classes=T_unique(Y_classes)
    return pandas.DataFrame(table, index=Ttest_names, columns=Y_classes)

In [38]:
Y_classes, Probs = nnet.use(Xtest)
#print(Y_classes.shape)
#print("into con")
confusion_matrix(Y_classes,Ttest)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
0,43.265306,0.0,1.122449,0.102041,0.306122,1.020408,13.061224,41.122449,0.0,0.0
1,0.0,19.559471,41.85022,18.854626,15.594714,0.088106,0.440529,3.524229,0.088106,0.0
2,2.034884,0.096899,59.496124,33.72093,0.775194,0.484496,0.678295,2.51938,0.096899,0.096899
3,0.39604,0.29703,10.693069,80.594059,0.29703,0.19802,2.673267,4.158416,0.29703,0.39604
4,27.800407,4.07332,11.405295,0.814664,27.393075,3.564155,0.814664,4.276986,0.509165,19.348269
5,10.089686,0.44843,7.399103,11.995516,5.044843,43.049327,13.2287,4.820628,1.345291,2.578475
6,2.296451,0.0,3.966597,8.350731,0.730689,5.532359,68.893528,9.812109,0.313152,0.104384
7,6.614786,1.945525,3.501946,4.863813,0.680934,0.0,0.291829,81.225681,0.0,0.875486
8,2.977413,0.205339,18.583162,33.880903,1.129363,14.989733,11.190965,3.285421,11.293634,2.464066
9,38.255699,0.49554,4.063429,1.684836,2.675917,0.792864,0.891972,12.983152,0.594648,37.561943
